Kafka的消费者是线程安全的吗?

Kafka的消费者(KafkaConsumer)不是线程安全的,具体表现在:

  1. 官方说明(1)
  • KafkaConsumer不是线程安全的
  • 所有网络I/O操作都发生在进行调用的线程中
  • 可以安全地关闭消费者或者从另一个线程唤醒轮询
  1. 正确使用方式
  • 单线程消费:一个消费者实例对应一个线程
  • 多线程处理:消费单线程,处理多线程
  • 多消费者实例:每个线程一个独立的消费者实例
  1. 错误使用示例

    1
    2
    3
    4
    5
    // 错误示例:多线程共享一个消费者实例
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.submit(() -> consumer.poll(Duration.ofMillis(100))); // 线程1
    executor.submit(() -> consumer.poll(Duration.ofMillis(100))); // 线程2
  2. 正确使用示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    // 正确示例1:每个线程一个消费者实例
    ExecutorService executor = Executors.newFixedThreadPool(2);
    executor.submit(() -> {
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    // 处理消息
    }
    });

    // 正确示例2:消费单线程,处理多线程
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    ExecutorService executor = Executors.newFixedThreadPool(2);
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    executor.submit(() -> processRecord(record)); // 异步处理
    }
    }
  3. 最佳实践建议

  • 使用消费者组机制实现并行消费
  • 处理逻辑放入线程池异步执行
  • 注意消费位移的正确提交
  • 合理设置消费者数量和分区数