Spark SQL数据加载和保存实例讲解
Spark SQL是Apache Spark的一个模块,它提供了一个简单而强大的接口来访问结构化数据。与传统的RDD编程不同,Spark SQL提供了一种名为DataFrame的分布式数据集,它不仅可以让开发者使用SQL查询和处理数据,还可以非常容易地与其他数据源集成,如Hive、Avro、Parquet等。
1. 加载本地数据
在Spark SQL中,通常使用DataFrames来处理结构化数据。使用spark.read
方法和特定的数据源读取数据并生成DataFrame。以下是一个加载CSV格式数据的示例:
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("path/to/file.csv")
这里使用了csv
作为数据源的格式,还设置了选项header
和inferSchema
。header为true表示数据包含头部,第一行将被视为数据列名;inferSchema为true表示由程序自动推断数据类型。
2. 加载远程数据源
Spark SQL除了可以加载本地文件,还可以从远程数据源加载数据。以下是从MySQL数据库读取数据的示例:
val jdbcDF = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/test")
.option("dbtable", "data")
.option("user", "root")
.option("password", "password")
.load()
这里使用了jdbc
作为数据源的格式,通过设置url
参数指定数据库连接地址,dbtable
参数指定数据表名,user
和password
参数指定数据库登陆账号和密码。
3. 保存数据
在Spark SQL中,除了可以从各种数据源加载数据,还可以将数据保存到各种形式的数据存储系统中。以下是将数据保存到CSV文件中的示例:
df.write.mode("overwrite").format("csv")
.option("header", "true")
.save("path/to/output")
这里使用了csv
作为格式,设置了选项header
为true表示包含头部信息。此外使用了overwrite
模式表示覆盖掉已经存在的文件。
除了CSV格式的文件外,还可以将数据保存到Hive表、关系型数据库等等。以下是将数据保存到MySQL数据库的示例:
jdbcDF.write.mode(SaveMode.Append)
.jdbc("jdbc:mysql://localhost/test", "data",
new Properties().setProperty("user", "root")
.setProperty("password", "password")
)
这里的jdbc
还是作为数据源的格式,通过设置url
参数指定数据库连接地址,dbtable
参数指定数据要插入的表名。此外还需要设置数据库的登陆账号和密码等操作。
4. 运行示例
在加载和保存数据的实现过程中,本文使用的实例数据是titanic.csv
,它记录了1912年泰坦尼克号沉船事件中乘客的生还信息。
下面我们来看一下如何使用Spark SQL加载和处理这份数据。首先,在Spark Shell中进入SQL模式:
bin/spark-shell --master local[*] --packages com.databricks:spark-csv_2.11:1.5.0
这里设置了Spark执行模式为local模式,使用spark-csv
包读取CSV文件。
接下来,将数据加载进来:
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("titanic.csv")
数据已经成功加载,我们可以使用DataFrame中的一些方法探索数据结构和内容。例如,查看数据类型:
df.printSchema()
这里的printSchema()
方法将DataFrame的结构打印出来,包括每一列的名称和数据类型。其中,重要的信息是每列数据的类型,它对之后DataFrame的处理至关重要。
接下来,在Spark SQL中使用SQL语句查询数据。例如,查找年龄大于30岁的生还者的数量:
df.createOrReplaceTempView("titanic")
val age30Survived = spark.sql("SELECT COUNT(*) FROM titanic WHERE Age > 30 AND Survived = 1")
age30Survived.show()
这里使用了createOrReplaceTempView()
方法将DataFrame转化为表,并使用Spark SQL执行了一条查询。最后,将结果打印出来,我们可以看到符合条件的结果有112条。
最后,我们将处理结果保存到CSV文件中:
age30Survived.write.mode("overwrite").format("csv")
.option("header", "true")
.save("age30Survived.csv")
此时处理结果已经保存到了age30Survived.csv
文件中。