Spark SQL 2.4.8 操作 Dataframe的两种方式

1. 简介

Spark SQL是Apache Spark的模块之一,它提供了一个关系型数据处理引擎,能够从不同数据源读取数据并处理数据,支持使用SQL语句和DataFrame API进行数据操作。

2. Dataframe

2.1 简介

DataFrame是Spark SQL中最核心的数据结构之一,它是分布式的、可定制的、不可变的数据集合,可以方便地进行结构化数据处理。

DataFrame可以被理解为类似于关系型数据库中的一张表,由Row类型的行组成。每个行都有自己的数据类型,数据类型由StructType类型的模式定义。

2.2 读取数据

Spark SQL可以从不同的数据源读取数据,包括本地文件系统、Hadoop的HDFS、Apache Cassandra、Amazon S3等。

读取本地文件系统中的CSV文件,可以使用以下代码:

val df = spark.read.format("csv")

.option("header", "true")

.option("inferSchema", "true")

.load("file.csv")

spark.read.format("csv")指定读取的数据源格式为CSV格式。然后通过option()方法设置读取的参数,比如设置是否读取表头(header),是否自动推断数据类型(inferSchema)等。最后通过load()方法加载数据。

2.3 DataFrame API操作

使用DataFrame API可以方便地进行复杂的数据操作,包括数据筛选、数据聚合、数据排序等。下面介绍两种常见的DataFrame API操作方法。

2.4 操作方法一:SQL语句

Spark SQL支持使用SQL语句操作DataFrame,这意味着熟悉SQL语言的开发者可以非常方便地进行复杂的数据操作。

通过以下代码示例,我们可以使用Spark SQL语句读取本地CSV文件,对数据进行筛选和聚合并输出结果:

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT COUNT(*) as count FROM people WHERE age BETWEEN 10 AND 20")

sqlDF.show()

df.createOrReplaceTempView("people")将读取的CSV文件生成一个名为“people”的临时视图,可以在随后的SQL查询中使用。

spark.sql("SELECT COUNT(*) as count FROM people WHERE age BETWEEN 10 AND 20")指定需要查询的SQL语句,通过show()方法输出结果。

2.5 操作方法二:DataFrame API

Spark SQL提供了一套API对DataFrame进行操作,这样可以完全避免使用SQL语句进行操作。

以下代码示例展示了如何使用DataFrame API对本地CSV文件进行筛选和聚合操作:

import spark.implicits._

val df = spark.read.format("csv")

.option("header", "true")

.option("inferSchema", "true")

.load("file.csv")

val result = df.select("name", "age")

.where($"age" between (10,20))

.groupBy("name")

.count()

result.show()

import spark.implicits._引入隐式转换,使得使用DataFrame API时可以使用一些便捷的操作符。

df.select("name", "age")选择需要的列。

where($"age" between (10,20))筛选年龄在10到20岁之间的行。

groupBy("name")按名称对数据进行分组。

count()对每个组进行计数。

result.show()输出结果。

3. 总结

本文介绍了Spark SQL中DataFrame的概念和操作方法,分别通过SQL语句和DataFrame API对本地CSV文件进行了筛选和聚合操作。

使用Spark SQL中的DataFrame可以方便地对结构化数据进行处理,并且可以选择使用SQL语句或DataFrame API进行操作,具有非常高的灵活性和可定制性。

数据库标签