Spark Streaming 常见操作

Posted by Ink Bai on 2018-08-01, & views

DStream 的转换操作与 RDD 的差不多,简单的像是 map,flatMap,repartition 我们就不讲了,我们讲几个关键特殊的。

DStream 的转换操作

UpdateStateByKey 操作

顾名思义,UpdateStateByKey 转换允许你在每个批次都可以对每个键更新他们的状态,至于这个 “状态” 到底是什么,就由我们自己定了,所以首先需要定义两个元素:

  • 定义一个状态:这个状态可以是任意数据类型
  • 定义一个更新函数:这个更新函数需要两个参数,一个是前一个批次的状态,另一个是本批次最新值的集合(因为一个 key 可能对应到多个值)

在每个批次中,Spark 都会将这个函数作用到所有现存的 key 上,不管本批次有没有这个 key 的数据。如果这个更新函数返回了 None,那么这个键值对就会被删除。

通过一个实例来讲解,还是使用最简单的 WordCount 的例子,我们现在不是要计算每个批次重复单词出现的次数,而是要计算这些重复单词出现过的总次数该怎么办呢?
可以通过如下方式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port> <checkpoint-directory>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
* <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 /Users/will/checkpoint/`
*/
object NetworkWithStateWordCount {

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = if (!newValues.isEmpty) newValues.size + runningCount.getOrElse(0)
else runningCount.getOrElse(0)
Some(newCount)
}

def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(args(2))

// Create a DStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
// Split each line into words
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).updateStateByKey[Int](updateFunction _)
wordCounts.print()

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}

输出如下:

1
2
3
4
5
6
7
8
9
10
-------------------------------------------
Time: 1533030262000 ms
-------------------------------------------
(ink,1)

-------------------------------------------
Time: 1533030263000 ms
-------------------------------------------
(ink,3)
(baixin,1)

程序中的更新函数 updateFunction 将会被作用到每个 key,其中参数 newValues 是 1 的集合,runningCount 是这个 key 之前的计数值。注意要使用 updateStateByKey 操作必须设置 checkpoint,如何设置可以见我的另一篇文章:Spark Streaming Checkpoint

transform 操作

transform 的作用就是弥补 DStream 函数的不足,会将一个 RDD-to-RDD 的函数作用在 DStream 上,这样我们就可以将适用于 RDD 的所有函数都应用在 DStream 上。例如我们想实时过滤掉数据中的广告记录,过滤的基础集是之前提前计算好的广告信息数据,那么可以通过如下方式实现:

1
2
3
4
5
6
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}

注意在每个批次都会调用这个函数,所以在此之间我们可以对这个提前计算的 RDD 进行一些 RDD 操作,例如重分区或者广播等。

窗口操作

Spark Streaming 也支持窗口计算,即可以把一个转换作用在滑动的窗口数据上,如下图所示:

上图显示的是一个跨度为 3,每次滑动步数为 2 窗口,一个窗口操作必须指定两个参数:

  • 窗口长度:窗口的持续时间(图中是 3)
  • 滑动间隔:每次窗口操作之间的间隔(图中是 2)

这两个参数必须是 Spark Streaming 批处理间隔的倍数(图中是 1)。

用一个实例来讲解一下,前面的入门例子我们会用 reduceByKey 统计每个批次相同单词的数量,现在我们想每 2 秒统计一次过去 4 秒相同单词的数量应该怎么做呢?需要用到 reduceByKeyAndWindow 操作。

1
2
// Reduce last 4 seconds of data, every 2 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(4), Seconds(2))

这个窗口操作按照键和窗口进行聚合,所以需要的参数是一个聚合函数,一个窗口长度,这里我们设置为 4 s,一个窗口滑动间隔,这里是 2 s,这个函数会每两秒计算一下过去 4 s 内重复单词出现的次数,全部源码看这里:WordCountByWindow1

下面是一些常用的窗口操作:

转换 含义
window(windowLength, slideInterval) 返回一个新的窗口 DStream。
countByWindow(windowLength, slideInterval) 返回滑动窗口中所包含元素的数量。
reduceByWindow(func, windowLength, slideInterval) 通过对所有元素依次进行聚合,返回一个包含单个元素的 DStream。从 (K, V) 集合 -> (K, V)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) (K, V) 集合 -> (K, V) 集合,根据键进行聚合。注意这个函数还有个 numTasks 参数,如果不设置的话并发任务数使用 Spark 默认配置(local 模式是 2,集群模式是 spark.default.parallelism 配置的数目),设置了的话就使用这个值。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 与上面比这个多了一个参数 invFunc,是上面窗口函数的升级版本,特点是可以进行增量计算,这样说了应该也不容易理解,详细解释还是看下面的例子吧
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 针对的数据类型不是键值对,而是单值,对单个值按照窗口进行计数

上面第二个 reduceByKeyAndWindow 与第一个类似,只不过多了一个参数 invFunc,也就是 inverse function。顾名思义,第一个参数 func 是指窗口向前移动,新的时间段的数据会被加进来,第二个参数 invFunc 与之相反,老的时间段的数据会被减出去。第一个 reduceByKeyAndWindow 操作是每次都全量计算整个时间窗口内的所有数据,如果窗口时间范围比较大的话相应的吞吐量也会很大。而我们这个改良版的 reduceByKeyAndWindow 操作可以实现增量更新,即每次窗口移动只会把新的加进来和把老的移除,而对没有变化的时间段内的数据不再进行计算,很大地提高了效率,相应的源码在这里:WordCountByWindow2

DStream 的输出操作

常见操作
输出操作 含义
print() 在 driver 结点运行,打印每个批次的 DStream 数据的前 10 条记录,一般用于开发的 debug。
saveAsTextFiles(prefix, [suffix]) 将 DStream 的内容存为文本文件,前缀是文件路径,后缀是文件后缀名称,最后文件的组合形式是 prefix-TIME_IN_MS[.suffix],如 /user/spark/file-1540540829.txt
saveAsObjectFiles(prefix, [suffix]) 将 DStream 内容保存为 Java 对象的序列化文件。
saveAsHadoopFiles(prefix, [suffix]) 将 DStream 的内容保存为 Hadoop 文件。
foreachRDD(func) 最通用的输出操作,它会将一个函数应用在 DStream 形成的每一个 RDD 上。这个函数会将数据输出到外部系统中,例如讲 RDD 保存为文件,或者存到数据库中。注意函数 func 运行在 driver 进程内。

如何正确使用 foreachRDD?

dstream.foreachRDD 是一种简单但却强大的将数据输出到外部的方法,我们应该避免下面这样的错误。

首先要将数据输出到外部我们就需要创建一个连接(例如到远程服务器的 TCP 连接),这时我们可能会这样写代码:

1
2
3
4
5
6
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}

foreachRDD 运行在 driver 节点上,而 rdd.foreach 运行在 worker 节点,这就需要将这个连接对象序列化然后从 driver 节点传输到 worker 节点,一般我们不会这样做,因此会报序列化错误 connection object not serializable 或者初始化错误 connection object needs to be initialized at the workers 等等。正确的做法应该是在 worker 节点创建连接对象。

现在我们改一下代码如下,这样是否正确呢?

1
2
3
4
5
6
7
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}

这样也是有问题的,因为这样会为每条 record 都创建一个连接对象,我们知道创建一个连接对象需要消耗时间和资源,因此为每一条记录创建并且销毁连接会导致不必要的高负载并且极大降低系统的吞吐量,更好的解决方式是使用 rdd.foreachPartition,为一个 RDD 分区内的所有记录创建一个单独的连接。

1
2
3
4
5
6
7
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}

当然以上代码还是有可以优化的空间,我们可以使用一个固定连接池来复用连接。

1
2
3
4
5
6
7
8
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}

这个连接池应当在需要的时候延迟创建并且设置超时时间如果有一段时间不使用的话,这样就可以实现最高效地向外部传输数据。