从源码的角度解析ByKey类算子

reduceByKey和groupByKey的区别

我们都知道reduceByKey和groupByKey的区别在于

  1. reduceByKey用于对每个key对应的多个value进行merge操作,并且在map端进行了预聚合操作。
  2. groupByKey也是对每个key进行操作,但只将key对应的value组成一个集合,没有预聚合的操作。

那么它们的源码使怎样的呢?

# reduceByKey源码
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
# groupByKey源码
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

combineByKeyWithClassTag算子是大多数ByKey类算子的最底层实现,我们看看源码。

 def combineByKeyWithClassTag[C](
createCombiner: V => C, //聚合值的类型
mergeValue: (C, V) => C, //局部聚合
mergeCombiners: (C, C) => C, //全局集合
partitioner: Partitioner, //分区数
mapSideCombine: Boolean = true, //默认开启map端预聚合
serializer: Serializer = null)
...

从源码中可以看到reduceByKey和groupByKey都调用了combineByKeyWithClassTag算子,区别在于

  1. reduceByKey中传入combineByKeyWithClassTag的前三个参数做了计算,并且没有修改mapSideCombine参数
  2. groupByKey中传入combineByKeyWithClassTag的前三个参数没有做计算(集合),并且修改mapSideCombine的参数为false

此上两点决定了reduceByKey做了map端的预聚合而groupByKey没做,reduceByKey做了计算返回结果而groupByKey没有做计算返回集合。

combineByKeyWithClassTag做计算

下面我们通过直接使用最底层的combineByKeyWithClassTag实现reduceByKey的功能来了解combineByKeyWithTag的计算原理。

# combineByKey源码
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

由于combineByKey的底层调用的combineByKeyWithClassTag,我们从combineByKey开始

案例1求和

val rdd = sc.parallelize(List((1, 3), (1, 4), (1, 2), (2, 3), (3, 6), (3, 8)), 3)

// 求和
val result = rdd.combineByKey(
x => x, // 初始化类型
(a: Int, b: Int) => a + b, // 局部聚合
(x: Int, y: Int) => x + y // 全局聚合
).collect()

println(result.mkString) // (3,14)(1,9)(2,3)

案例2求平均值

// 求平均值
val rdd = sc.parallelize(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)
val result = rdd.combineByKey(
(_, 1), // 初始化类型
(a: (Int, Int), b: Int) => (a._1 + b, a._2 + 1), // 局部聚合
(x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)// 全局聚合
).map{
case (key, value) => { // 匹配key,value
(key,(value._1/value._2).toDouble)
}
}.collect()

println(result.mkString) // (b,95.0)(a,91.0)

使用combineByKeyWithClassTag算子重构

val rdd = sc.parallelize(List((1, 3), (1, 4), (1, 2), (2, 3), (3, 6), (3, 8)), 3)

// 求和
val result = rdd.combineByKeyWithClassTag(
x => x, // 初始化类型
(a: Int, b: Int) => a + b, // 局部聚合
(x: Int, y: Int) => x + y // 全局聚合
).collect()

println(result.mkString) // (3,14)(1,9)(2,3)

从代码可以发现,combineByKeyWithClassTag的计算原理无非就是如何实现初始化类型、局部聚合和全局聚合。

大多数ByKey类的算子都是如此。

Author: Tunan
Link: http://yerias.github.io/2019/11/04/spark/34/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.