Spark之Transformations&Action

目录

  1. Transformations
  2. Action

Transformations

Transformations的特点是lazy的,和Scala中的lazy该念一致:延迟/懒加载,也就是不会立刻执行,只有等待遇到第一个action才会去提交作业到Spark上

转换算子

map 作用到每一个元素

输入:任意类型的函数,输出:泛型U类型的函数,返回RDD

def map[U: ClassTag](f: T => U): RDD[U]

mapPartitions 作用到每一个分区

输入:一个可迭代的类型T,输出:一个可迭代的类型U,返回RDD

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

mapPartitionsWithIndex 作用到每一个分区并打印分区数

输入:分区索引,可迭代的类型T,输出:可迭代的类型U,返回RDD

def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

glom() 按分区返回数组

def glom(): RDD[Array[T]]

案例:

listRDD.glom().collect().foreach(f=>f.foreach(x => println(_)))

filter() 过滤

输入:输入一个函数T,输出:一个布尔值,返回一个RDD

def filter(f: T => Boolean): RDD[T]

sample() 取样

输入:是否放回的布尔值,抽出来的概率,返回一个RDD

def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]

distinct(x) 去重 ==> numPartitions可指定分区

输入的必须是RDD,返回的也是一个RDD

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

案例:

val dist: RDD[Int] = listRDD3.distinct()

coalesce(x) 重点(小文件相关场景大量使用): ==> reduce数量决定最终输出的文件数,coalesce的作用是减少到指定分区数(x),减少分区是窄依赖
==> Spark作业遇到shuffle 会切分stage
输入一个分区数,返回一个重分区后的RDD

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]

案例:

listRDD2.coalesce(1).getNumPartitions

双value算子

zip() 拉链 ==> 不同分区和不同元素都不能用

def zip[A1 >: A, B, That](that: GenIterable[B])(implicit bf: CanBuildFrom[Repr, (A1, B), That]): That

zipWithIndex() 打印拉链所在的分区

def zipWithIndex[A1 >: A, That](implicit bf: CanBuildFrom[Repr, (A1, Int), That]): That

案例:

val name = List("张三", "李四", "王五")
val age = List(19, 26, 38)
val zipRDD: List[((String, Int), Int)] = name.zip(age).zipWithIndex

union() 并集 ==> 分区数相加

override def union[B >: A, That](that: GenSeq[B])(implicit bf: CanBuildFrom[Repr, B, That]): That

案例:

val list1 = List(1,2,3,4,5,6)
val list2 = List(4,5,6,7,8,8,8)
val ints: List[Int] = list1.union(list2)

intersection() 交集

def intersect[B >: A](that: GenSeq[B]): Repr

案例:

val inter: List[Int] = list1.intersect(list2)

subtract() 差集

输入的必须是RDD,返回的也是一个RDD

def subtract(other: RDD[T]): RDD[T]

案例:

val sub: RDD[Int] = listRDD2.subtract(listRDD3)

cartesian() 笛卡尔积

输入的必须是RDD,返回的也是一个RDD

def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

案例:

val car = listRDD2.cartesian(listRDD3)

kv算子

mapValues 得到所有ky的函数

输入:一个函数V,输出:一个值U,返回key为K,value为U的键函数对RDD

def mapValues[U](f: V => U): RDD[(K, U)]

案例:

mapRDD.groupByKey().mapValues(_.sum).print()

sortBy(x) 降序指定-x,指定任意参数

输入键值对,指定排序的值,默认升序

def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

sortByKey(true|false) 只能根据key排序

默认升序为true,可指定降序为false

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]

案例:

mapRDD.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).print()

groupByKey 返回的kv对中的函数可迭代
==>每个数据都经过shuffle,到reduce聚合,数据量大

可指定分区数,返回一个PariRDD,包含一个Key和一个可迭代的Value

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

案例:

mapRDD.groupByKey().mapValues(_.sum).print()

reduceByKey() 对value做指定的操作,直接返回函数
==>map端有Combiner先进行了一次预聚合操作,减少了网络IO传输的数据量,所以比groupByKey快
==>groupByKey的shuffle数据量明显多于reduceByKey,所以建议使用reduceByKey

输入两个值,输出一个值,返回一个PariRDD,包含一个明确的Key和一个明确的Value

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

join()

两个RDDjoin,返回一个PariRDD包含一个key,两个Value

val mapRDD2 = sc.parallelize(List(("zhaoliu", 18), ("zhangsan", 22), ("list", 21), ("wangwu", 26)))
val mapRDD3 = sc.parallelize(List(("hongqi", "男"), ("zhangsan", "男"), ("list", "女"), ("wangwu", "男")))

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

leftOuterJoin

两个RDDjoin,返回一个PariRDD包含一个key,一个确定的左表Value值,一个Option类型的右表Value值,即可能为空

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

rightOuterJoin

两个RDDjoin,返回一个PariRDD包含一个key,一个确定的右表Value值,一个Option类型的左表Value值,即可能为空

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

fullOuterJoin

两个RDDjoin,返回一个PariRDD包含一个key,一个Option类型的右表Value值,一个Option类型的左表Value值,即都可能为空

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

cogroup

作用和join类似,不同的是返回的结果是可迭代的,而join返回的是值,原因是join底层调用了cogroup

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

面试题:Spark Core 不使用distinct去重

val listRDD3 = sc.parallelize(List(4, 5, 6, 7, 8, 9, 9, 9, 8, 5))
listRDD3.map(x=>(x,null)).reduceByKey((x,y)=>x).map(_._1).print()

Action

first()

返回第一个元素,等于take(1)

def first(): T

take()

拿出指定的前N个元素,返回一个数组,结果为原始顺序

def take(num: Int): Array[T]

count()

返回元素数量,是个Long型

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

sum

求和,返回一个Double型

def sum(): Double

max

返回最大值,结果通过隐式转换排序过

def max()(implicit ord: Ordering[T]): T

min

返回最小值,结果通过隐式转换排序过

def min()(implicit ord: Ordering[T]): T

top()

先排降序再返回前N个元素组成的数组,字典序

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

案例:升序排序

listRDD.top(3)(Ordering.by(x => -x)).foreach(println)

takeOrdered

先排降序再返回N个元素组成的数组

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

案例:升序排序

listRDD.takeOrdered(3)(Ordering.by(x => x)).foreach(println)

reduce

聚合,输入两个元素输出一个元素,类型相同

def reduce(f: (T, T) => T): T

foreach

循环输出

def foreach(f: T => Unit): Unit

foreachPartition

分区循环输出

输入的是一个可迭代的类型T,输出Unit

def foreachPartition(f: Iterator[T] => Unit): Unit

案例:

listRDD.foreachPartition(x=>x.foreach(println))

countByKey

根据key统计个数,用作检测数据倾斜

def countByKey(): Map[K, Long]

lookup

根据map中的键来取出相应的值的,

def lookup(key: K): Seq[V]

案例:

mapRDD.lookup("zhangsan").foreach(println)
Author: Tunan
Link: http://yerias.github.io/2019/10/02/spark/2/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.