TOPIC

说明:

  1. Kafka使用版本:2.12-0.10.2.2(instruction 服务使用的此版本)
  2. kafka2.2.0版本以上, 使用--bootstrap-server 127.0.0.1:9092 连接,--zookeepker 127.0.0.1:2181已经废弃

新建topic

bin/kafka-topics.sh --create --replication-factor 1 --partitions 2 --zookeeper 127.0.0.1:2181 --topic kafka-topic-demo
# kafka版本>=2.2.0
bin/kafka-topics.sh --create --replication-factor 1 --partitions 2 --bootstrap-server 127.0.0.1:9092 --topic kafka-topic-demo

查看 topic 列表

bin/kafka-topics.sh  --zookeeper 127.0.0.1:2181 --list
# kafka版本>=2.2.0
bin/kafka-topics.sh  --zookeeper 127.0.0.1:2181 --list

删除 某个topic

bin/kafka-topics.sh  --zookeeper 127.0.0.1:2181 --delete -topic kafka-topic-demo
# kafka版本>=2.2.0
bin/kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --delete -topic kafka-topic-demo

查看某个 topic 详情

bin/kafka-topics.sh  --zookeeper 127.0.0.1:2181 --describe --topic kafka-topic-demo 
# kafka版本>=2.2.0
bin/kafka-topics.sh  --bootstrap-server 127.0.0.1:9092 --describe -topic kafka-topic-demo

查看指定topic上每个partition的offset

 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic kafka-topic-demo

单个topic增加partition

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic kafka-topic-demo --partitions 4
# kafka版本>=2.2.0
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic kafka-topic-demo --partitions 4

批量增加topic partition

sh bin/kafka-topics.sh --topic "kafka-topic-demo*" --zookeeper 127.0.0.1:2181 --alter --partitions 4
# kafka版本>=2.2.0
bin/kafka-topics.sh --topic "kafka-topic-demo*" --bootstrap-server 127.0.0.1:9092 --alter --partitions 5

查看某个 topic 的 message 数量

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic kafka-topic-demo

生产者(producer)

生产数据

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafka-topic-demo
# kafka版本>=2.5.0
bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka-topic-demo

消费者(consumer)

消费数据

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka-topic-demo --from-beginning
#kafka 版本>=2.2.0
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-topic-demo --from-beginning

查看特定consumer group 详情

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-5354 --describe

查看group 列表(zk 里的group 列表有问题,待查询)

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
# kafka版本>=2.2.0
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

删除group

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-5354 --delete

kafka自带压测(未验证过)

bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100  --producer-props bootstrap.servers=localhost:9092