Guohao Lu's Blog

个人技术博客

EasyExcel导出并在最后插入汇总行

  1. 实现方案(1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Test
    public void writeWithLastRow() {
    String fileName = "测试导出.xlsx";

    // 使用ExcelWriter可以实现更灵活的写入
    try (ExcelWriter excelWriter = EasyExcel.write(fileName, DemoData.class).build()) {
    WriteSheet writeSheet = EasyExcel.writerSheet("sheet1").build();

    // 1. 先写入正常的数据列表
    List<DemoData> dataList = getData();
    excelWriter.write(dataList, writeSheet);

    // 2. 创建最后一行的汇总数据
    List<List<String>> lastRowList = new ArrayList<>();
    List<String> lastRow = new ArrayList<>();
    lastRow.add("合计"); // 第一列
    lastRow.add("100"); // 第二列
    lastRow.add("200"); // 第三列
    lastRowList.add(lastRow);

    // 3. 写入最后一行
    excelWriter.write(lastRowList, writeSheet);
    }
    }
  2. 进阶实现:添加样式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class LastRowWriteHandler implements RowWriteHandler {
    private int totalRows;

    public LastRowWriteHandler(int totalRows) {
    this.totalRows = totalRows;
    }

    @Override
    public void afterRowDispose(RowWriteHandlerContext context) {
    // 判断是否是最后一行
    if (context.getRow().getRowNum() == totalRows) {
    Row row = context.getRow();
    CellStyle style = context.getWriteWorkbookHolder().getWorkbook().createCellStyle();
    // 设置背景色为灰色
    style.setFillForegroundColor(IndexedColors.GREY_25_PERCENT.getIndex());
    style.setFillPattern(FillPatternType.SOLID_FOREGROUND);
    // 设置字体加粗
    Font font = context.getWriteWorkbookHolder().getWorkbook().createFont();
    font.setBold(true);
    style.setFont(font);

    // 为最后一行的每个单元格设置样式
    for (Cell cell : row) {
    cell.setCellStyle(style);
    }
    }
    }
    }
  3. 完整示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    @Test
    public void writeWithStyledLastRow() {
    String fileName = "测试导出.xlsx";
    List<DemoData> dataList = getData();

    try (ExcelWriter excelWriter = EasyExcel.write(fileName, DemoData.class)
    .registerWriteHandler(new LastRowWriteHandler(dataList.size() + 1)) // +1是因为有表头
    .build()) {
    WriteSheet writeSheet = EasyExcel.writerSheet("sheet1").build();

    // 写入数据
    excelWriter.write(dataList, writeSheet);

    // 写入汇总行
    List<List<String>> lastRowList = new ArrayList<>();
    List<String> lastRow = new ArrayList<>();
    lastRow.add("合计");
    lastRow.add(calculateTotal(dataList)); // 计算汇总值
    lastRowList.add(lastRow);

    excelWriter.write(lastRowList, writeSheet);
    }
    }

    // 计算汇总值的辅助方法
    private String calculateTotal(List<DemoData> dataList) {
    return dataList.stream()
    .map(DemoData::getAmount)
    .reduce(BigDecimal.ZERO, BigDecimal::add)
    .toString();
    }
  4. 注意事项

  • 需要使用ExcelWriter而不是简单的doWrite方法
  • 最后一行数据需要单独构造List<List>格式
  • 可以通过RowWriteHandler自定义最后一行样式
  • 注意关闭ExcelWriter资源
  • 如果有合并单元格需求,可以使用CellWriteHandler

Kafka的ISR和OSR机制

  1. 基本概念

    1
    2
    3
    4
    5
    AR (Assigned Replicas): 所有副本
    ISR (In-Sync Replicas): 同步副本集合
    OSR (Out-of-Sync Replicas): 非同步副本集合

    关系:AR = ISR + OSR
  2. ISR(In-Sync Replicas)

    1
    2
    3
    4
    5
    6
    7
    8
    特点:
    - 与leader保持同步的follower集合
    - 包含leader副本自身
    - 动态调整:follower可能进入或退出ISR

    判断标准:
    1. replica.lag.time.max.ms内有同步请求
    2. 副本落后leader的消息数不超过replica.lag.max.messages
  3. OSR(Out-of-Sync Replicas)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    特点:
    - 与leader副本同步滞后的follower集合
    - 暂时无法参与副本选举
    - 会尝试追赶leader数据

    产生原因:
    1. 网络延迟或阻塞
    2. follower所在broker负载过高
    3. follower崩溃或重启
  4. 动态维护机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // 伪代码展示ISR维护逻辑
    class Partition {
    void checkISRUpdate() {
    // 1. 检查follower延迟
    for (Replica follower : AR) {
    if (inISR(follower)) {
    if (isLagging(follower)) {
    moveToOSR(follower);
    }
    } else {
    if (!isLagging(follower)) {
    moveToISR(follower);
    }
    }
    }

    // 2. 持久化ISR变更
    if (isrChanged) {
    updateZkIsrChange();
    }
    }
    }
  5. 与可用性的关系

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    acks配置影响:

    acks=all (-1):
    - 等待ISR中所有副本确认
    - 最高可靠性
    - 较低吞吐量

    acks=1:
    - 仅等待leader确认
    - 中等可靠性
    - 中等吞吐量

    acks=0:
    - 不等待确认
    - 最低可靠性
    - 最高吞吐量
  6. 监控指标

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    关键监控项:
    1. UnderReplicatedPartitions
    - 表示副本同步滞后的分区数
    - 反映系统健康状况

    2. IsrExpandsPerSec/IsrShrinksPerSec
    - ISR集合扩大/收缩的频率
    - 反映系统稳定性

    3. ReplicaMaxLag
    - follower落后leader的最大消息数
    - 反映副本同步状况

Kafka Rebalance协议详解

  1. 核心角色

    1
    2
    3
    4
    5
    6
    7
    8
    9
    参与者:
    ┌─────────────────┐ ┌─────────────────┐
    │ Group │ │ Consumer │
    │ Coordinator │ │ Group │
    │ (协调者) │ │ (消费组) │
    └─────────────────┘ └─────────────────┘

    - Coordinator:负责管理消费组的组件,运行在Broker上
    - Consumer Group:消费者组的所有成员
  2. Join Group阶段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    步骤1:所有消费者向Coordinator发送JoinGroup请求
    ┌─────────┐ ┌──────────┐
    │Consumer1│─────┐ │ │
    └─────────┘ │ │ │
    ┌─────────┐ ├──>│Coordinator
    │Consumer2│─────┤ │ │
    └─────────┘ │ │ │
    ┌─────────┐ │ │ │
    │Consumer3│─────┘ │ │
    └─────────┘ └──────────┘

    步骤2:Coordinator选择一个消费者作为Leader
    ┌─────────┐ ┌──────────┐
    │Consumer1│◄────────│ │
    └─────────┘(Leader) │ │
    ┌─────────┐ │Coordinator
    │Consumer2│◄────────│ │
    └─────────┘ │ │
    ┌─────────┐ │ │
    │Consumer3│◄────────│ │
    └─────────┘ └──────────┘
  3. Sync Group阶段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    步骤1:Leader消费者制定分区分配方案
    ┌─────────┐
    │Consumer1│ 制定分配方案:
    └─────────┘ Consumer1 -> Partition[0,1]
    (Leader) Consumer2 -> Partition[2,3]
    Consumer3 -> Partition[4,5]

    步骤2:Leader将方案发送给Coordinator
    ┌─────────┐ ┌──────────┐
    │Consumer1│────────>│ │
    └─────────┘ │Coordinator
    │ │
    └──────────┘

    步骤3:Coordinator将方案下发给所有消费者
    ┌─────────┐ ┌──────────┐
    │Consumer1│◄────────│ │
    └─────────┘ │ │
    ┌─────────┐ │Coordinator
    │Consumer2│◄────────│ │
    └─────────┘ │ │
    ┌─────────┐ │ │
    │Consumer3│◄────────│ │
    └─────────┘ └──────────┘
  4. Heartbeat阶段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    所有消费者定期发送心跳
    ┌─────────┐ 心跳 ┌──────────┐
    │Consumer1│─────────>│ │
    └─────────┘ │ │
    ┌─────────┐ │Coordinator
    │Consumer2│─────────>│ │
    └─────────┘ │ │
    ┌─────────┐ │ │
    │Consumer3│─────────>│ │
    └─────────┘ └──────────┘

    心跳超时:
    - session.timeout.ms:心跳超时时间
    - heartbeat.interval.ms:心跳发送间隔
    - max.poll.interval.ms:消息处理超时时间
  5. 完整流程示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // 消费者配置
    Properties props = new Properties();
    props.put("group.id", "my-group");
    props.put("session.timeout.ms", "10000");
    props.put("heartbeat.interval.ms", "3000");

    // Rebalance监听器
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
    // 再均衡开始前
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 提交偏移量
    consumer.commitSync(currentOffsets);
    }

    // 再均衡完成后
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    // 初始化新分区的消费位置
    for (TopicPartition partition : partitions) {
    consumer.seek(partition, getLastOffset(partition));
    }
    }
    });

Kafka Rebalance机制详解

  1. 触发Rebalance的场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1. 消费组成员变更
    - 新消费者加入消费组
    - 消费者主动离开消费组
    - 消费者崩溃离开消费组

    2. Topic分区数变更
    - 增加分区
    - 管理员手动分区重分配

    3. 订阅Topic数变更
    - 消费者订阅新Topic
    - 正则表达式订阅匹配新Topic
  2. Rebalance协议流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    第一阶段:Join Group
    ┌─────────┐ ┌─────────┐
    │Consumer1│ │Group │
    │Consumer2│ --> │Coordinator
    │Consumer3│ │ │
    └─────────┘ └─────────┘
    发送JoinGroup请求

    第二阶段:Sync Group
    ┌─────────┐ ┌─────────┐
    │Leader │ │Group │
    │Consumer │ --> │Coordinator
    │ │ │ │
    └─────────┘ └─────────┘
    制定分配方案

    第三阶段:Heart Beat
    定期发送心跳保持分配方案
  3. 分区分配策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // 1. RangeAssignor (默认)
    public class RangeAssignor {
    // 按照分区序号范围划分
    // 例如:6个分区,2个消费者
    // consumer1: [0,1,2]
    // consumer2: [3,4,5]
    }

    // 2. RoundRobinAssignor
    public class RoundRobinAssignor {
    // 轮询分配
    // consumer1: [0,2,4]
    // consumer2: [1,3,5]
    }

    // 3. StickyAssignor
    public class StickyAssignor {
    // 粘性分配,尽量保持原有分配
    // 减少分区迁移
    }
  4. 性能影响

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Rebalance过程中:
    1. 消费者停止消费
    2. 释放分区所有权
    3. 重新分配分区
    4. 重新建立连接
    5. 从新位置开始消费

    影响:
    - 消费延迟增加
    - 消费者暂时不可用
    - 可能重复消费
  5. 优化建议

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    // 1. 合理配置超时时间
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);

    // 2. 选择合适的分配策略
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    StickyAssignor.class.getName());

    // 3. 优雅关闭消费者
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    consumer.wakeup();
    consumer.close();
    }));
  6. 监控指标

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    关键指标:
    1. Rebalance频率
    2. Rebalance持续时间
    3. 消费延迟变化
    4. 分区分配不均衡度

    告警阈值:
    - Rebalance频率 > 10次/小时
    - Rebalance时间 > 10秒
    - 消费延迟突增 > 1000条

Kafka性能与Swap的关系

  1. Kafka的内存使用模型

    1
    2
    3
    4
    5
    6
    7
    Kafka Broker内存布局:
    ┌────────────────────┐
    │ JVM堆内存 │ <- 存储元数据、缓存等
    ├────────────────────┤
    │ 页面缓存 │ <- 消息数据缓存
    │ (Page Cache) │
    └────────────────────┘
  2. 为什么Kafka依赖页面缓存

  • 设计理念:
    • 利用操作系统的页面缓存而不是JVM堆
    • 避免数据在JVM堆和页面缓存的双重缓存
    • 减少GC压力
    • 提供快速的消息读写
  1. Swap对性能的影响

    1
    2
    3
    4
    5
    6
    7
    8
    当发生Swap时:
    1. 页面缓存被换出到磁盘
    内存页 -> Swap分区 (写磁盘)

    2. 需要数据时再换入内存
    Swap分区 -> 内存页 (读磁盘)

    结果:一次数据访问变成多次磁盘I/O
  2. 性能下降原因

  • 延迟增加

    • 正常:内存访问 (~100ns)
    • Swap:磁盘I/O (~10ms)
    • 性能差距:约10万倍
  • 吞吐量下降

    • 频繁的页面换入换出
    • 产生额外的I/O负载
    • 影响正常的消息读写
  1. 实际影响示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    场景:生产者写入10MB消息

    无Swap情况:
    1. 数据写入页面缓存 (内存操作)
    2. 异步刷盘 (后台I/O)
    总耗时:约10ms

    有Swap情况:
    1. 换出页面缓存 (I/O)
    2. 写入新数据 (内存操作)
    3. 可能需要换入其他数据 (I/O)
    4. 异步刷盘 (后台I/O)
    总耗时:可能超过100ms
  2. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 1. 设置低swappiness
    vm.swappiness=1

    # 2. 预留足够物理内存
    # 计算公式:
    需要内存 = JVM堆大小 +
    (分区数 × segment.bytes) × 25% +
    预留系统内存

    # 3. 监控Swap使用
    vmstat 1
    free -m
  3. 监控指标

  • si (swap in):换入数据量
  • so (swap out):换出数据量
  • 当这两个值大于0时,说明系统正在使用swap

长期激励系统的事件驱动架构设计

  1. 领域事件模型设计

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 1. 资格变更事件
    @Data
    public class QualificationChangedEvent {
    private String eventId; // 事件ID
    private String employeeId; // 员工ID
    private String planId; // 激励方案ID
    private QualificationStatus status; // 资格状态
    private LocalDateTime changeTime; // 变更时间
    private Map<String, Object> details;// 变更详情
    }

    // 2. 评议完成事件
    @Data
    public class ReviewCompletedEvent {
    private String eventId;
    private String employeeId;
    private String reviewId;
    private ReviewResult result;
    private List<ReviewComment> comments;
    }

    // 3. 合同签署事件
    @Data
    public class ContractSignedEvent {
    private String eventId;
    private String employeeId;
    private String contractId;
    private ContractStatus status;
    private LocalDateTime signTime;
    }
  2. Topic设计与分区策略

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    Topic设计:
    ├── incentive.qualification.changed.v1 # 资格变更事件
    ├── incentive.review.completed.v1 # 评议完成事件
    ├── incentive.contract.signed.v1 # 合同签署事件
    └── incentive.payment.initiated.v1 # 资金发放事件

    分区策略:
    - 按激励方案类型分区
    - 确保同一方案的事件顺序性
    - 支持并行处理不同方案
  3. 事件驱动流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    @Service
    public class QualificationService {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public void updateQualification(QualificationDTO dto) {
    // 1. 更新资格状态
    qualificationRepository.update(dto);

    // 2. 发送资格变更事件
    QualificationChangedEvent event = new QualificationChangedEvent();
    // ... 设置事件属性

    // 3. 使用方案ID作为key,确保分区顺序
    kafkaTemplate.send("incentive.qualification.changed.v1",
    dto.getPlanId(),
    event);
    }
    }

    @Service
    public class ReviewService {
    @KafkaListener(
    topics = "incentive.qualification.changed.v1",
    groupId = "review-service-group"
    )
    public void handleQualificationChanged(QualificationChangedEvent event) {
    // 1. 幂等性检查
    if (eventProcessed(event.getEventId())) {
    return;
    }

    // 2. 创建评议任务
    ReviewTask task = createReviewTask(event);

    // 3. 发送评议创建事件
    kafkaTemplate.send("incentive.review.created.v1",
    event.getPlanId(),
    new ReviewCreatedEvent(task));
    }
    }
  4. 事件驱动架构价值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    1. 业务解耦
    - 服务间通过事件异步通信
    - 降低系统耦合度
    - 便于服务独立扩展

    2. 数据一致性
    - 最终一致性保证
    - 通过事件溯源恢复状态
    - 完整的业务链路追踪

    3. 性能提升
    - 异步处理提高响应速度
    - 削峰填谷
    - 支持水平扩展

    4. 业务灵活性
    - 新增功能只需订阅事件
    - 便于实现业务监控
    - 支持事件回放和重试
  5. 可靠性保证

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Configuration
    public class KafkaConfig {
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
    Map<String, Object> config = new HashMap<>();
    // 可靠性配置
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.RETRIES_CONFIG, 3);
    config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    return new DefaultKafkaProducerFactory<>(config);
    }
    }

Kafka内存架构与页面缓存关系

  1. Linux内存整体架构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    物理内存布局:
    ┌────────────────────┐
    │ 用户空间 │ <- 应用程序、JVM堆
    │ (User Space) │ 进程私有内存
    ├────────────────────┤
    │ 内核空间 │ <- 内核代码、数据结构
    │ (Kernel Space) │ 共享内存区域
    │ ├─────────────┐ │
    │ │ 页面缓存 │ │ <- 磁盘文件缓存
    │ │(Page Cache) │ │ 所有进程共享
    │ └─────────────┘ │
    └────────────────────┘

    ┌────────────────────┐
    │ Swap空间 │ <- 虚拟内存,磁盘上的交换分区
    └────────────────────┘
  2. Kafka的内存使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Kafka Broker内存分布:
    ┌────────────────────┐
    │ JVM堆(6-8G) │ <- 用户空间
    │ - 消息元数据 │ broker进程私有
    │ - 消费者元数据 │
    │ - 副本状态等 │
    ├────────────────────┤
    │ 页面缓存 │ <- 内核空间
    │ - 消息数据 │ 所有进程共享
    │ - 索引文件 │ 由操作系统管理
    └────────────────────┘
  3. 页面缓存的作用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Kafka使用页面缓存:
    1. 读操作
    磁盘 -> 页面缓存 -> 网络接口(零拷贝)

    2. 写操作
    网络 -> 页面缓存 -> 异步刷盘

    优势:
    - 避免JVM GC压力
    - 利用操作系统的缓存机制
    - 支持零拷贝技术
    - 多进程共享缓存数据
  4. Swap与页面缓存的区别

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    页面缓存:
    - 缓存磁盘文件数据
    - 位于内核空间
    - 所有进程共享
    - 命中时直接返回数据
    - 未命中时从磁盘读取

    Swap空间:
    - 存储内存页面数据
    - 位于磁盘上
    - 进程私有
    - 访问时需要换入内存
    - 会导致性能下降
  5. 内存压力场景

    1
    2
    3
    4
    5
    6
    7
    8
    当内存不足时的处理顺序:
    1. 释放页面缓存
    2. 回收可回收内存
    3. 使用Swap空间

    对Kafka的影响:
    - 页面缓存被释放:性能下降但可接受
    - 发生Swap:严重性能问题
  6. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    内存配置建议:
    1. 预留足够页面缓存
    - 建议为总内存的50%
    - 例:16G内存预留8G页面缓存

    2. 控制JVM堆大小
    - 建议6-8G
    - 避免过大堆影响页面缓存

    3. 最小化Swap使用
    - 设置vm.swappiness=1
    - Swap大小控制在1-2G

Kafka的MMAP(Memory Mapped Files)实现

  1. 什么是MMAP

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    内存映射文件原理:
    ┌─────────────────┐
    │ 用户进程 │
    │ ┌───────────┐ │
    │ │映射的内存区域│ │ 映射
    └─│───────────│──┘ ←──────────┐
    └─────┬─────┘ │
    │ │
    ┌───────┼───────────────────┼─┐
    │ 页面缓存 │ │
    │ ┌─────────────┐ │ │
    │ │ 文件数据 │ ←───────┘ │
    │ └─────────────┘ │
    └─────────────────────────────┘
    内核空间
  2. Kafka中的使用场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // Kafka源码中的应用
    public class FileRecords extends AbstractRecords {
    // 索引文件映射
    private final MappedByteBuffer mmap;

    public FileRecords(File file, boolean mmap) throws IOException {
    if (mmap) {
    // 创建内存映射
    this.mmap = Utils.mmap(file, true);
    }
    }
    }

    主要用途:
    1. 索引文件访问
    - offset索引
    - timestamp索引
    - leader epoch索引

    2. 小分区的消息日志
    - 默认配置:segment.bytes=1GB
    - 可配置是否使用mmap
  3. 映射过程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 简化的映射过程
    public class MappedByteBuffer {

    // 1. 打开文件通道
    FileChannel channel = FileChannel.open(path);

    // 2. 创建映射
    MappedByteBuffer buffer = channel.map(
    FileChannel.MapMode.READ_WRITE, // 映射模式
    0, // 起始位置
    file.length() // 映射长度
    );

    // 3. 直接操作内存
    buffer.putInt(1); // 写入数据
    int value = buffer.getInt(); // 读取数据
    }
  4. 优缺点分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    优点:
    1. 零拷贝读写
    - 避免内核空间到用户空间的拷贝
    - 减少上下文切换

    2. 页面自动管理
    - 由操作系统管理页面调度
    - 支持预读和回写

    缺点:
    1. 内存占用
    - 映射文件会占用虚拟内存
    - 大文件映射需要注意内存限制

    2. 页面错误
    - 首次访问会触发页面错误
    - 可能导致延迟波动
  5. 配置与调优

    1
    2
    3
    4
    5
    6
    7
    8
    # broker配置
    # 是否使用mmap
    log.segment.bytes=1073741824
    file.mmap.enable=true

    # 系统配置
    # 最大映射区域
    vm.max_map_count=262144
  6. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    使用建议:
    1. 小文件优先使用mmap
    - 索引文件
    - 小分区日志

    2. 大文件使用直接I/O
    - 大分区日志
    - 避免内存压力

    3. 监控指标
    - 页面错误率
    - 内存映射区域数量
    - 虚拟内存使用率

Kafka服务器内存配置最佳实践

  1. 16G内存的最佳分配

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    建议分配:
    ┌────────────────────┐
    │ 系统预留: 2G │ <- 操作系统和其他进程使用
    ├────────────────────┤
    │ Kafka堆内存: 6G │ <- broker的JVM堆内存
    ├────────────────────┤
    │ 页面缓存: 8G │ <- 用于消息数据缓存
    └────────────────────┘

    Swap建议:
    - 默认4G -> 调整为1-2G
    - 设置vm.swappiness=1
  2. 为什么要减少Swap

    1
    2
    3
    4
    5
    6
    7
    原因:
    1. Kafka性能严重依赖于页面缓存
    2. 一旦发生Swap:
    - 页面缓存被换出到磁盘
    - 数据访问延迟从ns级变为ms级
    - 吞吐量显著下降
    - 可能引发消息积压
  3. 性能对比

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    场景:生产者写入1GB数据

    正常配置(小Swap):
    - 写入延迟:~10ms
    - 吞吐量:~100MB/s
    - 页面缓存命中率:95%

    大Swap配置:
    - 写入延迟:可能超过100ms
    - 吞吐量:可能降至10MB/s
    - 频繁的页面换入换出
  4. 配置建议

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 1. 调整Swap大小
    sudo swapoff -a
    sudo dd if=/dev/zero of=/swapfile bs=1G count=2
    sudo mkswap /swapfile
    sudo swapon /swapfile

    # 2. 修改swappiness
    echo "vm.swappiness=1" >> /etc/sysctl.conf
    sysctl -p

    # 3. 设置Kafka堆内存
    export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
  5. 监控指标

    1
    2
    3
    4
    5
    6
    7
    8
    # 监控Swap使用
    vmstat 1
    free -m

    关键指标:
    - si (swap in) 应接近0
    - so (swap out) 应接近0
    - swap used 应保持稳定
  6. 风险防范

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1. 内存使用监控
    - 设置内存使用率告警
    - 阈值建议:80%

    2. Swap使用监控
    - 设置Swap使用告警
    - 一旦发生Swap及时处理

    3. 性能指标监控
    - 生产者延迟
    - 消费者延迟
    - 页面缓存命中率

Kafka分区分配规则详解

  1. 分配的基本原则

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    关键规则:
    1. 消费者只能消费其订阅的主题
    2. 一个分区只能分配给同一个消费组中的一个消费者
    3. 分配是以<主题,分区>为单位进行的

    示例:
    Consumer1订阅:topic2
    Consumer2订阅:topic1
    ┌─────────┐ ┌─────────┐
    │Topic1 │ │Consumer1│
    │Partition1│ ╳ │(topic2)│ ╳ 不会分配
    └─────────┘ └─────────┘

    ┌─────────┐ ┌─────────┐
    │Topic1 │ │Consumer2│
    │Partition1│ → │(topic1)│ ✓ 会分配
    └─────────┘ └─────────┘
  2. 订阅模式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 1. 直接订阅指定主题
    consumer.subscribe(Arrays.asList("topic1", "topic2"));

    // 2. 正则表达式订阅
    consumer.subscribe(Pattern.compile("topic.*"));

    // 3. 手动分配分区
    consumer.assign(Arrays.asList(
    new TopicPartition("topic1", 0),
    new TopicPartition("topic1", 1)
    ));
  3. 分配过程示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    场景:
    - Topic1: 3个分区
    - Topic2: 2个分区
    - Consumer1: 订阅Topic2
    - Consumer2: 订阅Topic1
    - Consumer3: 订阅Topic1, Topic2

    最终分配:
    Consumer1 <- Topic2-P0, Topic2-P1
    Consumer2 <- Topic1-P0, Topic1-P1, Topic1-P2
    Consumer3 <- Topic2-P0, Topic2-P1, Topic1-P0, Topic1-P1, Topic1-P2

    注意:Consumer3因为订阅了两个主题,
    所以可能同时获得两个主题的分区
  4. 验证代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class KafkaSubscriptionTest {
    public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    // Consumer1只订阅Topic2
    KafkaConsumer<String, String> consumer1 =
    new KafkaConsumer<>(props);
    consumer1.subscribe(Arrays.asList("topic2"));

    // 获取分配的分区
    Set<TopicPartition> assignment = consumer1.assignment();
    for (TopicPartition partition : assignment) {
    // 只会看到Topic2的分区
    System.out.println(partition.topic() +
    "-" + partition.partition());
    }
    }
    }