1. 概述
Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。其具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
笔者之前在物联网公司工作,其中Kafka作为物联网MQ选型的事实标准,这里优先给大家搭建Kafka集群环境。由于Kafka的安装需要依赖Zookeeper,对Zookeeper还不了解的小伙伴可以在 这里 先认识下Zookeeper。
Kafka能解决什么问题呢?先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个:解耦、异步、削峰。
2. Kafka基本概念
Kafka部分名词解释如下:
Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
Segment:partition物理上由多个segment组成,下面有详细说明。
offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.每个partition中的消息都由offset=0开始记录消息。
3. Docker环境搭建
配合上一节的Zookeeper环境,计划搭建一个3节点的集群。宿主机IP为 192.168.124.5
。
docker-compose-kafka-cluster.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 version: '3.7' networks: docker_net: external: true services: kafka1: image: wurstmeister/kafka restart: unless-stopped container_name: kafka1 ports: - "9093:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 1 KAFKA_ADVERTISED_HOST_NAME: 192.168 .124 .5 KAFKA_ADVERTISED_PORT: 9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093 KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka1/docker.sock:/var/run/docker.sock" - "./kafka/kafka1/data/:/kafka" networks: - docker_net kafka2: image: wurstmeister/kafka restart: unless-stopped container_name: kafka2 ports: - "9094:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 2 KAFKA_ADVERTISED_HOST_NAME: 192.168 .124 .5 KAFKA_ADVERTISED_PORT: 9094 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094 KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka2/docker.sock:/var/run/docker.sock" - "./kafka/kafka2/data/:/kafka" networks: - docker_net kafka3: image: wurstmeister/kafka restart: unless-stopped container_name: kafka3 ports: - "9095:9092" external_links: - zoo1 - zoo2 - zoo3 environment: KAFKA_BROKER_ID: 3 KAFKA_ADVERTISED_HOST_NAME: 192.168 .124 .5 KAFKA_ADVERTISED_PORT: 9095 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095 KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181" volumes: - "./kafka/kafka3/docker.sock:/var/run/docker.sock" - "./kafka/kafka3/data/:/kafka" networks: - docker_net kafka-manager: image: sheepkiller/kafka-manager:latest restart: unless-stopped container_name: kafka-manager hostname: kafka-manager ports: - "9000:9000" links: - kafka1 - kafka2 - kafka3 external_links: - zoo1 - zoo2 - zoo3 environment: ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 TZ: CST-8 networks: - docker_net
执行以下命令启动
1 docker-compose -f docker-compose-kafka-cluster.yml up -d
可以看到kafka集群已经启动成功。
4. Kafka初认识
4.1 可视化管理
细心的小伙伴发现上边的配置除了kafka外还有一个kafka-manager模块。它是kafka的可视化管理模块。因为kafka的元数据、配置信息由Zookeeper管理,这里我们在UI页面做下相关配置。
1. 访问 http:localhost:9000 ,按图示添加相关配置
2. 配置后我们可以看到默认有一个topic(__consumer_offsets),3个brokers。该topic分50个partition,用于记录kafka的消费偏移量。
4.2 Zookeeper在kafka环境中做了什么
1. 首先观察下根目录
kafka基于zookeeper,kafka启动会将元数据保存在zookeeper中。查看zookeeper节点目录,会发现多了很多和kafka相关的目录。结果如下:
1 2 3 4 5 6 7 8 9 10 ➜ docker zkCli -server 127.0 .0.1 :2183 Connecting to 127.0 .0.1 :2183 Welcome to ZooKeeper! JLine support is enabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: 127.0 .0.1 :2183 (CONNECTED) 0 ] ls / [cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config]
2. 查看我们映射的kafka目录,新版本的kafka偏移量不再存储在zk中,而是在kafka自己的环境中。
我们节选了部分目录(包含2个partition)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ├── kafka1 │ ├── data │ │ └── kafka-logs-c4e2e9edc235 │ │ ├── __consumer_offsets-1 │ │ │ ├── 00000000000000000000.index // segment索引文件 │ │ │ ├── 00000000000000000000.log // 数据文件 │ │ │ ├── 00000000000000000000.timeindex // 消息时间戳索引文件 │ │ │ └── leader-epoch-checkpoint ... │ │ ├── __consumer_offsets-7 │ │ │ ├── 00000000000000000000.index │ │ │ ├── 00000000000000000000.log │ │ │ ├── 00000000000000000000.timeindex │ │ │ └── leader-epoch-checkpoint │ │ ├── cleaner-offset-checkpoint │ │ ├── log-start-offset-checkpoint │ │ ├── meta.properties │ │ ├── recovery-point-offset-checkpoint │ │ └── replication-offset-checkpoint │ └── docker.sock
结果与Kafka-Manage显示结果一致。图示的文件是一个Segment,00000000000000000000.log表示offset从0开始,随着数据不断的增加,会有多个Segment文件。
5. docker创建topic
1 2 3 4 5 6 ➜ docker docker exec -it kafka1 /bin/bash bash-4.4 bash-4.4 __consumer_offsets bash-4.4 Created topic test .
说明:
–replication-factor副本数;
–partitions分区数;
replication<=broker(一定);
有效消费者数<=partitions分区数(一定);
新建主题后, 再次查看映射目录, 由图可见,partition在3个broker上均匀分布。图示同步副本表示当前和主节点数据保持同步的副本数。kafka服务端可以设置 min.insync.replicas
参数(最小同步副本)。这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower (一种保证消息可靠不丢失的策略)。
ISR是由leader维护,follower从leader同步数据有一些延迟 (包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度), 任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
docker中的data路径示例: /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log
6. 官方工具
brew安装的kafka指令路径 /usr/local/Cellar/kafka/2.6.0_1/bin
真正的脚本路径 /usr/local/Cellar/kafka/2.6.0_1/libexec/bin
配置文件路径 /usr/local/etc/kafka
数据存储在 /usr/local/var/lib/kafka-logs
目录下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 ├── connect-distributed ├── connect-mirror-maker ├── connect-standalone ├── kafka-acls ├── kafka-broker-api-versions ├── kafka-configs ├── kafka-console-consumer ├── kafka-console-producer ├── kafka-consumer-groups ├── kafka-consumer-perf-test ├── kafka-delegation-tokens ├── kafka-delete-records ├── kafka-dump-log ├── kafka-leader-election ├── kafka-log-dirs ├── kafka-mirror-maker ├── kafka-preferred-replica-election ├── kafka-producer-perf-test ├── kafka-reassign-partitions ├── kafka-replica-verification ├── kafka-run-class ├── kafka-server-start ├── kafka-server-stop ├── kafka-streams-application-reset ├── kafka-topics ├── kafka-verifiable-consumer ├── kafka-verifiable-producer ├── trogdor ├── zookeeper-security-migration ├── zookeeper-server-start ├── zookeeper-server-stop └── zookeeper-shell
运维管理类
kafka-topics.sh
: 用来创建,删除,查看,改变一个topic参数的工具
kafka-reassign-partitions.sh
: 用来对partition进行重新分配(管理员会较多使用)
kafka-log-dirs.sh
: 用来查看指定broker下日志目录的使用空间
kafka-leader-election.sh
: 用于一组Topic分区的leader重新分配,可以支持优先副本和非同步副本(不在ISR中),老版本中的kafka-preferred-replica-election.sh脚本
kafka-replica-verification.sh
: 该工具可以用来检查topic的一组副本的数据是否一致
kafka-broker-api-versions.sh
: 用来查看指定broker当前支持的各个接口的版本(kafka高版本已经保证了向下兼容)
kafka-configs.sh
: 用来操作和查看topic, client, user or broker的实体配置
kafka操作类
kafka-console-consumer.sh
: 通过终端来启动消费者
kafka-console-producer.sh
: 通过终端来启动生产者
kafka-consumer-groups.sh
: 用来查看,删除或者重置消费者组offset
kafka-consumer-perf-test.sh
: 用来进行消费者压力测试
kafka-producer-perf-test.sh
: 用来进行生产者压力测试
kafka-delete-records.sh
: 删除指定分区的记录,直到指定的offset
kafka-mirror-maker.sh
: 用于多集群之间同步topic数据
kafka-server-start.sh
: broker启动脚本
kafka-server-stop.sh
: broker关闭脚本
kafka-streams-application-reset.sh
: 流式应用工具
zookeeper-shell.sh
: kafka工具中也默认提供了zookeeper管理工具(不太好用)
topic创建
--create
: 创建topic
--replication-factor
: 指定副本数量(仅在创建时可用)
--config
: 指定topic级别的参数(动态参数,可修改)
--replica-assignment
: 手动指定partition到broker的分配<part1_replica1:part1_replica2,part2_replica1:part2_replica2>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 # 创建topic ➜ bin ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test Created topic test. # 查看默认创建topic的参数详情(由broker配置决定) # 默认1个分区,1个副本 ➜ bin ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1 # 指定参数创建topic, 2个分片, 1个副本 # 指定分区partitions为2,副本replication-factor为1,topic数据保留2分钟 # replication<=broker(一定); # 有效消费者数<=partitions分区数(一定); ➜ bin ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test1 --partitions 2 --replication-factor 1 --config retention.ms=120000 Created topic test1. # 分区,副本和指定参数都改变了,2个分片, 1个副本 ➜ bin ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test1 Topic: test1 PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=120000 Topic: test1 Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic: test1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1 # 3个broker, 设置3个分片, 2个副本(1主1从), 可以看到分布相当均匀 bash-5.1# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test2 --partitions 3 --replication-factor 2 --config retention.ms=120000 Created topic test2. bash-5.1# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test2 Topic: test2 PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824,retention.ms=120000 Topic: test2 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Topic: test2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test2 Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3 # 3个broker, 设置3个分片, 1个副本(1个leader) bash-5.1# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 1 --partitions 3 --topic test3 Created topic test3. bash-5.1# bash-5.1# ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test3 Topic: test3 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: test3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2 Topic: test3 Partition: 1 Leader: 3 Replicas: 3 Isr: 3 Topic: test3 Partition: 2 Leader: 1 Replicas: 1 Isr: 1 # 查看topic列表 ➜ bin ./kafka-topics.sh --list --zookeeper 127.0.0.1:2181 __consumer_offsets test test1 test2 # 删除topic ➜ bin ./kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # 查看集群leader不可用分区 ➜ bin ./kafka-topics.sh --zookeeper 127.0.0.1:2181/test --describe --unavailable-partitions # 查看副本不同步的分区详情 ➜ bin ./kafka-topics.sh --zookeeper 127.0.0.1:2181/test --describe --under-replicated-partitions
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 # 查看topic偏移量 ➜ bin ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test test:0:7 # 查看topic指定分区offset的最大值 # -1为最大值 ➜ bin ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test --time -1 test:0:7 # 查看topic指定分区offset的最小值 # -2为最小值 ➜ bin ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test --time -2 test:0:0 # 查看.log数据文件 ➜ bin ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.log --print-data-log Dumping /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.log Starting offset: 0 baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1607746708893 size: 78 magic: 2 compresscodec: NONE crc: 3385071347 isvalid: true | offset: 0 CreateTime: 1607746708347 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 1 | offset: 1 CreateTime: 1607746708893 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 2 baseOffset: 2 lastOffset: 4 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 78 CreateTime: 1607746710358 size: 87 magic: 2 compresscodec: NONE crc: 2415539616 isvalid: true | offset: 2 CreateTime: 1607746709397 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 3 | offset: 3 CreateTime: 1607746709874 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 4 | offset: 4 CreateTime: 1607746710358 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 5 baseOffset: 5 lastOffset: 6 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 165 CreateTime: 1607746712081 size: 78 magic: 2 compresscodec: NONE crc: 449097919 isvalid: true | offset: 5 CreateTime: 1607746711241 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 6 | offset: 6 CreateTime: 1607746712081 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 7 baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 243 CreateTime: 1607750506795 size: 69 magic: 2 compresscodec: NONE crc: 1201654216 isvalid: true | offset: 7 CreateTime: 1607750506795 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: a baseOffset: 8 lastOffset: 9 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 312 CreateTime: 1607750508922 size: 78 magic: 2 compresscodec: NONE crc: 2249065123 isvalid: true | offset: 8 CreateTime: 1607750508188 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: b | offset: 9 CreateTime: 1607750508922 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: c baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 390 CreateTime: 1607750511875 size: 69 magic: 2 compresscodec: NONE crc: 926719390 isvalid: true | offset: 10 CreateTime: 1607750511875 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: d # 查看.index索引文件 ➜ bin ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.index Dumping /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.index offset: 0 position: 0 Mismatches in :/usr/local/var/lib/kafka-logs/test-0/00000000000000000000.index Index offset: 0, log offset: 1 # 查看.timeindex索引文件 ➜ bin ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.timeindex --verify-index-only Dumping /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.timeindex The following indexed offsets are not found in the log. Indexed offset: 0, found log offset: 1
1 2 # 生产消息 ➜ bin ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
1 2 3 4 5 6 7 8 9 10 # 消费消息 ➜ bin ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning # 指定offset消费消息 # 指定offset=2, 分片0, 最多消费3条 ➜ bin ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --offset 2 --partition 0 --max-messages 3 3 4 5 Processed a total of 3 messages
消费组管理工具,可以列出所有的消费组,查看消费组详情,删除消费组信息以及重置消费组的offset
--all-groups
: 应用到所有的消费组
--all-topics
:
--delete
: 删除topic分区的offset,以及拥有者和消费组信息(–group g1 –group g2)
--delete-offsets
: 删除消费组的offset
--describe
: 查看消费组信息以及消费者的offset lag
--execute
: 指定操作,支持reset-offsets
操作
--export
: 导出操作执行到csv,支持reset-offsets
--from-file
: 指定文件中指定的值重置offset (csv文件)
--group
: 指定消费组
--list
: 列出所有的消费组
--members
: 查看消费组中的成员
--state
: 查看消费组的状态
--offsets
: 查看消费组并且列出每个消费组所有topic的分区以及消息的offset lag
--reset-offsets
: 重置消费组的offset (需要指定如下一个参数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 # 查看分组列表 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list console-consumer-10023 console-consumer-20994 # 查看消费组的成员 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-32644 --members --describe Consumer group 'console-consumer-32644' has no active members. # 查看消费组的offset信息 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-32644 --offsets --describe Consumer group 'console-consumer-32644' has no active members. # 查看消费组状态 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-32644 --state --describe Consumer group 'console-consumer-32644' has no active members. GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS console-consumer-32644 localhost:9092 (0) Empty 0 # 查看group详情 # CURRENT-OFFSET: 当前消费者群组最近提交的 offset,也就是消费者分区里读取的当前位置 # LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量 # LAG:消费者的 CURRENT-OFFSET 与 broker 的 LOG-END-OFFSET 之间的差距 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --describe Consumer group 'console-consumer-20994' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID console-consumer-20994 test 0 4 7 3 - - - # 重新设置设置group的offset # 指定group=console-consumer-20994, 重置offset=4 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --reset-offsets --topic test --to-offset 4 --execute GROUP TOPIC PARTITION NEW-OFFSET console-consumer-20994 test 0 4 # 把group=console-consumer-20994在topic=test上的offcet恢复到最初 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --topic test --reset-offsets --to-earliest –execute WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting. GROUP TOPIC PARTITION NEW-OFFSET console-consumer-20994 test 0 0 # 重置Consumer Group的Offset到最新位移 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --reset-offsets --topic test --to-latest --execute GROUP TOPIC PARTITION NEW-OFFSET console-consumer-20994 test 0 7 # 把offcet从当前位置往前移动2个,如果是正数就是往后移动。 ➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --topic test --reset-offsets --shift-by -2 --execute GROUP TOPIC PARTITION NEW-OFFSET console-consumer-20994 test 0 5
1 2 3 4 5 # 查看kafka各个broker节点以及topic的磁盘使用率情况 ➜ bin ./kafka-log-dirs.sh --bootstrap-server 127.0.0.1:9092 --describe --topic-list test Querying brokers for log directories information Received log directory information from brokers 0 {"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/usr/local/var/lib/kafka-logs","error":null,"partitions":[{"partition":"test-0","size":243,"offsetLag":0,"isFuture":false}]}]}]}
用来查看和修改kafka相关的配置信息,包含集群的动态配置,topic级别的动态配置等等
--all
: 列出给定实体的全部配置文件(默认已经生效的全部参数,如果没有all仅对动态参数生效)
--entity-type
: 实体类型[topics/clients/users/brokers/broker-loggers]
--entity-name
: 实体名称[topic名称/client-id/user-name/broker-id]
--describe
: 列出给定实体的配置文件
--force
: 强制生效
--topic
: 指定topic名称
--alter
: 修改指定实体的配置文件 注意:当使用delete-config和add-config时必须使用--alter
--delete-config
: 删除指定的配置"k1,k2"
--add-config
: 给指定的实体增加配置(k=v,k2=[v1,v2,v3],k3=v3)
topic级别的动态参数
cleanup.policy
: 清理策略
compression.type
: 压缩类型(通常建议在produce端控制)
delete.retention.ms
: 压缩日志的保留时间
flush.messages
: 持久化message限制
flush.ms
: 持久化频率
follower.replication.throttled.replicas
: follower副本限流
leader.replication.throttled.replicas
: leader副本限流
max.message.bytes
: 最大的batch的message大小
message.downconversion.enable
: message向下兼容
message.format.version
: message格式版本
min.insync.replicas
: 最小的ISR
retention.ms
: 日志保留时间
retention.bytes
: 日志保留大小(通常按照时间限制)
segment.bytes
: segment的大小限制
segment.ms
: segment的切割时间
unclean.leader.election.enable
: 是否允许非同步副本选主(针对可用性设置的一个参数)
broker级别的动态参数
broker级别的动态参数比较多,这里只列举常用的几个
log.retention.ms
: 日志保留时间
max.connections
: 最大连接数
max.connections.per.ip
: 每个ip的最大连接数
message.max.bytes
: batch的message的最大限制
min.insync.replicas
: 最小的ISR
num.io.threads
: IO线程数(网络线程数的两倍)
num.network.threads
: 网络线程数(cpu的2/3较好)
num.recovery.threads.per.data.dir
: 每个数据目录的恢复线程
num.replica.fetchers
: 副本的fetchers数量(默认为1,可适当调大)
user级别的参数
SCRAM-SHA-256
:
SCRAM-SHA-512
:
consumer_byte_rate
: 针对消费者user进行限流
producer_byte_rate
: 针对生产者进行限流
request_percentage
: 请求百分比
clients级别参数
consumer_byte_rate
: 针对消费者user进行限流
producer_byte_rate
: 针对生产者进行限流
request_percentage
: 请求百分比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # 修改topic的数据保留时间 ➜ bin ./kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic test --add-config retention.ms=10000000 --alter Completed updating config for topic test. # 查看topic的动态参数配置 ➜ bin ./kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic test --describe Dynamic configs for topic test are: retention.ms=10000000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=10000000} # 删除topic动态参数配置 ➜ bin ./kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic test --alter --delete-config retention.ms Completed updating config for topic test. 查看broker全部的参数(--all会获取全部的参数) ➜ bin ./kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --all --broker-defaults --describe Default configs for brokers in the cluster are:
查看kafka对外的各个api版本.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 # 查看当前kafka版本 ➜ bin ./kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092 --version 2.6.0 (Commit:62abe01bee039651) # 查看集群所有节点的api版本 ➜ bin ./kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092 localhost:9092 (id: 0 rack: null) -> ( Produce(0): 0 to 8 [usable: 8], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4], StopReplica(5): 0 to 3 [usable: 3], UpdateMetadata(6): 0 to 6 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 7 [usable: 7], FindCoordinator(10): 0 to 3 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 4], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 2], InitProducerId(22): 0 to 3 [usable: 3], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 3 [usable: 3], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 2], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 2 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], DescribeClientQuotas(48): 0 [usable: 0], AlterClientQuotas(49): 0 [usable: 0]
7. SpringBoot集成
笔者SpringBoot版本是 2.2.2.RELEASE
pom.xml添加依赖
1 2 3 4 5 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > <version > 2.4.0.RELEASE</version > </dependency >
生产者配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Configuration public class KafkaProducerConfig { public Map<String, Object> producerConfigs () { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095" ); props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); props.put(ProducerConfig.ACKS_CONFIG,"all" ); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000 ); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096 ); props.put(ProducerConfig.LINGER_MS_CONFIG, 1 ); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960 ); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576 ); props.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.topinfo" ); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none" ); return props; } public ProducerFactory<String, String> producerFactory () { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean(name="kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate () { return new KafkaTemplate<>(producerFactory()); } }
消费者配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 @Configuration public class KafkaConsumerConfig { private static final String GROUP0_ID = "group0" ; private static final String GROUP1_ID = "group1" ; @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true ); factory.setConcurrency(4 ); factory.getContainerProperties().setPollTimeout(3000 ); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory () { Map<String, Object> map = consumerConfigs(); map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP0_ID); return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs () { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095" ); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-default" ); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest" ); return props; } }
主题配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Configuration public class KafkaTopicConfig { @Bean public KafkaAdmin kafkaAdmin () { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095" ); return new KafkaAdmin(configs); } @Bean public NewTopic topicinfo () { return new NewTopic("test" , 3 , (short ) 2 ); } }
消费者服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 @Slf4j @Service public class KafkaConsumerService { @KafkaListener(id = "id7", topics = {Constant.TOPIC}, groupId = "group7") public boolean consumer4 (List<ConsumerRecord<?, ?>> data) { for (int i=0 ; i<data.size(); i++) { ConsumerRecord<?, ?> record = data.get(i); Optional<?> kafkaMessage = Optional.ofNullable(record.value()); Long threadId = Thread.currentThread().getId(); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("consumer:group7 --> message:{}, topic:{}, partition:{}, key:{}, offset:{}, threadId:{}" , message.toString(), record.topic(), record.partition(), record.key(), record.offset(), threadId); } } return true ; } }
手动消费模型
模型一: 每个线程维护1个KafkaConsumer, 可以保证消息partition有序, 增加消费速率需要增加partition, 会触发rebalance
模型二: 拉取线程(1个KafkaConsumer)+工作线程(线程池), 难以维护分区内的消息顺序, 可以在不用增加Partition数量的情况下,只需要增加工作线程数量,便可进一步提升Kafka客户端的并行消费能力
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public void init () { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.GROUP_ID_CONFIG, group); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true" ); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000" ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "20" ); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, String.valueOf(5 * 1024 * 1024 )); consumer = new KafkaConsumer<>(props); executorService = Executors.newSingleThreadExecutor(); executorService.execute(() -> { List<String> topics = new ArrayList<>(); String[] arr = topicNames.split("," ); for (String topic : arr) { topics.add(topic.trim()); } consumer.subscribe(topics); Set<TopicPartition> partitions = consumer.assignment(); while (!stoped) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { String json = record.value(); } } }); }
生产者服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 @Service public class KafkaProducerService { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessageSync (String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException { kafkaTemplate.send(topic, message).get(10 , TimeUnit.SECONDS); } public void sendMessageAsync (String topic, String message) { ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message); ListenableFutureCallback listenableFutureCallback = new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onSuccess (SendResult<Integer, String> result) { System.out.println("success" ); } @Override public void onFailure (Throwable ex) { System.out.println("failure" ); } }; future.addCallback(listenableFutureCallback); } public void test (String topic, Integer partition, String key, String message) throws InterruptedException, ExecutionException, TimeoutException { kafkaTemplate.send(topic, partition, key, message).get(10 , TimeUnit.SECONDS); } }
web测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @RestController public class KafkaProducerController { @Autowired private KafkaProducerService producerService; @GetMapping("/sync") public void sendMessageSync (@RequestParam String topic) throws InterruptedException, ExecutionException, TimeoutException { producerService.sendMessageSync(topic, null , "同步发送消息测试" ); } @GetMapping("/async") public void sendMessageAsync () { producerService.sendMessageAsync("test" ,"异步发送消息测试" ); } @GetMapping("/test") public void test (@RequestParam String topic, @RequestParam(required = false) Integer partition, @RequestParam(required = false) String key, @RequestParam String message) throws InterruptedException, ExecutionException, TimeoutException { producerService.test(topic, partition, key, message); } }
消费太慢
考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
消费太快
调整参数:
fetch.max.bytes
单次获取数据的最大消息数。
max.poll.records <= 吞吐量
单次poll调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。
一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
默认值为500
consumer.poll(1000) 重要参数
新版本的Consumer的Poll方法使用了类似于Select I/O机制,因此所有相关事件(包括reblance,消息获取等)都发生在一个事件循环之中。
1000是一个超时时间,一旦拿到足够多的数据(参数设置),consumer.poll(1000)会立即返回 ConsumerRecords<String, String> records。
如果没有拿到足够多的数据,会阻塞1000ms,但不会超过1000ms就会返回。
7. AD
如果您觉得写的还不错,请关注公众号 【当我遇上你】, 您的支持是我最大的动力。
参考