Bootstrap

构建Apache Flink 开发环境(四)

写在前面:

大家好,我是强哥,一个热爱分享的技术狂。目前已有 12 年大数据与AI相关项目经验, 10 年推荐系统研究及实践经验。平时喜欢读书、暴走和写作。

业余时间专注于输出大数据、AI等相关文章,目前已经输出了40万字的推荐系统系列精品文章,强哥的畅销书「构建企业级推荐系统:算法、工程实现与案例分析」已经出版,需要提升可以私信我呀。如果这些文章能够帮助你快速入门,实现职场升职加薪,我将不胜欢喜。

想要获得更多免费学习资料或内推信息,一定要看到文章最后喔。

内推信息

如果你正在看相关的招聘信息,请加我微信:liuq4360,我这里有很多内推资源等着你,欢迎投递简历。

免费学习资料

如果你想获得更多免费的学习资料,请关注同名公众号【数据与智能】,输入“资料”即可!

学习交流群

如果你想找到组织,和大家一起学习成长,交流经验,也可以加入我们的学习成长群。群里有老司机带你飞,另有小哥哥、小姐姐等你来勾搭!加小姐姐微信:epsila,她会带你入群。

现在我们已经了解了前三章涉及到的很多理论知识,是时候开始动手操作一下了,我们准备开始开发第一个Flink应用程序了! 在本章中,你将学习如何设置开发、运行和调试Flink应用程序的环境。 我们将首先讨论需要准备好必需的软件,以及从哪里可以获得这本书所用到的示例代码。 通过使用这些示例,我们将了解如何在IDE中执行和调试Flink应用程序。 最后,我们展示了如何创建一个Flink Maven项目,这是构建一个新的Flink项目的第一步。

必需软件

首先,让我们讨论开发Flink应用程序所需的软件。 你可以在Linux、MacOS和Windows上开发和执行Flink应用程序。 但是,基于UNIX的环境是最合适的,主要是由于UNIX环境拥有最丰富的工具支持,因此这个环境是大多数Flink开发人员首选的。 我们将在本章的其余部分中基于一个假设出来的UNIX的环境来运行Flink项目。 作为Windows用户,你可以使用Linux(WSL)、Cygwin或Linux虚拟机的Windows子系统在UNIX环境中运行Flink。

Flink的DataStream API可用于Java和Scala开发。 因此,需要一个Java JDK环境来实现Flink数据流应用程序,一般选JDK 8(或更高版本)。 单纯的JRE环境是不够的。

假设我们安装了以下软件,尽管有一些不是开发Flink应用程序所严格要求的,但是一般都需要具备:

l Apache Maven 3.x版本,本书的代码示例使用Maven构建管理,而且Flink还提供了Maven项目原型去构建一个新的Flink Maven项目。

l 用于开发Java或者Scala的IDE工具,通常选择具有丰富插件支持的IntelliJ IDEA、Eclipse或Netbeans等IDE工具(例如Maven、Git和Scala支持),这里我们建议使用IntelliJ IDEA。你可以按照IntelliJ IDEA()网站上的说明去下载和安装它。

在IDE中运行和调试Flink程序 

即使Flink是一个分布式数据处理系统,你通常会在本地机器上开发和运行初始化测试。 这使开发变得更容易,并简化了集群部署,因为你可以在集群环境中运行完全相同的代码,而无需进行任何更改。 在下面,我们将描述如何获取我们在这里使用的代码示例,如何将它们导入IntelliJ,如何运行示例应用程序,以及如何调试它。

在IDE 中导入书中示例 

本书的代码示例托管在GitHub上。 在上,你将找到一个包含Scala示例的项目仓库和一个包含Java示例的项目仓库。 我们将使用Scala仓库进行设置,但如果你更喜欢Java,你也可以按照说明进行设置。

打开终端并运行下面的Git命令,以将scala示例仓库克隆到本地计算机。

git clone https://github.com/streaming-with-flink/examples-scala

你也可以从Github上下载样例源代码的zip文件。

> wget https://github.com/streaming-with-flink/examples-

scala/archive/master.zip

> unzip master.zip 

本书的示例是一个Maven项目。 你可以在src / 目录下找到按章节分组的源代码:

现在打开IDE并导入Maven项目,大多数IDE的导入步骤都类似。 在下文中,我们将详细解释IntelliJ如何操作这一步骤。

打开文件导航栏,新建文件,从已有的项目导入,找到本书示例所在的文件夹,选择Maven项目模型,点击下一步,选择要导入的项目(应该只有一个),设置SDK,并为项目命名,单机完成即可(File -> New->  Import project from external model -> Maven -> Next ->Root directory -> Select Maven projects to import -> Next -> set SDK -> Project name -> Finish):

图4-1 、图4-2和图 4-3体现了项目导入的部分操作步骤。

完成这一步就算是大功告成,接下来你就可以开始随意浏览和查阅本书的示例代码。

在IDE 中运行Flink 程序 

接下来,让我们在IDE中运行本书的一个示例应用程序,双击Shit键,搜索AverageSensorReadings类并打开它。正如第1章所述,该程序为多个热传感器生成模拟数据,作为事件输入源,并将事件中的温度从华氏温度转换为摄氏温度,并计算每个传感器每秒的平均温度。程序的结果被发送到标准输出。就像许多DataStream应用程序一样,程序的发生器、接收器和算子操作都是在AverageSensorReadings类的main()方法中组合起来的。

通过运行该类的main()方法启动应用程序,程序的输出被写入到IDE的标准输出窗口(或控制台)。一开始输出的日志信息会提示程序并行算子任务不同阶段的状态信息,例如SCHEDULING, DEPLOYING和RUNNING等状态。一旦所有任务都启动并运行,程序就会开始生成传感器模拟数据,看起来类似于以下几行的结果:

 

2> SensorReading(sensor_31,1515014051000,23.924656183848732)

4> SensorReading(sensor_32,1515014051000,4.118569049862492)

1> SensorReading(sensor_38,1515014051000,14.781835420242471)

3> SensorReading(sensor_34,1515014051000,23.871433252250583)

 

程序会不断模拟生成新事件,进一步处理它们,并且会每秒钟得到新的结果(即平均温度),直到你停止程序为止。

现在让我们快速讨论一下这个过程背后究竟发生了什么。如第3章所述,Flink应用程序首先被提交给JobManager (master,也就是程序的管理者),它将执行任务分配给一个或多个TaskManager (workers,实际执行计算任务的进程)。由于Flink是一个分布式系统,JobManager和TaskManagers往往作为独立的JVM进程在不同的机器上运行。通常来说,程序的main()方法负责组装数据流,并在调用StreamExecutionEnvironment.execute()方法时将其提交给远程JobManager。

但是,还有另一种模式,在这种模式中,当我们调用execute()方法时,会在同一JVM进程中将JobManager和TaskManager(默认情况下,具有与可用CPU线程一样多的插槽)作为独立的线程启动。因此,整个Flink应用程序是多线程的,并在同一个JVM进程中执行。这种模式适用于在IDE中执行Flink程序。

在IDE 中调试Flink 程序

由于是单个JVM执行模式,所以你也可以在IDE中调试Flink应用程序,跟IDE中的其他程序一样。你可以像在正常情况下那样在代码中定义断点并调试应用程序。

然而,在IDE中调试Flink应用程序时,有需要注意以下几个方面:

 除非你在程序中指定了并行度,不然的话程序默认由与本机CPU线程数量相同的线程执行,所以,你应该意识到你可能调试的是一个多线程程序。

与通过将Flink程序发送到远程作业管理器来执行Flink程序相比,该应用程序是在单个JVM中执行的,因此,某些像类加载这样的问题不能正确调试。

 尽管程序是在单个JVM中执行的,但是记录被序列化以用于跨线程通信,并且可能持久化内部状态。

 

创建Flink Maven项目

将本书的示例仓库导入IDE以体验Flink项目是很好的开始。但是除此之外,你还应该知道如何从头创建一个全新的Flink项目。

Flink提供了Maven原型,用于为Java或Scala Flink应用程序生成Maven项目。首先打开终端,运行以下命令创建一个Flink Maven Quickstart Scala项目,作为Flink应用程序的起点:

mvn archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-scala \

-DarchetypeVersion=1.7.1 \

-DgroupId=org.apache.flink.quickstart \

-DartifactId=flink-scala-project \

-Dversion=0.1 \

-Dpackage=org.apache.flink.quickstart \

-DinteractiveMode=false 

这将在一个名为flink-scala-project的文件夹中为Flink 1.7.1版本你的Flink生成一个Maven项目。你可以通过更改上述mvn命令的各个参数来更改Flink版本、group和artifact ID、版本和生成的包。最终生成的文件夹包含一个src/文件夹和一个pom.xml文件。src/ 文件夹的结构如下:

该项目包含两个样例类文件,BatchJob.scala和StreamingJob.scala,对应批处理作业和流处理作业,作为项目的启动文件,如果你觉得不需要,可以选择删除它们。

可以按照上一节中描述的步骤在IDE中导入项目,也可以执行以下命令来构建JAR文件:

mvn clean package -Pbuild-jar

如果命令成功完成,你将在项目文件夹中找到一个新的目标文件夹(target)。 该文件夹包含一个文件flink-scala-project-0.1.jar,它是Flink应用程序的JAR文件。 生成的pom.xml文件还包含关于如何向项目添加新依赖项的说明。

小结 

在本章中,我们学习了如何设置开发和调试Flink数据流应用程序的环境,以及如何使用Flink的Maven原型生成Maven项目。 显而易见的下一步将是学习如何实际实现数据流程序。第5章将向你介绍Data Stream API的基本使用,第6、7和8章将为你介绍需要了解的关于基于时间的算子、有状态函数以及源连接器(source connector)和接收端连接器(sink connector)的所有内容。

注意:我们同样提供了基于Java实现的样例仓库。