Spark之WC产生多少个RDD

目录

  1. WC产生多少个RDD

WC产生多少个RDD

一句标准的WC产生了多少个RDD?

val result = sc.textFile("E:\\Java\\spark\\tunan-spark\\tunan-spark-core\\data\\wc.txt").flatMap(_.split("\t")).map((_, 1)).reduceByKey(_ + _)
result.saveAsTextFile("out")
  1. 使用toDebugString方法查看RDD的数量

    result.toDebugString(不包括saveAsTextFile方法)

    (2) ShuffledRDD[4] at reduceByKey at wordcount.scala:11 []
    +-(2) MapPartitionsRDD[3] at map at wordcount.scala:11 []
    | MapPartitionsRDD[2] at flatMap at wordcount.scala:11 []
    | E:\Java\spark\tunan-spark\tunan-spark-core\data\wc.txt MapPartitionsRDD[1] at textFile at wordcount.scala:11 []
    | E:\Java\spark\tunan-spark\tunan-spark-core\data\wc.txt HadoopRDD[0] at textFile at wordcount.scala:11 []

    上面的方法中:textFile算子中有HadoopRDD和MapPartitionsRDD;flatMap方法有MapPartitionsRDD;map方法有MapPartitionsRDD;reduceByKey方法有ShuffledRDD。

    这里一共是5个RDD,如果加上saveAsTextFile方法中的一个MapPartitionsRDD,则一共是6个RDD,如果加上sort方法也是一样的算法。

  2. 查看源码的方式计算RDD的数量

    sc.textFile("...")

    textFile的作用是从HDFS、本地文件系统(在所有节点上可用)或任何hadoop支持的文件系统URI读取文本文件,并将其作为字符串的RDD返回。

    path:受支持的文件系统上的文本文件的路径

    minPartitions:建议的结果RDD的最小分区数,默认值是2

    def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
    }

    在textFile值调用了hadoopFile方法,该方法传入了pathTextInputFormat(也就是mapreduce中的FileInputFormat方法,特点是按行读取),LongWritable(mapreduce计算中的key,记录的是offset),Text(mapreduce计算中的key,记录的是每行的内容),minPartitions(分区数),然后返回一个tuple,tuple记录的是key和value,我们这里做了一个处理,.map(pair => pair._2.toString)方法让结果只有内容,而忽略掉了offset。

    继续看hadoopFile的源码

    使用任意的InputFormat获取Hadoop文件的RDD,返回一个RDD类型的包含(offset,value)的元组

    def hadoopFile[K, V](
    path: String, //目录下的输入数据文件,路径可以用逗号分隔路径作为输入列表
    inputFormatClass: Class[_ <: InputFormat[K, V]], //要读取的数据的存储格式
    keyClass: Class[K], //key的类型
    valueClass: Class[V], //value的类型
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] //分区数,默认值是2
    = withScope {assertNotStopped()

    // 这是一种强制加载hdfs-site.xml的方法
    FileSystem.getLocal(hadoopConfiguration)

    //一个Hadoop的配置文件大概10 KB,这是相当大的,所以广播它
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) //设置作业环境
    new HadoopRDD( //创建一个HadoopRDD
    this,
    confBroadcast, //广播配置
    Some(setInputPathsFunc), //作业环境也许可能出错,所以使用Some()
    inputFormatClass, //读取文件的格式化类
    keyClass, //key类型
    valueClass, //value类型
    minPartitions).setName(path) //分片数
    }

    因为Hadoop的RecordReader类为每条记录使用相同的Writable对象,直接保存或者直接使用aggregation或者shuffle将会产生很多对同一个对象的引用,所以我们保存、排序或者聚合操作writable对象前,要使用map方法做一个映射。

    回到上一步的textFile方法中,Hadoop的返回值是一个包含offset和value的元组,我们只需要内容,所以使用map方法做一个映射,只拿元祖中的value即可

    .map(pair => pair._2.toString)

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f) //初始化检查
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) ==>(context, pid, iter) ==> hadoopRDD, 分区ID, 迭代器
    }

    该map方法又创建一个MapPartitionsRDD,将map算子应用于每个分区的子RDD。这应用到了RDD五大特性之一的,对每个RDD做计算,实际上是对每个RDD的partition或者split做计算。由于MapPartitionsRDD较为复杂,暂不解析。

    到此,textFile产生了两个RDD,分别是HadoopRDD和MapPartitionsRDD。共两个RDD

.flatMap(_.split("\t"))

flatMap首先作用在每一个元素上,然后将结果扁平化,最后返回一个新的RDD

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) //对RDD的所有分区做计算
}

到此,flatMap产生了一个RDD,是MapPartitionsRDD。共三个RDD

.map((_, 1))

将map作用在每一个元素上,然后返回一个新的RDD

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

到此,map产生了一个RDD,是MapPartitionsRDD。共四个RDD

.reduceByKey(_ + _)

使用联合和交换reduce函数合并每个键的值。在将结果发送到reduce之前,这也将在每个mapper上本地执行合并,类似于MapReduce中的“combiner”。输出将使用现有分区器/并行度级别进行哈希分区。

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}

继续查看reduceByKey(defaultPartitioner(self), func)方法

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

继续查看combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)方法

这是一个具体干活的方法,它使用一组自定义聚合函数组合每个键的元素。将RDD[(K, V)]转换为RDD[(K, C)]类型的结果,用于“组合类型”C。这是一个复杂的方法,暂不做解析。

def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner) //创建ShuffledRDD
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}

到此,reduceByKey产生了一个RDD,是ShuffledRDD。共五个RDD

.saveAsTextFile("...")

将每个元素使用字符串表示形式,将此RDD保存为文本文件。

def saveAsTextFile(path: String): Unit = withScope {
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter => //注意这里
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}

通过每次将一个分区的数据以流的方式传入到HDFS中再关闭流

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD( //创建MapPartitionsRDD
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}

到此,saveAsTextFile产生了一个RDD,是MapPartitionsRDD。共六个RDD

20200325更新:

在最后的saveAsTextFile()算子中,我们忽略了一个RDD,它就是是PairRDDFunctions,这个RDD是通过RDD隐式转换过来的

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}

修正后的 RDD数量是 7个

  1. 总结:使用toDebugString方法,简单的看到了生成了多少个RDD,通过阅读源码的方式,详细了解到了生成了多少个RDD,他们分别做了什么事情。我们这个流程生成了67个RDD,如果对结果进行排序,也是相同的方法可以看到答案。
Author: Tunan
Link: http://yerias.github.io/2019/10/03/spark/3/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.