Spark之分组TopN模块

在Spark中,分组TopN好写,但是如果想写出性能好的代码却也很难。下面我们将通过写TopN的方式,找出问题,解决问题。

  1. 直接reduceByKey完成分组求和排序

    def main(args: Array[String]): Unit = {
    val in = "file:///home/hadoop/data/site.log"
    //连接SparkMaster
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    val fileRDD = sc.textFile(in)

    val mapRDD = fileRDD.map(lines => {
    val words = lines.split("\t")
    ((words(0), words(1)), 1) //((domain,url),1)
    })

    val result = mapRDD.reduceByKey(_ + _).groupBy(x => x._1._1).mapValues( x=> x.toList.sortBy(x => -x._2).map(x => (x._1._1,x._1._2,x._2)).take(2))
    result.foreach(println)
    }

    该方法虽然直接,但是在reduceByKey和groupBy分别进过了shuffle,而且x.toList是一个非常吃内存的操作,如果数据量大,直接OOM

  2. def main(args: Array[String]): Unit = {
    val in = "tunan-spark-core/data/site.log"
    //连接SparkMaster
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    val fileRDD = sc.textFile(in)

    val mapRDD = fileRDD.map(lines => {
    val words = lines.split("\t")
    ((words(0), words(1)), 1)
    })

    val domains = Array("www.google.com", "www.ruozedata.com", "www.baidu.com")

    for (domain <- domains){
    mapRDD.filter(x => x._1._1.equals(domain)).reduceByKey(_+_).sortBy(x => -x._2).take(2).foreach(println)
    }
    }

    核心思想:把需要分组分类的数据提前拿出来,在filter中过滤,每次执行一个分组,虽然减少了一次shuffle,但是我们不可能每次都把需要的数据都能提前拿到数据

  3. 使用ditinct.collect返回的数组替换人为创建的数组

    def main(args: Array[String]): Unit = {
    val in = "tunan-spark-core/data/site.log"
    //连接SparkMaster
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    val fileRDD = sc.textFile(in)

    val mapRDD = fileRDD.map(lines => {
    val words = lines.split("\t")
    ((words(0), words(1)), 1)
    })

    val domains = mapRDD.map(x => x._1._1).distinct().collect()

    for (domain <- domains){
    mapRDD.filter( x => domain.equals(x._1._1)).reduceByKey(_+_).sortBy(x => -x._2).take(2).foreach(println)
    }
    }

    有人说distinct性能不好,但是我们这里使用去重的是domain,这个数据量并不是很大,可以勉强接受,现在每次都使用for循环来处理数据,能不能更加优化一下呢

  4. 使用分区执行替换for循环

    def main(args: Array[String]): Unit = {
    val in = "tunan-spark-core/data/site.log"
    //连接SparkMaster
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    val fileRDD = sc.textFile(in)

    val mapRDD = fileRDD.map(lines => {
    val words = lines.split("\t")
    ((words(0), words(1)), 1)
    })

    val domains = mapRDD.map(x => x._1._1).distinct().collect()

    val mapPartRDD = mapRDD.reduceByKey(new MyPartitioner(domains), _ + _).mapPartitions(partition => {
    partition.toList.sortBy(x => -x._2).take(2).iterator
    })

    mapPartRDD.foreach(println)
    }

    自定义的分区类

    class MyPartitioner(domains:Array[String]) extends Partitioner{

    val map = mutable.HashMap[String,Int]()
    for (i <- 0 until (domains.length)){
    map(domains(i)) = i
    }
    override def numPartitions: Int = domains.length

    override def getPartition(key: Any): Int = {
    val domain = key.asInstanceOf[(String, String)]._1
    map(domain)
    }
    }

    这么做的好处是原本一起计算的RDD,现在每个分区里面去计算了,虽然toList内存占用大,但是还凑合,最终的版本就是把toList替换掉。

  5. 使用TreeSet替换toList实现最终的排序

    def main(args: Array[String]): Unit = {
    val in = "tunan-spark-core/data/site.log"
    //连接SparkMaster
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    val sc = new SparkContext(conf)

    val fileRDD = sc.textFile(in)

    val mapRDD = fileRDD.map(lines => {
    val words = lines.split("\t")
    ((words(0), words(1)), 1)
    })

    val domains = mapRDD.map(x => x._1._1).distinct().collect()

    val ord: Ordering[((String, String), Int)] = new Ordering[((String, String), Int)]() {
    override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = {
    if (!x._1.equals(y._1) && x._2 == y._2) {
    return 1
    }
    // 降序排
    y._2 - x._2
    }
    }

    val treeSort = mapRDD.reduceByKey(new MyPartitioner(domains), _ + _).mapPartitions(partition => {
    val set = mutable.TreeSet.empty(ord)
    partition.foreach(x => {
    set.add(x)
    if (set.size > 2) {
    set.remove(set.lastKey) //移除最后一个
    }
    })
    set.toIterator
    }).collect()
    treeSort.foreach(println)

    }

    使用TreeSet实现自定义排序器,使之每次维护的只有需要的极少量数据,这样占用内存少,效率最高。

Author: Tunan
Link: http://yerias.github.io/2019/10/10/spark/10/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.