Kafka 常用命令
所有命令均为 Kafka 2.x/3.x 通用(含 KRaft & Zookeeper 模式差异说明)。
目录:
- Kafka 基础命令(生产者/消费者)
- Topic 相关命令(创建/查询/修改/删除)
- 分区与副本命令(扩分区/迁移分区)
- 消费者组命令(偏移量管理)
- 集群管理命令(Broker / Controller / 集群状态)
- KRaft 模式命令(无 ZK)
- ZooKeeper 模式命令(管理 ISR、Controller、Broker)
- Reassign Partition(集群扩容迁移分区)
- Kafka 诊断/检查命令(Dump 日志/检查 Consistency)
- Kafka 工具总结(运维必备)
kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning
kafka-topics.sh \
--create \
--topic test \
--partitions 3 \
--replication-factor 3 \
--bootstrap-server localhost:9092
kafka-topics.sh \
--list \
--bootstrap-server localhost:9092
kafka-topics.sh \
--describe \
--topic test \
--bootstrap-server localhost:9092
输出包含:
- Leader
- ISR
- 副本列表
- 分区列表
kafka-topics.sh \
--delete \
--topic test \
--bootstrap-server localhost:9092
kafka-topics.sh \
--alter \
--topic test \
--partitions 6 \
--bootstrap-server localhost:9092
生成分配计划:
kafka-reassign-partitions.sh \
--generate \
--bootstrap-server localhost:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3"
执行迁移:
kafka-reassign-partitions.sh \
--execute \
--bootstrap-server localhost:9092 \
--reassignment-json-file plan.json
检查迁移:
kafka-reassign-partitions.sh \
--verify \
--bootstrap-server localhost:9092 \
--reassignment-json-file plan.json
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-group
方式1:从最早
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group \
--topic test \
--reset-offsets --to-earliest --execute
方式2:从最新
--to-latest
方式3:按具体 offset
--to-offset 1234
方式4:按日期时间
--to-datetime 2024-01-01T00:00:00.000
kafka-metadata-quorum.sh \
--bootstrap-server broker1:9092 \
describe --status
kafka-metadata-quorum.sh \
--bootstrap-server broker:9092 \
describe --cluster-metadata
ZooKeeper 模式:
zookeeper-shell.sh zk01:2181
ls /brokers/ids
get /controller
kafka-storage.sh format -t UUID -c server.properties
kafka-metadata-quorum.sh \
--bootstrap-server localhost:9092 \
describe --status
kafka-metadata-quorum.sh \
--bootstrap-server localhost:9092 \
describe --topic test
zookeeper-shell.sh 127.0.0.1:2181
ls /brokers/ids
ls /brokers/topics
get /brokers/topics/test
get /brokers/topics/test/partitions/0/state
get /controller
第一步:生成 topics.json
{
"topics": [{"topic": "test"}],
"version": 1
}
第二步:生成迁移计划:
kafka-reassign-partitions.sh \
--generate \
--broker-list "1,2,3" \
--topics-to-move-json-file topics.json \
--bootstrap-server localhost:9092
第三步:执行迁移
--execute
第四步:验证
--verify
kafka-dump-log.sh \
--deep-iteration \
--files /kafka-logs/test-0/00000000000000000000.log
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic test
kafka-topics.sh --describe --bootstrap-server localhost:9092
| 工具 | 功能 |
|---|---|
| kafka-console-producer.sh | 命令行生产者 |
| kafka-console-consumer.sh | 命令行消费者 |
| kafka-topics.sh | 管理 Topic |
| kafka-configs.sh | 配置变更 |
| kafka-consumer-groups.sh | 消费组管理 |
| kafka-reassign-partitions.sh | 分区迁移 |
| kafka-dump-log.sh | dump 日志段 |
| kafka-delete-records.sh | 删除指定 offset 以前的消息 |
| kafka-metadata-quorum.sh | KRaft 元数据管理 |
| kafka-storage.sh | KRaft 格式化工具 |
| kafka-acls.sh | ACL 权限管理 |
| kafka-get-offsets.sh | 获取 offset(新版本) |
| mirror-maker.sh | 集群跨 Region 同步 |