Kafka架构通过多个Partition提高并发,Producer生产数据的时候默认使用Hash发送数据到每个Partition,这样就造成了消费数据的时候只能保证分区内有序,而分区间无序(每个partition是一个有序的队列)。
现象
最直接的现象就是kafka消费乱序,造成日志处理的先后顺序发生了变化,比如我的业务逻辑如下:
insert into 100,100 |
采集到Kafka再使用Spark Streaming处理的时候的顺序却变成了
update 100,1 |
因为处理的顺序乱了,所以会造成丢数、多数、数据数量是一样但内容不一样等问题。
解决方案
设置一个分区,但是这样虽然保证了数据有序,但是吞吐量直线下降。
设置一个符合特征数据的key,如
db-table-id
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}如上代码,在Producer发送数据时候,可以指定一个key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
在maxwell -> kafka的时候,需要设置
producer_partition_by=primary_key
参数可以设置为:
PARTITION_BY: [ database | table | primary_key | transaction_id | column | random ]
接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。
这时候引入另外一个Producer的参数max.in.flight.requests.per.connection=1
,限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。设置此参数是为了避免消息乱序。
flume 发到 kafka也是类似的思路(未测试),及时没有提供key的设置,还可以修改源码。