Spark之排序模块

目录

  1. 算子排序
  2. 面向对象排序
  3. 隐式转换排序
  4. Ordering.on排序

Spark中的排序模块,顾名思义,这篇文章都是说如何排序

算子排序

的确,在Spark中有很多算子可以排序,可以给数组排序,可以给键值对排序,我们会使用算子引入排序,然后再重点介绍如何使用隐式转换达到排序的效果。

("西瓜 20 100", "苹果 10 500", "香蕉 10 30", "菠萝 30 200")

现在我们有一行数据,我们如何使它按价格降序排序?

//TODO List中每个东西的含义:名称name、价格price、库存amount
val rdd = sc.parallelize(List("西瓜 20 100", "苹果 10 500", "香蕉 10 30", "菠萝 30 200"))
//拆分map拿出来 ==> 返回tuple(productData) ==> 排序sortBy
val mapRDD = rdd.map(x => {
val split = x.split(" ")
val name = split(0)
val price = split(1).toDouble
val amount = split(2).toInt
(name, price, amount)
})
mapRDD.sortBy(-_._2).collect().foreach(println);

也许你会认为很简单,那么现在要求按价格降序排序,价格相同库存降序排序

//拆分 ==> 返回tuple() ==> 排序sortBy(x => (-x._2,-x._3))
val mapRDD2 = rdd.map(x => {
val split = x.split(" ")
val name = split(0)
val price = split(1).toDouble
val amount = split(2).toInt
(name, price, amount)
})
mapRDD2.sortBy(x=>(-x._2,-x._3)).collect().foreach(println);

的确,使用算子排序很简单,但是简洁的代码,能让你回想起几个月前的这几行是什么意思吗?

面向对象排序

现在我们引入面向对象的方式来排序

//TODO 面向对象的方式实现
val productRDD: RDD[ProductCaseClass] = rdd.map(x => {
val split = x.split(" ")
val name = split(0)
val price = split(1).toDouble
val amount = split(2).toInt
new Products(name, price, amount)
})
productRDD.sortBy(x=>x).collect().foreach(println)
class Products(val name: String, val price: Double, val amount: Int)
extends Ordered[Products]
with Serializable {
override def toString: String = s"$name | $price | $amount"

override def compare(that: Products): Int = {
that.price.toInt-this.price.toInt
}
}

你是不是傻?case class 自己实现了序列化,实现了toString、equals、hashCode,用起来还不需要new,创建class干啥?

于是再次改造

val productRDD: RDD[ProductCaseClass] = rdd.map(x => {
val split = x.split(" ")
val name = split(0)
val price = split(1).toDouble
val amount = split(2).toInt
ProductCaseClass(name, price, amount)
})
productRDD.sortBy(x=>x).collect().foreach(println)
case class ProductCaseClass(name:String,price:Double,amount:Int) extends Ordered[ProductCaseClass]{
override def toString: String = s"case class: $name | $price | $amount"
override def compare(that: ProductCaseClass): Int = that.price.toInt - this.price.toInt
}

隐式转换排序

于是呼~ 不爽,现在只给你一个最普通的类,给我增强出带排序功能的类。这不是隐式转换吗?

//一个最普通的类,不实现ordered排序的功能
class ProductsInfo(val name: String, val price: Double, val amount: Int) extends Serializable {
override def toString: String = s"ProductsInfo: $name | $price | $amount"
}

那么我们该如何改造它?

val productRDD: RDD[ProductsInfo] = rdd.map(x => {
val split = x.split(" ")
val name = split(0)
val price = split(1).toDouble
val amount = split(2).toInt
new ProductsInfo(name, price, amount)
})

我就把数据切分出来,返回一个ProductsInfo,其他啥都不干

现在最重要的就是实现隐式转换,将ProductsInfo增加排序的功能,排序的规则还要自定义

  1. 隐式方法/转换

    implicit def ProductsInfo2Orderding(productsInfo:ProductsInfo):Ordered[ProductsInfo] = new Ordered[ProductsInfo] {
    override def compare(that: ProductsInfo): Int = {
    if (that.price-productsInfo.price>0){
    1
    }else if (that.price-productsInfo.price==0 && that.amount-productsInfo.amount >0){
    1
    }else{
    -1
    }
    }
    }
  2. 隐式变量

    implicit val ProductsInfo2Orderding:Ordering[ProductsInfo] = new Ordering[ProductsInfo] {
    override def compare(x: ProductsInfo, y: ProductsInfo): Int = {
    y.amount - x.amount
    }
    }
  3. 隐式对象

    implicit object ProductsInfo22Orderding extends Ordering[ProductsInfo]{
    override def compare(x: ProductsInfo, y: ProductsInfo): Int = {
    y.amount - x.amount
    }
    }

这三种方式都可以达到增强ProductsInfo的功能,他们都离不开一个公式:

implicit def x2y(普通的x):牛逼的y = new 牛逼的y(普通的x)

上面的隐式转换就是这个公式的直接应用

Ordering.on排序

你也许会说,够了吧,这么多种方式排序了,累都累死人了,那么我告诉有一种方法,不需要case class、不需要class、不需要隐式转换,只有一行代码就能排序,学不学?

val productRDD = rdd.map(x => {
val split = x.split(" ")
val name = split(0)
val price = split(1).toDouble
val amount = split(2).toInt
(name, price, amount) //请注意,我们这里productRDD返回的是一个tuple
})

实现排序

/**
* (Double,Int) : 定义排序规则的返回值类型,可以是class
* (String,Double,Int) : 进来数据的类型
* (x => (-x._2,-x._3)) : 定义排序的规则
*/
implicit val order = Ordering[(Double,Int)].on[(String,Double,Int)](x => (-x._2,-x._3))

是不是看懵逼了?使用productRDD调用sortBy()就输出了排序后的结果

productRDD.sortBy(x => x).print()

结果:

(菠萝,30.0,200)
(西瓜,20.0,100)
(苹果,10.0,500)
(香蕉,10.0,30)
Author: Tunan
Link: http://yerias.github.io/2019/10/04/spark/4/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.