目录
- Transformations
- Action
Transformations
Transformations的特点是lazy的,和Scala中的lazy该念一致:延迟/懒加载,也就是不会立刻执行,只有等待遇到第一个action才会去提交作业到Spark上
转换算子
map 作用到每一个元素
输入:任意类型的函数,输出:泛型U类型的函数,返回RDD
def map[U: ClassTag](f: T => U): RDD[U] |
mapPartitions 作用到每一个分区
输入:一个可迭代的类型T,输出:一个可迭代的类型U,返回RDD
def mapPartitions[U: ClassTag]( |
mapPartitionsWithIndex 作用到每一个分区并打印分区数
输入:分区索引,可迭代的类型T,输出:可迭代的类型U,返回RDD
def mapPartitionsWithIndex[U: ClassTag]( |
glom() 按分区返回数组
def glom(): RDD[Array[T]] |
案例:
listRDD.glom().collect().foreach(f=>f.foreach(x => println(_))) |
filter() 过滤
输入:输入一个函数T,输出:一个布尔值,返回一个RDD
def filter(f: T => Boolean): RDD[T] |
sample() 取样
输入:是否放回的布尔值,抽出来的概率,返回一个RDD
def sample( |
distinct(x) 去重 ==> numPartitions可指定分区
输入的必须是RDD,返回的也是一个RDD
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] |
案例:
val dist: RDD[Int] = listRDD3.distinct() |
coalesce(x) 重点(小文件相关场景大量使用): ==> reduce数量决定最终输出的文件数,coalesce的作用是减少到指定分区数(x),减少分区是窄依赖
==> Spark作业遇到shuffle 会切分stage
输入一个分区数,返回一个重分区后的RDD
def coalesce(numPartitions: Int, shuffle: Boolean = false, |
案例:
listRDD2.coalesce(1).getNumPartitions |
双value算子
zip() 拉链 ==> 不同分区和不同元素都不能用
def zip[A1 >: A, B, That](that: GenIterable[B])(implicit bf: CanBuildFrom[Repr, (A1, B), That]): That |
zipWithIndex() 打印拉链所在的分区
def zipWithIndex[A1 >: A, That](implicit bf: CanBuildFrom[Repr, (A1, Int), That]): That |
案例:
val name = List("张三", "李四", "王五") |
union() 并集 ==> 分区数相加
override def union[B >: A, That](that: GenSeq[B])(implicit bf: CanBuildFrom[Repr, B, That]): That |
案例:
val list1 = List(1,2,3,4,5,6) |
intersection() 交集
def intersect[B >: A](that: GenSeq[B]): Repr |
案例:
val inter: List[Int] = list1.intersect(list2) |
subtract() 差集
输入的必须是RDD,返回的也是一个RDD
def subtract(other: RDD[T]): RDD[T] |
案例:
val sub: RDD[Int] = listRDD2.subtract(listRDD3) |
cartesian() 笛卡尔积
输入的必须是RDD,返回的也是一个RDD
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] |
案例:
val car = listRDD2.cartesian(listRDD3) |
kv算子
mapValues 得到所有ky的函数
输入:一个函数V,输出:一个值U,返回key为K,value为U的键函数对RDD
def mapValues[U](f: V => U): RDD[(K, U)] |
案例:
mapRDD.groupByKey().mapValues(_.sum).print() |
sortBy(x) 降序指定-x,指定任意参数
输入键值对,指定排序的值,默认升序
def sortBy[K]( |
sortByKey(true|false) 只能根据key排序
默认升序为true,可指定降序为false
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) |
案例:
mapRDD.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).print() |
groupByKey 返回的kv对中的函数可迭代
==>每个数据都经过shuffle,到reduce聚合,数据量大
可指定分区数,返回一个PariRDD,包含一个Key和一个可迭代的Value
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] |
案例:
mapRDD.groupByKey().mapValues(_.sum).print() |
reduceByKey() 对value做指定的操作,直接返回函数
==>map端有Combiner先进行了一次预聚合操作,减少了网络IO传输的数据量,所以比groupByKey快
==>groupByKey的shuffle数据量明显多于reduceByKey,所以建议使用reduceByKey
输入两个值,输出一个值,返回一个PariRDD,包含一个明确的Key和一个明确的Value
def reduceByKey(func: (V, V) => V): RDD[(K, V)] |
join()
两个RDDjoin,返回一个PariRDD包含一个key,两个Value
val mapRDD2 = sc.parallelize(List(("zhaoliu", 18), ("zhangsan", 22), ("list", 21), ("wangwu", 26))) |
leftOuterJoin
两个RDDjoin,返回一个PariRDD包含一个key,一个确定的左表Value值,一个Option类型的右表Value值,即可能为空
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] |
rightOuterJoin
两个RDDjoin,返回一个PariRDD包含一个key,一个确定的右表Value值,一个Option类型的左表Value值,即可能为空
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] |
fullOuterJoin
两个RDDjoin,返回一个PariRDD包含一个key,一个Option类型的右表Value值,一个Option类型的左表Value值,即都可能为空
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] |
cogroup
作用和join类似,不同的是返回的结果是可迭代的,而join返回的是值,原因是join底层调用了cogroup
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] |
面试题:Spark Core 不使用distinct去重
val listRDD3 = sc.parallelize(List(4, 5, 6, 7, 8, 9, 9, 9, 8, 5)) |
Action
first()
返回第一个元素,等于take(1)
def first(): T |
take()
拿出指定的前N个元素,返回一个数组,结果为原始顺序
def take(num: Int): Array[T] |
count()
返回元素数量,是个Long型
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum |
sum
求和,返回一个Double型
def sum(): Double |
max
返回最大值,结果通过隐式转换排序过
def max()(implicit ord: Ordering[T]): T |
min
返回最小值,结果通过隐式转换排序过
def min()(implicit ord: Ordering[T]): T |
top()
先排降序再返回前N个元素组成的数组,字典序
def top(num: Int)(implicit ord: Ordering[T]): Array[T] |
案例:升序排序
listRDD.top(3)(Ordering.by(x => -x)).foreach(println) |
takeOrdered
先排降序再返回N个元素组成的数组
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] |
案例:升序排序
listRDD.takeOrdered(3)(Ordering.by(x => x)).foreach(println) |
reduce
聚合,输入两个元素输出一个元素,类型相同
def reduce(f: (T, T) => T): T |
foreach
循环输出
def foreach(f: T => Unit): Unit |
foreachPartition
分区循环输出
输入的是一个可迭代的类型T,输出Unit
def foreachPartition(f: Iterator[T] => Unit): Unit |
案例:
listRDD.foreachPartition(x=>x.foreach(println)) |
countByKey
根据key统计个数,用作检测数据倾斜
def countByKey(): Map[K, Long] |
lookup
根据map中的键来取出相应的值的,
def lookup(key: K): Seq[V] |
案例:
mapRDD.lookup("zhangsan").foreach(println) |