Kafka的消费者是线程安全的吗?
Kafka的消费者(KafkaConsumer)不是线程安全的,具体表现在:
- 官方说明(1)
- KafkaConsumer不是线程安全的
- 所有网络I/O操作都发生在进行调用的线程中
- 可以安全地关闭消费者或者从另一个线程唤醒轮询
- 正确使用方式
- 单线程消费:一个消费者实例对应一个线程
- 多线程处理:消费单线程,处理多线程
- 多消费者实例:每个线程一个独立的消费者实例
错误使用示例
1
2
3
4
5// 错误示例:多线程共享一个消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> consumer.poll(Duration.ofMillis(100))); // 线程1
executor.submit(() -> consumer.poll(Duration.ofMillis(100))); // 线程2正确使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19// 正确示例1:每个线程一个消费者实例
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消息
}
});
// 正确示例2:消费单线程,处理多线程
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
ExecutorService executor = Executors.newFixedThreadPool(2);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> processRecord(record)); // 异步处理
}
}最佳实践建议
- 使用消费者组机制实现并行消费
- 处理逻辑放入线程池异步执行
- 注意消费位移的正确提交
- 合理设置消费者数量和分区数