Bootstrap

Spark SQL和DataFrames:与外部数据源进行交互(五)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与AI相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI等相关文章,目前已经输出了40万字的推荐系统系列精品文章,今年 6 月底会出版「构建企业级推荐系统:算法、工程实现与案例分析」一书。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。

在上一章中,我们介绍了与Spark中内置数据源的交互。我们还仔细研究了DataFrame API及其与Spark SQL的相互操作性。在本章中,我们将重点介绍Spark SQL与外部组件的接口。具体来说,我们讨论Spark SQL能够使你做什么:

1.使用用户自定义的函数对Apache Hive和Apache Spark进行操作。

2.与外部数据源连接,例如JDBC和SQL数据库,PostgreSQL,MySQL,Tableau,Azure Cosmos DB和MS SQL Server。

3.使用简单和复杂的类型,高阶函数以及常见的关系运算符。

我们还将介绍一些使用Spark SQL查询Spark的不同选项,例如Spark SQL shell,Beeline和Tableau。

Spark SQL和Apache Hive

Spark SQL是Apache Spark的基础组件,该组件将关系处理与Spark的功能编程API集成在一起。它的起源是在以前有关Shark的工作中。Shark最初基于Apache Spark之上的Hive代码库构建,并成为Hadoop系统上最早的交互式SQL查询引擎之一。这说明了兼顾两全其美是有可能的: 速度与企业数据仓库一样快,并且可以像Hive/MapReduce一样进行扩展。

Spark SQL使Spark程序员可以利用更快的性能和关系编程(例如,声明性查询和优化的存储)以及调用复杂的分析库(例如,机器学习)。如上一章所述,从Apache Spark 2.x开始,SparkSession提供了一个统一的入口点来操作Spark中的数据。

用户自定义函数

尽管Apache Spark具有大量内置功能,但Spark的灵活性允许数据工程师和数据科学家定义自己的功能。这些被称为用户自定义函数(UDF)。

SPARK SQL UDF

创建自己的PySpark或Scala UDF的好处是,你(和其他人)将能够使你在Spark SQL中进行使用。例如,数据科学家可以将ML模型包装在UDF中,以便数据分析人员可以在Spark SQL中查询其预测结果,而不必了解模型的内部结构。

这是创建Spark SQL UDF的简单示例。请注意,UDF在每个会话中运行,并且不会持久化在底层元存储中:

// In Scala

// Create cubed function

val cubed = (s: Long) => {

  s * s * s

}

 

// Register UDF

spark.udf.register("cubed", cubed)

 

// Create temporary view

spark.range(1, 9).createOrReplaceTempView("udf_test")

 

 In Python

from pyspark.sql.types import LongType

 

 Create cubed function

def cubed(s):

  return s * s * s

 

 Register UDF

spark.udf.register("cubed", cubed, LongType())

 

 Generate temporary view

spark.range(1, 9).createOrReplaceTempView("udf_test")

现在,你可以使用Spark SQL执行以下任意一个cubed()函数:

// In Scala/Python

// Query the cubed UDF

spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

 

+---+--------+

| id|id_cubed|

+---+--------+

|  1|       1|

|  2|       8|

|  3|      27|

|  4|      64|

|  5|     125|

|  6|     216|

|  7|     343|

|  8|     512|

+---+--------+

SPARK SQL中的赋值顺序和空值校验

Spark SQL(包括SQL,DataFrame API和Dataset API)不保证子表达式的求值顺序。例如,以下查询不能保证“s is NOT NULL“子句在“strlen(s) > 1”子句之前执行:

spark.sql("SELECT s FROM test1 WHERE s IS NOT NULL AND strlen(s) > 1")

因此,为了执行正确的null校验,建议你执行以下操作:

1. 使UDF本身意识到可能会存在null值,并在UDF内部进行null检查。

2. 使用IF或CASE WHEN表达式进行null检查,并在条件分支中调用UDF。

使用PANDAS UDF加速和分发PYSPARK UDF

使用PySpark UDF之前存在的主要问题之一是,它们的性能比Scala UDF慢。这是因为PySpark UDF需要在JVM和Python之间进行数据移动,这个过程比较耗费性能。为了解决此问题,Pandas UDF(也称为向量化UDF)作为Apache Spark 2.3的一部分引入。Pandas UDF使用Apache Arrow传输数据,使用Pandas处理数据。你可以使用关键字pandas_udf作为修饰符来定义Pandas UDF ,或者包装函数本身。一旦数据采用Apache Arrow格式,不再需要对数据进行序列化处理,因为它已经是Python进程可使用的格式。你不是逐行操作单个输入源,而是在Pandas Series或DataFrame上进行操作(即向量化执行)。

从具有Python 3.6及更高版本的Apache Spark 3.0起,Pandas UDF分为两个API类别:Pandas UDF和Pandas Function API。

Pandas UDF

用Apache Spark 3.0,Pandas UDF从Pandas UDF中的Python类型提示推断Pandas  UDF类型,如 pandas.Series,pandas.DataFrame,Tuple,和Iterator。以前,你需要手动定义和指定每种Pandas UDF类型。现在,Pandas UDF中支持的Python类型提示案例是Series到Series,Series迭代器到Series迭代器、多重Series迭代器到Series迭代器和Series到标量(单个值)。

Pandas Function API

Pandas函数API允许你将本地Python函数直接应用于PySpark DataFrame,其中输入和输出均为Pandas实例。对于Spark 3.0,受支持的Pandas Function API为grouped map, map, cogrouped map。

 

欲了解更多信息,请参阅第12章中的“利用Python类型提示重新设计Pandas UDF”一节。

以下是用于Spark 3.0的标量Pandas UDF的示例:

 In Python

 Import pandas

import pandas as pd

 

 Import various pyspark SQL functions including pandas_udf

from pyspark.sql.functions import col, pandas_udf

from pyspark.sql.types import LongType

 

 Declare the cubed function

def cubed(a: pd.Series) -> pd.Series:

    return a * a * a

 

 Create the pandas UDF for the cubed function

cubed_udf = pandas_udf(cubed, returnType=LongType())

前面的代码片段声明了一个称为cubed()执行立方操作的函数。这是常规的Pandas函数,带有额外的cubed_udf = pandas_udf()调用来创建我们的Pandas UDF。

让我们从一个简单的Pandas系列(为定义x)开始,然后将本地函数cubed()应用于立方计算:

Create a Pandas Series

x = pd.Series([1, 2, 3])

 

 The function for a pandas_udf executed with local Pandas data

print(cubed(x))

输出如下:

0 1

1 8

2 27

dtype:int64

现在,让我们切换到Spark DataFrame。我们可以将其作为Spark向量化UDF执行,如下所示:

 Create a Spark DataFrame, 'spark' is an existing SparkSession

df = spark.range(1, 4)

 

 Execute function as a Spark vectorized UDF

df.select("id", cubed_udf(col("id"))).show()

这是输出:

+ --- + --------- +

| id | cubed(id)|

+ --- + --------- +

| 1 | 1 |

| 2 | 8 |

| 3 | 27 |

+ --- + --------- +

与局部函数相反,使用向量化的UDF将导致执行Spark作业。以前的本地函数是仅在Spark驱动程序上执行的Pandas函数。在此pandas_udf功能的一个阶段(图5-1)中查看Spark UI时,这一点变得更加明显。

要更深入地了解Pandas UDF,请参阅pandas用户自定义函数文档。

像许多Spark作业一样,该作业通过parallelize()方法将本地数据(二进制批处理)发送给Executor,并调用mapPartitions()将二进制数据转换为Spark的内部数据格式,该格式可以分发给Spark的工作节点。有许多WholeStageCodegen步骤代表了性能上的根本提升(这要归功于Tungsten项目的整个阶段代码生成,显着提高了CPU效率和性能)。但这是由ArrowEvalPython的步骤确定(在本例中)正在执行Pandas UDF的步骤。

使用Spark SQL Shell,Beeline和Tableau查询

存在各种机制可以用于查询Apache Spark,包括Spark SQL Shell,Beeline CLI实用程序以及诸如Tableau和Power BI之类的报表工具。在本节中,我们包含有关Tableau的说明;对于Power BI,请参阅文档。

使用Spark SQL Shell

spark-sql CLI是执行Spark SQL查询的便捷工具。虽然此实用程序在本地模式下与Hive Metastore服务进行通信,但它不会与Thrift JDBC/ODBC 服务(也称为Spark Thrift ServerSTS)通信。STS允许JDBC/ODBC客户端在Apache Spark上通过JDBC和ODBC协议执行SQL查询。

要启动Spark SQL CLI,进入$SPARK_HOME文件夹中执行以下命令:

./bin/spark-sql

启动Shell后,可以进行交互方式执行Spark SQL查询。让我们看几个例子。

创建表

要创建新的Spark SQL表,请执行以下语句:

spark-sql> CREATE TABLE people (name STRING, age int);

你的输出应与此类似,并注意创建Spark SQL表people及其文件位置(/user/hive/warehouse/people):

20/01/11 22:42:16 WARN HiveMetaStore: Location: file:/user/hive/warehouse/people

specified for non-external table:people

Time taken: 0.63 seconds

将数据插入表中

你可以通过执行以下的sql语句将数据插入Spark SQL表:

INSERT INTO people SELECT name, age FROM ...

由于你不依赖于从预先存在的表或文件中加载数据,因此可以使用INSERT...VALUES语句将数据插入表中。这三个sql语句将三个人信息(包括姓名和年龄,如果知道)插入到people表中:

spark-sql> INSERT INTO people VALUES ("Michael", NULL);

Time taken: 1.696 seconds

spark-sql> INSERT INTO people VALUES ("Andy", 30);

Time taken: 0.744 seconds

spark-sql> INSERT INTO people VALUES ("Samantha", 19);

Time taken: 0.637 seconds

spark-sql>

执行SPARK SQL查询

现在,表中已经有数据了,你可以对它执行Spark SQL查询。让我们从查看元存储中存在的表开始:

spark-sql> SHOW TABLES;

default people false

Time taken: 0.016 seconds, Fetched 1 row(s)

接下来,让我们找出表中20岁以下的年轻人:

spark-sql> SELECT * FROM people WHERE age < 20;

Samantha 19

Time taken: 0.593 seconds, Fetched 1 row(s)

同样,让我们​​看看年龄为空的人是谁,一般有这种情况说明可能是脏数据:

spark-sql> SELECT name FROM people WHERE age IS NULL;

Michael

Time taken: 0.272 seconds, Fetched 1 row(s)

使用Beeline

如果你使用过Apache Hive,则你可能熟悉命令行工具Beeline,这是一个用于对HiveServer2运行HiveQL查询的通用实用程序。Beeline是基于SQLLine CLI的JDBC客户端。你可以使用同一实用程序对Spark Thrift服务执行Spark SQL查询。请注意,当前实现的Thrift JDBC/ODBC服务对应于Hive 1.2.1中的HiveServer2。你可以使用Spark或Hive 1.2.1自身带有的Beeline脚本来测试JDBC服务。

启动THRIFT服务

要启动Spark Thrift JDBC/ODBC服务,进入$SPARK_HOME文件夹中执行以下命令:

./sbin/start-thriftserver.sh

 

如果尚未启动Spark driver和worker,请在执行start-thriftserver.sh命令之前先执行以下命令:

./sbin/start-all.sh

通过BEELINE连接到THRIFT服务

要使用Beeline测试Thrift JDBC/ODBC服务,请执行以下命令:

./bin/beeline

然后将Beeline配置为连接到本地Thrift服务:

!connect jdbc:hive2://localhost:10000 

 

默认情况下,beeline处于非安全模式。因此,用户名是你的登录名(例如user@learningspark.org),密码是空的。

使用BEELINE执行SPARK SQL查询

从这里,你可以运行Spark SQL查询,类似于使用Beeline进行Hive查询的方式。以下是一些查询示例及其输出:

0: jdbc:hive2://localhost:10000> SHOW tables;

+-----------+------------+--------------+

| database | tableName | isTemporary |

+-----------+------------+--------------+

| default | people | false |

+-----------+------------+--------------+

1 row selected (0.417 seconds)

0: jdbc:hive2://localhost:10000> SELECT * FROM people;

+-----------+-------+

| name | age |

+-----------+-------+

| Samantha | 19 |

| Andy | 30 |

| Michael | NULL |

+-----------+-------+

3 rows selected (1.512 seconds)

0: jdbc:hive2://localhost:10000>

停止THRIFT服务

完成spark sql操作后,可以使用以下命令停止Thrift服务:

./sbin/stop-thriftserver.sh

使用Tableau

与通过Beeline或Spark SQL CLI运行查询类似,你可以通过Thrift JDBC/ODBC服务将自己喜欢的BI工具连接到Spark SQL。在本节中,我们将向你展示如何将Tableau Desktop(2019.2版)连接到本地Apache Spark实例。

你需要已经安装Tableau的Spark ODBC驱动程序版本1.2.0或更高版本。如果你已经安装(或升级到)Tableau 2018.1或更高版本,则该驱动程序应该已经预先安装。

启动THRIFT服务

要启动Spark Thrift JDBC/ODBC服务,进入到$SPARK_HOME文件夹中执行以下命令:

./sbin/start-thriftserver.sh

 

如果尚未启动Spark驱动程序和worker服务,请在执行以下start-thriftserver.sh命令之前优先执行下面的命令启动相关服务:

 

./sbin/start-all.sh

启动TABLEAU

如果是第一次启动Tableau,将看到一个“连接”对话框,该对话框允许你连接到大量数据源。默认情况下,Spark SQL选项不会包含在左侧的“To a Server”菜单中(请参见图5-2)。

要访问Spark SQL选项,请单击该列表底部的“更多…”,然后从出现在主面板中的列表中选择Spark SQL,如图5-3所示。

这将弹出Spark SQL对话框(图5-4)。连接到本地Apache Spark实例时,可以使用以下参数使用非安全用户名身份验证模式:

 服务器:localhost

 端口:10000(默认)

 类型:SparkThriftServer(默认)

 身份验证:用户名

 用户名:你的登录名,例如,user@learningspark.org

 需要SSL:不选中

图5-4。Spark SQL对话框

一旦成功连接到Spark SQL数据源后,你将看到类似于图5-5的数据源连接视图。

从左侧的“Select schema”下拉菜单中,选择“default”。然后输入要查询的表的名称(请参见图5-6)。请注意,你可以单击放大镜图标以获取可用表的完整列表。

有关使用Tableau连接到Spark SQL数据库的更多信息,请参考Tableau的Spark SQL文档和Databricks Tableau文档。

输入people作为表名,然后将表从左侧拖放到主对话框中(在标记为“Drag tables here”的空间中)。你应该看到如图5-7所示的内容。

单击“立即更新(Update Now)”,然后Tableau将查询Spark SQL数据源(图5-8)。

现在,你可以对Spark数据源、关联表等执行查询,就像对任何其他Tableau数据源一样。

停止THRIFT服务

完成后,你可以使用以下命令停止Thrift服务:

./sbin/stop-thriftserver.sh

外部数据源

在本节中,我们将从JDBC和SQL数据库开始,着重介绍如何使用Spark SQL连接到外部数据源。

JDBC和SQL数据库

Spark SQL包含一个数据源API,可以使用JDBC从其他数据库读取数据。Spark SQL将结果返回为DataFrame,从而简化了查询这些数据源,进而提供了Spark SQL的所有优点(包括性能和与其他数据源关联的能力)。

首先,你需要为JDBC数据源指定JDBC驱动程序,并且该驱动程序必须位于Spark类路径上。从$SPARK_HOME文件夹中,你可以发出如下命令:

./bin/spark-shell --driver-class-path $database.jar --jars $database.jar 

使用数据源API,可以将远程数据库中的表作为DataFrame或Spark SQL临时视图加载。用户可以在数据源选项中指定JDBC连接属性。表5-1包含Spark支持的一些更常见的连接属性(不区分大小写)。

表5-1:通用连接属性

有关连接属性的完整列表,请参见Spark SQL文档。

分区的重要性

在Spark SQL和JDBC外部源之间传输大量数据时,对数据源进行分区很重要。你的所有数据都通过一个驱动程序进行连接,这可能导致饱和并显著降低提取性能,并有可能使源系统的资源饱和。虽然这些JDBC属性是可选的,但对于任何大规模操作,强烈建议使用表5-2中显示的属性。

表5-2:分区连接属性

让我们看一个示例,以帮助你了解这些属性的工作方式。假设我们使用以下设置:

l numPartitions: 10

l lowerBound: 1000

l upperBound: 10000

然后步长等于1000,将创建10个分区。这等效于执行以下10个查询(每个分区一个):

SELECT * FROM table WHERE partitionColumn BETWEEN 1000 and 2000

SELECT * FROM table WHERE partitionColumn BETWEEN 2000 and 3000

...

SELECT * FROM table WHERE partitionColumn BETWEEN 9000 and 10000

虽然没有所有内容,但在使用这些属性时,请牢记以下提示:

一个好的起点是numPartitions最好是Spark worker的倍数。例如,如果你有四个Spark worker节点,则可能从4个或8个分区开始。但是一定需要注意的是,源系统可以很好地处理读取请求也很重要。对于具有处理窗口的系统,你可以最大程度地增加对源系统的并发请求数。对于缺少处理窗口的系统(例如,OLTP系统连续处理数据),应减少并发请求的数量,以防止源系统负载过高。

最初,根据最小和最大实际值计算lowerBound。例如,如果你选择{numPartitions:10, lowerBound: 1000, upperBound: 10000},但所有值都在2000和4000之间,那么10个查询中只有2个(每个分区一个)将完成所有工作。在这种情况下,更好的配置将是{numPartitions:10, lowerBound: 2000, upperBound: 4000}

 选择可以均匀分布的partitionColumn,以避免数据倾斜。例如,如果你的大多数partitionColumn的值都是2500,则{numPartitions:10, lowerBound: 1000, upperBound: 10000}大多数工作将由请求在2000和3000之间的任务来执行。相反,请选择其他partitionColumn分区,或者在可能的情况下生成一个新分区(可能是多个列的哈希),以更均匀地分配你的分区。

PostgreSQL

要连接到PostgreSQL数据库,请从Maven构建或下载JDBC jar并将其添加到你的类路径中。然后启动一个Spark shell(spark-shell或pyspark),并指定该jar:

bin / spark-shell --jars postgresql-42.2.6.jar

以下示例显示了如何使用Scala中的Spark SQL数据源API和JDBC从PostgreSQL数据库加载并保存到PostgreSQL数据库:

// In Scala

// Read Option 1: Loading data from a JDBC source using load method

val jdbcDF1 = spark

  .read

  .format("jdbc")

  .option("url", "jdbc:postgresql:[DBSERVER]")

  .option("dbtable", "[SCHEMA].[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load()

 

// Read Option 2: Loading data from a JDBC source using jdbc method

// Create connection properties

import java.util.Properties

val cxnProp = new Properties()

cxnProp.put("user", "[USERNAME]")

cxnProp.put("password", "[PASSWORD]")

 

// Load data using the connection properties

val jdbcDF2 = spark

  .read

  .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)

 

// Write Option 1: Saving data to a JDBC source using save method

jdbcDF1

  .write

  .format("jdbc")

  .option("url", "jdbc:postgresql:[DBSERVER]")

  .option("dbtable", "[SCHEMA].[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save()

 

// Write Option 2: Saving data to a JDBC source using jdbc method

jdbcDF2.write

  .jdbc(s"jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]", cxnProp)

这是在PySpark中的操作方法:

 In Python

 Read Option 1: Loading data from a JDBC source using load method

jdbcDF1 = (spark

  .read

  .format("jdbc")

  .option("url", "jdbc:postgresql://[DBSERVER]")

  .option("dbtable", "[SCHEMA].[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load())

 

 Read Option 2: Loading data from a JDBC source using jdbc method

jdbcDF2 = (spark

  .read

  .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",

          properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))

 

 Write Option 1: Saving data to a JDBC source using save method

(jdbcDF1

  .write

  .format("jdbc")

  .option("url", "jdbc:postgresql://[DBSERVER]")

  .option("dbtable", "[SCHEMA].[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save())

 

 Write Option 2: Saving data to a JDBC source using jdbc method

(jdbcDF2

  .write

  .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]",

          properties={"user": "[USERNAME]", "password": "[PASSWORD]"}))

MySQL

要连接到MySQL数据库,请从Maven或MySQL中构建或下载JDBC jar (后者更简单!)并将其添加到你的类路径中。然后启动一个Spark shell(spark-shell或pyspark),并指定该jar:

bin / spark-shell --jars mysql-connector-java_8.0.16-bin.jar

以下示例显示了如何使用Scala中的Spark SQL数据源API和JDBC从MySQL数据库加载数据并将其保存到MySQL数据库:

// In Scala

// Loading data from a JDBC source using load

val jdbcDF = spark

  .read

  .format("jdbc")

  .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")

  .option("driver", "com.mysql.jdbc.Driver")

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load()

 

// Saving data to a JDBC source using save

jdbcDF

  .write

  .format("jdbc")

  .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")

  .option("driver", "com.mysql.jdbc.Driver")

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save()

以下是在Python中执行此操作的方法:

 In Python

 Loading data from a JDBC source using load

jdbcDF = (spark

  .read

  .format("jdbc")

  .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")

  .option("driver", "com.mysql.jdbc.Driver")

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load())

 

 Saving data to a JDBC source using save

(jdbcDF

  .write

  .format("jdbc")

  .option("url", "jdbc:mysql://[DBSERVER]:3306/[DATABASE]")

  .option("driver", "com.mysql.jdbc.Driver")

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save())

Azure Cosmos数据库

若要连接到Azure Cosmos DB数据库,请从Maven或GitHub构建或下载JDBC jar ,并将其添加到你的类路径中。然后启动一个Scala或PySpark shell,并指定此jar(请注意,此示例使用的是Spark 2.4):

bin / spark-shell --jars azure-cosmosdb-spark_2.4.0_2.11-1.3.5-uber.jar

你还可以选择使用其Maven选项--packages从Spark Packages中获得连接器:

export PKG="com.microsoft.azure:azure-cosmosdb-spark_2.4.0_2.11:1.3.5"

bin/spark-shell --packages $PKG 

以下示例显示如何使用Scala和PySpark中的Spark SQL数据源API和JDBC从Azure Cosmos DB数据库加载数据并将其保存到Azure Cosmos DB数据库。请注意,通常使用query_custom配置来利用Cosmos DB中的各种索引:

// In Scala

// Import necessary libraries

import com.microsoft.azure.cosmosdb.spark.schema._

import com.microsoft.azure.cosmosdb.spark._

import com.microsoft.azure.cosmosdb.spark.config.Config

 

// Loading data from Azure Cosmos DB

// Configure connection to your collection

val query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"

val readConfig = Config(Map(

  "Endpoint" -> "https://[ACCOUNT].documents.azure.com:443/",

  "Masterkey" -> "[MASTER KEY]",

  "Database" -> "[DATABASE]",

  "PreferredRegions" -> "Central US;East US2;",

  "Collection" -> "[COLLECTION]",

  "SamplingRatio" -> "1.0",

  "query_custom" -> query

))

 

// Connect via azure-cosmosdb-spark to create Spark DataFrame

val df = spark.read.cosmosDB(readConfig)

df.count

 

// Saving data to Azure Cosmos DB

// Configure connection to the sink collection

val writeConfig = Config(Map(

  "Endpoint" -> "https://[ACCOUNT].documents.azure.com:443/",

  "Masterkey" -> "[MASTER KEY]",

  "Database" -> "[DATABASE]",

  "PreferredRegions" -> "Central US;East US2;",

  "Collection" -> "[COLLECTION]",

  "WritingBatchSize" -> "100"

))

 

// Upsert the DataFrame to Azure Cosmos DB

import org.apache.spark.sql.SaveMode

df.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

 

 

 In Python

 Loading data from Azure Cosmos DB

 Read configuration

query = "SELECT c.colA, c.coln FROM c WHERE c.origin = 'SEA'"

readConfig = {

  "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",

  "Masterkey" : "[MASTER KEY]",

  "Database" : "[DATABASE]",

  "preferredRegions" : "Central US;East US2",

  "Collection" : "[COLLECTION]",

  "SamplingRatio" : "1.0",

  "schema_samplesize" : "1000",

  "query_pagesize" : "2147483647",

  "query_custom" : query

}

 

 Connect via azure-cosmosdb-spark to create Spark DataFrame

df = (spark

  .read

  .format("com.microsoft.azure.cosmosdb.spark")

  .options(**readConfig)

  .load())

 

 Count the number of flights

df.count()

 

 Saving data to Azure Cosmos DB

 Write configuration

writeConfig = {

 "Endpoint" : "https://[ACCOUNT].documents.azure.com:443/",

 "Masterkey" : "[MASTER KEY]",

 "Database" : "[DATABASE]",

 "Collection" : "[COLLECTION]",

 "Upsert" : "true"

}

 

 Upsert the DataFrame to Azure Cosmos DB

(df.write

  .format("com.microsoft.azure.cosmosdb.spark")

  .options(**writeConfig)

  .save())

有关更多信息,请参考Azure Cosmos DB文档。

MS SQL服务

要连接到MS SQL Server数据库,请下载JDBC jar并将其添加到你的类路径中。然后启动Scala或PySpark shell,并指定以下jar:

bin / spark-shell --jars mssql-jdbc-7.2.2.jre8.jar

以下示例显示了如何使用Scala和PySpark中的Spark SQL数据源API和JDBC从MS SQL Server数据库加载数据并将其保存到MS SQL Server数据库:

// In Scala

// Loading data from a JDBC source

// Configure jdbcUrl

val jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"

 

// Create a Properties() object to hold the parameters.

// Note, you can create the JDBC URL without passing in the

// user/password parameters directly.

val cxnProp = new Properties()

cxnProp.put("user", "[USERNAME]")

cxnProp.put("password", "[PASSWORD]")

cxnProp.put("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")

 

// Load data using the connection properties

val jdbcDF = spark.read.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)

 

// Saving data to a JDBC source

jdbcDF.write.jdbc(jdbcUrl, "[TABLENAME]", cxnProp)

 

 In Python

 Configure jdbcUrl

jdbcUrl = "jdbc:sqlserver://[DBSERVER]:1433;database=[DATABASE]"

 

 Loading data from a JDBC source

jdbcDF = (spark

  .read

  .format("jdbc")

  .option("url", jdbcUrl)

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .load())

 

 Saving data to a JDBC source

(jdbcDF

  .write

  .format("jdbc")

  .option("url", jdbcUrl)

  .option("dbtable", "[TABLENAME]")

  .option("user", "[USERNAME]")

  .option("password", "[PASSWORD]")

  .save())

其他外部来源

Apache Spark可以连接许多外部数据源。其他流行的数据源包括:

Apache Cassandra

Snowflake

MongoDB

DataFrames和Spark SQL中的高阶函数

由于复杂数据类型是简单数据类型的组合,因此很容易直接对其进行操作。有两种用于处理复杂数据类型的典型解决方案:

将嵌套结构分解为单独的元素,并进行相应的处理,然后重新创建嵌套结构

构建用户自定义函数

这些方法的好处是可以让你以扁平化的格式考虑问题。它们通常涉及(但不限于)使用实用程序函数,例如get_json_object(),from_json(),to_json(),explode(),和selectExpr()。

让我们仔细看看这两种选项。

选项1:Explode和Collect

在此嵌套的SQL语句中,我们首先explode(values),它为每个元素(value)创建新的一行(带有id)的值:

-- In SQL

SELECT id, collect_list(value + 1) AS values

FROM  (SELECT id, EXPLODE(values) AS value

        FROM table) x

GROUP BY id

虽然collect_list()返回具有重复项的对象列表,但该GROUP BY语句需要进行洗牌操作,这意味着重新收集的数组的顺序不一定与原始数组的顺序相同。由于values可以是任意数量的维度(非常宽和/或非常长的数组),而我们正在做GROUP BY,这种方法的使用成本可能会非常昂贵。

选项2:用户自定义函数

为了执行相同的任务(添加1到中的每个元素values),我们还可以创建map()用于遍历每个元素(value)并执行添加操作的UDF :

// In Scala

def addOne(values: Seq[Int]): Seq[Int] = {

    values.map(value => value + 1)

}

val plusOneInt = spark.udf.register("plusOneInt", addOne(_: Seq[Int]): Seq[Int])

然后,我们可以在Spark SQL中使用此UDF,如下所示:

spark.sql("SELECT id, plusOneInt(values) AS values FROM table").show() 

尽管这比使用explode()并且collect_list()更好,因为不会有任何排序问题,但是序列化和反序列化过程本身可能会很昂贵。但是,还必须注意,这collect_list()可能会导致Executor遇到大数据集时出现内存不足问题,而使用UDF可以减轻这些问题。

复杂数据类型的内置函数

你可以使用Apache Spark 2.4及更高版本中包含的内置函数来处理复杂数据类型,而不必使用前面介绍的潜在的昂贵技术。表5-3(数组类型)和表5-4(映射类型)中列出了一些较常见的类型。有关完整列表,请参阅Databricks文档中的笔记。

表5-3数组类型的函数

表5-4 映射类型的函数

高阶函数

除了前面提到的内置函数外,还有一些将匿名lambda函数作为参数的高阶函数。下面是一个高阶函数的示例:

-- In SQL

transform(values, value -> lambda expression)

该transform()函数以数组(values)和匿名函数(lambda expression)作为输入。通过将匿名函数应用于每个元素,然后将结果分配给输出数组,该函数可以明确地创建一个新数组(类似于UDF方法,但效率更高)。

让我们创建一个样本数据集,以便我们可以运行一些示例:

 In Python

from pyspark.sql.types import *

schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

 

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]

t_c = spark.createDataFrame(t_list, schema)

t_c.createOrReplaceTempView("tC")

 

# Show the DataFrame

t_c.show()

 

// In Scala

// Create DataFrame with two rows of two arrays (tempc1, tempc2)

val t1 = Array(35, 36, 32, 30, 40, 42, 38)

val t2 = Array(31, 32, 34, 55, 56)

val tC = Seq(t1, t2).toDF("celsius")

tC.createOrReplaceTempView("tC")

 

// Show the DataFrame

tC.show()

这里是输出:

+ -------------------- +

|               celsius|

+ -------------------- +

| [35、36、32、30,...  |

| [31,32,34,55,56]  |

+ -------------------- +

使用前面的DataFrame,你可以运行以下更高阶的函数查询。

transform()

transform(array ,function ):array

该transform()函数通过将一个函数应用于输入数组的每个元素来生成一个数组(类似于一个map()函数):

// In Scala/Python

// Calculate Fahrenheit from Celsius for an array of temperatures

spark.sql("""

SELECT celsius,

 transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit

  FROM tC

""").show()

 

+--------------------+--------------------+

|             celsius|          fahrenheit|

+--------------------+--------------------+

|[35,36,32,30,... |[95,96,89,86,... |

|[31,32,34,55,56] |[87,89,93,131,...|

+--------------------+--------------------+

filter()

filter(array ,function ):array

该filter()函数产生一个数组,该数组仅由输入数组的布尔函数值为true的元素组成:

// In Scala/Python

// Filter temperatures > 38C for array of temperatures

spark.sql("""

SELECT celsius,

 filter(celsius, t -> t > 38) as high

  FROM tC

""").show()

 

+--------------------+--------+

|             celsius|    high|

+--------------------+--------+

|[35,36,32,30,...|[40,42]|

|[31,32,34,55,56]|[55,56]|

+--------------------+--------+

exists()

exists(array, function): Boolean

如果布尔函数对于输入数组中的某一个元素成立,则该exists()函数返回true:

// In Scala/Python

// Is there a temperature of 38C in the array of temperatures

spark.sql("""

SELECT celsius,

       exists(celsius, t -> t = 38) as threshold

  FROM tC

""").show()

 

+--------------------+---------+

|             celsius|threshold|

+--------------------+---------+

|[35,36,32,30,...|     true|

|[31,32,34,55,56]|    false|

+--------------------+---------+

reduce()

reduce(array ,B,function ,function

该reduce()函数通过函数function将元素合并到缓冲区B中,并在最终缓冲区上应用函数function,从而将数组的元素减少为单个值:

// In Scala/Python

// Calculate average temperature and convert to F

spark.sql("""

SELECT celsius,

       reduce(

          celsius,

          0,

          (t, acc) -> t + acc,

          acc -> (acc div size(celsius) * 9 div 5) + 32

        ) as avgFahrenheit

  FROM tC

""").show()

 

+--------------------+-------------+

|             celsius|avgFahrenheit|

+--------------------+-------------+

|[35,36,32,30,...|           96|

|[31,32,34,55,56]|          105|

+--------------------+-------------+

通用DataFrame和Spark SQL操作

Spark SQL的部分功能来自其支持的各种DataFrame操作(也称为无类型的Dataset操作)。操作列表非常广泛,包括:

汇总功能

收集功能

日期时间功能

数学函数

混合功能

非集合函数

排序功能

字符串函数

UDF功能

窗口函数

有关完整列表,请参见Spark SQL文档。

在本章中,我们将重点介绍以下常见的关系操作:

union和join

窗口

修改

为了执行这些DataFrame操作,我们首先准备一些数据。在以下代码段中,我们:

1. 导入两个文件并创建两个DataFrame,一个用于机场(airportsna)信息,一个用于美国航班延误(departureDelays)。

2. 使用expr(),将delay和distance列从STRING转换为INT。

3. 创建一个较小的表foo,我们可以将其作为演示示例的重点;它仅包含有关在短时间内从西雅图(SEA)到旧金山(SFO)的三班航班的信息。

让我们开始吧:

// In Scala

import org.apache.spark.sql.functions._

 

// Set file paths

val delaysPath =

  "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

val airportsPath =

  "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

 

// Obtain airports data set

val airports = spark.read

  .option("header", "true")

  .option("inferschema", "true")

  .option("delimiter", "\t")

  .csv(airportsPath)

airports.createOrReplaceTempView("airports_na")

 

// Obtain departure Delays data set

val delays = spark.read

  .option("header","true")

  .csv(delaysPath)

  .withColumn("delay", expr("CAST(delay as INT) as delay"))

  .withColumn("distance", expr("CAST(distance as INT) as distance"))

delays.createOrReplaceTempView("departureDelays")

 

// Create temporary small table

val foo = delays.filter(

  expr("""origin == 'SEA' AND destination == 'SFO' AND

      date like '01010%' AND delay > 0"""))

foo.createOrReplaceTempView("foo")

 

 In Python

 Set file paths

from pyspark.sql.functions import expr

tripdelaysFilePath =

  "/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"

airportsnaFilePath =

  "/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

  

 Obtain airports data set

airportsna = (spark.read

  .format("csv")

  .options(header="true", inferSchema="true", sep="\t")

  .load(airportsnaFilePath))

 

airportsna.createOrReplaceTempView("airports_na")

 

 Obtain departure delays data set

departureDelays = (spark.read

  .format("csv")

  .options(header="true")

  .load(tripdelaysFilePath))

 

departureDelays = (departureDelays

  .withColumn("delay", expr("CAST(delay as INT) as delay"))

  .withColumn("distance", expr("CAST(distance as INT) as distance")))

 

departureDelays.createOrReplaceTempView("departureDelays")

 

 Create temporary small table

foo = (departureDelays

  .filter(expr("""origin == 'SEA' and destination == 'SFO' and

    date like '01010%' and delay > 0""")))

foo.createOrReplaceTempView("foo")

所述departureDelays DataFrame包含> 1.3M航班数据,而foo DataFrame包含从SEA到SFO的航班在特定时间范围内只有三条信息,见下面的输出:

// Scala/Python

spark.sql("SELECT * FROM airports_na LIMIT 10").show()

 

+-----------+-----+-------+----+

|       City|State|Country|IATA|

+-----------+-----+-------+----+

| Abbotsford|   BC| Canada| YXX|

|   Aberdeen|   SD|    USA| ABR|

|    Abilene|   TX|    USA| ABI|

|      Akron|   OH|    USA| CAK|

|    Alamosa|   CO|    USA| ALS|

|     Albany|   GA|    USA| ABY|

|     Albany|   NY|    USA| ALB|

|Albuquerque|   NM|    USA| ABQ|

| Alexandria|   LA|    USA| AEX|

|  Allentown|   PA|    USA| ABE|

+-----------+-----+-------+----+

 

spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

 

+--------+-----+--------+------+-----------+

|    date|delay|distance|origin|destination|

+--------+-----+--------+------+-----------+

|01011245|    6|     602|   ABE|        ATL|

|01020600|   -8|     369|   ABE|        DTW|

|01021245|   -2|     602|   ABE|        ATL|

|01020605|   -4|     602|   ABE|        ATL|

|01031245|   -4|     602|   ABE|        ATL|

|01030605|    0|     602|   ABE|        ATL|

|01041243|   10|     602|   ABE|        ATL|

|01040605|   28|     602|   ABE|        ATL|

|01051245|   88|     602|   ABE|        ATL|

|01050605|    9|     602|   ABE|        ATL|

+--------+-----+--------+------+-----------+

 

spark.sql("SELECT * FROM foo").show()

 

+--------+-----+--------+------+-----------+

|    date|delay|distance|origin|destination|

+--------+-----+--------+------+-----------+

|01010710|   31|     590|   SEA|        SFO|

|01010955|  104|     590|   SEA|        SFO|

|01010730|    5|     590|   SEA|        SFO|

+--------+-----+--------+------+-----------+

在以下各节中,我们将使用此数据执行union,join和窗口化示例。

Unions

Apache Spark中的一种常见模式是将具有相同模式的两个不同的DataFrame联合在一起。可以使用以下union()方法实现:

// Scala

// Union two tables

val bar = delays.union(foo)

bar.createOrReplaceTempView("bar")

bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'

AND date LIKE '01010%' AND delay > 0""")).show()

 

 In Python

# Union two tables

bar = departureDelays.union(foo)

bar.createOrReplaceTempView("bar")

 

 Show the union (filtering for SEA and SFO in a specific time range)

bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'

AND date LIKE '01010%' AND delay > 0""")).show()

该bar DataFrame是foo和delays 两者union之后得到的。使用相同的过滤条件在bar DataFrame中得到foo结果,正如预期的那样,我们看到了重复的数据:

-- In SQL

spark.sql("""

SELECT *

  FROM bar

 WHERE origin = 'SEA'

   AND destination = 'SFO'

   AND date LIKE '01010%'

   AND delay > 0

""").show()

 

+--------+-----+--------+------+-----------+

|    date|delay|distance|origin|destination|

+--------+-----+--------+------+-----------+

|01010710|   31|     590|   SEA|        SFO|

|01010955|  104|     590|   SEA|        SFO|

|01010730|    5|     590|   SEA|        SFO|

|01010710|   31|     590|   SEA|        SFO|

|01010955|  104|     590|   SEA|        SFO|

|01010730|    5|     590|   SEA|        SFO|

+--------+-----+--------+------+-----------+

Joins

常见的DataFrame操作是将两个DataFrame(或表)连接在一起。默认情况下,Spark SQL默认连接是inner join,其中连接选项包括inner,cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi和left_anti这些连接方式。文档中提供了更多信息(适用于Scala和Python)。

以下代码示例执行airports 和 foo DataFrame之间的inner join:

 

// In Scala

foo.join(

  airports.as('air),

  $"air.IATA" === $"origin"

).select("City", "State", "date", "delay", "distance", "destination").show()

 

 In Python

 Join departure delays data (foo) with airport info

foo.join(

  airports,

  airports.IATA == foo.origin

).select("City", "State", "date", "delay", "distance", "destination").show()

 

-- In SQL

spark.sql("""

SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination

  FROM foo f

  JOIN airports_na a

    ON a.IATA = f.origin

""").show()

前面的代码可以使你通过查看foo DataFrame 和airports DataFrame两者的关联信息,得到进入城市的日期,延迟,距离和目的地信息:

+-------+-----+--------+-----+--------+-----------+

| City|State| date|delay|distance|destination|

+-------+-----+--------+-----+--------+-----------+

|Seattle| WA|01010710| 31| 590| SFO|

|Seattle| WA|01010955| 104| 590| SFO|

|Seattle| WA|01010730| 5| 590| SFO|

+-------+-----+--------+-----+--------+-----------+

windowing

窗口函数使用窗口(输入的行范围)中行的值返回一组值,通常是以另一行的形式。使用窗口函数,可以在一组行上进行操作,同时仍为每个输入行返回一个值。在本节中,我们将展示如何使用dense_rank()窗口功能。如表5-5所示,还有许多其他功能。

让我们首先回顾一下从西雅图(SEA),旧金山(SFO)和纽约(JFK)出发并前往一组特定的目的地位置的航班所经历的TotalDelays(计算得出sum(Delay)),如以下查询所示:

-- In SQL

DROP TABLE IF EXISTS departureDelaysWindow;

 

CREATE TABLE departureDelaysWindow AS

SELECT origin, destination, SUM(delay) AS TotalDelays

  FROM departureDelays

 WHERE origin IN ('SEA', 'SFO', 'JFK')

   AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')

 GROUP BY origin, destination;

 

SELECT * FROM departureDelaysWindow

 

+------+-----------+-----------+

|origin|destination|TotalDelays|

+------+-----------+-----------+

|   JFK|        ORD|       5608|

|   SEA|        LAX|       9359|

|   JFK|        SFO|      35619|

|   SFO|        ORD|      27412|

|   JFK|        DEN|       4315|

|   SFO|        DEN|      18688|

|   SFO|        SEA|      17080|

|   SEA|        SFO|      22293|

|   JFK|        ATL|      12141|

|   SFO|        ATL|       5091|

|   SEA|        DEN|      13645|

|   SEA|        ATL|       4535|

|   SEA|        ORD|      10041|

|   JFK|        SEA|       7856|

|   JFK|        LAX|      35755|

|   SFO|        JFK|      24100|

|   SFO|        LAX|      40798|

|   SEA|        JFK|       4667|

+------+-----------+-----------+

如果你想为每个这些始发机场找到三个延误最严重的目的地,该怎么办?你可以通过针对每个起点运行三个不同的查询,然后将结果合并在一起,来实现此目的,如下所示:

-- In SQL

SELECT origin, destination, SUM(TotalDelays) AS TotalDelays

 FROM departureDelaysWindow

WHERE origin = '[ORIGIN]'

GROUP BY origin, destination

ORDER BY SUM(TotalDelays) DESC

LIMIT 3

这里[ORIGIN]是三个不同产地的值:JFK,SEA和SFO。

但是更好的方法是使用窗口函数dense_rank()来执行以下计算:

-- In SQL

spark.sql("""

SELECT origin, destination, TotalDelays, rank

  FROM (

     SELECT origin, destination, TotalDelays, dense_rank()

       OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank

       FROM departureDelaysWindow

  ) t

 WHERE rank <= 3

""").show()

 

+------+-----------+-----------+----+

|origin|destination|TotalDelays|rank|

+------+-----------+-----------+----+

|   SEA|        SFO|      22293|   1|

|   SEA|        DEN|      13645|   2|

|   SEA|        ORD|      10041|   3|

|   SFO|        LAX|      40798|   1|

|   SFO|        ORD|      27412|   2|

|   SFO|        JFK|      24100|   3|

|   JFK|        LAX|      35755|   1|

|   JFK|        SFO|      35619|   2|

|   JFK|        ATL|      12141|   3|

+------+-----------+-----------+----+

通过使用dense_rank()窗口功能,我们可以快速确定三个出发城市的延误最严重的目的地是:

l 西雅图(SEA):旧金山(SFO),丹佛(DEN)和芝加哥(ORD)

l 旧金山(SFO):洛杉矶(LAX),芝加哥(ORD)和纽约(JFK)

l 纽约(JFK):洛杉矶(LAX),旧金山(SFO)和亚特兰大(ATL)

重要的是要注意,每个窗口分组都需要适合一个执行程序,并且在执行过程中将组成一个分区。因此,你需要确保你的查询不是无界的(即限制窗口的大小)。

修改

另一个常见的操作是对DataFrame进行修改。尽管DataFrames本身是不可变的,但是你可以通过创建具有不同列的新的、不同的DataFrames的操作来修改它们。(从前面的章节中回想起,底层的RDD是不可变的,也就是说它们不能更改,以确保Spark操作具有数据血缘。)让我们从前面的小DataFrame示例开始:

// In Scala/Python

foo.show()

 

--------+-----+--------+------+-----------+

|    date|delay|distance|origin|destination|

+--------+-----+--------+------+-----------+

|01010710|   31|     590|   SEA|        SFO|

|01010955|  104|     590|   SEA|        SFO|

|01010730|    5|     590|   SEA|        SFO|

+--------+-----+--------+------+-----------+

添加新列

要将新列添加到foo DataFrame,请使用以下withColumn()方法:

// In Scala

import org.apache.spark.sql.functions.expr

val foo2 = foo.withColumn(

              "status",

              expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")

           )

# In Python

from pyspark.sql.functions import expr

foo2 = (foo.withColumn(

          "status",

          expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")

        ))

新创建的foo2 DataFrame具有原始foo DataFrame的内容以及status列,该列是由CASE语句定义的:

// In Scala/Python

foo2.show()

 

+--------+-----+--------+------+-----------+-------+

|    date|delay|distance|origin|destination| status|

+--------+-----+--------+------+-----------+-------+

|01010710|   31|     590|   SEA|        SFO|Delayed|

|01010955|  104|     590|   SEA|        SFO|Delayed|

|01010730|    5|     590|   SEA|        SFO|On-time|

+--------+-----+--------+------+-----------+-------+

删除列

要删除列,请使用drop()方法。例如,让我们删除该delay列,因为我们status在上一节中添加了一个列:

// In Scala

val foo3 = foo2.drop("delay")

foo3.show()

 

 In Python

foo3 = foo2.drop("delay")

foo3.show()

 

+--------+--------+------+-----------+-------+

|    date|distance|origin|destination| status|

+--------+--------+------+-----------+-------+

|01010710|     590|   SEA|        SFO|Delayed|

|01010955|     590|   SEA|        SFO|Delayed|

|01010730|     590|   SEA|        SFO|On-time|

+--------+--------+------+-----------+-------+

重命名列

你可以使用以下withColumnRenamed()方法重命名列:

// In Scala

val foo4 = foo3.withColumnRenamed("status", "flight_status")

foo4.show()

 

 In Python

foo4 = foo3.withColumnRenamed("status", "flight_status")

foo4.show()

 

+--------+--------+------+-----------+-------------+

|    date|distance|origin|destination|flight_status|

+--------+--------+------+-----------+-------------+

|01010710|     590|   SEA|        SFO|      Delayed|

|01010955|     590|   SEA|        SFO|      Delayed|

|01010730|     590|   SEA|        SFO|      On-time|

+--------+--------+------+-----------+-------------+

旋转

在处理数据时,有时你需要将列交换为行----旋转数据。让我们获取一些数据来说明这个概念:

-- In SQL

SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay

  FROM departureDelays

 WHERE origin = 'SEA'

 

+-----------+-----+-----+

|destination|month|delay|

+-----------+-----+-----+

|        ORD|    1|   92|

|        JFK|    1|   -7|

|        DFW|    1|   -5|

|        MIA|    1|   -3|

|        DFW|    1|   -3|

|        DFW|    1|    1|

|        ORD|    1|  -10|

|        DFW|    1|   -6|

|        DFW|    1|   -2|

|        ORD|    1|   -3|

+-----------+-----+-----+

only showing top 10 rows

通过数据透视,你可以将名称放置在month列中(而不是1和2,可以分别显示Jan和Feb),并且可以按目的地和月份对延迟执行汇总计算(在这种情况下,平均值和最大值):

-- In SQL

SELECT * FROM (

SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay

  FROM departureDelays WHERE origin = 'SEA'

)

PIVOT (

  CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay

  FOR month IN (1 JAN, 2 FEB)

)

ORDER BY destination

 

+-----------+------------+------------+------------+------------+

|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|

+-----------+------------+------------+------------+------------+

|        ABQ|       19.86|         316|       11.42|          69|

|        ANC|        4.44|         149|        7.90|         141|

|        ATL|       11.98|         397|        7.73|         145|

|        AUS|        3.48|          50|       -0.21|          18|

|        BOS|        7.84|         110|       14.58|         152|

|        BUR|       -2.03|          56|       -1.89|          78|

|        CLE|       16.00|          27|        null|        null|

|        CLT|        2.53|          41|       12.96|         228|

|        COS|        5.32|          82|       12.18|         203|

|        CVG|       -0.50|           4|        null|        null|

|        DCA|       -1.15|          50|        0.07|          34|

|        DEN|       13.13|         425|       12.95|         625|

|        DFW|        7.95|         247|       12.57|         356|

|        DTW|        9.18|         107|        3.47|          77|

|        EWR|        9.63|         236|        5.20|         212|

|        FAI|        1.84|         160|        4.21|          60|

|        FAT|        1.36|         119|        5.22|         232|

|        FLL|        2.94|          54|        3.50|          40|

|        GEG|        2.28|          63|        2.87|          60|

|        HDN|       -0.44|          27|       -6.50|           0|

+-----------+------------+------------+------------+------------+

only showing top 20 rows

总结

本章探讨了Spark SQL如何与外部组件进行交互的接口。我们讨论了创建用户自定义函数(包括Pandas UDF)的过程,并介绍了一些用于执行Spark SQL查询的选项(包括Spark SQL Shell,Beeline和Tableau)。然后,我们提供了有关如何使用Spark SQL连接各种外部数据源的示例,例如SQL数据库,PostgreSQL,MySQL,Tableau,Azure Cosmos DB,MS SQL Server等。

我们探索了Spark用于复杂数据类型的内置函数,并提供了一些使用高阶函数的示例。最后,我们讨论了一些常见的关系运算符,并展示了如何选择执行DataFrame的操作。

在下一章中,我们将探讨如何使用数据集,强类型操作的好处以及何时以及为何使用它们。