Spark Streaming笔记——技术点汇总

本文转自博客园,作者ID「netoxi」

http://www.cnblogs.com/netoxi/p/7223414.html


Spark Streaming支持实时数据流的可扩展(scalable)、高吞吐(high-throughput)、容错(fault-tolerant)的流处理(stream processing)。


架构图


特性如下:


  • 可线性伸缩至超过数百个节点;

  • 实现亚秒级延迟处理;

  • 可与Spark批处理和交互式处理无缝集成;

  • 提供简单的API实现复杂算法;

  • 更多的流方式支持,包括Kafka、Flume、Kinesis、Twitter、ZeroMQ等。


原理


Spark在接收到实时输入数据流后,将数据划分成批次(divides the data into batches),然后转给Spark Engine处理,按批次生成最后的结果流(generate the final stream of results in batches)。 



API


DStream


DStream(Discretized Stream,离散流)是Spark Stream提供的高级抽象连续数据流。


  • 组成:一个DStream可看作一个RDDs序列。

  • 核心思想:将计算作为一系列较小时间间隔的、状态无关的、确定批次的任务,每个时间间隔内接收的输入数据被可靠存储在集群中,作为一个输入数据集。



  • 特性:一个高层次的函数式编程API、强一致性以及高校的故障恢复。

  • 应用程序模板:

  • 模板1

  • 模板2


WordCount示例



Input DStream


Input DStream是一种从流式数据源获取原始数据流的DStream,分为基本输入源(文件系统、Socket、Akka Actor、自定义数据源)和高级输入源(Kafka、Flume等)。


  • Receiver:

  • 每个Input DStream(文件流除外)都会对应一个单一的Receiver对象,负责从数据源接收数据并存入Spark内存进行处理。应用程序中可创建多个Input DStream并行接收多个数据流。

  • 每个Receiver是一个长期运行在Worker或者Executor上的Task,所以会占用该应用程序的一个核(core)。如果分配给Spark Streaming应用程序的核数小于或等于Input DStream个数(即Receiver个数),则只能接收数据,却没有能力全部处理(文件流除外,因为无需Receiver)。

  • Spark Streaming已封装各种数据源,需要时参考官方文档。


Transformation Operation


  • 常用Transformation


  • updateStateByKey(func)

  • updateStateByKey可对DStream中的数据按key做reduce,然后对各批次数据累加

  • WordCount的updateStateByKey版本


  • transform(func)

  • 通过对原DStream的每个RDD应用转换函数,创建一个新的DStream。

  • 官方文档代码举例


  • Window operations

  • 窗口操作:基于window对数据transformation(个人认为与Storm的tick相似,但功能更强大)。

  • 参数:窗口长度(window length)和滑动时间间隔(slide interval)必须是源DStream批次间隔的倍数。

  • 举例说明:窗口长度为3,滑动时间间隔为2;上一行是原始DStream,下一行是窗口化的DStream。

  • 常见window operation

  • 官方文档代码举例 


  • join(otherStream, [numTasks])

  • 连接数据流

  • 官方文档代码举例1

  • 官方文档代码举例2


Output Operation



缓存与持久化


  • 通过persist()将DStream中每个RDD存储在内存。

  • Window operations会自动持久化在内存,无需显示调用persist()。

  • 通过网络接收的数据流(如Kafka、Flume、Socket、ZeroMQ、RocketMQ等)执行persist()时,默认在两个节点上持久化序列化后的数据,实现容错。


Checkpoint


  • 用途:Spark基于容错存储系统(如HDFS、S3)进行故障恢复。

  • 分类:

  • 元数据检查点:保存流式计算信息用于Driver运行节点的故障恢复,包括创建应用程序的配置、应用程序定义的DStream operations、已入队但未完成的批次。

  • 数据检查点:保存生成的RDD。由于stateful transformation需要合并多个批次的数据,即生成的RDD依赖于前几个批次RDD的数据(dependency chain),为缩短dependency chain从而减少故障恢复时间,需将中间RDD定期保存至可靠存储(如HDFS)。

  • 使用时机:

  • Stateful transformation:updateStateByKey()以及window operations。

  • 需要Driver故障恢复的应用程序。

  • 使用方法

  • Stateful transformation

  • 需要Driver故障恢复的应用程序(以WordCount举例):如果checkpoint目录存在,则根据checkpoint数据创建新StreamingContext;否则(如首次运行)新建StreamingContext。


  • checkpoint时间间隔

  • 方法:

  • 原则:一般设置为滑动时间间隔的5-10倍。

  • 分析:checkpoint会增加存储开销、增加批次处理时间。当批次间隔较小(如1秒)时,checkpoint可能会减小operation吞吐量;反之,checkpoint时间间隔较大会导致lineage和task数量增长。


性能调优


降低批次处理时间


  • 数据接收并行度

  • 增加DStream:接收网络数据(如Kafka、Flume、Socket等)时会对数据反序列化再存储在Spark,由于一个DStream只有Receiver对象,如果成为瓶颈可考虑增加DStream。

  • 设置“spark.streaming.blockInterval”参数:接收的数据被存储在Spark内存前,会被合并成block,而block数量决定了Task数量;举例,当批次时间间隔为2秒且block时间间隔为200毫秒时,Task数量约为10;如果Task数量过低,则浪费了CPU资源;推荐的最小block时间间隔为50毫秒。

  • 显式对Input DStream重新分区:在进行更深层次处理前,先对输入数据重新分区。


  • 数据处理并行度:reduceByKey、reduceByKeyAndWindow等operation可通过设置“spark.default.parallelism”参数或显式设置并行度方法参数控制。

  • 数据序列化:可配置更高效的Kryo序列化。


设置合理批次时间间隔


  • 原则:处理数据的速度应大于或等于数据输入的速度,即批次处理时间大于或等于批次时间间隔。

  • 方法:

  • 先设置批次时间间隔为5-10秒以降低数据输入速度;

  • 再通过查看log4j日志中的“Total delay”,逐步调整批次时间间隔,保证“Total delay”小于批次时间间隔。


内存调优


  • 持久化级别:开启压缩,设置参数“spark.rdd.compress”。

  • GC策略:在Driver和Executor上开启CMS。


相关阅读:Spark SQL笔记——技术点汇总

相关文章
相关标签/搜索