在Spark中,分组TopN好写,但是如果想写出性能好的代码却也很难。下面我们将通过写TopN的方式,找出问题,解决问题。
直接reduceByKey完成分组求和排序
def main(args: Array[String]): Unit = {
val in = "file:///home/hadoop/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)
val fileRDD = sc.textFile(in)
val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1) //((domain,url),1)
})
val result = mapRDD.reduceByKey(_ + _).groupBy(x => x._1._1).mapValues( x=> x.toList.sortBy(x => -x._2).map(x => (x._1._1,x._1._2,x._2)).take(2))
result.foreach(println)
}该方法虽然直接,但是在reduceByKey和groupBy分别进过了shuffle,而且x.toList是一个非常吃内存的操作,如果数据量大,直接OOM
def main(args: Array[String]): Unit = {
val in = "tunan-spark-core/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)
val fileRDD = sc.textFile(in)
val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1)
})
val domains = Array("www.google.com", "www.ruozedata.com", "www.baidu.com")
for (domain <- domains){
mapRDD.filter(x => x._1._1.equals(domain)).reduceByKey(_+_).sortBy(x => -x._2).take(2).foreach(println)
}
}核心思想:把需要分组分类的数据提前拿出来,在filter中过滤,每次执行一个分组,虽然减少了一次shuffle,但是我们不可能每次都把需要的数据都能提前拿到数据
使用ditinct.collect返回的数组替换人为创建的数组
def main(args: Array[String]): Unit = {
val in = "tunan-spark-core/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)
val fileRDD = sc.textFile(in)
val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1)
})
val domains = mapRDD.map(x => x._1._1).distinct().collect()
for (domain <- domains){
mapRDD.filter( x => domain.equals(x._1._1)).reduceByKey(_+_).sortBy(x => -x._2).take(2).foreach(println)
}
}有人说distinct性能不好,但是我们这里使用去重的是domain,这个数据量并不是很大,可以勉强接受,现在每次都使用for循环来处理数据,能不能更加优化一下呢
使用分区执行替换for循环
def main(args: Array[String]): Unit = {
val in = "tunan-spark-core/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)
val fileRDD = sc.textFile(in)
val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1)
})
val domains = mapRDD.map(x => x._1._1).distinct().collect()
val mapPartRDD = mapRDD.reduceByKey(new MyPartitioner(domains), _ + _).mapPartitions(partition => {
partition.toList.sortBy(x => -x._2).take(2).iterator
})
mapPartRDD.foreach(println)
}自定义的分区类
class MyPartitioner(domains:Array[String]) extends Partitioner{
val map = mutable.HashMap[String,Int]()
for (i <- 0 until (domains.length)){
map(domains(i)) = i
}
override def numPartitions: Int = domains.length
override def getPartition(key: Any): Int = {
val domain = key.asInstanceOf[(String, String)]._1
map(domain)
}
}这么做的好处是原本一起计算的RDD,现在每个分区里面去计算了,虽然toList内存占用大,但是还凑合,最终的版本就是把toList替换掉。
使用TreeSet替换toList实现最终的排序
def main(args: Array[String]): Unit = {
val in = "tunan-spark-core/data/site.log"
//连接SparkMaster
val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)
val fileRDD = sc.textFile(in)
val mapRDD = fileRDD.map(lines => {
val words = lines.split("\t")
((words(0), words(1)), 1)
})
val domains = mapRDD.map(x => x._1._1).distinct().collect()
val ord: Ordering[((String, String), Int)] = new Ordering[((String, String), Int)]() {
override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = {
if (!x._1.equals(y._1) && x._2 == y._2) {
return 1
}
// 降序排
y._2 - x._2
}
}
val treeSort = mapRDD.reduceByKey(new MyPartitioner(domains), _ + _).mapPartitions(partition => {
val set = mutable.TreeSet.empty(ord)
partition.foreach(x => {
set.add(x)
if (set.size > 2) {
set.remove(set.lastKey) //移除最后一个
}
})
set.toIterator
}).collect()
treeSort.foreach(println)
}使用TreeSet实现自定义排序器,使之每次维护的只有需要的极少量数据,这样占用内存少,效率最高。