大数据分析技术与实战之Spark Streaming

随着信息技术的迅猛发展,数据量呈现出爆炸式增长趋势,数据的种类与变化速度也远远超出人们的想象,因此人们对大数据处理提出了更高的要求,越来越多的领域迫切需要大数据技术来解决领域内的关键问题。在一些特定的领域中(例如金融、灾害预警等),时间就是金钱、时间可能就是生命!然而传统的批处理框架却一直难以满足这些领域中的实时性需求。为此,涌现出了一批如S4、Storm的流式计算框架。Spark是基于内存的大数据综合处理引擎,具有优秀的作业调度机制和快速的分布式计算能力,使其能够更加高效地进行迭代计算,因此Spark能够在一定程度上实现大数据的流式处理。


Spark Streaming是Spark上的一个流式处理框架,可以面向海量数据实现高吞吐量、高容错的实时计算。Spark Streaming支持多种类型数据源,包括Kafka、Flume、trwitter、zeroMQ、Kinesis以及TCP sockets等,如图1所示。Spark Streaming实时接收数据流,并按照一定的时间间隔将连续的数据流拆分成一批批离散的数据集;然后应用诸如map、reducluce、join和window等丰富的API进行复杂的数据处理;最后提交给Spark引擎进行运算,得到批量结果数据,因此其也被称为准实时处理系统。


图1  Spark Streaming支持多种类型数据源


目前应用最广泛的大数据流式处理框架是Storm。Spark Streaming 最低0.5~2s做一次处理(而Storm最快可达0.1s),在实时性和容错方面不如Storm。然而Spark Streaming的集成性非常好,通过RDD不仅能够与Spark上的所有组件无缝衔接共享数据,还能非常容易地与Kafka、Flume等分布式日志收集框架进行集成;同时Spark Streaming的吞吐量非常高,远远优于Storm的吞吐量,如图2所示。所以虽然Spark Streaming的处理延迟高于Storm,但是在集成性与吞吐量方面的优势使其更适用于大数据背景。


图2  Spark Streaming与Storm吞吐量比较图


Spark Streaming基础概念


批处理时间间隔


在Spark Streaming中,对数据的采集是实时、逐条进行的,但是对数据的处理却是分批进行的。因此,Spark Streaming需要设定一个时间间隔,将该时间间隔内采集到的数据统一进行处理,这个间隔称为批处理时间间隔。


也就是说对于源源不断的数据,Spark Streaming是通过切分的方式,先将连续的数据流进行离散化处理。数据流每被切分一次,对应生成一个RDD,每个RDD都包含了一个时间间隔内所获取到的所有数据,因此数据流被转换为由若干个RDD构成的有序集合,而批处理时间间隔决定了Spark Streaming需要多久对数据流切分一次。Spark Streaming是Spark上的组件,其获取的数据和数据上的操作最终仍以Spark作业的形式在底层的Spark内核中进行计算,因此批处理时间间隔不仅影响数据处理的吞吐量,同时也决定了Spark Streaming向Spark提交作业的频率和数据处理的延迟。需要注意的是,批处理时间间隔的设置会伴随Spark Streaming应用程序的整个生命周期,无法在程序运行期间动态修改,所以需要综合考虑实际应用场景中的数据流特点和集群的处理性能等多种因素进行设定。


窗口时间间隔


窗口时间间隔又称为窗口长度,它是一个抽象的时间概念,决定了Spark Streaming对RDD序列进行处理的范围与粒度,即用户可以通过设置窗口长度来对一定时间范围内的数据进行统计和分析。如果设批处理时间设为1s,窗口时间间隔为3s,如3图所示,其中每个实心矩形表示Spark Streaming每1秒钟切分出的一个RDD,若干个实心矩形块表示一个以时间为序的RDD序列,而透明矩形框表示窗口时间间隔。易知窗口内RDD的数量最多为3个,即Spark Streming 每次最多对3个RDD中的数据进行统计和分析。对于窗口时间间隔还需要注意以下几点:


  • 以图3为例,在系统启动后的前3s内,因进入窗口的RDD不足3个,但是随着时间的推移,最终窗口将被填满。

  • 不同窗口内所包含的RDD可能会有重叠,即当前窗口内的数据可能被其后续若干个窗口所包含,因此在一些应用场景中,对于已经处理过的数据不能立即删除,以备后续计算使用。

  • 窗口时间间隔必须是批处理时间间隔的整数倍。


图3  窗口时间间隔示意图


滑动时间间隔


滑动时间间隔决定了Spark Streaming对数据进行统计与分析的频率,多出现在与窗口相关的操作中。滑动时间间隔是基于批处理时间间隔提出的,其必须是批处理时间间隔的整数倍。在默认的情况下滑动时间间隔设置为与批处理时间间隔相同的值。如果批处理时间间隔为1s,窗口间隔为3s,滑动时间间隔为2s,如图4所示,其含义是每隔2s对过去3s内产生的3个RDD进行统计分析。


图4  滑动时间间隔、窗口时间间隔、批处理时间间隔综合示意图


DStream基本概念


DStream是Spark Streaming的一个基本抽象,它以离散化的RDD序列的形式近似描述了连续的数据流。DStream本质上是一个以时间为键,RDD为值的哈希表,保存了按时间顺序产生的RDD,而每个RDD封装了批处理时间间隔内获取到的数据。Spark Streaming每次将新产生的RDD添加到哈希表中,而对于已经不再需要的RDD则会从这个哈希表中删除,所以DStream也可以简单地理解为以时间为键的RDD的动态序列。设批处理时间间隔为1s,图5为4s内产生的DStream示意图。


图5  DStream示意图


Spark Streaming编程模式与案例分析


Spark Streaming编程模式


下面以Spark Streaming官方提供的WordCount代码为例来介绍Spark Streaming的使用方式。


示例1:


import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

/*创建一个本地模式的StreamingContext,并设定master节点工作线程数为2,并以1秒作为批处理时间间隔。*/

val conf =  new SparkConf().setMaster("local[2]").

setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1)) 

/*通过获取”localhost”节点9999端口中的实时数据流创建DStream。*/

val lines = ssc.socketTextStream("localhost", 9999)

/*以空格作为分割DStream中数据的依据,使得每一行文本转换为若干个单词。*/ 

val words = lines.flatMap(_.split(" "))

import org.apache.spark.streaming.StreamingContext._

/*对于words中的每个单词word,转换为相应的二元组形式(word,1),在此基础上统计每个单词出现的次数。*/ 

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

//输出DStream中每个RDD中前10个元素。

wordCounts.print()

//启动Spark Streaming应用程序。

ssc.start()              

//等待计算完成。

ssc.awaitTermination()


Spark Streaming应用程序在功能结构上通常包含以下五部分,如上述示例1所示。


  • 导入Spark Streaming相关包:Spark Streaming作为Spark框架上的一个组件,具有很好的集成性。在开发Spark Streaming应用程序时,只需导入Spark Streaming相关包,无需额外的参数配置。

  • 创建StreamingContext对象:同Spark应用程序中的SparkContext对象一样, StreamingContext对象是Spark Streaming应用程序与集群进行交互的唯一通道,其中封装了Spark集群的环境信息和应用程序的一些属性信息。在该对象中通常需要指明应用程序的运行模式(示例1中设为local[2])、设定应用程序名称(示例1中设为NetworkWordCount)、设定批处理时间间隔(示例1中设为Seconds(1)即1秒钟),其中批处理时间间隔需要根据用户的需求和集群的处理能力进行适当地设置。

  • 创建InputDStream:Spark Streaming需要根据数据源类型选择相应的创建DStream的方法。示例1中Spark Streaming通过StreamingContext对象调用socketTextStream方法处理以socket连接类型数据源,创建出DStream即lines。Spark Streaming同时支持多种不同的数据源类型,其中包括Kafka、Flume、HDFS/S3、Kinesis和Twitter等数据源。

  • 操作DStream:对于从数据源得到的DStream,用户可以调用丰富的操作对其进行处理。示例1中针对lines的一系列操作就是一个典型的WordCount执行流程:对于当前批处理时间间隔内的文本数据以空格进行切分,进而得到words;再将words中每个单词转换为二元组,进而得到pairs;最后利用reduceByKey方法进行统计。

  • 启动与停止Spark Streaming应用程序:在启动Spark Streaming应用程序之前,DStream上所有的操作仅仅是定义了数据的处理流程,程序并没有真正连接上数据源,也没有对数据进行任何操作,当ssc.start()启动后程序中定义的操作才会真正开始执行。


文本文件数据处理案例


功能需求


实时监听并获取本地home/dong/Streamingtext目录中新生成的文件(文件均为英文文本文件,单词之间使用空格进行间隔),并对文件中各单词出现的次数进行统计。


代码实现


package dong.spark

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds,StreamingContext}

import org.apache.spark.streaming.StreamingContext._

object StreamingFileWordCount {

  def main(args: Array[String]): Unit ={

//以local模式运行,并设定master节点工作线程数为2。

    val sparkConf = new SparkConf().

setAppName("StreamingFileWordCount").

setMaster("local[2]")

/*创建StreamingContext实例,设定批处理时间间隔为20秒。*/ 

    val ssc = new StreamingContext(sparkConf,Seconds(20))

/*指定数据源来自本地home/dong/Streamingtext。*/

    val lines = ssc.textFileStream("/home/dong/Streamingtext")

/*在每个批处理时间间隔内,对指定文件夹中变化的数据进行单词统计并打印。*/

    val words= lines.flatMap(_.split(" "))

    val wordcounts=words.map(x=>(x,1)).reduceByKey(_+_)

    wordcounts.print()

    ssc.start()

    ssc.awaitTermination()

  }

}


运行演示


第1步,启动Hadoop与Spark。


$ start-all.sh

$ cd spark-1.4.0-bin-hadoop2.4

$ sbin/start-all.sh


第2步,创建Streaming监控目录。


$ mkdir /home/dong/Streamingtext


在dong用户主目录下创建Streamingtext为Spark Streaming监控的目录,创建后如图6所示。


图6 dong用户主目录下创建Streamingtext文件夹


第3步,在IntelliJ IDEA中编辑运行Streaming程序。在IntelliJ IDEA中创建工程StreamingFileWordCount,编辑对象StreamingFileWordCount,如图7所示。


图7 IntelliJ IDEA中StreamingFileWordCount示意图


由于该示例没有输入参数,因此不需要配置参数,可直接单击右键->单击"Run‘StreamingFileWordCount’ "。


第4步,在监听目录下创建文本文件。在master节点上的/home/dong/Streamingtext中分别创建file1.txt与file2.txt。


file1.txt内容如下:


aa

bb


file2.txt内容如下:


ee

dd

cc


创建后,/home/dong/Streamingtext中内容如图8所示。


图8 Streamingtext文件夹内容示意图


查看结果


终端窗口输出了每个批处理时间间隔(20秒)内,/home/dong/Streamingtext中新生成文件所包含的各单词个数,如图9所示。


图9 StreamingFileWordCount运行结果示意图


网络数据处理案例


功能需求


监听本地节点指定端口传输的数据流(本案例为master节点9999端口的英文文本数据,以逗号间隔单词),每5秒统计一次该时间间隔内收集到的各单词的个数。


代码实现


本案例涉及数据流模拟器和分析器两部分。为了更接近真实的网络环境,首先定义数据流模拟器,该模拟器以Socket方式监听网络中指定节点上的指定端口号(master节点9999端口),当外部程序通过该端口连接并请求数据时,数据流模拟器将定时地从指定文本文件中随机选取数据发送至指定端口(每间隔1秒钟数据流模拟器从master节点上的/home/dong/Streamingtext/file1.txt中随机截取一行文本发送给master节点的9999端口),通过这种方式模拟网络环境下源源不断的数据流。针对获取到的实时数据,再定义分析器(Spark Streaming应用程序),用以统计时间间隔(5秒)内收集到的单词个数。


数据流模拟器代码实现如下:


package dong.spark

import java.io.{PrintWriter}

import java.net.ServerSocket

import scala.io.Source

objectSocketSimulation {

//定义随机获取整数的方法。

  def index(length: Int)={

    import java.util.Random

    val rdm = new Random

    rdm.nextInt(length)

  }

  def main(args:Array[String]): Unit ={

    if(args.length!=3){

/*调用数据流模拟器需要三个参数:文件路径、端口号和批处理时间间隔时间(单位:毫秒)。*/

      System.err.println("Usage:<filename><port><millisecond>")

      System.exit(1)

    }

//获取指定文件总的行数。

    val filename = args(0)

    val lines = Source.fromFile(filename).getLines().toList

    val filerow=lines.length

//指定监听参数args(1)指定的端口,当外部程序请求时建立连接。

    val listener =new ServerSocket(args(1).toInt)

    while(true){

      val socket = listener.accept()

      new Thread(){

        override def run={

          println("Got client connected from:"+socket.getInetAddress)

          val out = new PrintWriter(socket.getOutputStream(),true)

          while(true){

            Thread.sleep(args(2).toLong)

//当该端口接受请求时,随机获取某行数据发送给对方。

            val content= lines(index(filerow))

            println(content)

            out.write(content+'\n')

            out.flush()

          }

          socket.close()

        }

      }.start()

    }

  }

}


分析器代码如下:


package dong.spark

import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.storage.StorageLevel

object NetworkWordCount {

  def main (args:Array[String]) ={

//以local模式运行,并设定master节点工作线程数为2。

    val conf=new SparkConf().setAppName("NetworkWordCount").

               setMaster("local[2]")

    val sc=new SparkContext(conf)

    val ssc=new StreamingContext(sc, Seconds(5))

/*通过socketTextStream获取指定节点指定端口的数据创建DStream,并保存在内存和硬盘中,其中节点与端口分别对应参数args(0)和args(1)。*/


val lines=ssc.socketTextStream(args(0),

args(1).toInt,

StorageLevel.MEMORY_AND_DISK_SER)

//在每个批处理时间间隔内对获取到的数据进行单词统计并且打印。

    val words= lines.flatMap(_.split(","))

    val wordcounts = words.map(x=>(x,1)).reduceByKey(_+_)

    wordcounts.print()

    ssc.start()

    ssc.awaitTermination()

  }

}


运行演示


第1步,在IntelliJ IDEA中编辑运行Streaming程序。master节点启动IntelliJ IDEA,创建工程NetworkWordCount,编辑模拟器与分析器。模拟器如图10所示,分析器如图11所示。


图10 IntelliJ IDEA中数据流模拟器示意图


图11 IntelliJ IDEA中分析器示意图


第2步,创建模拟器数据源文件。在master节点创建/home/dong/Streamingtext目录,在其中创建文本文件file1.txt。


file1.txt内容如下:


spark,

hello,

hbase,

world,


第3步,打包数据流模拟器。打包过程详见本书4.3.3节。在Artifacts打包配置界面中,根据用户实际scala安装目录,在Class Path中添加下述scala依赖包,如图12所示。


/usr/scala-2.10.4/lib/scala-swing.jar

/usr/scala-2.10.4/lib/scala-library.jar

/usr/scala-2.10.4/lib/scala-actors.jar


图12 在Class Path中添加scala依赖包


打包后在主目录下生成NetworkWordCount.jar,如图13所示。



图13 在dong用户主目录下生成NetworkWordCount.jar示意图


第4步,启动数据流模拟器。在master节点开启控制终端,通过下面代码启动数据流模拟器。


$ java -cp /home/dong/NetworkWordCount.jar dong.spark.SocketSimulation/ home/dong/Streamingtest/file1.txt 9999 1000


数据流模拟器每间隔1000毫秒从/home/dong/Streamingtext/file1.txt中随机截取一行文本发送给master节点的9999端口。在分析器未连接时,数据流模拟器处于阻塞状态,终端不会显示输出的文本。


第5步,运行分析器。在master上启动IntelliJ IDEA编写分析器代码,然后单击菜单"Build->"Build Artifacts",通过Application选项配置分析器运行所需的参数,其中Socket主机名为master、端口号为9999,参数之间用空格间隔,如图13所示。


图13 分析器参数配置示意图


配置好参数后返回IntelliJ IDEA菜单栏,单击"Run"->"Build Artifacts"运行分析器。


查看结果


第1步,在master上查看数据流模拟器运行情况。IntelliJ IDEA运行分析器从而与数据流模拟器建立连接。当检测到外部连接时,数据流模拟器将每隔1000毫秒从/home/dong/Streamingtext/file1.txt中随机截取一行文本发送给master节点上的9999端口。为方便讲解和说明,file1.txt中每一行只包含一个单词,因此数据流模拟器每次仅发送一个单词给端口,如图14所示。


图14 在master上模拟器运行结果


第2步,在master的IntelliJ IDEA中查看分析器运行情况。在IntelliJ IDEA的运行日志窗口中,可以观察到统计结果。通过分析可知Spark Streaming每个批处理时间间隔内获取的单词数为5,刚好是5秒内发送单词的总数,并对各单词进行了统计,如图15所示。


图15 IntelliJ IDEA中分析器运行结果


stateful应用案例


在很多数据流相关的实际应用场景中,对当前数据的统计分析需要借助于先前的数据处理结果完成。例如电商每间隔10分钟统计某一商品当前累计销售总额、车站每隔3小时统计当前客流总量,等等。此类应用需求可借助于Spark Streaming的有状态转换操作实现。


功能需求


监听网络中某节点上指定端口传输的数据流(slave1节点9999端口的英文文本数据,以逗号间隔单词),每5秒分别统计各单词的累计出现次数。


代码实现


本案例功能的实现涉及数据流模拟器和分析器两部分。


分析器代码:


package dong.spark

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.streaming.{Milliseconds,Seconds, StreamingContext}

import org.apache.spark.streaming.StreamingContext._

object StatefulWordCount {

  def main(args:Array[String]): Unit ={

/*定义更新状态方法,参数values为当前批处理时间间隔内各单词出现的次数,state为以往所有批次各单词累计出现次数。*/

    val updateFunc=(values: Seq[Int],state:Option[Int])=>{

val currentCount=values.foldLeft(0)(_+_)

val previousCount=state.getOrElse(0)

Some(currentCount+previousCount)

    }

    val conf=new SparkConf().

setAppName("StatefulWordCount").


setMaster("spark://192.168.149.132:7077")

    val sc=new SparkContext(conf)

//创建StreamingContext,Spark Steaming运行时间间隔为5秒。

    val ssc=new StreamingContext(sc, Seconds(5))

/*使用updateStateByKey时需要checkpoint持久化接收到的数据。在集群模式下运行时,需要将持久化目录设为HDFS上的目录。*/

ssc.checkpoint("hdfs://master:9000/user/dong/input/StatefulWordCountlog")

/*通过Socket获取指定节点指定端口的数据创建DStream,其中节点与端口分别由参数args(0)和args(1)给出。*/

    val lines=ssc.socketTextStream(args(0),args(1).toInt)

    val words=lines.flatMap(_.split(","))

    val wordcounts=words.map(x=>(x,1))

//使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数。

    val stateDstream=wordcounts.updateStateByKey[Int](updateFunc)

    stateDstream.print()

    ssc.start()

    ssc.awaitTermination()

  }

}


运行演示


第1步,slave1节点启动数据流模拟器。


第2步,打包分析器。master节点启动IntelliJ IDEA创建工程StatefulWordCount编辑分析器,如图16所示,并将分析器直接打包至master节点dong用户的主目录下,如图17所示。


图16 IntelliJ IDEA中StatefulWordCount示意图


图17 master上的StatefulWordCount.jar示意图


第3步,运行分析器。在master节点开启终端,通过下面代码向Spark集群提交应用程序。


$ bin/spark-submit ~/StatefulWordCount.jar slave1 9999


查看结果


第1步,查看slave1上数据流模拟器运行情况。分析器在集群上提交运行后与slave1上运行的数据流模拟器建立连接。当检测到外部连接时,数据流模拟器将每隔1000毫秒从/home/dong/Streamingtext/file1.txt中随机截取一行文本发送给slave1节点上的9999端口。由于该文本文件中每一行只包含一个单词,因此每秒仅发送一个单词给端口。如图18所示。


图18 slave1上数据流模拟器运行示意图


第2步,查看master上分析器运行情况。在master节点的提交窗口中可以查看到统计结果,如图19所示。


图19 master上分析器运行示意图


图中表明截至147920770500ms分析器共接收到14个单词,其中"spark"累计出现3次,"hbase"累计出现5次,"hello"累计出现3次,"world"累计出现3次。由于批处理时间间隔是5s,模拟器每1秒发送1个单词,使得分析器在5s内共接收到5个单词,因此截止至147920771000ms,分析器共收到19个单词,其中"spark"累计出现5次,"hbase"累计出现7次,"hello"累计出现4次,"world"累计出现3次。


第3步,查看HDFS中持久化目录。运行后查看HDFS上的持久化目录/user/dong/input/StatefulWordCountlog,如图20所示。Streaming应用程序将接收到的网络数据持久化至该目录下,便于容错处理。


图20 HDFS上持久化目录示意图


window应用案例


在实际生产环境中,与窗口相关的应用场景很常见,例如电商每间隔10分钟小时统计某一商品前30分钟内累计销售总额、车站每隔1小时统计前3个小时内的客流量等,此类需求可借助Spark Streaming中的window相关操作实现。window应用案例同时涉及批处理时间间隔、窗口时间间隔与滑动时间间隔。


功能需求


监听网络中某节点上指定端口传输的数据流(slave1节点上9999端口的英文文本数据,以逗号间隔单词),每10秒统计前30秒各单词累计出现的次数。


代码实现


本例功能的实现涉及数据流模拟器和分析器两部分。


分析器代码:


package dong.spark

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming._

import org.apache.spark.storage.StorageLevel

object WindowWordCount {

  def main(args:Array[String]) ={

    val conf=new SparkConf().setAppName("WindowWordCount").

setMaster("spark://192.168.149.132:7077")

    val sc=new SparkContext(conf)

    val ssc=new StreamingContext(sc, Seconds(5))

    ssc.checkpoint("hdfs://master:9000/user/dong/WindowWordCountlog")

    val lines=ssc.socketTextStream( args(0),

args(1).toInt,

StorageLevel.MEMORY_ONLY_SER)

    val words= lines.flatMap(_.split(","))

/*采用reduceByKeyAndWindow操作进行叠加处理,窗口时间间隔与滑动时间间隔分别由参数args(2)和args(3)给出。*/

    val wordcounts=words.map(x=>(x,1)).

reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(ar

gs(2).toInt),Seconds(args(3).toInt))

    wordcounts.print()

    ssc.start()

    ssc.awaitTermination()

  }

}


运行演示


第1步,slave1节点启动数据流模拟器。


第2步,打包分析器。在master节点启动IntelliJ IDEA创建工程WindowWordCount编辑分析器,如图21,并将分析器直接打包至master节点dong用户的主目录下,如图22所示。


图21 IntelliJ IDEA中WindowWordCount示意图


图22  master上WindowWordCount.jar示意图


第3步,运行分析器。在master节点开启终端,通过下面代码向Spark集群提交应用程序。


$ bin/spark-submit ~/WindowWordCount.jar slave1 9999 30 10


查看结果


第1步  在slave1上查看数据流模拟器运行情况。分析器在集群上提交运行后与slave1上运行的数据流模拟器建立连接。当检测到外部连接时,数据流模拟器将每隔1000毫秒从/home/dong/Streamingtext/file1.txt中随机截取一行文本发送给slave1节点的9999端口。由于该文本文件中每一行只包含一个单词和一个逗号,因此每秒仅发送一个单词和一个逗号给端口,如图23所示。


图23 在slave1上数据流模拟器运行示意图


第2步,在master上查看分析器运行情况。在master节点的提交窗口中可以查看到统计结果。在WindowWordCount应用程序启动初期,窗口并没有被接收到的单词填满,但随着时间的推进,每个窗口中的单词数目最终固定为30个。图7.35只是截取了运行结果中的三个批次。由于设定了窗口时间间隔是30s,滑动时间间隔是10s,且数据流模拟器每间隔1s发送一个单词,因此WindowWordCount每间隔10s对过去30s内收到的各单词个数进行统计。图24中截至1479276925000ms分析器对过去30s内收到的30个单词进行统计,其中"spark"累计出现5次,"hbase"累计出现8次,"hello"累计出现9次,"world"累计出现8次。再间隔10s,截至1479276935000ms,分析器对过去30s内收到的30个单词进行统计,其中"spark"累计出现8次,"hbase"累计出现9次,"hello"累计出现7次,"world"累计出现6次。


图24 在master上分析器运行示意图


第3步,查看持久化数据。运行后查看HDFS上的持久化目录/user/dong/input/WindowWordCountlog,如图25所示。Streaming应用程序将接收到的网络数据持久化至该目录下,便于容错处理。


图25 HDFS上持久化目录示意图


性能考量


在开发Spark Streaming应用程序时,要结合集群中各节点的配置情况尽可能地提高数据处理的实时性。在调优的过程中,一方面要尽可能利用集群资源来减少每个批处理的时间;另一方面要确保接收到的数据能及时处理掉。


运行时间优化


  • 设置合理的批处理时间和窗口大小


Spark Streaming中作业之间通常存在依赖关系,后面的作业必须确保前面的作业执行结束后才能提交,若前面的作业的执行时间超过了设置的批处理时间间隔,那么后续的作业将无法按时提交执行,造成作业的堵塞。也就是说若想Spark Streaming应用程序稳定地在集群中运行,对于接收到的数据必须尽快处理掉。例如若设定批处理时间为1秒钟,那么系统每1秒钟生成一个RDD,如果系统计算一个RDD的时间大于1秒,那么当前的RDD还没来得及处理,后续的RDD已经提交上来在等待处理了,这就产生了堵塞。因此需要设置一个合理的批处理时间间隔以确保作业能够在这个批处理时间间隔时间内结束。许多实验数据表明,500毫秒对大多Spark Streaming应用而言是较好的批处理时间间隔。


类似地,对于窗口操作,滑动时间间隔对于性能也有很大的影响。当单批次数据计算代价过高时,可以考虑适当增大滑动时间间隔。


对于批处理时间和窗口大小的设定,并没有统一的标准。通常是先从一个比较大的批处理时间(10秒左右)开始,然后不断地使用更小的值进行对比测试。如果Spark Streaming用户界面中显示的处理时间保持不变,则可以进一步设定更小的值;如果处理时间开始增加,则可能已经达到了应用的极限,再减小该值则可能会影响系统的性能。


  • 提高并行度


提高并行度也是一种减少批处理所消耗时间的常见方法。有以下三种方式可以提高并行度。一种方法是增加接收器数目。如果获取的数据太多,则可能导致单个节点来不及对数据进行读入与分发,使得接收器成为系统瓶颈。这时可以通过创建多个输入DStream来增加接收器数目,然后再使用union来把数据合并为一个数据源。第二种方法是将收到的数据显式地重新分区。如果接收器数目无法再增加,可以通过使用DStream.repartition、spark.streaming.blocklnterval等参数显式地对Dstream进行重新分区。第三种方法是提高聚合计算的并行度。对于会导致shuffle的操作,例如reduceByKey、reduceByKeyAndWindow等操作,可通过显示设置更高的行度参数确保更为充分地使用集群资源。


内存使用与垃圾回收


  • 控制批处理时间间隔内的数据量


Spark Streaming会把批处理时间间隔内获取到的所有数据存放在Spark内部可用的内存中。因此必须确保在当前节点上SparkStreaming可用的内存容量至少能容下一个批处理时间间隔内所有的数据。比如一个批处理时间间隔是1秒,但是1秒产生了1GB的数据,那么要确保当前的节点上至少有可供SparkStreaming使用的1GB内存。


  • 及时清理不再使用的数据


对于内存中处理过的、不再需要的数据应及时清理,以确保Spark Streaming能够拥有足够的内存空间可以使用。一种方法是可以通过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据,但该方法应慎重使用,以免后续数据在需要时被错误清理。另一种方法是将spark.streaming.unpersist设置为true,系统将自动清理已经不需要的RDD。该方法能显著减少RDD对内存的需要,同时潜在地提高GC的性能。此外用户还可以通过配置参数streamingContext.remember为数据设置更长的保留时间。


  • 减少序列化与反序列化的负担


SparkStreaming默认将接收到的数据序列化后放入内存,以减少内存使用。序列化和反序列化需要更多的CPU资源,因此使用适当的序列化工具(例如Kryo)和自定义的序列化接口可以更高效地使用CPU。除了使用更好的序列化工具外还可以结合压缩机制,通过配置spark.rdd.compress,以CPU的时间开销来换取内存资源,降低GC开销。




本文节选并整理自CDA数据分析师系列丛书Spark大数据分析技术与实战一书,经管之家 主编。点击阅读原文了解图书详情。


相关文章
相关标签/搜索