Kafka集群缩容实战
CDH 版本5.15.1,Kafka版本1.0.1
由于历史原因,我们有两套Kafka集群,老集群的规模对于它所承载的流量有些过剩,为了充分利用资源,需要对老集群进行缩容。
本文记录了Kafka集群缩容的过程,仅供参考。
缩容过程中用到的脚本已经上传到github:https://github.com/iamabug/kafka-shrink-scripts
公众号:大数据学徒
缩容思路
假设集群有15台机器,预计缩到10台机器,那么需要做5次缩容操作,每次将一个节点下线,那么现在问题就是如何正确、安全地从Kafka集群中移除一台broker?搞定这个之后,重复5次即可。
众所周知,一个broker下线,它上面的所有partition都会处于副本不足的状态,并且Kafka集群不会在其它的broker上生成这些副本,因此,在将一个broker从集群中移除之前,需要将这个broker上的partition副本都转移到最终会保留的10台机器上,怎么实现这个呢?Kafka自带的分区重分配工具。
在集群数据量较大的情况下,分区的转移可能会花费较长时间,那么在转移过程中最好不要创建新topic,不然新的topic有可能又创建到要被移除的broker上,当然如果实在无法避免的话,可以再对新的topic进行一次额外的转移。
总结一下,从Kafka集群中移除一个broker的步骤如下:
其中重点是partition的转移或者说重分配。
分区重分配
分区重分配主要使用的是kafka-reassign-partitions脚本,参考的说明,这个脚本有三种模式:
--generate,给定topic列表和broker列表,生成一个备选的重分配方案,重分配方案是一个JSON;
--execute,根据一个JSON文件里的重分配方案,进行分区重分配;
--verify,根据集群当前partition的分布情况,验证和JSON文件提供的重分配方案是否一致;
根据这个脚本的使用方式,分区重分配包括以下几步。
获取broker id
在测试集群的CM中Kafka的配置页面里,在搜索栏里输入“id”,可以看到broker id和主机名的对应关系:

可以看到,共有5个broker,broker id分别为150,151,152,165,294,选择 294 作为被移除的broker。
获取topic列表及输出格式转换
使用 脚本获取集群所有的topic:
$ kafka-topics --list --zookeeper ${ZK_HOSTS}
把输出的topic列表保存在 里,然后用python脚本把它处理成json格式:
import json
obj = {}
obj["version"] = 1
obj["topics"] = []
with open("topics.txt") as f:
for line in f.readlines():
topic = {"topic": line.strip()}
obj["topics"].append(topic)
with open("topics.json", "w") as f:
json.dump(obj, f, indent=2)
输出的json文件是 ,其中的内容格式如下:
{
"version": 1,
"topics": [
{
"topic": "first"
},
{
"topic": "second"
},
...
]
}
获取当前partition分配方案
使用 脚本的 来获取当前的partition分配方案:
$ kafka-reassign-partitions --zookeeper ${ZK_HOSTS} --topics-to-move-json-file topics.json --broker-list "150,151,152,165" --generate
# 注意 --broker-list 这个参数传入的值里面没有 294 这个broker id
这个命令会有非常多的输出,分为两部分,一部分是当前partition的分配情况,一部分是这个脚本生成的重分配方案:
Current partition replica assignment
{
"version": 1,
"partitions": [
{
"topic": "first",
"partition": 0,
"replicas": [150, 151, 294]
},
...
]
}
Proposed partition reassignment configuration
{
"version": 1,
"partitions": [
{
"topic": "first",
"partition": 0,
"replicas": [150, 151, 165]
},
...
]
}
可以看到,脚本生成的重分配方案里面,replicas里面是没有 294 这个broker id的。
注意:通过比较可以发现,脚本生成的重分配方案不光将原来在294上的partition转移到了其它broker上,即使和294无关的partition也会被调整,这样既增大了重分配的耗时,也带来不必要的leader切换,不是特别理想,在这种情况下,可以手动生成重分配方案。
生成重分配方案
我希望的重分配方案是:把partition从待移除的broker上转移到不会被移除的broker上,且不存在同一partition的两个副本在同一个broker上的情况即可。
将上一步输出的当前分配方案保存到为 ,然后使用python脚本进行处理:
import json
import random
BROKER_ID = 294
# 不会被移除的broker id列表
BROKER_IDS = [150, 151, 152, 165]
n = len(BROKER_IDS)
topics = {}
with open("current.json") as f:
topics = json.load(f)
# 遍历每个partition
for partition in topics["partitions"]:
# 遍历当前partition的每个副本
replicas = partition['replicas']
for i in range(len(replicas)):
# 当前副本需要转移
if replicas[i] == BROKER_ID:
# 随机生成一个broker id,确保新的副本不和当前partition的其他副本在同一个broker上
choice = random.randint(0, n-1)
while BROKER_IDS[choice] in replicas:
choice = random.randint(0, n-1)
#print("before: ", replicas)
replicas[i] = BROKER_IDS[choice]
#print("after: ", replicas)
with open("reassigned.json", "w") as f:
json.dump(topics, f, indent=2)
生成的重分配方案保存在 中。
执行分区重分配
执行分区重分配命令:
$ kafka-reassign-partitions --zookeeper ${ZK_HOSTS} --reassignment-json-file reassigned.json --execute
这个命令是在后台运行的,如果你安装了kafka-manager的话,可以在“Reassign Partitions”页面看到状态:

pending 表示还未完成。
参考时间:15个节点、160T总磁盘占用的Kafka集群对一个节点做分区重分配大约花费时间为6个小时。
注意事项
在分区重分配的过程中,因为partition的leader会发生切换,客户端有可能报 的错,这个报错一般来说是可恢复的,但还是需要密切关注。
验证
可以通过三种方式进行验证:
$ kafka-reassign-partitions --zookeeper ${ZK_HOSTS} --reassignment-json-file reassigned.json --verify | grep -v successsfully
在重分配完成之前会得到很多类似 的输出,完成之后执行的话就没有太多输出。

topic数、分区数都是0,符合预期。
# 假设kafka的数据目录为/data*/kafka/data
$ ls /data*/kafka/data
/data01/kafka/data:
cleaner-offset-checkpoint meta.properties replication-offset-checkpoint
log-start-offset-checkpoint recovery-point-offset-checkpoint
/data02/kafka/data:
cleaner-offset-checkpoint meta.properties replication-offset-checkpoint
log-start-offset-checkpoint recovery-point-offset-checkpoint
...
可以看到,kafka的数据目录下只剩下kafka自己的数据文件,分区重分配成功。
broker下线
确认待移除的broker上没有任何分区之后,在CM里,先停止broker,再删除broker,broker下线完成。
根据需要下线的broker数量,重复上面的操作,就可以实现多台broker的缩容。
欢迎批评指正沟通建议。
公众号:大数据学徒