Spark Streaming 常见操作

Posted by Ink Bai on 2018-08-01     & views

DStream 的转换操作与 RDD 的差不多,简单的像是 map,flatMap,repartition 我们就不讲了,我们讲几个关键特殊的。

DStream 的转换操作

UpdateStateByKey 操作

顾名思义,UpdateStateByKey 转换允许你在每个批次都可以对每个键更新他们的状态,至于这个 “状态” 到底是什么,就由我们自己定了,所以首先需要定义两个元素:

  • 定义一个状态:这个状态可以是任意数据类型
  • 定义一个更新函数:这个更新函数需要两个参数,一个是前一个批次的状态,另一个是本批次最新值的集合(因为一个 key 可能对应到多个值)

在每个批次中,Spark 都会将这个函数作用到所有现存的 key 上,不管本批次有没有这个 key 的数据。如果这个更新函数返回了 None,那么这个键值对就会被删除。

通过一个实例来讲解,还是使用最简单的 WordCount 的例子,我们现在不是要计算每个批次重复单词出现的次数,而是要计算这些重复单词出现过的总次数该怎么办呢?
可以通过如下方式实现:

/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port> <checkpoint-directory>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
* <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 /Users/will/checkpoint/`
*/
object NetworkWithStateWordCount {

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = if (!newValues.isEmpty) newValues.size + runningCount.getOrElse(0)
else runningCount.getOrElse(0)
Some(newCount)
}

def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(args(2))

// Create a DStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
// Split each line into words
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).updateStateByKey[Int](updateFunction _)
wordCounts.print()

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}

输出如下:

-------------------------------------------
Time: 1533030262000 ms
-------------------------------------------
(ink,1)

-------------------------------------------
Time: 1533030263000 ms
-------------------------------------------
(ink,3)
(baixin,1)

程序中的更新函数 updateFunction 将会被作用到每个 key,其中参数 newValues 是 1 的集合,runningCount 是这个 key 之前的计数值。注意要使用 updateStateByKey 操作必须设置 checkpoint,如何设置可以见我的另一篇文章:Spark Streaming Checkpoint

transform 操作

transform 的作用就是弥补 DStream 函数的不足,会将一个 RDD-to-RDD 的函数作用在 DStream 上,这样我们就可以将适用于 RDD 的所有函数都应用在 DStream 上。例如我们想实时过滤掉数据中的广告记录,过滤的基础集是之前提前计算好的广告信息数据,那么可以通过如下方式实现:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}

注意在每个批次都会调用这个函数,所以在此之间我们可以对这个提前计算的 RDD 进行一些 RDD 操作,例如重分区或者广播等。

窗口操作

Spark Streaming 也支持窗口计算,即可以把一个转换作用在滑动的窗口数据上,如下图所示:

上图显示的是一个跨度为 3,每次滑动步数为 2 窗口,一个窗口操作必须指定两个参数:

  • 窗口长度:窗口的持续时间(图中是 3)
  • 滑动间隔:每次窗口操作之间的间隔(图中是 2)

这两个参数必须是 Spark Streaming 批处理间隔的倍数(图中是 1)。

用一个实例来讲解一下,前面的入门例子我们会用 reduceByKey 统计每个批次相同单词的数量,现在我们想每 2 秒统计一次过去 4 秒相同单词的数量应该怎么做呢?需要用到 reduceByKeyAndWindow 操作。

// Reduce last 4 seconds of data, every 2 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(4), Seconds(2))

这个窗口操作按照键和窗口进行聚合,所以需要的参数是一个聚合函数,一个窗口长度,这里我们设置为 4 s,一个窗口滑动间隔,这里是 2 s,这个函数会每两秒计算一下过去 4 s 内重复单词出现的次数,全部源码看这里:WordCountByWindow1

下面是一些常用的窗口操作:

转换 含义
window(windowLength, slideInterval) 返回一个新的窗口 DStream。
countByWindow(windowLength, slideInterval) 返回滑动窗口中所包含元素的数量。
reduceByWindow(func, windowLength, slideInterval) 通过对所有元素依次进行聚合,返回一个包含单个元素的 DStream。从 (K, V) 集合 -> (K, V)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) (K, V) 集合 -> (K, V) 集合,根据键进行聚合。注意这个函数还有个 numTasks 参数,如果不设置的话并发任务数使用 Spark 默认配置(local 模式是 2,集群模式是 spark.default.parallelism 配置的数目),设置了的话就使用这个值。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 与上面比这个多了一个参数 invFunc,是上面窗口函数的升级版本,特点是可以进行增量计算,这样说了应该也不容易理解,详细解释还是看下面的例子吧
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 针对的数据类型不是键值对,而是单值,对单个值按照窗口进行计数

上面第二个 reduceByKeyAndWindow 与第一个类似,只不过多了一个参数 invFunc,也就是 inverse function。顾名思义,第一个参数 func 是指窗口向前移动,新的时间段的数据会被加进来,第二个参数 invFunc 与之相反,老的时间段的数据会被减出去。第一个 reduceByKeyAndWindow 操作是每次都全量计算整个时间窗口内的所有数据,如果窗口时间范围比较大的话相应的吞吐量也会很大。而我们这个改良版的 reduceByKeyAndWindow 操作可以实现增量更新,即每次窗口移动只会把新的加进来和把老的移除,而对没有变化的时间段内的数据不再进行计算,很大地提高了效率,相应的源码在这里:WordCountByWindow2

DStream 的输出操作