Spark SQL数据加载和保存实例讲解

Spark SQL数据加载和保存实例讲解

Spark SQL是Apache Spark的一个模块,它提供了一个简单而强大的接口来访问结构化数据。与传统的RDD编程不同,Spark SQL提供了一种名为DataFrame的分布式数据集,它不仅可以让开发者使用SQL查询和处理数据,还可以非常容易地与其他数据源集成,如Hive、Avro、Parquet等。

1. 加载本地数据

在Spark SQL中,通常使用DataFrames来处理结构化数据。使用spark.read方法和特定的数据源读取数据并生成DataFrame。以下是一个加载CSV格式数据的示例:

val df = spark.read.format("csv")

.option("header", "true")

.option("inferSchema", "true")

.load("path/to/file.csv")

这里使用了csv作为数据源的格式,还设置了选项headerinferSchema。header为true表示数据包含头部,第一行将被视为数据列名;inferSchema为true表示由程序自动推断数据类型。

2. 加载远程数据源

Spark SQL除了可以加载本地文件,还可以从远程数据源加载数据。以下是从MySQL数据库读取数据的示例:

val jdbcDF = spark.read.format("jdbc")

.option("url", "jdbc:mysql://localhost/test")

.option("dbtable", "data")

.option("user", "root")

.option("password", "password")

.load()

这里使用了jdbc作为数据源的格式,通过设置url参数指定数据库连接地址,dbtable参数指定数据表名,userpassword参数指定数据库登陆账号和密码。

3. 保存数据

在Spark SQL中,除了可以从各种数据源加载数据,还可以将数据保存到各种形式的数据存储系统中。以下是将数据保存到CSV文件中的示例:

df.write.mode("overwrite").format("csv")

.option("header", "true")

.save("path/to/output")

这里使用了csv作为格式,设置了选项header为true表示包含头部信息。此外使用了overwrite模式表示覆盖掉已经存在的文件。

除了CSV格式的文件外,还可以将数据保存到Hive表、关系型数据库等等。以下是将数据保存到MySQL数据库的示例:

jdbcDF.write.mode(SaveMode.Append)

.jdbc("jdbc:mysql://localhost/test", "data",

new Properties().setProperty("user", "root")

.setProperty("password", "password")

)

这里的jdbc还是作为数据源的格式,通过设置url参数指定数据库连接地址,dbtable参数指定数据要插入的表名。此外还需要设置数据库的登陆账号和密码等操作。

4. 运行示例

在加载和保存数据的实现过程中,本文使用的实例数据是titanic.csv,它记录了1912年泰坦尼克号沉船事件中乘客的生还信息。

下面我们来看一下如何使用Spark SQL加载和处理这份数据。首先,在Spark Shell中进入SQL模式:

bin/spark-shell --master local[*] --packages com.databricks:spark-csv_2.11:1.5.0

这里设置了Spark执行模式为local模式,使用spark-csv包读取CSV文件。

接下来,将数据加载进来:

val df = spark.read.format("csv")

.option("header", "true")

.option("inferSchema", "true")

.load("titanic.csv")

数据已经成功加载,我们可以使用DataFrame中的一些方法探索数据结构和内容。例如,查看数据类型:

df.printSchema()

这里的printSchema()方法将DataFrame的结构打印出来,包括每一列的名称和数据类型。其中,重要的信息是每列数据的类型,它对之后DataFrame的处理至关重要。

接下来,在Spark SQL中使用SQL语句查询数据。例如,查找年龄大于30岁的生还者的数量:

df.createOrReplaceTempView("titanic")

val age30Survived = spark.sql("SELECT COUNT(*) FROM titanic WHERE Age > 30 AND Survived = 1")

age30Survived.show()

这里使用了createOrReplaceTempView()方法将DataFrame转化为表,并使用Spark SQL执行了一条查询。最后,将结果打印出来,我们可以看到符合条件的结果有112条。

最后,我们将处理结果保存到CSV文件中:

age30Survived.write.mode("overwrite").format("csv")

.option("header", "true")

.save("age30Survived.csv")

此时处理结果已经保存到了age30Survived.csv文件中。

数据库标签