reduceByKey和groupByKey的区别
我们都知道reduceByKey和groupByKey的区别在于
- reduceByKey用于对每个key对应的多个value进行merge操作,并且在map端进行了预聚合操作。
- groupByKey也是对每个key进行操作,但只将key对应的value组成一个集合,没有预聚合的操作。
那么它们的源码使怎样的呢?
# reduceByKey源码 def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) } # groupByKey源码 def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
|
combineByKeyWithClassTag
算子是大多数ByKey类算子的最底层实现,我们看看源码。
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null) ...
|
从源码中可以看到reduceByKey和groupByKey都调用了combineByKeyWithClassTag算子,区别在于
- reduceByKey中传入combineByKeyWithClassTag的前三个参数做了计算,并且没有修改mapSideCombine参数
- groupByKey中传入combineByKeyWithClassTag的前三个参数没有做计算(集合),并且修改mapSideCombine的参数为false
此上两点决定了reduceByKey做了map端的预聚合而groupByKey没做,reduceByKey做了计算返回结果而groupByKey没有做计算返回集合。
combineByKeyWithClassTag做计算
下面我们通过直接使用最底层的combineByKeyWithClassTag实现reduceByKey的功能来了解combineByKeyWithTag的计算原理。
# combineByKey源码 def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) }
|
由于combineByKey的底层调用的combineByKeyWithClassTag,我们从combineByKey开始
案例1求和
val rdd = sc.parallelize(List((1, 3), (1, 4), (1, 2), (2, 3), (3, 6), (3, 8)), 3)
val result = rdd.combineByKey( x => x, (a: Int, b: Int) => a + b, (x: Int, y: Int) => x + y ).collect()
println(result.mkString)
|
案例2求平均值
val rdd = sc.parallelize(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2) val result = rdd.combineByKey( (_, 1), (a: (Int, Int), b: Int) => (a._1 + b, a._2 + 1), (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2) ).map{ case (key, value) => { (key,(value._1/value._2).toDouble) } }.collect()
println(result.mkString)
|
使用combineByKeyWithClassTag算子重构
val rdd = sc.parallelize(List((1, 3), (1, 4), (1, 2), (2, 3), (3, 6), (3, 8)), 3)
val result = rdd.combineByKeyWithClassTag( x => x, (a: Int, b: Int) => a + b, (x: Int, y: Int) => x + y ).collect()
println(result.mkString)
|
从代码可以发现,combineByKeyWithClassTag的计算原理无非就是如何实现初始化类型、局部聚合和全局聚合。
大多数ByKey类的算子都是如此。