Bootstrap

Kafka系列8:一网打尽常用脚本及配置,宜收藏落灰!

前言

通过前面 7 篇文章的介绍,小伙伴们应该对 Kafka 运行工作原理有一个相对比较清晰的认识了。为了提高平时的工作效率,帮助我们快速定位一些线上问题,比如查看部分 Partition 堆积机器 IP 等操作,这篇文章总结了一些平时常用到的一些 Kafka 命令及常用配置,方便日后查阅(该文章中提到的相关配置会持续更新)。

文章概览

常用命令总结

一.  脚本相关常用命令,主要操作 Topic。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 3 --topic op_log
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe op_log
$ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic op_log1 --partition 4
$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic op_log

二.  脚本常用命令,主要用于操作消费组相关的。

$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-1291 --describe
$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group mygroup --describe

三.  脚本常用命令,用于检查 OffSet 相关信息。(注意:该脚本在 0.9 以后可以使用 kafka-consumer-groups.sh 脚本代替,官方已经标注为 deprecated 了)

$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291
$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291 --broker-info

四.  脚本常用命令,该脚本主要用于增加/修改 Kafka 集群的配置信息。

$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --add-config 'max.message.bytes=50000000,flush.message=5'
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --delete-config 'flush.message'
$ bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name mytopic --describe

五.  脚本相关常用命令,主要操作 Partition 的负载情况。

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list "0" --topics-to-move-json-file ~/json/op_log_topic-to-move.json --generate
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --execute
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --verify

常用配置及说明

kafka 常见重要配置说明,分为四部分

  • Broker Config:kafka 服务端的配置

  • Producer Config:生产端的配置

  • Consumer Config:消费端的配置

  • Kafka Connect Config:kafka 连接相关的配置

Broker Config

连接 zookeeper 集群的地址,用于将 kafka 集群相关的元数据信息存储到指定的 zookeeper 集群中

注册到 zookeeper 中的地址端口信息,在 IaaS 环境中,默认注册到 zookeeper 中的是内网地址,通过该配置指定外网访问的地址及端口,advertised.host.name 和 advertised.port 作用和 advertised.port 差不多,在 0.10.x 之后,直接配置 advertised.port 即可,前两个参数被废弃掉了。

自动创建 topic,默认为 true。其作用是当向一个还没有创建的 topic 发送消息时,此时会自动创建一个 topic,并同时设置 -num.partition 1 (partition 个数) 和 default.replication.factor (副本个数,默认为 1) 参数。

一般该参数需要手动关闭,因为自动创建会影响 topic 的管理,我们可以通过 kafka-topic.sh 脚本手动创建 topic,通常也是建议使用这种方式创建 topic。在 0.10.x 之后提供了 kafka-admin 包,可以使用其来创建 topic。

自动 rebalance,默认为 true。其作用是通过后台线程定期扫描检查,在一定条件下触发重新 leader 选举;在生产环境中,不建议开启,因为替换 leader 在性能上没有什么提升。

后台线程数,默认为 10。用于后台操作的线程,可以不用改动。

Broker 的唯一标识,用于区分不同的 Broker。kafka 的检查就是基于此 id 是否在 zookeeper 中/brokers/ids 目录下是否有相应的 id 目录来判定 Broker 是否健康。

压缩类型。此配置可以接受的压缩类型有 gzip、snappy、lz4。另外可以不设置,即保持和生产端相同的压缩格式。

启用删除 topic。如果关闭,则无法使用 admin 工具进行 topic 的删除操作。

partition 检查重新 rebalance 的周期时间

标识每个 Broker 失去平衡的比率,如果超过改比率,则执行重新选举 Broker 的 leader

保存 kafka 日志数据的位置。如果 log.dirs 没有设置,则使用 log.dir 指定的目录进行日志数据存储。

partition 分区的数据量达到指定大小时,对数据进行一次刷盘操作。比如设置了 1024k 大小,当 partition 积累的数据量达到这个数值时则将数据写入到磁盘上。

数据写入磁盘时间间隔,即内存中的数据保留多久就持久化一次,如果没有设置,则使用 log.flush.scheduler.interval.ms 参数指定的值。

表示 topic 的容量达到指定大小时,则对其数据进行清除操作,默认为-1,永远不删除。

标示 topic 的数据最长保留多久,单位是小时

表示 topic 的数据最长保留多久,单位是分钟,如果没有设置该参数,则使用 log.retention.hours 参数

表示 topic 的数据最长保留多久,单位是毫秒,如果没有设置该参数,则使用 log.retention.minutes 参数

新的 segment 创建周期,单位小时。kafka 数据是以 segment 存储的,当周期时间到达时,就创建一个新的 segment 来存储数据。

topic 能够接收的最大文件大小。需要注意的是 producer 和 consumer 端设置的大小需要一致。

最小副本同步个数。当 producer 设置了 request.required.acks 为-1 时,则 topic 的副本数要同步至该参数指定的个数,如果达不到,则 producer 端会产生异常。

指定 io 操作的线程数

执行网络操作的线程数

每个数据目录用于恢复数据的线程数

从 leader 备份数据的线程数

允许消费者端保存 offset 的最大个数

offset 提交的延迟时间

topic 的 offset 的备份数量。该参数建议设置更高保证系统更高的可用性

端口号,Broker 对外提供访问的端口号。

Broker 接收到请求后的最长等待时间,如果超过设定时间,则会给客户端发送错误信息

客户端和 zookeeper 建立连接的超时时间,如果没有设置该参数,则使用 zookeeper.session.timeout.ms 值

空连接的超时时间。即空闲连接超过该时间时则自动销毁连接。

Producer Config

服务端列表。即接收生产消息的服务端列表

消息键的序列化方式。指定 key 的序列化类型

消息内容的序列化方式。指定 value 的序列化类型

消息写入 Partition 的个数。通常可以设置为 0,1,all;当设置为 0 时,只需要将消息发送完成后就完成消息发送功能;当设置为 1 时,即 Leader Partition 接收到数据并完成落盘;当设置为 all 时,即主从 Partition 都接收到数据并落盘。

客户端缓存大小。即 Producer 生产的数据量缓存达到指定值时,将缓存数据一次发送的 Broker 上。

压缩类型。指定消息发送前的压缩类型,通常有 none, gzip, snappy, or, lz4 四种。不指定时消息默认不压缩。

消息发送失败时重试次数。当该值设置大于 0 时,消息因为网络异常等因素导致消息发送失败进行重新发送的次数。

Consumer Config

服务端列表。即消费端从指定的服务端列表中拉取消息进行消费。

消息键的反序列化方式。指定 key 的反序列化类型,与序列化时指定的类型相对应。

消息内容的反序列化方式。指定 value 的反序列化类型,与序列化时指定的类型相对应。

抓取消息的最小内容。指定每次向服务端拉取的最小消息量。

消费组中每个消费者的唯一表示。

心跳检查周期。即在周期性的向 group coordinator 发送心跳,当服务端发生 rebalance 时,会将消息发送给各个消费者。该参数值需要小于 session.timeout.ms,通常为后者的 1/3。

Partition 每次返回的最大数据量大小。

consumer 失效的时间。即 consumer 在指定的时间后,还没有响应则认为该 consumer 已经发生故障了。

当 kafka 中没有初始偏移量或服务器上不存在偏移量时,指定从哪个位置开始消息消息。earliest:指定从头开始;latest:从最新的数据开始消费。

Kafka Connect Config

消费者在消费组中的唯一标识

内部 key 的转换类型。

内部 value 的转换类型。

服务端接收到 key 时指定的转换类型。

服务端接收到 value 时指定的转换类型。

服务端列表。

心跳检测,与 consumer 中指定的配置含义相同。

session 有效时间,与 consumer 中指定的配置含义相同。

总结

本文总结了平时经常用到的一些 Kafka 配置及命令说明,方便随时查看;喜欢的朋友可以收藏以备不时之需。

下篇文章我们来分析一些经常在面试中碰到的问题及相应的解决办法,敬请期待。

历史精彩文章推荐

微信公众号搜索【z小赵】,更多系列精彩文章等你解锁。