经典案例&多目录输出&计数器&持久化&广播变量

目录

  1. 经典案例
  2. 多目录输出
  3. 计数器
  4. 持久化
  5. 广播变量

经典案例

/**
* 用户 节目 展示 点击
* 001,一起看|电视剧|军旅|亮剑,1,1
* 001,一起看|电视剧|军旅|亮剑,1,0
* 002,一起看|电视剧|军旅|士兵突击,1,1
* ==>
* 001,一起看,2,1
* 001,电视剧,2,1
* 001,军旅,2,1
* 001,亮剑,2,1
**/
object exercise02 {

def main(args: Array[String]): Unit = {

val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)

val linesRDD: RDD[String] = sc.textFile("tunan-spark-core/data/test2.txt")

//使用map返回的是一个数组,我不要数组,就使用flatMap
import com.tunan.spark.utils.ImplicitAspect.rdd2RichRDD
val map2RDD: RDD[((String, String), (Int, Int))] = linesRDD.flatMap(line => {
val words: Array[String] = line.split(",")
val programs: Array[String] = words(1).split("\\|")
val mapRDD: Array[((String, String), (Int, Int))] = programs.map(program => ((words(0), program), (words(2).toInt, words(3).toInt)))
mapRDD
})
val groupRDD: RDD[((String, String), Iterable[(Int, Int)])] = map2RDD.groupByKey()

//这里是mapValues很好的一个使用案例
val mapVRDD: RDD[((String, String), (Int, Int))] = groupRDD.mapValues(x => {
val imps: Int = x.map(_._1).sum
val check: Int = x.map(_._2).sum
(imps, check)
})

//格式化输出
mapVRDD.map(x => {
(x._1._1,x._1._2,x._2._1,x._2._1)
}).print()

sc.stop()
}
}

多目录输出

  1. 实现多目录输出自定义类

    import org.apache.hadoop.io.NullWritable
    import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

    class MyMultipleTextOutputFormat extends MultipleTextOutputFormat[Any,Any] {
    //生成最终生成的key的类型,这里不要,给Null
    override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()

    //生成最终生成的value的类型,这里是String
    override def generateActualValue(key: Any, value: Any): Any = {
    value.asInstanceOf[String]
    }

    //生成文件名
    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
    s"$key/$name"
    }
    }
  2. 主类,使用saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])方法保存数据,指定参数

    object MultipleDirectory {
    def main(args: Array[String]): Unit = {
    val out = "tunan-spark-core/out"
    val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)
    CheckHDFSOutPath.ifExistsDeletePath(sc.hadoopConfiguration,out)
    //读取数组,转换成键值对的格式
    val lines = sc.textFile("tunan-spark-core/ip/access-result/*")
    val mapRDD: RDD[(String, String)] = lines.map(line => {
    val words = line.split(",")
    (words(12), line)
    })
    //多目录保存文件
    mapRDD.saveAsHadoopFile(out,classOf[String],classOf[String],classOf[MyMultipleTextOutputFormat])
    sc.stop()
    }
    }
  3. 结果

    多目录输出

在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)

计数器

为什么要定义计数器?

在spark应用程序中,我们经常会有这样的需求,如异常监控调试记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会再driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

图解计数器

错误的图解

累加器错误的图解

正确的图解

累加器正确的图解

计数器种类很多,但是经常用的就是两种,longAccumulatorcollectionAccumulator

需要注意的是计数器是lazy的,只有触发action才会进行计数,在不持久化的情况下重复触发action,计数器会重复累加

LongAccumulator

Accumulators 是只能通过associative和commutative操作“added”的变量,因此可以有效地并行支持。它们可用于实现计数器(如MapReduce)和Spark本身支持数字类型的累加器,程序员还可以添加对新类型的支持

longAccumulator通过累加的方式计数

object MyLongAccumulator {

def main(args: Array[String]): Unit = {

val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)
var acc = sc.longAccumulator("计数")

val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

val forRDD = rdd.map(x => {
// 计数器做累加
acc.add(1L)
})
// action操作
forRDD.count()

println(acc.value) // 9
}
}

使用longAccumulator做计数的时候要小心重复执行action导致的acc.value的变化

object MyLongAccumulatorV2{
def main(args: Array[String]): Unit = {
val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)
//生成计数器
val acc = sc.longAccumulator("计数")
val rdd = sc.parallelize(1 to 8)
val forRDD = rdd.map(x => {
//计数器做累加
acc.add(1L)
})
forRDD.count()
println(acc.value) //8
forRDD.count()
println(acc.value) //16
}
}

由于重复执行了count(),累加器的数量成倍增长,解决这种错误累加也很简单,就是在count之前调用forRDD的cache方法(或persist),这样在count后数据集就会被缓存下来,reduce操作就会读取缓存的数据集,而无需从头开始计算。

object MyLongAccumulator {
def main(args: Array[String]): Unit = {
val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)
//生成计数器
val acc = sc.longAccumulator("计数")
val rdd = sc.parallelize(1 to 8)
val forRDD = rdd.map(x => {
//计数器做累加
acc.add(1L)
})
forRDD.cache().count()
println(acc.value) //8
forRDD.count()
println(acc.value) //8
}
}

CollectionAccumulator

collectionAccumulator,集合计数器,计数器中保存的是集合元素,通过泛型指定。

/**
* 需求:id后三位相同的加入计数器
*/
object MyCollectionAccumulator {
def main(args: Array[String]): Unit = {
val sc = ContextUtils.getSparkContext(this.getClass.getSimpleName)

//生成集合计数器
val acc = sc.collectionAccumulator[People]("集合计数器")

//生成RDD
val rdd: RDD[People] = sc.parallelize(Array(People("tunan", 100000), People("xiaoqi", 100001), People("张三", 100222), People("李四", 100003)))

//map操作
rdd.map(x => {
val id2 = x.id.toString.reverse
//满足条件就加入计数器,
if (id2(0) == id2(1) && id2(0) ==id2(2)){
acc.add(x)
}
}).count() //触发action

println(acc.value) //[People(张三,100222), People(tunan,100000)]
}
case class People(name:String,id:Long);
}

注意事项:

  1. 计数器在Driver端定义赋初始值,计数器只能在Driver端读取最后的值,在Excutor端更新。

  2. 计数器不是一个调优的操作,因为如果不这样做,结果是错的

持久化

Spark中最重要的功能之一是跨操作在内存中持久化数据集。持久化一个RDD时,每个节点在内存中存储它计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重构它们。这使得将来的操作要快得多(通常超过10倍)。缓存是迭代算法和快速交互使用的关键工具。

可以使用其上的persist()或cache()方法将RDD标记为持久的。第一次在操作中计算它时,它将保存在节点的内存中。Spark的缓存是容错的——如果一个RDD的任何分区丢失了,它将使用最初创建它的转换自动重新计算。

持久化的存储级别很多,常用的是MEMORY_ONLY、MEMORY_ONLY_SER、MEMORY_AND_DISK

Storage Level Meaning
MEMORY_ONLY 将RDD作为不序列化的Java对象存储在JVM中。如果RDD不适合内存,那么一些分区将不会被缓存,而是在需要它们时动态地重新计算。这是默认级别。
MEMORY_AND_DISK 将RDD作为不序列化的Java对象存储在JVM中。如果RDD不适合内存,那么将不适合的分区存储在磁盘上,并在需要时从那里读取它们。
MEMORY_ONLY_SER (Java and Scala) 将RDD存储为序列化的Java对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用快速序列化器时,但读取时需要更多cpu。

如何选择它们?

Storage Level的选择是内存和CPU的权衡

  1. 内存多:MEMORY_ONLY (不进行序列化)
  2. CPU跟的上:MEMORY_ONLY_SER (进行了序列化,推介)
  3. 不建议写Disk

使用cache()和persist()进行持久化操作,它们都是lazy的,需要action才能触发,默认使用MEMORY_ONLY

scala> forRDD.cache
res18: forRDD.type = MapPartitionsRDD[9] at map at <console>:27

scala> forRDD.count
res19: Long = 8

结果可以在Web UI的Storage中查看

如果需要清除缓存,使用unpersist(),清除缓存数据是立即执行的

scala> forRDD.unpersist()
res8: forRDD.type = MapPartitionsRDD[3] at map at <console>:28

怎么修改存储级别?

val forRDD = rdd.map(x => {
//计数器做累加
acc.add(1L)
}).persist(StorageLevel.MEMORY_ONLY_SER).count()

StorageLevel是个object,需要的级别都可以从里面拿出来

考点:cache和persist有什么区别?

  • cache调用的persist,persist调用的persist(storage level)

考点:序列化和非序列化有什么区别?

  • 序列化将对象转换成字节数组了,节省空间,占CPU

Removing Data

Spark自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧的数据分区。如果想要手动删除一个RDD,而不是等待它从缓存中消失,那么可以使用RDD.unpersist()方法。

伪代码以及画图表示出什么是LRU?

广播变量

为什么要将变量定义成广播变量?

如果我们要在分布式计算里面分发大对象,例如:字典集合黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么知识每个executor拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。

广播变量图解

错误的,不使用广播变量

不使用广播变量

正确的,使用广播变量的情况

使用广播变量

小表广播案例

使用广播变量的场景很多, 我们都知道spark 一种常见的优化方式就是小表广播, 使用 map join 来代替 reduce join, 我们通过把小的数据集广播到各个节点上,节省了一次特别 expensive 的 shuffle 操作。

比如driver 上有一张数据量很小的表, 其他节点上的task 都需要 lookup 这张表, 那么 driver 可以先把这张表 copy 到这些节点,这样 task 就可以在本地查表了。

  1. Fact table 航线(起点机场, 终点机场, 航空公司, 起飞时间)

    SEA,JFK,DL,7:00
    SFO,LAX,AA,7:05
    SFO,JFK,VX,7:05
    JFK,LAX,DL,7:10
    LAX,SEA,DL,7:10
  2. Dimension table 机场(简称, 全称, 城市, 所处城市简称)

    JFK,John F. Kennedy International Airport,New York,NY
    LAX,Los Angeles International Airport,Los Angeles,CA
    SEA,Seattle-Tacoma International Airport,Seattle,WA
    SFO,San Francisco International Airport,San Francisco,CA
  3. Dimension table 航空公司(简称,全称)

    AA,American Airlines
    DL,Delta Airlines
    VX,Virgin America
  4. 思路:将机场维度表和航空公司维度表进行广播,生成Map,航线事实表从广播变量中通过key拿到value(计算在每个executor上)

  5. 代码

    object BroadcastApp {
    def main(args: Array[String]): Unit = {
    val sc: SparkContext = ContextUtils.getSparkContext(this.getClass.getSimpleName)

    // Fact table 航线(起点机场, 终点机场, 航空公司, 起飞时间)
    val flights = sc.textFile("tunan-spark-core/broadcast/flights.txt")

    // Dimension table 机场(简称, 全称, 城市, 所处城市简称)
    val airports: RDD[String] = sc.textFile("tunan-spark-core/broadcast/airports.txt")

    // Dimension table 航空公司(简称,全称)
    val airlines = sc.textFile("tunan-spark-core/broadcast/airlines.txt")

    /**
    * 最终统计结果:
    * 出发城市 终点城市 航空公司名称 起飞时间
    * Seattle New York Delta Airlines 7:00
    * San Francisco Los Angeles American Airlines 7:05
    * San Francisco New York Virgin America 7:05
    * New York Los Angeles Delta Airlines 7:10
    * Los Angeles Seattle Delta Airlines 7:10
    */

    //广播Dimension Table airport,生成Map
    val airportsBC = sc.broadcast(airports.map(x => {
    val words = x.split(",")
    (words(0), words(2))
    }).collectAsMap())

    //广播Dimension Table airlines,生成Map
    val airlinesBC = sc.broadcast(airlines.map(x => {
    val words = x.split(",")
    (words(0), words(1))
    }).collectAsMap())

    //通过key获取value
    flights.map(lines => {
    val words = lines.split(",")
    val a = airportsBC.value.get(words(0)).get
    val b = airportsBC.value.get(words(1)).get
    val c = airlinesBC.value.get(words(2)).get
    a+" "+b+" "+c+" "+words(3)
    }).foreach(println)
    sc.stop()
    }
    }
  6. 结果

    New York    	Los Angeles     Delta Airlines    7:10
    Los Angeles Seattle Delta Airlines 7:10
    Seattle New York Delta Airlines 7:00
    San Francisco Los Angeles American Airlines 7:05
    San Francisco New York Virgin America 7:05

为什么只能 broadcast 只读的变量

这就涉及一致性的问题,如果变量可以被更新,那么一旦变量被某个节点更新,其他节点要不要一块更新?如果多个节点同时在更新,更新顺序是什么?怎么做同步? 仔细想一下, 每个都很头疼, spark 目前就索性搞成了只读的。 因为分布式强一致性真的很蛋疼

注意事项

  1. 变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改

  2. 能不能将一个RDD使用广播变量广播出去?因为RDD是不存储数据的。可以将RDD的结果广播出去。

  3. 广播变量只能在Driver端定义,不能在Executor端定义。

  4. 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

  5. 如果executor端用到了Driver的变量,不使用广播变量在Executor有多少task就有多少Driver端的变量副本。

  6. 如果Executor端用到了Driver的变量,使用广播变量在每个Executor中只有一份Driver端的变量副本。

Author: Tunan
Link: http://yerias.github.io/2019/10/09/spark/9/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.