Kafka消费者组(Consumer Group)详解
基本概念
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16消费者组定义:
- 多个消费者实例组成的逻辑分组
- 共同消费一个或多个主题
- 每个消费者组有唯一的group.id
消费模型:
┌─────────────┐
│ Topic-A │
│ Partition 0 │────┐
│ Partition 1 │──┐ │
│ Partition 2 │┐ │ │ Consumer Group X
└─────────────┘│ │ │ ┌─────────────────┐
│ │ └──►│ Consumer 1 │
│ └────►│ Consumer 2 │
└──────►│ Consumer 3 │
└─────────────────┘关键特性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
151. 分区所有权
- 一个分区只能被组内一个消费者消费
- 一个消费者可以消费多个分区
2. 负载均衡
- 组内成员自动分配分区
- 支持动态扩缩容
3. 消费位置管理
- 每个组独立维护消费位置
- 支持从任意位置开始消费
4. 故障转移
- 自动检测消费者故障
- 自动重新分配分区消费者组状态
1
2
3
4
5
6
7
8
9
10状态流转:
Empty ──► PreparingRebalance ──► CompletingRebalance ──► Stable
▲ │
└────────────────────────────────────────────────────────┘
主要状态:
1. Empty: 组内无成员
2. PreparingRebalance: 准备开始重平衡
3. CompletingRebalance: 完成分区分配
4. Stable: 稳定状态,正常消费配置示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 消费者组配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group"); // 组ID
props.put("enable.auto.commit", "true"); // 自动提交
props.put("auto.commit.interval.ms", "1000"); // 提交间隔
props.put("session.timeout.ms", "30000"); // 会话超时
props.put("max.poll.interval.ms", "300000"); // 最大轮询间隔
// 创建消费者
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Arrays.asList("my-topic"));消费示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public class GroupConsumerExample {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(props);
try {
while (true) {
// 批量拉取消息
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
// 处理消息
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 异步提交位移
consumer.commitAsync();
}
} finally {
try {
// 同步提交最后的位移
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}常见使用场景
1
2
3
4
5
6
7
8
9
10
11
121. 消息队列模式
- 一个分区一个消费者
- 组内负载均衡
2. 发布订阅模式
- 多个消费者组
- 每个组都收到全量消息
示例:
Topic-A ──► Consumer Group 1 (日志处理)
└──► Consumer Group 2 (数据分析)
└──► Consumer Group 3 (监控告警)监控指标
1
2
3
4
5
6关键指标:
1. 消费延迟(lag)
2. 消费速率
3. 提交失败率
4. 重平衡频率
5. 活跃消费者数