Guohao Lu's Blog

个人技术博客

Kafka内存架构与页面缓存关系

  1. Linux内存整体架构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    物理内存布局:
    ┌────────────────────┐
    │ 用户空间 │ <- 应用程序、JVM堆
    │ (User Space) │ 进程私有内存
    ├────────────────────┤
    │ 内核空间 │ <- 内核代码、数据结构
    │ (Kernel Space) │ 共享内存区域
    │ ├─────────────┐ │
    │ │ 页面缓存 │ │ <- 磁盘文件缓存
    │ │(Page Cache) │ │ 所有进程共享
    │ └─────────────┘ │
    └────────────────────┘

    ┌────────────────────┐
    │ Swap空间 │ <- 虚拟内存,磁盘上的交换分区
    └────────────────────┘
  2. Kafka的内存使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Kafka Broker内存分布:
    ┌────────────────────┐
    │ JVM堆(6-8G) │ <- 用户空间
    │ - 消息元数据 │ broker进程私有
    │ - 消费者元数据 │
    │ - 副本状态等 │
    ├────────────────────┤
    │ 页面缓存 │ <- 内核空间
    │ - 消息数据 │ 所有进程共享
    │ - 索引文件 │ 由操作系统管理
    └────────────────────┘
  3. 页面缓存的作用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Kafka使用页面缓存:
    1. 读操作
    磁盘 -> 页面缓存 -> 网络接口(零拷贝)

    2. 写操作
    网络 -> 页面缓存 -> 异步刷盘

    优势:
    - 避免JVM GC压力
    - 利用操作系统的缓存机制
    - 支持零拷贝技术
    - 多进程共享缓存数据
  4. Swap与页面缓存的区别

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    页面缓存:
    - 缓存磁盘文件数据
    - 位于内核空间
    - 所有进程共享
    - 命中时直接返回数据
    - 未命中时从磁盘读取

    Swap空间:
    - 存储内存页面数据
    - 位于磁盘上
    - 进程私有
    - 访问时需要换入内存
    - 会导致性能下降
  5. 内存压力场景

    1
    2
    3
    4
    5
    6
    7
    8
    当内存不足时的处理顺序:
    1. 释放页面缓存
    2. 回收可回收内存
    3. 使用Swap空间

    对Kafka的影响:
    - 页面缓存被释放:性能下降但可接受
    - 发生Swap:严重性能问题
  6. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    内存配置建议:
    1. 预留足够页面缓存
    - 建议为总内存的50%
    - 例:16G内存预留8G页面缓存

    2. 控制JVM堆大小
    - 建议6-8G
    - 避免过大堆影响页面缓存

    3. 最小化Swap使用
    - 设置vm.swappiness=1
    - Swap大小控制在1-2G

Kafka的MMAP(Memory Mapped Files)实现

  1. 什么是MMAP

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    内存映射文件原理:
    ┌─────────────────┐
    │ 用户进程 │
    │ ┌───────────┐ │
    │ │映射的内存区域│ │ 映射
    └─│───────────│──┘ ←──────────┐
    └─────┬─────┘ │
    │ │
    ┌───────┼───────────────────┼─┐
    │ 页面缓存 │ │
    │ ┌─────────────┐ │ │
    │ │ 文件数据 │ ←───────┘ │
    │ └─────────────┘ │
    └─────────────────────────────┘
    内核空间
  2. Kafka中的使用场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // Kafka源码中的应用
    public class FileRecords extends AbstractRecords {
    // 索引文件映射
    private final MappedByteBuffer mmap;

    public FileRecords(File file, boolean mmap) throws IOException {
    if (mmap) {
    // 创建内存映射
    this.mmap = Utils.mmap(file, true);
    }
    }
    }

    主要用途:
    1. 索引文件访问
    - offset索引
    - timestamp索引
    - leader epoch索引

    2. 小分区的消息日志
    - 默认配置:segment.bytes=1GB
    - 可配置是否使用mmap
  3. 映射过程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 简化的映射过程
    public class MappedByteBuffer {

    // 1. 打开文件通道
    FileChannel channel = FileChannel.open(path);

    // 2. 创建映射
    MappedByteBuffer buffer = channel.map(
    FileChannel.MapMode.READ_WRITE, // 映射模式
    0, // 起始位置
    file.length() // 映射长度
    );

    // 3. 直接操作内存
    buffer.putInt(1); // 写入数据
    int value = buffer.getInt(); // 读取数据
    }
  4. 优缺点分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    优点:
    1. 零拷贝读写
    - 避免内核空间到用户空间的拷贝
    - 减少上下文切换

    2. 页面自动管理
    - 由操作系统管理页面调度
    - 支持预读和回写

    缺点:
    1. 内存占用
    - 映射文件会占用虚拟内存
    - 大文件映射需要注意内存限制

    2. 页面错误
    - 首次访问会触发页面错误
    - 可能导致延迟波动
  5. 配置与调优

    1
    2
    3
    4
    5
    6
    7
    8
    # broker配置
    # 是否使用mmap
    log.segment.bytes=1073741824
    file.mmap.enable=true

    # 系统配置
    # 最大映射区域
    vm.max_map_count=262144
  6. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    使用建议:
    1. 小文件优先使用mmap
    - 索引文件
    - 小分区日志

    2. 大文件使用直接I/O
    - 大分区日志
    - 避免内存压力

    3. 监控指标
    - 页面错误率
    - 内存映射区域数量
    - 虚拟内存使用率

Kafka服务器内存配置最佳实践

  1. 16G内存的最佳分配

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    建议分配:
    ┌────────────────────┐
    │ 系统预留: 2G │ <- 操作系统和其他进程使用
    ├────────────────────┤
    │ Kafka堆内存: 6G │ <- broker的JVM堆内存
    ├────────────────────┤
    │ 页面缓存: 8G │ <- 用于消息数据缓存
    └────────────────────┘

    Swap建议:
    - 默认4G -> 调整为1-2G
    - 设置vm.swappiness=1
  2. 为什么要减少Swap

    1
    2
    3
    4
    5
    6
    7
    原因:
    1. Kafka性能严重依赖于页面缓存
    2. 一旦发生Swap:
    - 页面缓存被换出到磁盘
    - 数据访问延迟从ns级变为ms级
    - 吞吐量显著下降
    - 可能引发消息积压
  3. 性能对比

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    场景:生产者写入1GB数据

    正常配置(小Swap):
    - 写入延迟:~10ms
    - 吞吐量:~100MB/s
    - 页面缓存命中率:95%

    大Swap配置:
    - 写入延迟:可能超过100ms
    - 吞吐量:可能降至10MB/s
    - 频繁的页面换入换出
  4. 配置建议

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 1. 调整Swap大小
    sudo swapoff -a
    sudo dd if=/dev/zero of=/swapfile bs=1G count=2
    sudo mkswap /swapfile
    sudo swapon /swapfile

    # 2. 修改swappiness
    echo "vm.swappiness=1" >> /etc/sysctl.conf
    sysctl -p

    # 3. 设置Kafka堆内存
    export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
  5. 监控指标

    1
    2
    3
    4
    5
    6
    7
    8
    # 监控Swap使用
    vmstat 1
    free -m

    关键指标:
    - si (swap in) 应接近0
    - so (swap out) 应接近0
    - swap used 应保持稳定
  6. 风险防范

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1. 内存使用监控
    - 设置内存使用率告警
    - 阈值建议:80%

    2. Swap使用监控
    - 设置Swap使用告警
    - 一旦发生Swap及时处理

    3. 性能指标监控
    - 生产者延迟
    - 消费者延迟
    - 页面缓存命中率

Kafka分区分配规则详解

  1. 分配的基本原则

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    关键规则:
    1. 消费者只能消费其订阅的主题
    2. 一个分区只能分配给同一个消费组中的一个消费者
    3. 分配是以<主题,分区>为单位进行的

    示例:
    Consumer1订阅:topic2
    Consumer2订阅:topic1
    ┌─────────┐ ┌─────────┐
    │Topic1 │ │Consumer1│
    │Partition1│ ╳ │(topic2)│ ╳ 不会分配
    └─────────┘ └─────────┘

    ┌─────────┐ ┌─────────┐
    │Topic1 │ │Consumer2│
    │Partition1│ → │(topic1)│ ✓ 会分配
    └─────────┘ └─────────┘
  2. 订阅模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 1. 直接订阅指定主题
    consumer.subscribe(Arrays.asList("topic1", "topic2"));

    // 2. 正则表达式订阅
    consumer.subscribe(Pattern.compile("topic.*"));

    // 3. 手动分配分区
    consumer.assign(Arrays.asList(
    new TopicPartition("topic1", 0),
    new TopicPartition("topic1", 1)
    ));
  3. 分配过程示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    场景:
    - Topic1: 3个分区
    - Topic2: 2个分区
    - Consumer1: 订阅Topic2
    - Consumer2: 订阅Topic1
    - Consumer3: 订阅Topic1, Topic2

    最终分配:
    Consumer1 <- Topic2-P0, Topic2-P1
    Consumer2 <- Topic1-P0, Topic1-P1, Topic1-P2
    Consumer3 <- Topic2-P0, Topic2-P1, Topic1-P0, Topic1-P1, Topic1-P2

    注意:Consumer3因为订阅了两个主题,
    所以可能同时获得两个主题的分区
  4. 验证代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class KafkaSubscriptionTest {
    public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    // Consumer1只订阅Topic2
    KafkaConsumer<String, String> consumer1 =
    new KafkaConsumer<>(props);
    consumer1.subscribe(Arrays.asList("topic2"));

    // 获取分配的分区
    Set<TopicPartition> assignment = consumer1.assignment();
    for (TopicPartition partition : assignment) {
    // 只会看到Topic2的分区
    System.out.println(partition.topic() +
    "-" + partition.partition());
    }
    }
    }

RangeAssignor 是 Kafka 中的一种默认分区分配策略,它的工作方式比较简单直接。这个策略主要用于在消费者和分区之间进行一种连续的范围分配。

工作原理:

  1. 排序所有分区:首先,RangeAssignor 将所有的分区按照主题和分区号的顺序进行排序。
  2. 排序所有消费者:同样地,所有请求订阅的消费者也会被按照消费者ID进行排序。
  3. 分配分区:分配过程中,RangeAssignor 会将排序后的分区列表分成尽可能均等的N份(N是消费者的数量),每个消费者分得一份。对于不能均分的情况,靠前的消费者将分配到稍微多一点的分区。

分配示例:

假设有两个主题AB,主题A有3个分区(A0, A1, A2),主题B有2个分区(B0, B1),共有5个分区。如果有两个消费者C1C2:

  • 分区会被排序为:A0, A1, A2, B0, B1
  • 消费者会被排序为:C1, C2
  • 分区分配可能如下:
    • C1A0, A1, B0(前三个分区)
    • C2A2, B1(后两个分区)

特点和使用场景:

  • 公平性RangeAssignor 简单地尝试将分区均匀分配给每个消费者,这种方法算法简单,执行效率高。
  • 适用性:当消费者数量较少或每个消费者处理能力相近时,这种分配方法相对公平。
  • 缺点:在某些情况下,可能不是最优的负载均衡策略,特别是在分区数量不均匀分布于多个主题或消费者性能差异大的情况下。

总之,RangeAssignor 提供了一种基础而直观的分区分配方式,适用于简单应用场景,但可能不适合需要复杂负载均衡或性能优化的高级场景。在实际使用中,选择合适的分配策略应根据具体的业务需求和消费者设置来定。

Kafka消息重复消费分析

  1. 重复消费场景分类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    场景分类:
    1. 生产端重复发送
    2. 消费端重复消费
    3. Rebalance导致重复
    4. 位移提交异常

    影响程度:
    ┌────────────────┬───────────┬────────────┐
    │ 场景类型 │ 影响范围 │ 发生概率 │
    ├────────────────┼───────────┼────────────┤
    │生产端重复 │ 单条消息 │ 低 │
    │消费端重复 │ 批量消息 │ 中 │
    │Rebalance重复 │ 分区数据 │ 高 │
    │位移提交异常 │ 批量消息 │ 中 │
    └────────────────┴───────────┴────────────┘
  2. 生产端重复发送

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 问题代码
    public void send(String topic, String message) {
    try {
    producer.send(new ProducerRecord<>(topic, message));
    } catch (Exception e) {
    // 捕获异常后重试,可能导致重复发送
    producer.send(new ProducerRecord<>(topic, message));
    }
    }

    // 解决方案:启用幂等性
    Properties props = new Properties();
    props.put("enable.idempotence", true);
    props.put("transactional.id", "tx-1");
    // 使用事务API
    producer.initTransactions();
    try {
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
    } catch (Exception e) {
    producer.abortTransaction();
    }
  3. 消费端重复消费

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 问题代码
    @KafkaListener(topics = "my-topic")
    public void consume(ConsumerRecord<String, String> record) {
    processMessage(record); // 处理消息
    consumer.commitSync(); // 同步提交可能失败
    }

    // 解决方案:实现幂等消费
    public class IdempotentConsumer {
    private final Set<String> processedMessages =
    Collections.synchronizedSet(new HashSet<>());

    @KafkaListener(topics = "my-topic")
    public void consume(ConsumerRecord<String, String> record) {
    String messageId = generateMessageId(record);
    if (processedMessages.add(messageId)) {
    try {
    processMessage(record);
    // 持久化消息ID
    saveMessageId(messageId);
    } catch (Exception e) {
    processedMessages.remove(messageId);
    throw e;
    }
    }
    }
    }
  4. Rebalance导致重复

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // 问题:Rebalance时未提交的消息会重复消费
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 分区被撤销时可能未及时提交位移
    }
    });

    // 解决方案:优化Rebalance配置和提交策略
    Properties props = new Properties();
    // 增加会话超时时间,减少不必要的Rebalance
    props.put("session.timeout.ms", "30000");
    // 使用手动提交,确保处理完成后再提交
    props.put("enable.auto.commit", "false");

    // 实现再均衡监听器
    class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // Rebalance前确保提交位移
    consumer.commitSync(currentOffsets);
    }
    }
  5. 位移提交异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // 问题代码
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    processRecord(record);
    }
    try {
    consumer.commitSync(); // 批量提交可能失败
    } catch (Exception e) {
    // 异常处理不当导致重复消费
    }
    }

    // 解决方案:实现精确的位移管理
    public class OffsetManager {
    private final Map<TopicPartition, OffsetAndMetadata> offsets =
    new ConcurrentHashMap<>();

    public void markOffset(String topic, int partition, long offset) {
    offsets.put(
    new TopicPartition(topic, partition),
    new OffsetAndMetadata(offset + 1)
    );
    }

    public void commitOffsets(KafkaConsumer<?, ?> consumer) {
    try {
    consumer.commitSync(offsets);
    offsets.clear();
    } catch (Exception e) {
    // 记录失败的位移,下次重试
    handleCommitFailure(offsets, e);
    }
    }
    }
  6. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    1. 消息设计
    - 生成全局唯一消息ID
    - 包含业务去重字段
    - 添加时间戳信息

    2. 消费端优化
    - 实现幂等性检查
    - 使用本地缓存+持久化存储
    - 合理设置批处理大小

    3. 监控告警
    - 监控重复消费率
    - 设置消费延迟阈值
    - 跟踪位移提交状态

Rebalance后消费位置重置的问题

  1. 可能带来的问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    场景示例:
    Consumer1原本消费Partition0:
    ┌───────────────────────────┐
    │ Partition 0 │
    │ offset: 1000000 -> 1500000│
    └───────────────────────────┘

    Rebalance后分配给Consumer2:
    1. 需要初始化消费位置
    2. 可能需要建立新的TCP连接
    3. 重新填充消费者缓存
    4. 可能导致重复消费
  2. 性能影响

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    主要开销:
    1. 状态重建
    - 重新加载消费位置
    - 重建内部缓存

    2. 网络开销
    - 建立新的TCP连接
    - 首次拉取数据

    3. 消费延迟
    - 处理暂停
    - 消息积压增加
  3. 优化方案

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    // 1. 使用StickyAssignor策略
    properties.put(
    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    "org.apache.kafka.clients.consumer.StickyAssignor"
    );

    // 2. 合理配置缓存大小
    properties.put(
    ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
    "5242880" // 5MB
    );

    // 3. 实现优雅的偏移量管理
    class MyConsumer {
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets
    = new ConcurrentHashMap<>();

    @KafkaListener(topics = "my-topic")
    public void consume(ConsumerRecord<String, String> record) {
    // 处理消息
    processMessage(record);

    // 记录偏移量
    currentOffsets.put(
    new TopicPartition(record.topic(), record.partition()),
    new OffsetAndMetadata(record.offset() + 1)
    );
    }

    // Rebalance监听器
    class SaveOffsetOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // Rebalance前提交偏移量
    consumer.commitSync(currentOffsets);
    }
    }
    }
  4. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    1. 减少Rebalance频率
    - 合理设置session.timeout.ms
    - 避免频繁重启消费者

    2. 优化消费者配置
    - 使用StickyAssignor
    - 合理设置fetch.max.bytes
    - 启用消费者缓存

    3. 监控指标
    - 消费延迟
    - Rebalance频率
    - 处理时间
  5. 新版本优化(Kafka 2.4+)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    增量式Rebalance:
    1. 只对变更的分区进行重分配
    2. 未变更的分区继续消费
    3. 显著减少重平衡影响

    Cooperative Rebalancing:
    1. 两阶段提交过程
    2. 允许消费者继续处理未撤销的分区
    3. 减少消费中断时间

Kafka消息消费失败处理方案

  1. 重试机制设计

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    @Configuration
    public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    // 禁用自动提交
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    // 从最早的消息开始消费
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<>(props);
    }
    }

    @Service
    public class ReviewService {
    @KafkaListener(
    topics = "incentive.qualification.changed.v1",
    groupId = "review-service-group"
    )
    @RetryableKafkaHandler(
    attempts = "3",
    backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void handleQualificationChanged(
    @Payload QualificationChangedEvent event,
    Acknowledgment ack) {
    try {
    // 1. 业务处理
    processEvent(event);

    // 2. 确认消息
    ack.acknowledge();

    } catch (Exception e) {
    // 3. 重试次数达到上限,进入死信队列
    if (!isRetryable(e)) {
    sendToDLQ(event);
    ack.acknowledge();
    return;
    }
    throw e; // 触发重试
    }
    }
    }
  2. 死信队列处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // 1. 死信队列配置
    @Bean
    public TopicBuilder deadLetterTopic() {
    return TopicBuilder.name("incentive.dlq.v1")
    .partitions(3)
    .replicas(3)
    .build();
    }

    // 2. 死信消息处理服务
    @Service
    public class DeadLetterService {
    @KafkaListener(
    topics = "incentive.dlq.v1",
    groupId = "dlq-processor-group"
    )
    public void processDLQ(ConsumerRecord<String, String> record) {
    // 1. 记录错误日志
    log.error("Processing DLQ message: {}", record);

    // 2. 错误通知
    notifyOperators(record);

    // 3. 持久化到错误表
    saveToErrorTable(record);
    }

    // 4. 手动重试接口
    public void retryMessage(String messageId) {
    Message msg = errorRepository.find(messageId);
    kafkaTemplate.send(msg.getOriginalTopic(),
    msg.getKey(),
    msg.getPayload());
    }
    }
  3. 错误数据持久化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Entity
    @Table(name = "kafka_error_messages")
    public class ErrorMessage {
    @Id
    private String messageId;
    private String topic;
    private String key;
    private String payload;
    private String errorMsg;
    private int retryCount;
    private LocalDateTime createTime;
    private MessageStatus status;
    }

    public enum MessageStatus {
    FAILED, // 失败待处理
    RETRYING, // 重试中
    RESOLVED // 已解决
    }
  4. 监控告警机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Component
    public class KafkaMonitor {
    // 1. 消费延迟监控
    @Scheduled(fixedRate = 60000)
    public void checkLag() {
    Map<TopicPartition, Long> lags = getLags();
    if (isLagTooHigh(lags)) {
    alertService.send("消费延迟过高");
    }
    }

    // 2. 错误率监控
    @Scheduled(fixedRate = 60000)
    public void checkErrorRate() {
    double errorRate = getErrorRate();
    if (errorRate > threshold) {
    alertService.send("错误率过高");
    }
    }
    }
  5. 补偿机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Service
    public class CompensationService {
    // 1. 定时补偿
    @Scheduled(cron = "0 0 */1 * * ?")
    public void compensate() {
    List<ErrorMessage> errors =
    errorRepository.findUnresolved();

    for (ErrorMessage error : errors) {
    // 根据业务类型选择补偿策略
    CompensationStrategy strategy =
    getStrategy(error.getTopic());
    strategy.compensate(error);
    }
    }

    // 2. 手动补偿接口
    public void manualCompensate(String messageId) {
    ErrorMessage error =
    errorRepository.find(messageId);
    retryMessage(error);
    }
    }

消费者组内的分区分配详解

  1. 同组消费者分区分配

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    场景一:单主题多分区
    Topic-A (3个分区)
    ┌─────────────┐
    │ Partition 0 │────► Consumer1
    │ Partition 1 │────► Consumer2
    │ Partition 2 │────► Consumer1
    └─────────────┘

    Consumer Group X
    ├── Consumer1: 消费Partition 0,2
    └── Consumer2: 消费Partition 1

    特点:
    - 每个消费者确实消费不同分区
    - 但这些分区的数据合起来是主题的全量数据
    - 消费者组作为整体可以看到主题的所有数据
  2. 不同订阅的消费者能否在同组

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    场景二:订阅主题不同
    Consumer1订阅:Topic A,B,C
    Consumer2订阅:Topic A,B,C,D

    结论:不能在同一组!
    原因:
    1. 违反了消费者组的订阅原则
    2. 可能导致数据消费混乱
    3. Kafka会抛出异常

    正确做法:
    Consumer Group X
    ├── Consumer1: 订阅 A,B,C,D
    └── Consumer2: 订阅 A,B,C,D

    或者分成不同组:
    Group1: Consumer1 (A,B,C)
    Group2: Consumer2 (A,B,C,D)
  3. 实际案例说明

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 错误示例:同组不同订阅
    public class WrongGroupExample {
    public static void main(String[] args) {
    // Consumer1
    KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
    consumer1.subscribe(Arrays.asList("A", "B", "C"));

    // Consumer2 (同组但订阅不同)
    KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
    consumer2.subscribe(Arrays.asList("A", "B", "C", "D"));

    // 会导致异常!
    }
    }

    // 正确示例:同组相同订阅
    public class CorrectGroupExample {
    public static void main(String[] args) {
    // 所有消费者订阅相同的主题
    List<String> topics = Arrays.asList("A", "B", "C", "D");

    // Consumer1
    KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
    consumer1.subscribe(topics);

    // Consumer2
    KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
    consumer2.subscribe(topics);
    }
    }
  4. 消费者组的数据完整性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    示例场景:
    Topic-A (4个分区)
    ┌─────────────┐
    │ Partition 0 │──┐
    │ Partition 1 │──┼─► Consumer Group X
    │ Partition 2 │──┤ ├── Consumer1
    │ Partition 3 │──┘ └── Consumer2
    └─────────────┘

    数据流:
    1. 生产者发送消息到各个分区
    2. 分区0,1 分配给Consumer1
    3. 分区2,3 分配给Consumer2
    4. 消费者组作为整体消费了所有数据
  5. 关键原则

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1. 订阅原则
    - 同组消费者必须订阅相同主题
    - 违反会导致异常

    2. 分区分配
    - 分区是最小分配单位
    - 一个分区只能分配给一个消费者
    - 消费者组能看到所有数据

    3. 扩展性
    - 增加消费者可以提高并行度
    - 消费者数不应超过分区总数

Kafka消费者组(Consumer Group)详解

  1. 基本概念

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    消费者组定义:
    - 多个消费者实例组成的逻辑分组
    - 共同消费一个或多个主题
    - 每个消费者组有唯一的group.id

    消费模型:
    ┌─────────────┐
    │ Topic-A │
    │ Partition 0 │────┐
    │ Partition 1 │──┐ │
    │ Partition 2 │┐ │ │ Consumer Group X
    └─────────────┘│ │ │ ┌─────────────────┐
    │ │ └──►│ Consumer 1 │
    │ └────►│ Consumer 2 │
    └──────►│ Consumer 3 │
    └─────────────────┘
  2. 关键特性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    1. 分区所有权
    - 一个分区只能被组内一个消费者消费
    - 一个消费者可以消费多个分区

    2. 负载均衡
    - 组内成员自动分配分区
    - 支持动态扩缩容

    3. 消费位置管理
    - 每个组独立维护消费位置
    - 支持从任意位置开始消费

    4. 故障转移
    - 自动检测消费者故障
    - 自动重新分配分区
  3. 消费者组状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    状态流转:
    Empty ──► PreparingRebalance ──► CompletingRebalance ──► Stable
    ▲ │
    └────────────────────────────────────────────────────────┘

    主要状态:
    1. Empty: 组内无成员
    2. PreparingRebalance: 准备开始重平衡
    3. CompletingRebalance: 完成分区分配
    4. Stable: 稳定状态,正常消费
  4. 配置示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 消费者组配置
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "my-group"); // 组ID
    props.put("enable.auto.commit", "true"); // 自动提交
    props.put("auto.commit.interval.ms", "1000"); // 提交间隔
    props.put("session.timeout.ms", "30000"); // 会话超时
    props.put("max.poll.interval.ms", "300000"); // 最大轮询间隔

    // 创建消费者
    KafkaConsumer<String, String> consumer =
    new KafkaConsumer<>(props);

    // 订阅主题
    consumer.subscribe(Arrays.asList("my-topic"));
  5. 消费示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public class GroupConsumerExample {
    public static void main(String[] args) {
    KafkaConsumer<String, String> consumer =
    new KafkaConsumer<>(props);

    try {
    while (true) {
    // 批量拉取消息
    ConsumerRecords<String, String> records =
    consumer.poll(Duration.ofMillis(100));

    // 处理消息
    for (ConsumerRecord<String, String> record : records) {
    processRecord(record);
    }

    // 异步提交位移
    consumer.commitAsync();
    }
    } finally {
    try {
    // 同步提交最后的位移
    consumer.commitSync();
    } finally {
    consumer.close();
    }
    }
    }
    }
  6. 常见使用场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1. 消息队列模式
    - 一个分区一个消费者
    - 组内负载均衡

    2. 发布订阅模式
    - 多个消费者组
    - 每个组都收到全量消息

    示例:
    Topic-A ──► Consumer Group 1 (日志处理)
    └──► Consumer Group 2 (数据分析)
    └──► Consumer Group 3 (监控告警)
  7. 监控指标

    1
    2
    3
    4
    5
    6
    关键指标:
    1. 消费延迟(lag)
    2. 消费速率
    3. 提交失败率
    4. 重平衡频率
    5. 活跃消费者数