1. 简介
Spark SQL是Apache Spark的模块之一,它提供了一个关系型数据处理引擎,能够从不同数据源读取数据并处理数据,支持使用SQL语句和DataFrame API进行数据操作。
2. Dataframe
2.1 简介
DataFrame是Spark SQL中最核心的数据结构之一,它是分布式的、可定制的、不可变的数据集合,可以方便地进行结构化数据处理。
DataFrame可以被理解为类似于关系型数据库中的一张表,由Row类型的行组成。每个行都有自己的数据类型,数据类型由StructType类型的模式定义。
2.2 读取数据
Spark SQL可以从不同的数据源读取数据,包括本地文件系统、Hadoop的HDFS、Apache Cassandra、Amazon S3等。
读取本地文件系统中的CSV文件,可以使用以下代码:
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("file.csv")
spark.read.format("csv")指定读取的数据源格式为CSV格式。然后通过option()方法设置读取的参数,比如设置是否读取表头(header),是否自动推断数据类型(inferSchema)等。最后通过load()方法加载数据。
2.3 DataFrame API操作
使用DataFrame API可以方便地进行复杂的数据操作,包括数据筛选、数据聚合、数据排序等。下面介绍两种常见的DataFrame API操作方法。
2.4 操作方法一:SQL语句
Spark SQL支持使用SQL语句操作DataFrame,这意味着熟悉SQL语言的开发者可以非常方便地进行复杂的数据操作。
通过以下代码示例,我们可以使用Spark SQL语句读取本地CSV文件,对数据进行筛选和聚合并输出结果:
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT COUNT(*) as count FROM people WHERE age BETWEEN 10 AND 20")
sqlDF.show()
df.createOrReplaceTempView("people")将读取的CSV文件生成一个名为“people”的临时视图,可以在随后的SQL查询中使用。
spark.sql("SELECT COUNT(*) as count FROM people WHERE age BETWEEN 10 AND 20")指定需要查询的SQL语句,通过show()方法输出结果。
2.5 操作方法二:DataFrame API
Spark SQL提供了一套API对DataFrame进行操作,这样可以完全避免使用SQL语句进行操作。
以下代码示例展示了如何使用DataFrame API对本地CSV文件进行筛选和聚合操作:
import spark.implicits._
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("file.csv")
val result = df.select("name", "age")
.where($"age" between (10,20))
.groupBy("name")
.count()
result.show()
import spark.implicits._引入隐式转换,使得使用DataFrame API时可以使用一些便捷的操作符。
df.select("name", "age")选择需要的列。
where($"age" between (10,20))筛选年龄在10到20岁之间的行。
groupBy("name")按名称对数据进行分组。
count()对每个组进行计数。
result.show()输出结果。
3. 总结
本文介绍了Spark SQL中DataFrame的概念和操作方法,分别通过SQL语句和DataFrame API对本地CSV文件进行了筛选和聚合操作。
使用Spark SQL中的DataFrame可以方便地对结构化数据进行处理,并且可以选择使用SQL语句或DataFrame API进行操作,具有非常高的灵活性和可定制性。