Spark Streaming 双流Join

需求分析

Spark Streaming实现两个流的join操作,如:一个流是订单数据,另一个流是订单详情数据,现需要将两个流按照某个公共字段连接进行join操作,同时订单数据和订单详情数据理论上是同时产生的,但考虑到实际情况即:延迟,结合Spark Streaming的批次处理实时数据的理念,这两个流的数据不一定是同时到达的,意思就是订单一的数据已经过来了,可能订单详情数据早到或者迟到,这就导致直接做join时会join不上,且数据会丢失。

需求实现

为了保证早到的数据或者迟到的数据在某个时间点能够被join上,因此需要对数据做缓存处理。前提是两个流的批次大小必须一样(比如都是10s一个批次)。首先需要明白订单数据和订单详情数据是一对多的关系,即一个订单数据可能对应多个订单详情数据(一个订单买了很多商品)。

考虑订单数据,不管订单数据来早还是来晚都需要做缓存,站在订单数据的角度上,即使同批次join上了也不能确定还有没有对应的其他订单详情数据有没有来,是来过了还是没有来,因此订单数据需要先和同批次进行join,然后无条件缓存自身同时还需要查询订单详情的缓存,缓存自身为了等待来晚的订单详情数据,查询订单详情缓存为了寻找来早的订单详情数据。

考虑订单详情数据,若同批次join上,那就结束了(该数据的生命周期结束),若同批次没有join上需要查询订单数据缓存尝试join为了确保订单数据有没有先到,若还是没有join上那就以为了这条数据来早了,因此就缓存自身等待订单数据,也就是说同批次join上意味着没有没有延迟;查找订单缓存join上意味着自己来晚了,没有join上缓存自身意味着自己来早了

缓存数据问题

首先任何支持存储的组件都能实现,但需要考虑到实时性以及数据量的问题,因此可以使用redis或者Hbase,本文选择redis作为缓存,基于如下考虑:

  1. 订单数据无条件缓存,那么是否无条件缓存下去?
  2. 订单详情数据缓存后,是否无条件缓存下去?

答案都是不会,首选订单详情数据的数据量往往是很大的,因此缓存中的数据一旦被join命中立刻删除,此时这条数据已经没有用了(不删也不会有影响,考虑一下?不删会不会造成数据重复,即会不会再次被命中),对于订单数据也无需要无限缓存下去,只要等待一段时间(大于实际的延迟时间即可)就可以删除,即使极端情况下有某条订单详情数据过来了,那也没有join的必要,已经过了实时计算的范畴了。因此使用redis来管理这些数据的生命周期(设置key的过期时间)

采用什么样的 join

与flink流的join原理不同的是,Spark双流join是对俩个流做满外连接 ,因为网络延迟等关系,不能保证每个窗口中的数据key都能匹配上,这样势必会出现三种情况:(some,some),(None,Some),(Some,None) ,根据这三种情况,下面做一下详细解析:

(Some,Some) : 1号流和2号流中key能正常进行逻辑运算,但是考虑到2号流后续可能会有剩下的数据到来,所以需要将1号流中的key保存到redis,以等待接下来的数据

(None,Some) : 找不到1号流中对应key的数据,需要去redis中查找1号流的缓存,如果找不到,则缓存起来,等待1号流

(Some,None) : 找不到2号流中的数据,需要将key保存到redis,以等待接下来的数据,并且去reids中找2号流的缓存,如果有,则join,然后删除2号流的缓存

代码实现

样例类

object domain {
case class Order(orderId: String, userId: String)
case class OrderInfo(infoId: String, orderId: String, skuId: String, skuName: String, skuNum: Int)
case class OrderDetail(var orderId: String = "",
var userId: String = "",
var infoId: String = "",
var skuId: String = "",
var skuName: String = "",
var skuNum: Int = 0) {

def mergeOrder(order: Order): OrderDetail = {
if (order != null) {
this.orderId = order.orderId
this.userId = order.userId
}
this
}

def mergeOrderInfo(orderInfo: OrderInfo): OrderDetail = {
if (orderInfo != null) {
this.infoId = orderInfo.infoId
this.skuId = orderInfo.skuId
this.skuName = orderInfo.skuName
this.skuNum = orderInfo.skuNum
}
this
}
}
}

逻辑实现

object DoubleStreamJoin {

def main(args: Array[String]): Unit = {

// 10s 一个批次
val conf = new SparkConf().setMaster("local[3]").setAppName("JoinAppV1")
val ssc = new StreamingContext(conf, Seconds(10))

// Order表
val orderStream = ssc.socketTextStream("aliyun", 9001).map(x => {
val lines = x.split(",")
(lines(0), Order(
lines(0).trim,
lines(1).trim
))
})

// Order详情表
val orderInfoStream = ssc.socketTextStream("aliyun", 9002).map(x => {
val lines = x.split(",")
(lines(1), OrderInfo(
lines(0).trim,
lines(1).trim,
lines(2).trim,
lines(3).trim,
lines(4).trim.toInt
))
})

val orderDetailRDD = orderStream.fullOuterJoin(orderInfoStream).mapPartitions(partition => {
println(" ----------------10s---------------------")
val jedis = RedisUtils.getJedis
val gson = new Gson()
val OrderDetails = partition.map({
// order:orderInfo => 1:n
case (orderId, (Some(order), Some(orderInfo))) => {
// order加入缓存,设置过期时间
jedis.setex(s"order:${orderId}", 30, gson.toJson(order))
// 得到join后的结果
val orderDetail = OrderDetail().mergeOrder(order).mergeOrderInfo(orderInfo)
// 要不要根据订单id去缓存中拿到所有缓存的订单信息(1:n 对应的订单信息可能延迟)
// orderInfo可能有多个,都是orderId对应的,如果key相同会覆盖,需要对key做处理 => orderId:orderInfoId
import scala.collection.JavaConverters._
val keys = jedis.keys(s"orderInfo:${orderId}:*").asScala
val orderDetails: mutable.Set[OrderDetail] = keys.map(key => {
// 拿到该订单所有的订单信息
val orderInfo = gson.fromJson(jedis.get(key), classOf[OrderInfo])
// 拿到信息后删除该订单信息,否则数据会重复
jedis.del(key)
// join
OrderDetail().mergeOrder(order).mergeOrderInfo(orderInfo)
})
orderDetails += orderDetail
}

// 1:n => 1:1
case (orderId, (None, Some(orderInfo))) => {
// redis中查找order
val jedisOrder = jedis.get(s"order:${orderId}")

if (jedisOrder != null) {
// 找到了进行join
val order = gson.fromJson(jedisOrder, classOf[Order])
OrderDetail().mergeOrder(order).mergeOrderInfo(orderInfo)
} else {
// 找不到了缓存到redis,不需要设置过期时间,下次被找到时立即删除
jedis.set(s"orderInfo:${orderId}:${orderInfo.infoId}", gson.toJson(orderInfo))
}
}

// 1:n
case (orderId, (Some(order), None)) => {
// 首先缓存order,给定过期时间
jedis.setex(s"order:${orderId}", 30, gson.toJson(order))
// redis中查找orderInfo
import scala.collection.JavaConverters._
val keys = jedis.keys(s"orderInfo:${orderId}:*").asScala
val details = keys.map(key => {
val orderInfo = gson.fromJson(jedis.get(key), classOf[OrderInfo])
// join上了删除orderInfo
jedis.del(key)
OrderDetail().mergeOrder(order).mergeOrderInfo(orderInfo)
})
details
}
})
RedisUtils.closeJedis(jedis)
OrderDetails
})

orderDetailRDD.foreachRDD(rdd => rdd.foreach(println))


ssc.start()
ssc.awaitTermination()
}
}
Author: Tunan
Link: http://yerias.github.io/2020/11/16/spark/39/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.