Kafka batch相关&partition数据迁移&删除topic后遗症

目录

  1. Kafka batch相关
  2. partition数据迁移
  3. 删除topic后遗症

Kafka batch相关

在创建topic的时候需要设置两个值,分别是partitions数量和replication-factor副本数量,这两个参数分别代表kafka的吞吐量和设定副本数维护Kafka的可靠性。

Partitions的数量根据公司的能力设定,最少3个。

场景:

1250 * 8 =1W 	#ss的速率 * 分区
2500 * 8 =2w #调整后的ss的速率 * 分区

Kafka的消费速率往往通过两个角度来调整的,一个是设置的ss消费速率,一个是设置kafka的分区数。

kafka的分区数和ss分区数 1:1 对应,需要确定的是调整后的速率不能产生处理时间的延迟,这个延迟跟ss的消费时间有关,ss的固定时间内能处理多少数据量是有限的。

案例:

如何每个batch时间原先为3s,增加速率后变为5s,产生了2s延迟。下一个批次继续延迟2s,下下个批次继续延迟2s,最终产生雪崩的效应。

3s : 3 * 1250 * 8 = 3W  经验值 稳定值  经历过长时间验证的 
3s : 3 * 2500 * 8 = 6w 夯住了

Partition数据迁移

官网地址,切记: 在创建topic的时候,好好想想设置多少个Partition。

分区重新分配工具可用于将某些主题从当前代理集移到新添加的代理。这在扩展现有集群时通常很有用,因为与一次移动一个分区相比,将整个主题移至新的代理集更容易。用于执行此操作时,用户应提供应移至新代理集的主题列表和新代理的目标列表。然后,该工具将给定主题列表中的所有分区均匀分布在新的一组代理中。在此过程中,主题的复制因子保持不变。有效地,将输入主题列表的所有分区的副本从旧的代理集移至新添加的代理。

例如,以下示例将主题foo1,foo2的所有分区移动到新的代理人5,6的集合。在此步骤结束时,主题foo1和foo2的所有分区仅存在于代理5,6上。

由于该工具将主题的输入列表作为json文件接受,因此您首先需要确定要移动的主题并按以下方式创建json文件:

> ``cat` `topics-to-move.json``{``"topics"``: [{``"topic"``: ``"foo1"``},``      ``{``"topic"``: ``"foo2"``}],``"version"``:1``}

JSON文件准备好后,请使用分区重新分配工具生成候选分配:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}

该工具生成候选分配,该分配会将所有分区从主题foo1,foo2移至代理5,6。但是请注意,此时分区移动尚未开始,它仅告诉您当前分配和建议的新分配。如果您要回滚到当前分配,则应将其保存。新的赋值应保存在json文件(例如,expand-cluster-reassignment.json)中,然后使用–execute选项输入到工具中,如下所示:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}

最后,–verify选项可与该工具一起使用,以检查分区重新分配的状态。请注意,应将相同的expand-cluster-reassignment.json(与–execute选项一起使用)与–verify选项一起使用:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

删除topic后遗症

创建topic需要注意几个关键点

  1. Kafka topic名称规范: 英文字母小写
  2. 在建topic之前,名字真的想好了再建
  3. 不要轻易删除topic,除非生产上这个topic不用了,数据量较大也没关系7天后自定删除
  4. 不要有强迫症
  5. 删除有风险

当不成功的时候,暴力删除

# 删除文件目录
rm -rf /home/hadoop/tmp/kafka-logs/jj*

# 删除zookeeper目录
rmr /kafka/brokers/topics/jj
rmr /kafka/config/topics/jj
rmr /kafka/admin/delete_topics/jj

再建一个相同名称的topic,重启kafka集群 ,但是重启有风险可能起不来

解决办法:

kill -9 $(pgrep -f kafka)
喘息的机会(如果没有完全kill,kafka起不来)
/home/hadoop/app/kafka/bin/kafka-server-start.sh -daemon /home/hadoop/app/kafka/config/server.properties
Author: Tunan
Link: http://yerias.github.io/2020/04/25/kafka/5/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.