Skip to main content
Documents
Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

Kafka 常用命令

所有命令均为 Kafka 2.x/3.x 通用(含 KRaft & Zookeeper 模式差异说明)。

🚀 Kafka 常用命令大全(含集群)

目录:

  1. Kafka 基础命令(生产者/消费者)
  2. Topic 相关命令(创建/查询/修改/删除)
  3. 分区与副本命令(扩分区/迁移分区)
  4. 消费者组命令(偏移量管理)
  5. 集群管理命令(Broker / Controller / 集群状态)
  6. KRaft 模式命令(无 ZK)
  7. ZooKeeper 模式命令(管理 ISR、Controller、Broker)
  8. Reassign Partition(集群扩容迁移分区)
  9. Kafka 诊断/检查命令(Dump 日志/检查 Consistency)
  10. Kafka 工具总结(运维必备)

1. Kafka 基础命令

1.1 启动 Producer(命令行)

kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic test

1.2 启动 Consumer(命令行)

kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic test \
    --from-beginning

2. Topic 相关命令

2.1 创建 Topic

kafka-topics.sh \
    --create \
    --topic test \
    --partitions 3 \
    --replication-factor 3 \
    --bootstrap-server localhost:9092

2.2 查看 Topic 列表

kafka-topics.sh \
    --list \
    --bootstrap-server localhost:9092

2.3 查看 Topic 详细信息

kafka-topics.sh \
    --describe \
    --topic test \
    --bootstrap-server localhost:9092

输出包含:

  • Leader
  • ISR
  • 副本列表
  • 分区列表

2.4 删除 Topic(软删除)

kafka-topics.sh \
    --delete \
    --topic test \
    --bootstrap-server localhost:9092

3. 分区 / 副本相关命令

3.1 增加 Topic 分区(不可减少)

kafka-topics.sh \
    --alter \
    --topic test \
    --partitions 6 \
    --bootstrap-server localhost:9092

3.2 修改副本分布(Reassign Partition)

生成分配计划:

kafka-reassign-partitions.sh \
    --generate \
    --bootstrap-server localhost:9092 \
    --topics-to-move-json-file topics.json \
    --broker-list "1,2,3"

执行迁移:

kafka-reassign-partitions.sh \
    --execute \
    --bootstrap-server localhost:9092 \
    --reassignment-json-file plan.json

检查迁移:

kafka-reassign-partitions.sh \
    --verify \
    --bootstrap-server localhost:9092 \
    --reassignment-json-file plan.json

4. 消费者组命令(必会!)

4.1 查看消费者组列表

kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --list

4.2 查看某个消费组状态 / Lag

kafka-consumer-groups.sh \
    --bootstrap-server localhost:9092 \
    --describe \
    --group my-group

4.3 重置 Offset(最常用)

方式1:从最早

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --group my-group \
    --topic test \
    --reset-offsets --to-earliest --execute

方式2:从最新

--to-latest

方式3:按具体 offset

--to-offset 1234

方式4:按日期时间

--to-datetime 2024-01-01T00:00:00.000

5. 集群管理命令(Broker / Controller)

5.1 查看集群中的 Broker 列表(KRaft)

kafka-metadata-quorum.sh \
    --bootstrap-server broker1:9092 \
    describe --status

5.2 查看 Controller ID(KRaft)

kafka-metadata-quorum.sh \
    --bootstrap-server broker:9092 \
    describe --cluster-metadata

ZooKeeper 模式:

zookeeper-shell.sh zk01:2181
ls /brokers/ids
get /controller

6. KRaft 模式命令(无 ZK)

6.1 启动 KRaft Controller

kafka-storage.sh format -t UUID -c server.properties

6.2 查看 KRaft 集群状态

kafka-metadata-quorum.sh \
    --bootstrap-server localhost:9092 \
    describe --status

6.3 查看元数据(Topic/Partition 集群信息)

kafka-metadata-quorum.sh \
    --bootstrap-server localhost:9092 \
    describe --topic test

7. ZooKeeper 模式命令(仍大量使用)

7.1 查看 Broker 列表

zookeeper-shell.sh 127.0.0.1:2181
ls /brokers/ids

7.2 查看某个 Topic 的分区元数据

ls /brokers/topics
get /brokers/topics/test

7.3 查看 ISR

get /brokers/topics/test/partitions/0/state

7.4 查看当前 Controller

get /controller

8. 分区迁移(Reassign Partition)

第一步:生成 topics.json

{
  "topics": [{"topic": "test"}],
  "version": 1
}

第二步:生成迁移计划:

kafka-reassign-partitions.sh \
    --generate \
    --broker-list "1,2,3" \
    --topics-to-move-json-file topics.json \
    --bootstrap-server localhost:9092

第三步:执行迁移

--execute

第四步:验证

--verify

9. Kafka 调试 + 诊断命令

9.1 检查日志文件信息(Dump Log Segment)

kafka-dump-log.sh \
    --deep-iteration \
    --files /kafka-logs/test-0/00000000000000000000.log

9.2 检查副本同步延迟

kafka-run-class.sh kafka.tools.GetOffsetShell \
    --broker-list localhost:9092 \
    --topic test

9.3 查分区 Leader 分布

kafka-topics.sh --describe --bootstrap-server localhost:9092

10. Kafka 相关所有 CLI 工具列表(最全)

工具 功能
kafka-console-producer.sh 命令行生产者
kafka-console-consumer.sh 命令行消费者
kafka-topics.sh 管理 Topic
kafka-configs.sh 配置变更
kafka-consumer-groups.sh 消费组管理
kafka-reassign-partitions.sh 分区迁移
kafka-dump-log.sh dump 日志段
kafka-delete-records.sh 删除指定 offset 以前的消息
kafka-metadata-quorum.sh KRaft 元数据管理
kafka-storage.sh KRaft 格式化工具
kafka-acls.sh ACL 权限管理
kafka-get-offsets.sh 获取 offset(新版本)
mirror-maker.sh 集群跨 Region 同步