Kafka消费时保证消息的顺序性

Kafka架构通过多个Partition提高并发,Producer生产数据的时候默认使用Hash发送数据到每个Partition,这样就造成了消费数据的时候只能保证分区内有序,而分区间无序(每个partition是一个有序的队列)。

现象

最直接的现象就是kafka消费乱序,造成日志处理的先后顺序发生了变化,比如我的业务逻辑如下:

insert into 100,100
update 100,1
update 100,2
update 100,3
delete where id=100

采集到Kafka再使用Spark Streaming处理的时候的顺序却变成了

update  100,1
delete where id=100
update 100,2
update 100,3
insert into 100,100

因为处理的顺序乱了,所以会造成丢数、多数、数据数量是一样但内容不一样等问题。

解决方案

  1. 设置一个分区,但是这样虽然保证了数据有序,但是吞吐量直线下降。

  2. 设置一个符合特征数据的key,如db-table-id

    public ProducerRecord(String topic, Integer partition, K key, V value) {
    this(topic, partition, null, key, value, null);
    }

    如上代码,在Producer发送数据时候,可以指定一个key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。

    在maxwell -> kafka的时候,需要设置producer_partition_by=primary_key

    参数可以设置为:

    PARTITION_BY: [ database | table | primary_key | transaction_id | column | random ]

接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

这时候引入另外一个Producer的参数max.in.flight.requests.per.connection=1,限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。设置此参数是为了避免消息乱序。

flume 发到 kafka也是类似的思路(未测试),及时没有提供key的设置,还可以修改源码。

其他参考链接: https://www.cnblogs.com/huxi2b/p/6056364.html

Author: Tunan
Link: http://yerias.github.io/2020/04/24/kafka/4/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.