需求分析 Spark Streaming实现两个流的join操作,如:一个流是订单数据,另一个流是订单详情数据,现需要将两个流按照某个公共字段连接进行join操作,同时订单数据和订单详情数据理论上是同时产生的,但考虑到实际情况即:延迟,结合Spark Streaming的批次处理实时数据的理念,这两个流的数据不一定是同时到达的,意思就是订单一的数据已经过来了,可能订单详情数据早到或者迟到,这就导致直接做join时会join不上,且数据会丢失。
需求实现 为了保证早到的数据或者迟到的数据在某个时间点能够被join上,因此需要对数据做缓存处理。前提是两个流的批次大小必须一样(比如都是10s一个批次) 。首先需要明白订单数据和订单详情数据是一对多 的关系,即一个订单数据可能对应多个订单详情数据(一个订单买了很多商品)。
考虑订单数据,不管订单数据来早还是来晚都需要做缓存 ,站在订单数据的角度上,即使同批次join上了也不能确定还有没有对应的其他订单详情数据有没有来,是来过了还是没有来,因此订单数据需要先和同批次进行join,然后无条件缓存自身同时还需要查询订单详情的缓存,缓存自身为了等待来晚的订单详情数据,查询订单详情缓存为了寻找来早的订单详情数据。
考虑订单详情数据,若同批次join上,那就结束了(该数据的生命周期结束),若同批次没有join上需要查询订单数据缓存尝试join为了确保订单数据有没有先到,若还是没有join上那就以为了这条数据来早了,因此就缓存自身等待订单数据 ,也就是说同批次join上意味着没有没有延迟;查找订单缓存join上意味着自己来晚了,没有join上缓存自身意味着自己来早了
缓存数据问题 首先任何支持存储的组件都能实现,但需要考虑到实时性以及数据量的问题,因此可以使用redis
或者Hbase
,本文选择redis
作为缓存,基于如下考虑:
订单数据无条件缓存,那么是否无条件缓存下去?
订单详情数据缓存后,是否无条件缓存下去?
答案都是不会,首选订单详情数据的数据量往往是很大的,因此缓存中的数据一旦被join命中立刻删除,此时这条数据已经没有用了(不删也不会有影响,考虑一下?不删会不会造成数据重复,即会不会再次被命中),对于订单数据也无需要无限缓存下去,只要等待一段时间 (大于实际的延迟时间即可)就可以删除,即使极端情况下有某条订单详情数据过来了,那也没有join的必要,已经过了实时计算的范畴了。因此使用redis
来管理这些数据的生命周期(设置key的过期时间)
采用什么样的 join 与flink流的join原理不同的是,Spark双流join是对俩个流做满外连接 ,因为网络延迟等关系,不能保证每个窗口中的数据key都能匹配上,这样势必会出现三种情况:(some,some),(None,Some),(Some,None) ,根据这三种情况,下面做一下详细解析:
(Some,Some) : 1号流和2号流中key能正常进行逻辑运算,但是考虑到2号流后续可能会有剩下的数据到来,所以需要将1号流中的key保存到redis,以等待接下来的数据
(None,Some) : 找不到1号流中对应key的数据,需要去redis中查找1号流的缓存,如果找不到,则缓存起来,等待1号流
(Some,None) : 找不到2号流中的数据,需要将key保存到redis,以等待接下来的数据,并且去reids中找2号流的缓存,如果有,则join,然后删除2号流的缓存
代码实现 样例类 object domain { case class Order (orderId: String, userId: String) case class OrderInfo (infoId: String, orderId: String, skuId: String, skuName: String, skuNum: Int) case class OrderDetail (var orderId: String = "" , var userId: String = "" , var infoId: String = "" , var skuId: String = "" , var skuName: String = "" , var skuNum: Int = 0 ) { def mergeOrder (order: Order) : OrderDetail = { if (order != null ) { this .orderId = order.orderId this .userId = order.userId } this } def mergeOrderInfo (orderInfo: OrderInfo) : OrderDetail = { if (orderInfo != null ) { this .infoId = orderInfo.infoId this .skuId = orderInfo.skuId this .skuName = orderInfo.skuName this .skuNum = orderInfo.skuNum } this } } }
逻辑实现 object DoubleStreamJoin { def main (args: Array[String]) : Unit = { val conf = new SparkConf().setMaster("local[3]" ).setAppName("JoinAppV1" ) val ssc = new StreamingContext(conf, Seconds(10 )) val orderStream = ssc.socketTextStream("aliyun" , 9001 ).map(x => { val lines = x.split("," ) (lines(0 ), Order( lines(0 ).trim, lines(1 ).trim )) }) val orderInfoStream = ssc.socketTextStream("aliyun" , 9002 ).map(x => { val lines = x.split("," ) (lines(1 ), OrderInfo( lines(0 ).trim, lines(1 ).trim, lines(2 ).trim, lines(3 ).trim, lines(4 ).trim.toInt )) }) val orderDetailRDD = orderStream.fullOuterJoin(orderInfoStream).mapPartitions(partition => { println(" ----------------10s---------------------" ) val jedis = RedisUtils.getJedis val gson = new Gson() val OrderDetails = partition.map({ case (orderId, (Some(order), Some(orderInfo))) => { jedis.setex(s"order:${orderId}" , 30 , gson.toJson(order)) val orderDetail = OrderDetail().mergeOrder(order).mergeOrderInfo(orderInfo) import scala.collection.JavaConverters._ val keys = jedis.keys(s"orderInfo:${orderId}:*" ).asScala val orderDetails: mutable.Set[OrderDetail] = keys.map(key => { val orderInfo = gson.fromJson(jedis.get(key), classOf[OrderInfo]) jedis.del(key) OrderDetail().mergeOrder(order).mergeOrderInfo(orderInfo) }) orderDetails += orderDetail } case (orderId, (None, Some(orderInfo))) => { val jedisOrder = jedis.get(s"order:${orderId}" ) if (jedisOrder != null ) { val order = gson.fromJson(jedisOrder, classOf[Order]) OrderDetail().mergeOrder(order).mergeOrderInfo(orderInfo) } else { jedis.set(s"orderInfo:${orderId}:${orderInfo.infoId}" , gson.toJson(orderInfo)) } } case (orderId, (Some(order), None)) => { jedis.setex(s"order:${orderId}" , 30 , gson.toJson(order)) import scala.collection.JavaConverters._ val keys = jedis.keys(s"orderInfo:${orderId}:*" ).asScala val details = keys.map(key => { val orderInfo = gson.fromJson(jedis.get(key), classOf[OrderInfo]) jedis.del(key) OrderDetail().mergeOrder(order).mergeOrderInfo(orderInfo) }) details } }) RedisUtils.closeJedis(jedis) OrderDetails }) orderDetailRDD.foreachRDD(rdd => rdd.foreach(println)) ssc.start() ssc.awaitTermination() } }