Guohao Lu's Blog

个人技术博客

Kafka堆内存与GC性能的关系

  1. JVM内存特性(1)
  • JVM对象开销大,通常是原始数据大小的2倍或更多
  • 堆内存越大,GC扫描和处理的对象越多
  • Full GC时需要扫描整个堆空间
  1. Kafka的内存使用策略
  • 不在JVM堆上缓存消息数据
  • 利用操作系统的页面缓存(Page Cache)
  • 主要使用堆内存存储元数据
  1. 大堆内存的问题

    1
    2
    3
    4
    举例:32GB堆内存的GC影响
    - Minor GC:扫描年轻代,可能需要100-200ms
    - Full GC:扫描整个堆,可能需要1-2s
    - GC期间:服务暂停,无法处理请求
  2. 最佳实践

  • 推荐堆内存配置:4-8GB
  • 剩余内存留给页面缓存
  • 配置示例:
    1
    2
    3
    4
    # 32GB物理内存的配置建议
    系统预留:4GB
    Kafka堆内存:6GB
    页面缓存:22GB
  1. 性能对比
    1
    2
    3
    4
    5
    6GB堆内存 vs 24GB堆内存
    - GC频率:较高 vs 较低
    - GC时间:较短(~100ms) vs 较长(~1s)
    - 服务影响:短暂停顿 vs 长时间停顿
    - 页面缓存:更多 vs 更少

获取List最后一个元素的工具类方法

  1. Apache Commons Collections

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import org.apache.commons.collections4.CollectionUtils;

    List<String> list = Arrays.asList("a", "b", "c");

    // 方法1:使用CollectionUtils.get()
    String last = CollectionUtils.get(list, list.size() - 1);

    // 方法2:使用ListUtils.defaultIfEmpty()
    String last = ListUtils.defaultIfEmpty(list, Collections.emptyList())
    .get(list.size() - 1);
  2. Google Guava

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import com.google.common.collect.Iterables;

    List<String> list = Arrays.asList("a", "b", "c");

    // 方法1:使用Iterables.getLast()
    String last = Iterables.getLast(list);

    // 方法2:带默认值的方式
    String last = Iterables.getLast(list, "default");
  3. Java 8 Stream API

    1
    2
    3
    4
    5
    6
    7
    8
    9
    List<String> list = Arrays.asList("a", "b", "c");

    // 方法1:使用Stream
    Optional<String> last = list.stream().reduce((first, second) -> second);

    // 方法2:使用skip
    Optional<String> last = list.stream()
    .skip(Math.max(0, list.size() - 1))
    .findFirst();
  4. 最佳实践建议

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 推荐使用Guava的方式,代码最简洁且自带空值处理
    public String getLastElement(List<String> list) {
    return Iterables.getLast(list, null);
    }

    // 如果需要返回Optional
    public Optional<String> getLastElementSafe(List<String> list) {
    return Optional.ofNullable(Iterables.getLast(list, null));
    }
  5. 注意事项

  • 所有方法在list为空时的处理:
    • Apache Commons:抛出IndexOutOfBoundsException
    • Guava:如果没有默认值会抛出NoSuchElementException
    • Stream:返回Optional.empty()
  • 建议先判断list是否为空
  • 考虑是否需要返回Optional

EasyExcel写入Excel函数

  1. 基本实现方案(1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    @Test
    public void writeWithFormula() {
    String fileName = "测试公式.xlsx";

    try (ExcelWriter excelWriter = EasyExcel.write(fileName, DemoData.class).build()) {
    WriteSheet writeSheet = EasyExcel.writerSheet("模板").build();

    // 1. 写入正常数据
    List<DemoData> dataList = getData();
    excelWriter.write(dataList, writeSheet);

    // 2. 写入公式行
    // 假设金额在第3列,数据从第2行开始(第1行是表头)
    int lastRow = dataList.size() + 1; // 最后一行的位置
    List<List<Object>> formulaRowList = new ArrayList<>();
    List<Object> formulaRow = new ArrayList<>();
    formulaRow.add("合计"); // 第1列
    formulaRow.add(null); // 第2列
    // 第3列写入SUM函数,计算从第2行到最后一行的和
    formulaRow.add(new FormulaData("SUM(C2:C" + lastRow + ")"));
    formulaRowList.add(formulaRow);

    excelWriter.write(formulaRowList, writeSheet);
    }
    }

    // FormulaData类用于包装公式
    @Data
    public class FormulaData {
    private String formula;

    public FormulaData(String formula) {
    this.formula = formula;
    }
    }
  2. 使用CellWriteHandler实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class FormulaCellWriteHandler implements CellWriteHandler {
    @Override
    public void afterCellDispose(CellWriteHandlerContext context) {
    Cell cell = context.getCell();
    // 判断是否是最后一行的第三列
    if (cell.getRowIndex() == context.getWriteSheetHolder().getSheetNo()
    && cell.getColumnIndex() == 2) {
    // 设置公式
    cell.setCellFormula("SUM(C2:C" + (cell.getRowIndex()) + ")");
    }
    }
    }
  3. 支持的常用Excel函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 求和函数
    "SUM(C2:C10)"

    // 平均值函数
    "AVERAGE(C2:C10)"

    // 最大值函数
    "MAX(C2:C10)"

    // 最小值函数
    "MIN(C2:C10)"

    // 计数函数
    "COUNT(C2:C10)"

    // IF条件函数
    "IF(C2>100,\"高\",\"低\")"
  4. 注意事项

  • 函数单元格需要使用FormulaData包装
  • 函数引用的单元格范围要准确
  • 注意行列的索引从0开始
  • 可以使用相对引用和绝对引用
  • 复杂函数建议使用CellWriteHandler实现

EasyExcel写入公式完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Test
public void writeWithFormula() {
String fileName = "测试公式.xlsx";

// 关键点:在这里注册CellWriteHandler
try (ExcelWriter excelWriter = EasyExcel.write(fileName, DemoData.class)
.registerWriteHandler(new FormulaCellWriteHandler()) // 注册处理器
.build()) {

WriteSheet writeSheet = EasyExcel.writerSheet("模板").build();

// 写入数据
List<DemoData> dataList = getData();
excelWriter.write(dataList, writeSheet);
}
}

// 处理器实现
public class FormulaCellWriteHandler implements CellWriteHandler {
@Override
public void afterCellDispose(CellWriteHandlerContext context) {
Cell cell = context.getCell();
// 判断是否是最后一行的第三列
if (cell.getRowIndex() == context.getWriteSheetHolder().getSheetNo()
&& cell.getColumnIndex() == 2) {
// 设置公式
cell.setCellFormula("SUM(C2:C" + (cell.getRowIndex()) + ")");
}
}
}

// 数据对象
@Data
public class DemoData {
private String name;
private String date;
private BigDecimal amount;
}

关键点说明:

  1. 使用.registerWriteHandler()注册处理器
  2. 处理器会在每个单元格写入时被调用
  3. 不需要单独写入最后一行,处理器会自动处理
  4. 相比之前的方案更加简洁和自动化

CellWriteHandlerContext上下文信息获取

  1. 获取总行数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class FormulaCellWriteHandler implements CellWriteHandler {
    @Override
    public void afterCellDispose(CellWriteHandlerContext context) {
    // 获取当前sheet
    WriteSheetHolder writeSheetHolder = context.getWriteSheetHolder();
    // 获取当前sheet的总行数
    int totalRows = writeSheetHolder.getSheet().getLastRowNum();

    Cell cell = context.getCell();
    // 判断是否是最后一行的第三列(C列)
    if (cell.getRowIndex() == totalRows && cell.getColumnIndex() == 2) {
    // 设置求和公式,从第2行开始到最后一行
    cell.setCellFormula("SUM(C2:C" + totalRows + ")");
    }
    }
    }
  2. 上下文中可获取的信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 行相关
    context.getRow(); // 当前行
    context.getRowIndex(); // 当前行号
    context.getRelativeRowIndex(); // 相对行号(不包括表头)

    // 列相关
    context.getCell(); // 当前单元格
    context.getColumnIndex(); // 当前列号
    context.getHead(); // 表头信息

    // Sheet相关
    context.getWriteSheetHolder(); // Sheet相关信息
    context.getWriteTableHolder(); // Table相关信息

    // 工作簿相关
    context.getWriteWorkbookHolder(); // 工作簿相关信息
  3. 实际应用示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class FormulaCellWriteHandler implements CellWriteHandler {
    @Override
    public void afterCellDispose(CellWriteHandlerContext context) {
    WriteSheetHolder writeSheetHolder = context.getWriteSheetHolder();
    Sheet sheet = writeSheetHolder.getSheet();

    // 当前行号
    int currentRowIndex = context.getRowIndex();
    // 总行数
    int totalRows = sheet.getLastRowNum();
    // 当前列号
    int columnIndex = context.getColumnIndex();

    // 判断是否是最后一行的金额列
    if (currentRowIndex == totalRows && columnIndex == 2) {
    Cell cell = context.getCell();
    // 设置公式
    cell.setCellFormula("SUM(C2:C" + totalRows + ")");

    // 设置样式
    CellStyle style = context.getWriteWorkbookHolder().getWorkbook().createCellStyle();
    style.setFillForegroundColor(IndexedColors.GREY_25_PERCENT.getIndex());
    style.setFillPattern(FillPatternType.SOLID_FOREGROUND);
    cell.setCellStyle(style);
    }
    }
    }
  4. 注意事项

  • getLastRowNum()返回的是最后一行的索引(从0开始)
  • 需要考虑表头行的影响
  • 相对行号不包括表头行
  • 可以通过上下文获取完整的写入信息

EasyExcel导出并在最后插入汇总行

  1. 实现方案(1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Test
    public void writeWithLastRow() {
    String fileName = "测试导出.xlsx";

    // 使用ExcelWriter可以实现更灵活的写入
    try (ExcelWriter excelWriter = EasyExcel.write(fileName, DemoData.class).build()) {
    WriteSheet writeSheet = EasyExcel.writerSheet("sheet1").build();

    // 1. 先写入正常的数据列表
    List<DemoData> dataList = getData();
    excelWriter.write(dataList, writeSheet);

    // 2. 创建最后一行的汇总数据
    List<List<String>> lastRowList = new ArrayList<>();
    List<String> lastRow = new ArrayList<>();
    lastRow.add("合计"); // 第一列
    lastRow.add("100"); // 第二列
    lastRow.add("200"); // 第三列
    lastRowList.add(lastRow);

    // 3. 写入最后一行
    excelWriter.write(lastRowList, writeSheet);
    }
    }
  2. 进阶实现:添加样式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class LastRowWriteHandler implements RowWriteHandler {
    private int totalRows;

    public LastRowWriteHandler(int totalRows) {
    this.totalRows = totalRows;
    }

    @Override
    public void afterRowDispose(RowWriteHandlerContext context) {
    // 判断是否是最后一行
    if (context.getRow().getRowNum() == totalRows) {
    Row row = context.getRow();
    CellStyle style = context.getWriteWorkbookHolder().getWorkbook().createCellStyle();
    // 设置背景色为灰色
    style.setFillForegroundColor(IndexedColors.GREY_25_PERCENT.getIndex());
    style.setFillPattern(FillPatternType.SOLID_FOREGROUND);
    // 设置字体加粗
    Font font = context.getWriteWorkbookHolder().getWorkbook().createFont();
    font.setBold(true);
    style.setFont(font);

    // 为最后一行的每个单元格设置样式
    for (Cell cell : row) {
    cell.setCellStyle(style);
    }
    }
    }
    }
  3. 完整示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Test
    public void writeWithStyledLastRow() {
    String fileName = "测试导出.xlsx";
    List<DemoData> dataList = getData();

    try (ExcelWriter excelWriter = EasyExcel.write(fileName, DemoData.class)
    .registerWriteHandler(new LastRowWriteHandler(dataList.size() + 1)) // +1是因为有表头
    .build()) {
    WriteSheet writeSheet = EasyExcel.writerSheet("sheet1").build();

    // 写入数据
    excelWriter.write(dataList, writeSheet);

    // 写入汇总行
    List<List<String>> lastRowList = new ArrayList<>();
    List<String> lastRow = new ArrayList<>();
    lastRow.add("合计");
    lastRow.add(calculateTotal(dataList)); // 计算汇总值
    lastRowList.add(lastRow);

    excelWriter.write(lastRowList, writeSheet);
    }
    }

    // 计算汇总值的辅助方法
    private String calculateTotal(List<DemoData> dataList) {
    return dataList.stream()
    .map(DemoData::getAmount)
    .reduce(BigDecimal.ZERO, BigDecimal::add)
    .toString();
    }
  4. 注意事项

  • 需要使用ExcelWriter而不是简单的doWrite方法
  • 最后一行数据需要单独构造List<List>格式
  • 可以通过RowWriteHandler自定义最后一行样式
  • 注意关闭ExcelWriter资源
  • 如果有合并单元格需求,可以使用CellWriteHandler

Kafka Rebalance协议详解

  1. 核心角色

    1
    2
    3
    4
    5
    6
    7
    8
    9
    参与者:
    ┌─────────────────┐ ┌─────────────────┐
    │ Group │ │ Consumer │
    │ Coordinator │ │ Group │
    │ (协调者) │ │ (消费组) │
    └─────────────────┘ └─────────────────┘

    - Coordinator:负责管理消费组的组件,运行在Broker上
    - Consumer Group:消费者组的所有成员
  2. Join Group阶段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    步骤1:所有消费者向Coordinator发送JoinGroup请求
    ┌─────────┐ ┌──────────┐
    │Consumer1│─────┐ │ │
    └─────────┘ │ │ │
    ┌─────────┐ ├──>│Coordinator
    │Consumer2│─────┤ │ │
    └─────────┘ │ │ │
    ┌─────────┐ │ │ │
    │Consumer3│─────┘ │ │
    └─────────┘ └──────────┘

    步骤2:Coordinator选择一个消费者作为Leader
    ┌─────────┐ ┌──────────┐
    │Consumer1│◄────────│ │
    └─────────┘(Leader) │ │
    ┌─────────┐ │Coordinator
    │Consumer2│◄────────│ │
    └─────────┘ │ │
    ┌─────────┐ │ │
    │Consumer3│◄────────│ │
    └─────────┘ └──────────┘
  3. Sync Group阶段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    步骤1:Leader消费者制定分区分配方案
    ┌─────────┐
    │Consumer1│ 制定分配方案:
    └─────────┘ Consumer1 -> Partition[0,1]
    (Leader) Consumer2 -> Partition[2,3]
    Consumer3 -> Partition[4,5]

    步骤2:Leader将方案发送给Coordinator
    ┌─────────┐ ┌──────────┐
    │Consumer1│────────>│ │
    └─────────┘ │Coordinator
    │ │
    └──────────┘

    步骤3:Coordinator将方案下发给所有消费者
    ┌─────────┐ ┌──────────┐
    │Consumer1│◄────────│ │
    └─────────┘ │ │
    ┌─────────┐ │Coordinator
    │Consumer2│◄────────│ │
    └─────────┘ │ │
    ┌─────────┐ │ │
    │Consumer3│◄────────│ │
    └─────────┘ └──────────┘
  4. Heartbeat阶段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    所有消费者定期发送心跳
    ┌─────────┐ 心跳 ┌──────────┐
    │Consumer1│─────────>│ │
    └─────────┘ │ │
    ┌─────────┐ │Coordinator
    │Consumer2│─────────>│ │
    └─────────┘ │ │
    ┌─────────┐ │ │
    │Consumer3│─────────>│ │
    └─────────┘ └──────────┘

    心跳超时:
    - session.timeout.ms:心跳超时时间
    - heartbeat.interval.ms:心跳发送间隔
    - max.poll.interval.ms:消息处理超时时间
  5. 完整流程示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // 消费者配置
    Properties props = new Properties();
    props.put("group.id", "my-group");
    props.put("session.timeout.ms", "10000");
    props.put("heartbeat.interval.ms", "3000");

    // Rebalance监听器
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
    // 再均衡开始前
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 提交偏移量
    consumer.commitSync(currentOffsets);
    }

    // 再均衡完成后
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    // 初始化新分区的消费位置
    for (TopicPartition partition : partitions) {
    consumer.seek(partition, getLastOffset(partition));
    }
    }
    });

Kafka的ISR和OSR机制

  1. 基本概念

    1
    2
    3
    4
    5
    AR (Assigned Replicas): 所有副本
    ISR (In-Sync Replicas): 同步副本集合
    OSR (Out-of-Sync Replicas): 非同步副本集合

    关系:AR = ISR + OSR
  2. ISR(In-Sync Replicas)

    1
    2
    3
    4
    5
    6
    7
    8
    特点:
    - 与leader保持同步的follower集合
    - 包含leader副本自身
    - 动态调整:follower可能进入或退出ISR

    判断标准:
    1. replica.lag.time.max.ms内有同步请求
    2. 副本落后leader的消息数不超过replica.lag.max.messages
  3. OSR(Out-of-Sync Replicas)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    特点:
    - 与leader副本同步滞后的follower集合
    - 暂时无法参与副本选举
    - 会尝试追赶leader数据

    产生原因:
    1. 网络延迟或阻塞
    2. follower所在broker负载过高
    3. follower崩溃或重启
  4. 动态维护机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // 伪代码展示ISR维护逻辑
    class Partition {
    void checkISRUpdate() {
    // 1. 检查follower延迟
    for (Replica follower : AR) {
    if (inISR(follower)) {
    if (isLagging(follower)) {
    moveToOSR(follower);
    }
    } else {
    if (!isLagging(follower)) {
    moveToISR(follower);
    }
    }
    }

    // 2. 持久化ISR变更
    if (isrChanged) {
    updateZkIsrChange();
    }
    }
    }
  5. 与可用性的关系

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    acks配置影响:

    acks=all (-1):
    - 等待ISR中所有副本确认
    - 最高可靠性
    - 较低吞吐量

    acks=1:
    - 仅等待leader确认
    - 中等可靠性
    - 中等吞吐量

    acks=0:
    - 不等待确认
    - 最低可靠性
    - 最高吞吐量
  6. 监控指标

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    关键监控项:
    1. UnderReplicatedPartitions
    - 表示副本同步滞后的分区数
    - 反映系统健康状况

    2. IsrExpandsPerSec/IsrShrinksPerSec
    - ISR集合扩大/收缩的频率
    - 反映系统稳定性

    3. ReplicaMaxLag
    - follower落后leader的最大消息数
    - 反映副本同步状况

Kafka性能与Swap的关系

  1. Kafka的内存使用模型

    1
    2
    3
    4
    5
    6
    7
    Kafka Broker内存布局:
    ┌────────────────────┐
    │ JVM堆内存 │ <- 存储元数据、缓存等
    ├────────────────────┤
    │ 页面缓存 │ <- 消息数据缓存
    │ (Page Cache) │
    └────────────────────┘
  2. 为什么Kafka依赖页面缓存

  • 设计理念:
    • 利用操作系统的页面缓存而不是JVM堆
    • 避免数据在JVM堆和页面缓存的双重缓存
    • 减少GC压力
    • 提供快速的消息读写
  1. Swap对性能的影响

    1
    2
    3
    4
    5
    6
    7
    8
    当发生Swap时:
    1. 页面缓存被换出到磁盘
    内存页 -> Swap分区 (写磁盘)

    2. 需要数据时再换入内存
    Swap分区 -> 内存页 (读磁盘)

    结果:一次数据访问变成多次磁盘I/O
  2. 性能下降原因

  • 延迟增加

    • 正常:内存访问 (~100ns)
    • Swap:磁盘I/O (~10ms)
    • 性能差距:约10万倍
  • 吞吐量下降

    • 频繁的页面换入换出
    • 产生额外的I/O负载
    • 影响正常的消息读写
  1. 实际影响示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    场景:生产者写入10MB消息

    无Swap情况:
    1. 数据写入页面缓存 (内存操作)
    2. 异步刷盘 (后台I/O)
    总耗时:约10ms

    有Swap情况:
    1. 换出页面缓存 (I/O)
    2. 写入新数据 (内存操作)
    3. 可能需要换入其他数据 (I/O)
    4. 异步刷盘 (后台I/O)
    总耗时:可能超过100ms
  2. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 1. 设置低swappiness
    vm.swappiness=1

    # 2. 预留足够物理内存
    # 计算公式:
    需要内存 = JVM堆大小 +
    (分区数 × segment.bytes) × 25% +
    预留系统内存

    # 3. 监控Swap使用
    vmstat 1
    free -m
  3. 监控指标

  • si (swap in):换入数据量
  • so (swap out):换出数据量
  • 当这两个值大于0时,说明系统正在使用swap

Kafka Rebalance机制详解

  1. 触发Rebalance的场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1. 消费组成员变更
    - 新消费者加入消费组
    - 消费者主动离开消费组
    - 消费者崩溃离开消费组

    2. Topic分区数变更
    - 增加分区
    - 管理员手动分区重分配

    3. 订阅Topic数变更
    - 消费者订阅新Topic
    - 正则表达式订阅匹配新Topic
  2. Rebalance协议流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    第一阶段:Join Group
    ┌─────────┐ ┌─────────┐
    │Consumer1│ │Group │
    │Consumer2│ --> │Coordinator
    │Consumer3│ │ │
    └─────────┘ └─────────┘
    发送JoinGroup请求

    第二阶段:Sync Group
    ┌─────────┐ ┌─────────┐
    │Leader │ │Group │
    │Consumer │ --> │Coordinator
    │ │ │ │
    └─────────┘ └─────────┘
    制定分配方案

    第三阶段:Heart Beat
    定期发送心跳保持分配方案
  3. 分区分配策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 1. RangeAssignor (默认)
    public class RangeAssignor {
    // 按照分区序号范围划分
    // 例如:6个分区,2个消费者
    // consumer1: [0,1,2]
    // consumer2: [3,4,5]
    }

    // 2. RoundRobinAssignor
    public class RoundRobinAssignor {
    // 轮询分配
    // consumer1: [0,2,4]
    // consumer2: [1,3,5]
    }

    // 3. StickyAssignor
    public class StickyAssignor {
    // 粘性分配,尽量保持原有分配
    // 减少分区迁移
    }
  4. 性能影响

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Rebalance过程中:
    1. 消费者停止消费
    2. 释放分区所有权
    3. 重新分配分区
    4. 重新建立连接
    5. 从新位置开始消费

    影响:
    - 消费延迟增加
    - 消费者暂时不可用
    - 可能重复消费
  5. 优化建议

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 1. 合理配置超时时间
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);

    // 2. 选择合适的分配策略
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    StickyAssignor.class.getName());

    // 3. 优雅关闭消费者
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    consumer.wakeup();
    consumer.close();
    }));
  6. 监控指标

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    关键指标:
    1. Rebalance频率
    2. Rebalance持续时间
    3. 消费延迟变化
    4. 分区分配不均衡度

    告警阈值:
    - Rebalance频率 > 10次/小时
    - Rebalance时间 > 10秒
    - 消费延迟突增 > 1000条

长期激励系统的事件驱动架构设计

  1. 领域事件模型设计

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 1. 资格变更事件
    @Data
    public class QualificationChangedEvent {
    private String eventId; // 事件ID
    private String employeeId; // 员工ID
    private String planId; // 激励方案ID
    private QualificationStatus status; // 资格状态
    private LocalDateTime changeTime; // 变更时间
    private Map<String, Object> details;// 变更详情
    }

    // 2. 评议完成事件
    @Data
    public class ReviewCompletedEvent {
    private String eventId;
    private String employeeId;
    private String reviewId;
    private ReviewResult result;
    private List<ReviewComment> comments;
    }

    // 3. 合同签署事件
    @Data
    public class ContractSignedEvent {
    private String eventId;
    private String employeeId;
    private String contractId;
    private ContractStatus status;
    private LocalDateTime signTime;
    }
  2. Topic设计与分区策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    Topic设计:
    ├── incentive.qualification.changed.v1 # 资格变更事件
    ├── incentive.review.completed.v1 # 评议完成事件
    ├── incentive.contract.signed.v1 # 合同签署事件
    └── incentive.payment.initiated.v1 # 资金发放事件

    分区策略:
    - 按激励方案类型分区
    - 确保同一方案的事件顺序性
    - 支持并行处理不同方案
  3. 事件驱动流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    @Service
    public class QualificationService {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public void updateQualification(QualificationDTO dto) {
    // 1. 更新资格状态
    qualificationRepository.update(dto);

    // 2. 发送资格变更事件
    QualificationChangedEvent event = new QualificationChangedEvent();
    // ... 设置事件属性

    // 3. 使用方案ID作为key,确保分区顺序
    kafkaTemplate.send("incentive.qualification.changed.v1",
    dto.getPlanId(),
    event);
    }
    }

    @Service
    public class ReviewService {
    @KafkaListener(
    topics = "incentive.qualification.changed.v1",
    groupId = "review-service-group"
    )
    public void handleQualificationChanged(QualificationChangedEvent event) {
    // 1. 幂等性检查
    if (eventProcessed(event.getEventId())) {
    return;
    }

    // 2. 创建评议任务
    ReviewTask task = createReviewTask(event);

    // 3. 发送评议创建事件
    kafkaTemplate.send("incentive.review.created.v1",
    event.getPlanId(),
    new ReviewCreatedEvent(task));
    }
    }
  4. 事件驱动架构价值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    1. 业务解耦
    - 服务间通过事件异步通信
    - 降低系统耦合度
    - 便于服务独立扩展

    2. 数据一致性
    - 最终一致性保证
    - 通过事件溯源恢复状态
    - 完整的业务链路追踪

    3. 性能提升
    - 异步处理提高响应速度
    - 削峰填谷
    - 支持水平扩展

    4. 业务灵活性
    - 新增功能只需订阅事件
    - 便于实现业务监控
    - 支持事件回放和重试
  5. 可靠性保证

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Configuration
    public class KafkaConfig {
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> config = new HashMap<>();
    // 可靠性配置
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.RETRIES_CONFIG, 3);
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    return new DefaultKafkaProducerFactory<>(config);
    }
    }