Spark结构化数据流StructuredStreaming
是一个构建在Spark SQL之上的一个高容错可扩展的流处理引擎。当然你的流计算也可以
用相同的方式对静态数据进行批处理计算。
它可以不断更新持续流进来的递增数据,并且将计算结果也持续的更新。目前的
Dataset/DataFrame的API支持的语言有Scala,Java和Python三种语言。可以实现流聚合,事件窗口,Join聚合等。
结构化流处理是建立在Spark SQL优化引擎Catylist之上的又一引擎。所以,其性能也是
非常好的。未来的发展的空间也是巨大的。
该种流处理保证有且只处理数据一次,同时实现端到端的数据。通过检查点checkpoint和Write Ahead Logs机制实现高容错。
在Spark2.1版本中,其API仍然还是试验性质的,接下来我们就来一起走进StructuredStreaming的殿堂吧。
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkConf
object Test {
def main(args: Array[String]): Unit = {
/*
* 配置信息:配置应用名称
*StructuredStreaming Test
* Master为local[3]
*/
val conf = new SparkConf()
.setAppName("StructuredStreaming Test")
.setMaster("local[3]")
val spark =SparkSession
.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
//获取一行一行的数据,注意,此时的host对应的、
//master为数据源那台机器的主机名
//并且已经配置好了hosts文件里的ip与主机映射
//此时的lines是一个DataFrame对象
val lines = spark
.readStream
.format("socket")
.option("host", "master")
.option("port", 9999)
.load()
//用Dataset里的flatMap方法将每行转换为一个一//个的单词
val words = lines.as[String].flatMap(_.split(" ")) //单词计数
val wordCounts = words.groupBy("value").count()
//返回一个流查询对象
val query =
wordCounts.writeStream.outputMode("complet")
.format("console")
.start()
//等待终端
query.awaitTermination()
}
}
图1-1 StructuredNetworkWordCount代码实现
以上这段代码是处理的数据源是一台主机名为master的9999端口。通过监听TCP端口实时监听获取流数据源。因此,我们需要通过以下的方式来开启TCP端口的监听
nc -lk 9999
通过观察console的终端输出的结果,我们可以判定,这段程序处理的是以10秒为单位的微批数据,也就是说,每隔10秒钟进行一次计算。
此外,你还可以通过Spark官方提供的案例来运行你的第一个结构化流处理的应用程序。在Spark官方提供的二进制包里面的bin目录下有一个run-examples的脚本,当加上org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount的参数后,
运行这个脚本,这样也能运行您的第一个结构化流处理程序。
运行了第一个结构化流程序之后,我们再回过头来看看Spark源码中提供给我们的三类案例。这三类案例在Spark源码包中的路径为
examples/src/main/java/org.apache.spark.sql.streaming/streaming和
examples/src/main/scala/org.apache.spark.sql.streaming/streaming
(源码的导入方法详见第xxx章第xxx节)
在Java代码包路径下,一共有三个案例,分别为Java StructuredNetworkWo rdCount
, Java StructuredKafkaworkWordCount, Java StructuredNetworkWor dCountWindowed。
在Scala代码包路径下,也是有三个案例,分别为StructuredNetworkWordC ount
,StructuredKafkaworkWordCount,StructuredNetworkWordCount Windowed。
Java StructuredNetworkWordCount和StructuredNetworkWordCoun t的实现方法和图1-1是一样的。
那么我们接下来以kafka作为数据源来试验一下结构化流处理的实现方法。
首先创建一个类StructuredKafkaworkWordCountTest
这里还有俩个案例没有试验完成:
程序模型介绍:
结构化流处理Structured Streaming的关键的思想是将一个实时流数据看做是一张持续动态追加数据的一张表。这种思想产生了一种新的流处理模型。这种流处理模型从某种程度上与批处理模型相似。通过这种模型,我们可以采用批处理编程模型类似的方式进行编程,从而对流数据进行处理,就像标准批处理一样,对静态表数据进行查询,只不过这种查询通常会伴随着频繁的增量数据查询。Spark官方也把这
种处理数据的方式叫做类批处理。
值得注意的是,在图1-1的程序中,outputMode()的参数有三种方式,也就是说流
数据写出到外部的存储系统有三种方式:
Complete
Append
Update
基本组件:
Concepts
将输入的数据流当做一张表,每一条新的数据流进来的时候,就被追加到新的输入表中,基于输入的查询操作将会生成一张结果表,无论什么时候结果表进行了更新,我们都将要写数据到外部的下游数据系统中。
处理Event-Time和延迟
数据本身就内嵌到了Event-time之中了,对于一些应用来说,有可能会基于
event-time进行统计,比如,如果你想要统计IoT设备每一分钟产生的数据的数量。
然后,你可能想基于数据产生的时间来统计,而不是数据到达Spark这一端时的处理时间,event这个概念在结构化流处理中非常的有用。设备产生的每一个event
在表中就是一行,而event-time就是该行中的一个字段属性,这使得基于window
的聚合操作非常方便。实质上,所谓的窗口函数,其实就是根据event-time或者Process-time进行特殊的分组,然后聚合。每个窗口,就是一组。
此外,这个模型很自然的解决了处理延迟数据的问题,究竟要不要将延迟的数据加入统计结果当中。如果要加,那么是将所有的都加吗?还是符合条件的才加,也就是延迟时间超过一定大小,我们就丢掉延迟数据,反之,我们仍然将数据添加进来。Spark官方对此应用了watermarking的概念。通过watermarking来作为评判标准和二
者的分界线。
高容错语义的实现
端到端的exactly-once语义是结构化流处理设计出来的关键目标。Spark官方为了实现它,因而设计了结构化流处理的sources,sinks以及执行引擎engine去追踪数据的处理的进度,从而可以处理重新开始或者数据重复消费等任何情况下的各种失败。首先,我们假设
每个数据源都有offsets偏移量(类似kafka的偏移量),我们定位到了上一次消费的流数
据的位置,引擎用checkpinting和write ahead logs去记录偏移量。同时,结构化流sinks
被设计成是幂等的。这样,二者合在一起,通过可复制的数据源,Write Ahead Logs记录offset,checkpoint记录state,数据处理后的下游sinks又是幂等的。最终就可以做到exactly once。
Spark2.0以及之后,DataFrame和Dat aset都可以通过SparkSession对象来静态调用对
应的方法来创建,如果您还不太熟悉SparkSession,那就应该多看看Spark1.6后的
Spark2.0了。
SparkSession.readStream()会返回一个DataStreamReader
Data Source
?File source
?Kafka source
?Socket source (for testing) -
DataStreamReader在Spark2.1.0的源码中是一个final类型的类,其只能是
org.apache.spark.sql包下的其他类才能访问。
其成员方法format()指定输入数据源的数据格式,schema()指定数据的结构信息。一些像Json这样的数据格式,DataStreamReader可以推断出来它的数据结构,所以不用显示定义其schema。但是如果显示指定了其schema的话,加载数据的时候会跳过这个步骤,从而加速读取数据过程。
除此之外,还有如下的一些重要的方法。在开发和性能调优的过程中可能会经常用到。
Option()方法:该方法通过传入字符串key和value的值,从而显示指定应用程序的配置属性值,比如应用的名称,应用的Master等。每调用一次,就返回一个新的DataStreamReader。
Load()方法:该方法返回一个DataSet对象(也可以说是Dataframe对象,Dataframe是Dataset 的一种特殊形式,但是2.1.0版本的源码中又没有这个类存在了)。
DataStreamWriter同DataStreamReader类似,也是final类型,且访问权限也只是开放给sql 包。其主要的方法和作用如下所示
对于progress,这是一个只在outputMode为update的时候才用到的一个类,用于记录更新读写的进度。由于都还处于试验阶段的API并且用到的场景比较有限,所以笔者就不过多的介绍了。
StreamingQuery 流查询,流查询是一个trait,对于StreamingQuery来说,我们只需要大概了解一下其定义的一些抽象的字段,从而对流查询这个概念有个大致的印象。
其主要字段见下表
顾名思义,这些字段都非常容易理解。只是可以关注一些explain,explain非常有用。因Explain 可以打印物理执行的过程,有助于定位错误。这在排查错误的时候比较有用。
StreamingQueryManager
StreamingQueryListenner
在实际的生产环境中,难免会遇到关机或者各种复杂的错误。我们该如何提前预防这样的情况发生呢?
DataFrameReader通过.option("checkpointLocation","path/to/HDFS/dir")方法将当前的状态进行持久化,以便在出现错误的时候,从错误状态中恢复出来,
1 在结构化流中使用DataSet的SQL相关的API的时候,不支持多个聚合查询操作的问题。