Docker实战之Kafka集群

1. 概述

Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。其具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

笔者之前在物联网公司工作,其中Kafka作为物联网MQ选型的事实标准,这里优先给大家搭建Kafka集群环境。由于Kafka的安装需要依赖Zookeeper,对Zookeeper还不了解的小伙伴可以在 这里 先认识下Zookeeper。

Kafka能解决什么问题呢?先说一下消息队列常见的使用场景吧,其实场景有很多,但是比较核心的有 3 个:解耦、异步、削峰。

2. Kafka基本概念

Kafka部分名词解释如下:

  • Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
  • Segment:partition物理上由多个segment组成,下面有详细说明。
  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.每个partition中的消息都由offset=0开始记录消息。

3. Docker环境搭建

配合上一节的Zookeeper环境,计划搭建一个3节点的集群。宿主机IP为 192.168.124.5

docker-compose-kafka-cluster.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
version: '3.7'

networks:
docker_net:
external: true

services:

kafka1:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka1
ports:
- "9093:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9093 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9093 ## 绑定发布订阅的端口。修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka1/docker.sock:/var/run/docker.sock"
- "./kafka/kafka1/data/:/kafka"
networks:
- docker_net


kafka2:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka2
ports:
- "9094:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9094 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9094 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka2/docker.sock:/var/run/docker.sock"
- "./kafka/kafka2/data/:/kafka"
networks:
- docker_net

kafka3:
image: wurstmeister/kafka
restart: unless-stopped
container_name: kafka3
ports:
- "9095:9092"
external_links:
- zoo1
- zoo2
- zoo3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 ## 修改:宿主机IP
KAFKA_ADVERTISED_PORT: 9095 ## 修改:宿主机映射port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9095 ## 修改:宿主机IP
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"
volumes:
- "./kafka/kafka3/docker.sock:/var/run/docker.sock"
- "./kafka/kafka3/data/:/kafka"
networks:
- docker_net

kafka-manager:
image: sheepkiller/kafka-manager:latest
restart: unless-stopped
container_name: kafka-manager
hostname: kafka-manager
ports:
- "9000:9000"
links: # 连接本compose文件创建的container
- kafka1
- kafka2
- kafka3
external_links: # 连接本compose文件以外的container
- zoo1
- zoo2
- zoo3
environment:
ZK_HOSTS: zoo1:2181,zoo2:2181,zoo3:2181 ## 修改:宿主机IP
TZ: CST-8
networks:
- docker_net

执行以下命令启动

1
docker-compose -f docker-compose-kafka-cluster.yml up -d

可以看到kafka集群已经启动成功。

4. Kafka初认识

4.1 可视化管理

细心的小伙伴发现上边的配置除了kafka外还有一个kafka-manager模块。它是kafka的可视化管理模块。因为kafka的元数据、配置信息由Zookeeper管理,这里我们在UI页面做下相关配置。

1. 访问 http:localhost:9000,按图示添加相关配置

2. 配置后我们可以看到默认有一个topic(__consumer_offsets),3个brokers。该topic分50个partition,用于记录kafka的消费偏移量。

4.2 Zookeeper在kafka环境中做了什么

1. 首先观察下根目录

kafka基于zookeeper,kafka启动会将元数据保存在zookeeper中。查看zookeeper节点目录,会发现多了很多和kafka相关的目录。结果如下:

1
2
3
4
5
6
7
8
9
10
➜  docker zkCli -server 127.0.0.1:2183
Connecting to 127.0.0.1:2183
Welcome to ZooKeeper!
JLine support is enabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: 127.0.0.1:2183(CONNECTED) 0] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, controller_epoch, zk-test0000000000, kafka-manager, consumers, latest_producer_id_block, config]

2. 查看我们映射的kafka目录,新版本的kafka偏移量不再存储在zk中,而是在kafka自己的环境中。

我们节选了部分目录(包含2个partition)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
├── kafka1
│   ├── data
│   │   └── kafka-logs-c4e2e9edc235
│   │   ├── __consumer_offsets-1
│   │   │   ├── 00000000000000000000.index // segment索引文件
│   │   │   ├── 00000000000000000000.log // 数据文件
│   │   │   ├── 00000000000000000000.timeindex // 消息时间戳索引文件
│   │   │   └── leader-epoch-checkpoint
...
│   │   ├── __consumer_offsets-7
│   │   │   ├── 00000000000000000000.index
│   │   │   ├── 00000000000000000000.log
│   │   │   ├── 00000000000000000000.timeindex
│   │   │   └── leader-epoch-checkpoint
│   │   ├── cleaner-offset-checkpoint
│   │   ├── log-start-offset-checkpoint
│   │   ├── meta.properties
│   │   ├── recovery-point-offset-checkpoint
│   │   └── replication-offset-checkpoint
│   └── docker.sock

结果与Kafka-Manage显示结果一致。图示的文件是一个Segment,00000000000000000000.log表示offset从0开始,随着数据不断的增加,会有多个Segment文件。

5. docker创建topic

1
2
3
4
5
6
➜  docker docker exec -it kafka1 /bin/bash   # 进入容器
bash-4.4# cd /opt/kafka/ # 进入安装目录
bash-4.4# ./bin/kafka-topics.sh --list --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 # 查看主题列表
__consumer_offsets
bash-4.4# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 2 --partitions 3 --topic test # 新建主题
Created topic test.

说明:
–replication-factor副本数;
–partitions分区数;
replication<=broker(一定);
有效消费者数<=partitions分区数(一定);

新建主题后, 再次查看映射目录, 由图可见,partition在3个broker上均匀分布。图示同步副本表示当前和主节点数据保持同步的副本数。kafka服务端可以设置 min.insync.replicas 参数(最小同步副本)。这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower (一种保证消息可靠不丢失的策略)。

ISR是由leader维护,follower从leader同步数据有一些延迟 (包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度), 任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

docker中的data路径示例: /kafka/kafka-logs-c4e2e9edc235/test-0/00000000000000000000.log

6. 官方工具

brew安装的kafka指令路径 /usr/local/Cellar/kafka/2.6.0_1/bin

真正的脚本路径 /usr/local/Cellar/kafka/2.6.0_1/libexec/bin

配置文件路径 /usr/local/etc/kafka

数据存储在 /usr/local/var/lib/kafka-logs 目录下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
├── connect-distributed
├── connect-mirror-maker
├── connect-standalone
├── kafka-acls
├── kafka-broker-api-versions
├── kafka-configs
├── kafka-console-consumer
├── kafka-console-producer
├── kafka-consumer-groups
├── kafka-consumer-perf-test
├── kafka-delegation-tokens
├── kafka-delete-records
├── kafka-dump-log
├── kafka-leader-election
├── kafka-log-dirs
├── kafka-mirror-maker
├── kafka-preferred-replica-election
├── kafka-producer-perf-test
├── kafka-reassign-partitions
├── kafka-replica-verification
├── kafka-run-class
├── kafka-server-start
├── kafka-server-stop
├── kafka-streams-application-reset
├── kafka-topics
├── kafka-verifiable-consumer
├── kafka-verifiable-producer
├── trogdor
├── zookeeper-security-migration
├── zookeeper-server-start
├── zookeeper-server-stop
└── zookeeper-shell

运维管理类

  • kafka-topics.sh: 用来创建,删除,查看,改变一个topic参数的工具
  • kafka-reassign-partitions.sh: 用来对partition进行重新分配(管理员会较多使用)
  • kafka-log-dirs.sh: 用来查看指定broker下日志目录的使用空间
  • kafka-leader-election.sh: 用于一组Topic分区的leader重新分配,可以支持优先副本和非同步副本(不在ISR中),老版本中的kafka-preferred-replica-election.sh脚本
  • kafka-replica-verification.sh: 该工具可以用来检查topic的一组副本的数据是否一致
  • kafka-broker-api-versions.sh: 用来查看指定broker当前支持的各个接口的版本(kafka高版本已经保证了向下兼容)
  • kafka-configs.sh: 用来操作和查看topic, client, user or broker的实体配置

kafka操作类

  • kafka-console-consumer.sh: 通过终端来启动消费者
  • kafka-console-producer.sh: 通过终端来启动生产者
  • kafka-consumer-groups.sh: 用来查看,删除或者重置消费者组offset
  • kafka-consumer-perf-test.sh: 用来进行消费者压力测试
  • kafka-producer-perf-test.sh: 用来进行生产者压力测试
  • kafka-delete-records.sh: 删除指定分区的记录,直到指定的offset
  • kafka-mirror-maker.sh: 用于多集群之间同步topic数据
  • kafka-server-start.sh: broker启动脚本
  • kafka-server-stop.sh: broker关闭脚本
  • kafka-streams-application-reset.sh: 流式应用工具
  • zookeeper-shell.sh: kafka工具中也默认提供了zookeeper管理工具(不太好用)

kafka-topics.sh

topic创建

  • --create: 创建topic
    • --topic: 指定topic名称
    • --partitions: 指定分区数量
    • --replication-factor: 指定副本数量(仅在创建时可用)
    • --config: 指定topic级别的参数(动态参数,可修改)
    • --replica-assignment: 手动指定partition到broker的分配<part1_replica1:part1_replica2,part2_replica1:part2_replica2>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 创建topic
➜ bin ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test
Created topic test.

# 查看默认创建topic的参数详情(由broker配置决定)
# 默认1个分区,1个副本
➜ bin ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1

# 指定参数创建topic, 2个分片, 1个副本
# 指定分区partitions为2,副本replication-factor为1,topic数据保留2分钟
# replication<=broker(一定);
# 有效消费者数<=partitions分区数(一定);
➜ bin ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test1 --partitions 2 --replication-factor 1 --config retention.ms=120000
Created topic test1.

# 分区,副本和指定参数都改变了,2个分片, 1个副本
➜ bin ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test1
Topic: test1 PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=120000
Topic: test1 Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic: test1 Partition: 1 Leader: 1 Replicas: 1 Isr: 1

# 3个broker, 设置3个分片, 2个副本(1主1从), 可以看到分布相当均匀
bash-5.1# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test2 --partitions 3 --replication-factor 2 --config retention.ms=120000
Created topic test2.

bash-5.1# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test2
Topic: test2 PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824,retention.ms=120000
Topic: test2 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: test2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test2 Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3

# 3个broker, 设置3个分片, 1个副本(1个leader)
bash-5.1# ./bin/kafka-topics.sh --create --zookeeper zoo1:2181,zoo2:2181,zoo3:2181 --replication-factor 1 --partitions 3 --topic test3
Created topic test3.
bash-5.1#
bash-5.1# ./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test3
Topic: test3 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test3 Partition: 1 Leader: 3 Replicas: 3 Isr: 3
Topic: test3 Partition: 2 Leader: 1 Replicas: 1 Isr: 1

# 查看topic列表
➜ bin ./kafka-topics.sh --list --zookeeper 127.0.0.1:2181
__consumer_offsets
test
test1
test2

# 删除topic
➜ bin ./kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# 查看集群leader不可用分区
➜ bin ./kafka-topics.sh --zookeeper 127.0.0.1:2181/test --describe --unavailable-partitions

# 查看副本不同步的分区详情
➜ bin ./kafka-topics.sh --zookeeper 127.0.0.1:2181/test --describe --under-replicated-partitions

kafka-run-class.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 查看topic偏移量
➜ bin ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test
test:0:7

# 查看topic指定分区offset的最大值
# -1为最大值
➜ bin ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test --time -1
test:0:7

# 查看topic指定分区offset的最小值
# -2为最小值
➜ bin ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic test --time -2
test:0:0


# 查看.log数据文件
➜ bin ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.log --print-data-log
Dumping /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1607746708893 size: 78 magic: 2 compresscodec: NONE crc: 3385071347 isvalid: true
| offset: 0 CreateTime: 1607746708347 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 1
| offset: 1 CreateTime: 1607746708893 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 2
baseOffset: 2 lastOffset: 4 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 78 CreateTime: 1607746710358 size: 87 magic: 2 compresscodec: NONE crc: 2415539616 isvalid: true
| offset: 2 CreateTime: 1607746709397 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 3
| offset: 3 CreateTime: 1607746709874 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 4
| offset: 4 CreateTime: 1607746710358 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 5
baseOffset: 5 lastOffset: 6 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 165 CreateTime: 1607746712081 size: 78 magic: 2 compresscodec: NONE crc: 449097919 isvalid: true
| offset: 5 CreateTime: 1607746711241 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 6
| offset: 6 CreateTime: 1607746712081 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: 7
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 243 CreateTime: 1607750506795 size: 69 magic: 2 compresscodec: NONE crc: 1201654216 isvalid: true
| offset: 7 CreateTime: 1607750506795 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: a
baseOffset: 8 lastOffset: 9 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 312 CreateTime: 1607750508922 size: 78 magic: 2 compresscodec: NONE crc: 2249065123 isvalid: true
| offset: 8 CreateTime: 1607750508188 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: b
| offset: 9 CreateTime: 1607750508922 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: c
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 390 CreateTime: 1607750511875 size: 69 magic: 2 compresscodec: NONE crc: 926719390 isvalid: true
| offset: 10 CreateTime: 1607750511875 keysize: -1 valuesize: 1 sequence: -1 headerKeys: [] payload: d


# 查看.index索引文件
➜ bin ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.index
Dumping /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.index
offset: 0 position: 0
Mismatches in :/usr/local/var/lib/kafka-logs/test-0/00000000000000000000.index
Index offset: 0, log offset: 1


# 查看.timeindex索引文件
➜ bin ./kafka-run-class.sh kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.timeindex --verify-index-only
Dumping /usr/local/var/lib/kafka-logs/test-0/00000000000000000000.timeindex
The following indexed offsets are not found in the log.
Indexed offset: 0, found log offset: 1

kafka-console-producer.sh

1
2
# 生产消息
➜ bin ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

kafka-console-consumer.sh

1
2
3
4
5
6
7
8
9
10
# 消费消息
➜ bin ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

# 指定offset消费消息
# 指定offset=2, 分片0, 最多消费3条
➜ bin ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --offset 2 --partition 0 --max-messages 3
3
4
5
Processed a total of 3 messages

kafka-consumer-groups.sh

消费组管理工具,可以列出所有的消费组,查看消费组详情,删除消费组信息以及重置消费组的offset

  • --all-groups: 应用到所有的消费组
  • --all-topics:
  • --delete: 删除topic分区的offset,以及拥有者和消费组信息(–group g1 –group g2)
  • --delete-offsets: 删除消费组的offset
  • --describe: 查看消费组信息以及消费者的offset lag
  • --execute: 指定操作,支持reset-offsets操作
  • --export: 导出操作执行到csv,支持reset-offsets
  • --from-file: 指定文件中指定的值重置offset (csv文件)
  • --group: 指定消费组
  • --list: 列出所有的消费组
  • --members: 查看消费组中的成员
  • --state: 查看消费组的状态
  • --offsets: 查看消费组并且列出每个消费组所有topic的分区以及消息的offset lag
  • --reset-offsets: 重置消费组的offset (需要指定如下一个参数)
    • --to-datetime:
    • --by-period:
    • --to-earliest:
    • --to-latest:
    • --shift-by:
    • --from-file:
    • --to-current:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# 查看分组列表
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
console-consumer-10023
console-consumer-20994

# 查看消费组的成员
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-32644 --members --describe


Consumer group 'console-consumer-32644' has no active members.

# 查看消费组的offset信息
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-32644 --offsets --describe


Consumer group 'console-consumer-32644' has no active members.


# 查看消费组状态
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-32644 --state --describe

Consumer group 'console-consumer-32644' has no active members.

GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
console-consumer-32644 localhost:9092 (0) Empty 0


# 查看group详情
# CURRENT-OFFSET: 当前消费者群组最近提交的 offset,也就是消费者分区里读取的当前位置
# LOG-END-OFFSET: 当前最高水位偏移量,也就是最近一个读取消息的偏移量,同时也是最近一个提交到集群的偏移量
# LAG:消费者的 CURRENT-OFFSET 与 broker 的 LOG-END-OFFSET 之间的差距
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --describe


Consumer group 'console-consumer-20994' has no active members.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
console-consumer-20994 test 0 4 7 3 - - -


# 重新设置设置group的offset
# 指定group=console-consumer-20994, 重置offset=4
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --reset-offsets --topic test --to-offset 4 --execute

GROUP TOPIC PARTITION NEW-OFFSET
console-consumer-20994 test 0 4


# 把group=console-consumer-20994在topic=test上的offcet恢复到最初
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --topic test --reset-offsets --to-earliest –execute
WARN: No action will be performed as the --execute option is missing.In a future major release, the default behavior of this command will be to prompt the user before executing the reset rather than doing a dry run. You should add the --dry-run option explicitly if you are scripting this command and want to keep the current default behavior without prompting.

GROUP TOPIC PARTITION NEW-OFFSET
console-consumer-20994 test 0 0


# 重置Consumer Group的Offset到最新位移
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --reset-offsets --topic test --to-latest --execute


GROUP TOPIC PARTITION NEW-OFFSET
console-consumer-20994 test 0 7


# 把offcet从当前位置往前移动2个,如果是正数就是往后移动。
➜ bin ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group console-consumer-20994 --topic test --reset-offsets --shift-by -2 --execute

GROUP TOPIC PARTITION NEW-OFFSET
console-consumer-20994 test 0 5

kafka-log-dirs.sh

1
2
3
4
5
# 查看kafka各个broker节点以及topic的磁盘使用率情况
➜ bin ./kafka-log-dirs.sh --bootstrap-server 127.0.0.1:9092 --describe --topic-list test
Querying brokers for log directories information
Received log directory information from brokers 0
{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/usr/local/var/lib/kafka-logs","error":null,"partitions":[{"partition":"test-0","size":243,"offsetLag":0,"isFuture":false}]}]}]}

kafka-configs.sh

用来查看和修改kafka相关的配置信息,包含集群的动态配置,topic级别的动态配置等等

  • --all: 列出给定实体的全部配置文件(默认已经生效的全部参数,如果没有all仅对动态参数生效)
  • --entity-type: 实体类型[topics/clients/users/brokers/broker-loggers]
  • --entity-name: 实体名称[topic名称/client-id/user-name/broker-id]
  • --describe: 列出给定实体的配置文件
  • --force: 强制生效
  • --topic: 指定topic名称
  • --alter: 修改指定实体的配置文件 注意:当使用delete-config和add-config时必须使用--alter
  • --delete-config: 删除指定的配置"k1,k2"
  • --add-config: 给指定的实体增加配置(k=v,k2=[v1,v2,v3],k3=v3)

topic级别的动态参数

  • cleanup.policy: 清理策略
  • compression.type: 压缩类型(通常建议在produce端控制)
  • delete.retention.ms: 压缩日志的保留时间
  • flush.messages: 持久化message限制
  • flush.ms: 持久化频率
  • follower.replication.throttled.replicas: follower副本限流
  • leader.replication.throttled.replicas: leader副本限流
  • max.message.bytes: 最大的batch的message大小
  • message.downconversion.enable: message向下兼容
  • message.format.version: message格式版本
  • min.insync.replicas: 最小的ISR
  • retention.ms: 日志保留时间
  • retention.bytes: 日志保留大小(通常按照时间限制)
  • segment.bytes: segment的大小限制
  • segment.ms: segment的切割时间
  • unclean.leader.election.enable: 是否允许非同步副本选主(针对可用性设置的一个参数)

broker级别的动态参数

broker级别的动态参数比较多,这里只列举常用的几个

  • log.retention.ms: 日志保留时间
  • max.connections: 最大连接数
  • max.connections.per.ip: 每个ip的最大连接数
  • message.max.bytes: batch的message的最大限制
  • min.insync.replicas: 最小的ISR
  • num.io.threads: IO线程数(网络线程数的两倍)
  • num.network.threads: 网络线程数(cpu的2/3较好)
  • num.recovery.threads.per.data.dir: 每个数据目录的恢复线程
  • num.replica.fetchers: 副本的fetchers数量(默认为1,可适当调大)

user级别的参数

  • SCRAM-SHA-256:
  • SCRAM-SHA-512:
  • consumer_byte_rate: 针对消费者user进行限流
  • producer_byte_rate: 针对生产者进行限流
  • request_percentage: 请求百分比

clients级别参数

  • consumer_byte_rate: 针对消费者user进行限流
  • producer_byte_rate: 针对生产者进行限流
  • request_percentage: 请求百分比
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 修改topic的数据保留时间
➜ bin ./kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic test --add-config retention.ms=10000000 --alter
Completed updating config for topic test.


# 查看topic的动态参数配置
➜ bin ./kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic test --describe
Dynamic configs for topic test are:
retention.ms=10000000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=10000000}

# 删除topic动态参数配置
➜ bin ./kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --topic test --alter --delete-config retention.ms
Completed updating config for topic test.


查看broker全部的参数(--all会获取全部的参数)
➜ bin ./kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --all --broker-defaults --describe
Default configs for brokers in the cluster are:

kafka-broker-api-versions.sh

查看kafka对外的各个api版本.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 查看当前kafka版本
➜ bin ./kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092 --version
2.6.0 (Commit:62abe01bee039651)

# 查看集群所有节点的api版本
➜ bin ./kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092
localhost:9092 (id: 0 rack: null) -> (
Produce(0): 0 to 8 [usable: 8],
Fetch(1): 0 to 11 [usable: 11],
ListOffsets(2): 0 to 5 [usable: 5],
Metadata(3): 0 to 9 [usable: 9],
LeaderAndIsr(4): 0 to 4 [usable: 4],
StopReplica(5): 0 to 3 [usable: 3],
UpdateMetadata(6): 0 to 6 [usable: 6],
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): 0 to 8 [usable: 8],
OffsetFetch(9): 0 to 7 [usable: 7],
FindCoordinator(10): 0 to 3 [usable: 3],
JoinGroup(11): 0 to 7 [usable: 7],
Heartbeat(12): 0 to 4 [usable: 4],
LeaveGroup(13): 0 to 4 [usable: 4],
SyncGroup(14): 0 to 5 [usable: 5],
DescribeGroups(15): 0 to 5 [usable: 5],
ListGroups(16): 0 to 4 [usable: 4],
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 5 [usable: 5],
DeleteTopics(20): 0 to 4 [usable: 4],
DeleteRecords(21): 0 to 2 [usable: 2],
InitProducerId(22): 0 to 3 [usable: 3],
OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
AddPartitionsToTxn(24): 0 to 1 [usable: 1],
AddOffsetsToTxn(25): 0 to 1 [usable: 1],
EndTxn(26): 0 to 1 [usable: 1],
WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 to 3 [usable: 3],
DescribeAcls(29): 0 to 2 [usable: 2],
CreateAcls(30): 0 to 2 [usable: 2],
DeleteAcls(31): 0 to 2 [usable: 2],
DescribeConfigs(32): 0 to 3 [usable: 3],
AlterConfigs(33): 0 to 1 [usable: 1],
AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
DescribeLogDirs(35): 0 to 2 [usable: 2],
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 2 [usable: 2],
CreateDelegationToken(38): 0 to 2 [usable: 2],
RenewDelegationToken(39): 0 to 2 [usable: 2],
ExpireDelegationToken(40): 0 to 2 [usable: 2],
DescribeDelegationToken(41): 0 to 2 [usable: 2],
DeleteGroups(42): 0 to 2 [usable: 2],
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): 0 [usable: 0],
DescribeClientQuotas(48): 0 [usable: 0],
AlterClientQuotas(49): 0 [usable: 0]

7. SpringBoot集成

笔者SpringBoot版本是 2.2.2.RELEASE

pom.xml添加依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.0.RELEASE</version>
</dependency>

生产者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Configuration
public class KafkaProducerConfig {

/**
* producer配置
* @return
*/
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// 指定多个kafka集群多个地址 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");
// 重试次数,0为不启用重试机制
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// acks=0 把消息发送到kafka就认为发送成功
// acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
// acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
props.put(ProducerConfig.ACKS_CONFIG,"all");
// 生产者空间不足时,send()被阻塞的时间,默认60s
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
// 控制批处理大小,单位为字节
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
// 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
// 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
// 客户端id
props.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.topinfo");
// 键的序列化方式
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 值的序列化方式
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
// 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
return props;
}

/**
* producer工厂配置
* @return
*/
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

/**
* Producer Template 配置
*/
@Bean(name="kafkaTemplate")
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

消费者配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
@Configuration
public class KafkaConsumerConfig {


private static final String GROUP0_ID = "group0";
private static final String GROUP1_ID = "group1";

/**
* 1. setAckMode: 消费者手动提交ack
*
* RECORD: 每处理完一条记录后提交。
* BATCH(默认): 每次poll一批数据后提交一次,频率取决于每次poll的调用频率。
* TIME: 每次间隔ackTime的时间提交。
* COUNT: 处理完poll的一批数据后并且距离上次提交处理的记录数超过了设置的ackCount就提交。
* COUNT_TIME: TIME和COUNT中任意一条满足即提交。
* MANUAL: 手动调用Acknowledgment.acknowledge()后,并且处理完poll的这批数据后提交。
* MANUAL_IMMEDIATE: 手动调用Acknowledgment.acknowledge()后立即提交。
*
* 2. factory.setConcurrency(3);
* 此处设置的目的在于:假设 topic test 下有 0、1、2三个 partition,Spring Boot中只有一个 @KafkaListener() 消费者订阅此 topic,此处设置并发为3,
* 启动后 会有三个不同的消费者分别订阅 p0、p1、p2,本地实际有三个消费者线程。
* 而 factory.setConcurrency(1); 的话 本地只有一个消费者线程, p0、p1、p2被同一个消费者订阅。
* 由于 一个partition只能被同一个消费者组下的一个消费者订阅,对于只有一个 partition的topic,即使设置 并发为3,也只会有一个消费者,多余的消费者没有 partition可以订阅。
*
* 3. factory.setBatchListener(true);
* 设置批量消费 ,每个批次数量在Kafka配置参数ConsumerConfig.MAX_POLL_RECORDS_CONFIG中配置,
* 限制的是 一次批量接收的最大条数,而不是 等到达到最大条数才接收,这点容易被误解。
* 实际测试时,接收是实时的,当生产者大量写入时,一次批量接收的消息数量为 配置的最大条数。
*/
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
// 设置消费者工厂
factory.setConsumerFactory(consumerFactory());
// 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
// 消费者组中线程数量,消费者数量<=partition数量,即使配置的消费者数量大于partition数量,多余消费者无法消费到数据。
factory.setConcurrency(4);
// 拉取超时时间
factory.getContainerProperties().setPollTimeout(3000);
// 手动提交
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
Map<String, Object> map = consumerConfigs();
map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP0_ID);
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

// @Bean
// KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory1() {
// ConcurrentKafkaListenerContainerFactory<Integer, String>
// factory = new ConcurrentKafkaListenerContainerFactory<>();
// // 设置消费者工厂
// factory.setConsumerFactory(consumerFactory1());
// // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
// factory.setBatchListener(true);
// // 消费者组中线程数量,消费者数量<=partition数量,即使配置的消费者数量大于partition数量,多余消费者无法消费到数据。
// factory.setConcurrency(3);
// // 拉取超时时间
// factory.getContainerProperties().setPollTimeout(3000);
// // 手动提交
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// return factory;
// }
//
// public ConsumerFactory<Integer, String> consumerFactory1() {
// Map<String, Object> map = consumerConfigs();
// map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP1_ID);
// return new DefaultKafkaConsumerFactory<>(consumerConfigs());
// }

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// Kafka地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");
// 是否自动提交offset偏移量(默认true)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 批量消费
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
// 消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-default");
// 自动提交的频率(ms)
// propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
// Session超时设置
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
// 键的反序列化方式
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 值的反序列化方式
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// offset偏移量规则设置:
// (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
// (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
// (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}

}

主题配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class KafkaTopicConfig {

/**
* 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
*/
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
// 指定多个kafka集群多个地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.5:9093,192.168.124.5:9094,192.168.124.5:9095");
return new KafkaAdmin(configs);
}

/**
* 创建 Topic
*/
@Bean
public NewTopic topicinfo() {
// 创建topic,需要指定创建的topic的"名称"、"分区数"、"副本数量(副本数数目设置要小于Broker数量)"
return new NewTopic("test", 3, (short) 2);
}

}

消费者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@Slf4j
@Service
public class KafkaConsumerService {


// /**
// * 单条消费
// * @param message
// */
// @KafkaListener(id = "id0", topics = {Constant.TOPIC}, containerFactory="kafkaListenerContainerFactory")
// public void kafkaListener0(String message){
// log.info("consumer:group0 --> message:{}", message);
// }
//
// @KafkaListener(id = "id1", topics = {Constant.TOPIC}, groupId = "group1")
// public void kafkaListener1(String message){
// log.info("consumer:group1 --> message:{}", message);
// }
// /**
// * 监听某个 Topic 的某个分区示例,也可以监听多个 Topic 的分区
// * 为什么找不到group2呢?
// * @param message
// */
// @KafkaListener(id = "id2", groupId = "group2", topicPartitions = { @TopicPartition(topic = Constant.TOPIC, partitions = { "0" }) })
// public void kafkaListener2(String message) {
// log.info("consumer:group2 --> message:{}", message);
// }
//
// /**
// * 获取监听的 topic 消息头中的元数据
// * @param message
// * @param topic
// * @param key
// */
// @KafkaListener(id = "id3", topics = Constant.TOPIC, groupId = "group3")
// public void kafkaListener(@Payload String message,
// @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
// @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
// @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// Long threadId = Thread.currentThread().getId();
// log.info("consumer:group3 --> message:{}, topic:{}, partition:{}, key:{}, threadId:{}", message, topic, partition, key, threadId);
// }
//
// /**
// * 监听 topic 进行批量消费
// * @param messages
// */
// @KafkaListener(id = "id4", topics = Constant.TOPIC, groupId = "group4")
// public void kafkaListener(List<String> messages) {
// for(String msg:messages){
// log.info("consumer:group4 --> message:{}", msg);
// }
// }
//
// /**
// * 监听topic并手动提交偏移量
// * @param messages
// * @param acknowledgment
// */
// @KafkaListener(id = "id5", topics = Constant.TOPIC,groupId = "group5")
// public void kafkaListener(List<String> messages, Acknowledgment acknowledgment) {
// for(String msg:messages){
// log.info("consumer:group5 --> message:{}", msg);
// }
// // 触发提交offset偏移量
// acknowledgment.acknowledge();
// }
//
// /**
// * 模糊匹配多个 Topic
// * @param message
// */
// @KafkaListener(id = "id6", topicPattern = "test.*",groupId = "group6")
// public void annoListener2(String message) {
// log.error("consumer:group6 --> message:{}", message);
// }

/**
* 完整consumer
* @return
*/
@KafkaListener(id = "id7", topics = {Constant.TOPIC}, groupId = "group7")
public boolean consumer4(List<ConsumerRecord<?, ?>> data) {
for (int i=0; i<data.size(); i++) {
ConsumerRecord<?, ?> record = data.get(i);
Optional<?> kafkaMessage = Optional.ofNullable(record.value());

Long threadId = Thread.currentThread().getId();
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("consumer:group7 --> message:{}, topic:{}, partition:{}, key:{}, offset:{}, threadId:{}", message.toString(), record.topic(), record.partition(), record.key(), record.offset(), threadId);
}
}

return true;
}

}

手动消费模型

  • 模型一: 每个线程维护1个KafkaConsumer, 可以保证消息partition有序, 增加消费速率需要增加partition, 会触发rebalance
  • 模型二: 拉取线程(1个KafkaConsumer)+工作线程(线程池), 难以维护分区内的消息顺序, 可以在不用增加Partition数量的情况下,只需要增加工作线程数量,便可进一步提升Kafka客户端的并行消费能力
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 模型二
*/
public void init() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "20");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, String.valueOf(5 * 1024 * 1024));
consumer = new KafkaConsumer<>(props);
executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
List<String> topics = new ArrayList<>();
String[] arr = topicNames.split(",");
for(String topic : arr) {
topics.add(topic.trim());
}
consumer.subscribe(topics);
Set<TopicPartition> partitions = consumer.assignment();
while (!stoped) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String json = record.value();
// futureQueue.add(json);
// if (futureQueue.size() >= messageQueueSize) {
// consumer.pause(partitions);
// } else {
// sonsumer.resume(partitions);
// }
}
}
});
}

生产者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Service
public class KafkaProducerService {

@Autowired
private KafkaTemplate kafkaTemplate;

/**
* producer 同步方式发送数据
* @param topic topic名称
* @param key 一般用业务id,相同业务在同一partition保证消费顺序
* @param message producer发送的数据
*/
public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
// 默认轮询partition
kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
// // 根据key进行hash运算,再将运算结果写入到不同partition
// kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
// // 第二个参数为partition,当partition和key同时设置时partition优先。
// kafkaTemplate.send(topic, 0, key, message);
// // 组装消息
// Message msg = MessageBuilder.withPayload("Send Message(payload,headers) Test")
// .setHeader(KafkaHeaders.MESSAGE_KEY, key)
// .setHeader(KafkaHeaders.TOPIC, topic)
// .setHeader(KafkaHeaders.PREFIX,"kafka_")
// .build();
// kafkaTemplate.send(msg).get(10, TimeUnit.SECONDS);
// // 组装消息
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", "Send ProducerRecord(topic,value) Test");
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
}

/**
* producer 异步方式发送数据
* @param topic topic名称
* @param message producer发送的数据
*/
public void sendMessageAsync(String topic, String message) {
ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, message);

// 设置异步发送消息获取发送结果后执行的动作
ListenableFutureCallback listenableFutureCallback = new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
System.out.println("success");
}

@Override
public void onFailure(Throwable ex) {
System.out.println("failure");
}
};

// 将listenableFutureCallback与异步发送消息对象绑定
future.addCallback(listenableFutureCallback);
}

public void test(String topic, Integer partition, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
kafkaTemplate.send(topic, partition, key, message).get(10, TimeUnit.SECONDS);
}
}

web测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RestController
public class KafkaProducerController {

@Autowired
private KafkaProducerService producerService;

@GetMapping("/sync")
public void sendMessageSync(@RequestParam String topic) throws InterruptedException, ExecutionException, TimeoutException {
producerService.sendMessageSync(topic, null, "同步发送消息测试");
}

@GetMapping("/async")
public void sendMessageAsync(){
producerService.sendMessageAsync("test","异步发送消息测试");
}

@GetMapping("/test")
public void test(@RequestParam String topic, @RequestParam(required = false) Integer partition, @RequestParam(required = false) String key, @RequestParam String message) throws InterruptedException, ExecutionException, TimeoutException {
producerService.test(topic, partition, key, message);
}

}

消费太慢

考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

消费太快

调整参数:

  • fetch.max.bytes
  • 单次获取数据的最大消息数。

  • max.poll.records <= 吞吐量
  • 单次poll调用返回的最大消息数,如果处理逻辑很轻量,可以适当提高该值。
  • 一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
  • 默认值为500

  • consumer.poll(1000) 重要参数
  • 新版本的Consumer的Poll方法使用了类似于Select I/O机制,因此所有相关事件(包括reblance,消息获取等)都发生在一个事件循环之中。
  • 1000是一个超时时间,一旦拿到足够多的数据(参数设置),consumer.poll(1000)会立即返回 ConsumerRecords<String, String> records。
  • 如果没有拿到足够多的数据,会阻塞1000ms,但不会超过1000ms就会返回。

7. AD

如果您觉得写的还不错,请关注公众号 【当我遇上你】, 您的支持是我最大的动力。

参考