Guohao Lu's Blog

个人技术博客

Rebalance后消费位置重置的问题

  1. 可能带来的问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    场景示例:
    Consumer1原本消费Partition0:
    ┌───────────────────────────┐
    │ Partition 0 │
    │ offset: 1000000 -> 1500000│
    └───────────────────────────┘

    Rebalance后分配给Consumer2:
    1. 需要初始化消费位置
    2. 可能需要建立新的TCP连接
    3. 重新填充消费者缓存
    4. 可能导致重复消费
  2. 性能影响

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    主要开销:
    1. 状态重建
    - 重新加载消费位置
    - 重建内部缓存

    2. 网络开销
    - 建立新的TCP连接
    - 首次拉取数据

    3. 消费延迟
    - 处理暂停
    - 消息积压增加
  3. 优化方案

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    // 1. 使用StickyAssignor策略
    properties.put(
    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    "org.apache.kafka.clients.consumer.StickyAssignor"
    );

    // 2. 合理配置缓存大小
    properties.put(
    ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
    "5242880" // 5MB
    );

    // 3. 实现优雅的偏移量管理
    class MyConsumer {
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets
    = new ConcurrentHashMap<>();

    @KafkaListener(topics = "my-topic")
    public void consume(ConsumerRecord<String, String> record) {
    // 处理消息
    processMessage(record);

    // 记录偏移量
    currentOffsets.put(
    new TopicPartition(record.topic(), record.partition()),
    new OffsetAndMetadata(record.offset() + 1)
    );
    }

    // Rebalance监听器
    class SaveOffsetOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // Rebalance前提交偏移量
    consumer.commitSync(currentOffsets);
    }
    }
    }
  4. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    1. 减少Rebalance频率
    - 合理设置session.timeout.ms
    - 避免频繁重启消费者

    2. 优化消费者配置
    - 使用StickyAssignor
    - 合理设置fetch.max.bytes
    - 启用消费者缓存

    3. 监控指标
    - 消费延迟
    - Rebalance频率
    - 处理时间
  5. 新版本优化(Kafka 2.4+)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    增量式Rebalance:
    1. 只对变更的分区进行重分配
    2. 未变更的分区继续消费
    3. 显著减少重平衡影响

    Cooperative Rebalancing:
    1. 两阶段提交过程
    2. 允许消费者继续处理未撤销的分区
    3. 减少消费中断时间

Kafka消息消费失败处理方案

  1. 重试机制设计

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    @Configuration
    public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    // 禁用自动提交
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    // 从最早的消息开始消费
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<>(props);
    }
    }

    @Service
    public class ReviewService {
    @KafkaListener(
    topics = "incentive.qualification.changed.v1",
    groupId = "review-service-group"
    )
    @RetryableKafkaHandler(
    attempts = "3",
    backoff = @Backoff(delay = 1000, multiplier = 2)
    )
    public void handleQualificationChanged(
    @Payload QualificationChangedEvent event,
    Acknowledgment ack) {
    try {
    // 1. 业务处理
    processEvent(event);

    // 2. 确认消息
    ack.acknowledge();

    } catch (Exception e) {
    // 3. 重试次数达到上限,进入死信队列
    if (!isRetryable(e)) {
    sendToDLQ(event);
    ack.acknowledge();
    return;
    }
    throw e; // 触发重试
    }
    }
    }
  2. 死信队列处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // 1. 死信队列配置
    @Bean
    public TopicBuilder deadLetterTopic() {
    return TopicBuilder.name("incentive.dlq.v1")
    .partitions(3)
    .replicas(3)
    .build();
    }

    // 2. 死信消息处理服务
    @Service
    public class DeadLetterService {
    @KafkaListener(
    topics = "incentive.dlq.v1",
    groupId = "dlq-processor-group"
    )
    public void processDLQ(ConsumerRecord<String, String> record) {
    // 1. 记录错误日志
    log.error("Processing DLQ message: {}", record);

    // 2. 错误通知
    notifyOperators(record);

    // 3. 持久化到错误表
    saveToErrorTable(record);
    }

    // 4. 手动重试接口
    public void retryMessage(String messageId) {
    Message msg = errorRepository.find(messageId);
    kafkaTemplate.send(msg.getOriginalTopic(),
    msg.getKey(),
    msg.getPayload());
    }
    }
  3. 错误数据持久化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @Entity
    @Table(name = "kafka_error_messages")
    public class ErrorMessage {
    @Id
    private String messageId;
    private String topic;
    private String key;
    private String payload;
    private String errorMsg;
    private int retryCount;
    private LocalDateTime createTime;
    private MessageStatus status;
    }

    public enum MessageStatus {
    FAILED, // 失败待处理
    RETRYING, // 重试中
    RESOLVED // 已解决
    }
  4. 监控告警机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Component
    public class KafkaMonitor {
    // 1. 消费延迟监控
    @Scheduled(fixedRate = 60000)
    public void checkLag() {
    Map<TopicPartition, Long> lags = getLags();
    if (isLagTooHigh(lags)) {
    alertService.send("消费延迟过高");
    }
    }

    // 2. 错误率监控
    @Scheduled(fixedRate = 60000)
    public void checkErrorRate() {
    double errorRate = getErrorRate();
    if (errorRate > threshold) {
    alertService.send("错误率过高");
    }
    }
    }
  5. 补偿机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @Service
    public class CompensationService {
    // 1. 定时补偿
    @Scheduled(cron = "0 0 */1 * * ?")
    public void compensate() {
    List<ErrorMessage> errors =
    errorRepository.findUnresolved();

    for (ErrorMessage error : errors) {
    // 根据业务类型选择补偿策略
    CompensationStrategy strategy =
    getStrategy(error.getTopic());
    strategy.compensate(error);
    }
    }

    // 2. 手动补偿接口
    public void manualCompensate(String messageId) {
    ErrorMessage error =
    errorRepository.find(messageId);
    retryMessage(error);
    }
    }

Kafka消费者组(Consumer Group)详解

  1. 基本概念

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    消费者组定义:
    - 多个消费者实例组成的逻辑分组
    - 共同消费一个或多个主题
    - 每个消费者组有唯一的group.id

    消费模型:
    ┌─────────────┐
    │ Topic-A │
    │ Partition 0 │────┐
    │ Partition 1 │──┐ │
    │ Partition 2 │┐ │ │ Consumer Group X
    └─────────────┘│ │ │ ┌─────────────────┐
    │ │ └──►│ Consumer 1 │
    │ └────►│ Consumer 2 │
    └──────►│ Consumer 3 │
    └─────────────────┘
  2. 关键特性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    1. 分区所有权
    - 一个分区只能被组内一个消费者消费
    - 一个消费者可以消费多个分区

    2. 负载均衡
    - 组内成员自动分配分区
    - 支持动态扩缩容

    3. 消费位置管理
    - 每个组独立维护消费位置
    - 支持从任意位置开始消费

    4. 故障转移
    - 自动检测消费者故障
    - 自动重新分配分区
  3. 消费者组状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    状态流转:
    Empty ──► PreparingRebalance ──► CompletingRebalance ──► Stable
    ▲ │
    └────────────────────────────────────────────────────────┘

    主要状态:
    1. Empty: 组内无成员
    2. PreparingRebalance: 准备开始重平衡
    3. CompletingRebalance: 完成分区分配
    4. Stable: 稳定状态,正常消费
  4. 配置示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 消费者组配置
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "my-group"); // 组ID
    props.put("enable.auto.commit", "true"); // 自动提交
    props.put("auto.commit.interval.ms", "1000"); // 提交间隔
    props.put("session.timeout.ms", "30000"); // 会话超时
    props.put("max.poll.interval.ms", "300000"); // 最大轮询间隔

    // 创建消费者
    KafkaConsumer<String, String> consumer =
    new KafkaConsumer<>(props);

    // 订阅主题
    consumer.subscribe(Arrays.asList("my-topic"));
  5. 消费示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public class GroupConsumerExample {
    public static void main(String[] args) {
    KafkaConsumer<String, String> consumer =
    new KafkaConsumer<>(props);

    try {
    while (true) {
    // 批量拉取消息
    ConsumerRecords<String, String> records =
    consumer.poll(Duration.ofMillis(100));

    // 处理消息
    for (ConsumerRecord<String, String> record : records) {
    processRecord(record);
    }

    // 异步提交位移
    consumer.commitAsync();
    }
    } finally {
    try {
    // 同步提交最后的位移
    consumer.commitSync();
    } finally {
    consumer.close();
    }
    }
    }
    }
  6. 常见使用场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1. 消息队列模式
    - 一个分区一个消费者
    - 组内负载均衡

    2. 发布订阅模式
    - 多个消费者组
    - 每个组都收到全量消息

    示例:
    Topic-A ──► Consumer Group 1 (日志处理)
    └──► Consumer Group 2 (数据分析)
    └──► Consumer Group 3 (监控告警)
  7. 监控指标

    1
    2
    3
    4
    5
    6
    关键指标:
    1. 消费延迟(lag)
    2. 消费速率
    3. 提交失败率
    4. 重平衡频率
    5. 活跃消费者数

消费者组内的分区分配详解

  1. 同组消费者分区分配

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    场景一:单主题多分区
    Topic-A (3个分区)
    ┌─────────────┐
    │ Partition 0 │────► Consumer1
    │ Partition 1 │────► Consumer2
    │ Partition 2 │────► Consumer1
    └─────────────┘

    Consumer Group X
    ├── Consumer1: 消费Partition 0,2
    └── Consumer2: 消费Partition 1

    特点:
    - 每个消费者确实消费不同分区
    - 但这些分区的数据合起来是主题的全量数据
    - 消费者组作为整体可以看到主题的所有数据
  2. 不同订阅的消费者能否在同组

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    场景二:订阅主题不同
    Consumer1订阅:Topic A,B,C
    Consumer2订阅:Topic A,B,C,D

    结论:不能在同一组!
    原因:
    1. 违反了消费者组的订阅原则
    2. 可能导致数据消费混乱
    3. Kafka会抛出异常

    正确做法:
    Consumer Group X
    ├── Consumer1: 订阅 A,B,C,D
    └── Consumer2: 订阅 A,B,C,D

    或者分成不同组:
    Group1: Consumer1 (A,B,C)
    Group2: Consumer2 (A,B,C,D)
  3. 实际案例说明

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 错误示例:同组不同订阅
    public class WrongGroupExample {
    public static void main(String[] args) {
    // Consumer1
    KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
    consumer1.subscribe(Arrays.asList("A", "B", "C"));

    // Consumer2 (同组但订阅不同)
    KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
    consumer2.subscribe(Arrays.asList("A", "B", "C", "D"));

    // 会导致异常!
    }
    }

    // 正确示例:同组相同订阅
    public class CorrectGroupExample {
    public static void main(String[] args) {
    // 所有消费者订阅相同的主题
    List<String> topics = Arrays.asList("A", "B", "C", "D");

    // Consumer1
    KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
    consumer1.subscribe(topics);

    // Consumer2
    KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
    consumer2.subscribe(topics);
    }
    }
  4. 消费者组的数据完整性

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    示例场景:
    Topic-A (4个分区)
    ┌─────────────┐
    │ Partition 0 │──┐
    │ Partition 1 │──┼─► Consumer Group X
    │ Partition 2 │──┤ ├── Consumer1
    │ Partition 3 │──┘ └── Consumer2
    └─────────────┘

    数据流:
    1. 生产者发送消息到各个分区
    2. 分区0,1 分配给Consumer1
    3. 分区2,3 分配给Consumer2
    4. 消费者组作为整体消费了所有数据
  5. 关键原则

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    1. 订阅原则
    - 同组消费者必须订阅相同主题
    - 违反会导致异常

    2. 分区分配
    - 分区是最小分配单位
    - 一个分区只能分配给一个消费者
    - 消费者组能看到所有数据

    3. 扩展性
    - 增加消费者可以提高并行度
    - 消费者数不应超过分区总数

Kafka页面缓存工作原理

  1. 什么是页面缓存(Page Cache)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    操作系统的内存分层:
    ┌────────────────────┐
    │ 用户空间 │ <- 应用程序(如JVM)使用
    ├────────────────────┤
    │ 页面缓存 │ <- 操作系统管理的磁盘数据缓存
    │ (Page Cache) │
    ├────────────────────┤
    │ 内核空间 │ <- 系统内核使用
    └────────────────────┘

    特点:
    - 由操作系统管理
    - 用于缓存磁盘数据
    - 采用LRU算法
    - 支持预读和回写
  2. 传统JVM缓存的问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    数据读取流程(双重缓存):
    磁盘 -> 页面缓存 -> JVM堆 -> 应用程序

    问题:
    1. 数据被复制两次:
    - 一次从磁盘到页面缓存
    - 一次从页面缓存到JVM堆

    2. 内存使用效率低:
    - 相同数据占用两份内存空间
    - JVM堆会触发GC
    - GC会导致停顿
  3. Kafka的页面缓存使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Kafka读取流程:
    磁盘 -> 页面缓存 -> 应用程序(零拷贝)

    实现方式:
    1. sendfile系统调用:
    - 直接从页面缓存传输到网络接口
    - 避免数据复制到JVM堆

    2. mmap内存映射:
    - 将文件映射到内存地址空间
    - 直接操作页面缓存
  4. 性能优势

    1
    2
    3
    4
    5
    6
    7
    8
    // 传统方式
    FileInputStream in = new FileInputStream("message.log");
    byte[] buffer = new byte[8192];
    in.read(buffer); // 数据复制到JVM堆

    // Kafka方式(零拷贝)
    FileChannel channel = new FileInputStream("message.log").getChannel();
    channel.transferTo(position, count, socketChannel); // 直接传输
  5. 具体实现机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // Kafka日志段读取代码
    public class FileRecords extends AbstractRecords {
    // 使用MappedByteBuffer直接操作页面缓存
    private final MappedByteBuffer mmap;

    public FileRecords(File file, boolean mmap) {
    if (mmap) {
    // 内存映射方式
    this.mmap = Utils.mmap(file, true);
    }
    }
    }
  6. 优化效果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    传统方式 vs Kafka方式:

    内存拷贝次数:
    - 传统:4次 (磁盘->内核->JVM堆->socket缓冲区->网卡)
    - Kafka:2次 (磁盘->页面缓存->网卡)

    上下文切换:
    - 传统:4次
    - Kafka:2次

    性能提升:
    - 吞吐量提高2-3倍
    - 延迟降低40-50%
  7. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 1. 预留足够的页面缓存空间
    # 内存配置建议:
    系统内存 = JVM堆大小(4-8G) + 页面缓存(剩余内存)

    # 2. 避免使用过大的JVM堆
    # 配置示例:
    export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"

    # 3. 监控页面缓存使用
    free -m
    vmstat 1

Linux内存架构与页面缓存位置

  1. Linux内存空间划分

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    系统内存布局:
    ┌────────────────────┐
    │ 用户空间 │
    │ (User Space) │ <- 应用程序、JVM堆等
    │ │ 0 ~ 3G (32位系统)
    ├────────────────────┤
    │ 内核空间 │ <- 内核代码、内核数据结构
    │ (Kernel Space) │ 页面缓存位于此处
    │ │ 3G ~ 4G (32位系统)
    └────────────────────┘
  2. 页面缓存的位置

  • 位于内核空间
  • 由内核直接管理
  • 对用户空间透明
  • 通过系统调用访问
  1. 为什么在内核空间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    原因:
    1. 安全性:
    - 防止用户程序直接修改缓存
    - 保护系统关键数据

    2. 性能:
    - 避免用户态和内核态切换
    - 直接与磁盘I/O子系统交互

    3. 统一管理:
    - 集中管理所有进程的文件访问
    - 优化整体系统性能
  2. 访问机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    应用程序访问文件数据流程:

    用户空间 内核空间
    ┌──────┐ 系统调用 ┌───────────┐
    │ App │ ==========> │页面缓存 │
    └──────┘ └───────────┘

    ┌───────────┐
    │ 磁盘 │
    └───────────┘
  3. 内存映射(mmap)机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    使用mmap后:

    用户空间 内核空间
    ┌──────┐ 映射 ┌───────────┐
    │ App │ <======> │页面缓存 │
    └──────┘ └───────────┘

    特点:
    - 在用户空间直接映射内核的页面缓存
    - 避免了数据复制
    - 仍然由内核管理

Redis+Lua分布式限流实现

  1. 基本架构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    请求流转图:
    ┌──────────┐ 1.请求 ┌──────────┐
    │ Client 1 │─────────────►│ │
    └──────────┘ │ │
    ┌──────────┐ 2.限流 │ API │
    │ Client 2 │◄────────────►│ Gateway │
    └──────────┘ │ │
    ┌──────────┐ 3.计数 │ │
    │ Client 3 │─────────────►│ │
    └──────────┘ └────┬─────┘

    4.执行Lua脚本


    ┌──────────┐
    │ Redis │
    └──────────┘
  2. Lua限流脚本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    -- 限流脚本
    -- KEYS[1]: 限流key
    -- ARGV[1]: 时间窗口大小(秒)
    -- ARGV[2]: 限流阈值
    -- ARGV[3]: 当前时间戳
    local key = KEYS[1]
    local window = tonumber(ARGV[1])
    local threshold = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])

    -- 1. 移除时间窗口之前的数据
    redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)

    -- 2. 获取当前窗口的请求数
    local count = redis.call('ZCARD', key)

    -- 3. 判断是否超过阈值
    if count >= threshold then
    return 0
    end

    -- 4. 记录本次请求
    redis.call('ZADD', key, now, now)

    -- 5. 设置过期时间
    redis.call('EXPIRE', key, window)

    return 1
  3. Java实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class RedisLimiter {
    private final StringRedisTemplate redisTemplate;
    private final String luaScript;

    public RedisLimiter(StringRedisTemplate redisTemplate) {
    this.redisTemplate = redisTemplate;
    // 加载Lua脚本
    this.luaScript = loadLuaScript();
    }

    public boolean isAllowed(String key, int window, int threshold) {
    List<String> keys = Collections.singletonList(key);
    long now = System.currentTimeMillis();

    // 执行Lua脚本
    Long result = redisTemplate.execute(
    new DefaultRedisScript<>(luaScript, Long.class),
    keys,
    String.valueOf(window),
    String.valueOf(threshold),
    String.valueOf(now)
    );

    return result != null && result == 1;
    }
    }
  4. 动态调整实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class DynamicRateLimiter {
    private static final String LIMIT_CONFIG_KEY = "rate:limit:config";

    // 更新限流阈值
    public void updateThreshold(String key, int threshold) {
    redisTemplate.opsForHash().put(
    LIMIT_CONFIG_KEY,
    key,
    String.valueOf(threshold)
    );
    }

    // 获取当前阈值
    private int getCurrentThreshold(String key) {
    String value = (String) redisTemplate.opsForHash()
    .get(LIMIT_CONFIG_KEY, key);
    return value == null ?
    defaultThreshold : Integer.parseInt(value);
    }

    // 限流检查
    public boolean isAllowed(String key) {
    int threshold = getCurrentThreshold(key);
    return redisLimiter.isAllowed(
    key,
    window,
    threshold
    );
    }
    }
  5. 滑动时间窗口示意

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    时间窗口滑动:
    now-60s now
    │ │
    ▼ ▼
    ┌─────────────────────┐
    │ 时间窗口(60s) │
    └─────────────────────┘
    │ │
    │ ┌──────┐ │
    │ │请求量│ │
    │ └──────┘ │
    │ │
    过期的请求 新请求
  6. 使用示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @RestController
    public class ApiController {
    private final DynamicRateLimiter limiter;

    @GetMapping("/api/test")
    public String test() {
    String key = "api:test";
    if (!limiter.isAllowed(key)) {
    throw new RuntimeException("请求被限流");
    }
    // 业务逻辑
    return "success";
    }

    @PostMapping("/limit/update")
    public void updateLimit(
    @RequestParam String key,
    @RequestParam int threshold
    ) {
    limiter.updateThreshold(key, threshold);
    }
    }

Seata AT模式详解

  1. AT模式原理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    执行流程:
    ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
    │ 业务SQL │ │ 解析SQL │ │ 生成回滚SQL │
    └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
    │ │ │
    ▼ ▼ ▼
    ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
    │ 记录前镜像 │ ──► │ 执行业务SQL │ ──► │ 记录后镜像 │
    └─────────────┘ └─────────────┘ └─────────────┘

    两阶段提交:
    Phase 1: Try
    - 业务SQL执行
    - 解析SQL
    - 记录前后镜像
    - 生成回滚日志

    Phase 2: Commit/Rollback
    - Commit: 删除回滚日志
    - Rollback: 根据镜像生成反向SQL
  2. 核心组件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // 1. 数据源代理
    @Bean
    public DataSource dataSource() {
    DruidDataSource dataSource = new DruidDataSource();
    // 配置数据源
    dataSource.setUrl("jdbc:mysql://localhost:3306/test");

    // 使用Seata代理数据源
    return new DataSourceProxy(dataSource);
    }

    // 2. 全局事务注解
    @GlobalTransactional
    @Transactional
    public void businessMethod() {
    // 订单服务
    orderService.create(order);
    // 库存服务
    stockService.deduct(stock);
    // 账户服务
    accountService.debit(money);
    }
  3. 实际应用场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    场景一:电商下单
    ┌─────────────┐
    │ 下单服务 │
    └──────┬──────┘


    ┌──────┴──────┐ ┌─────────────┐ ┌─────────────┐
    │ 订单服务 │ ──► │ 库存服务 │ ──► │ 账户服务 │
    └─────────────┘ └─────────────┘ └─────────────┘
    订单表 库存表 账户表
    - 创建订单 - 扣减库存 - 扣减余额

    场景二:积分兑换
    ┌─────────────┐
    │ 兑换服务 │
    └──────┬──────┘


    ┌──────┴──────┐ ┌─────────────┐
    │ 积分服务 │ ──► │ 商品服务 │
    └─────────────┘ └─────────────┘
    积分表 库存表
    - 扣减积分 - 扣减库存
  4. 实现示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 订单服务
    @GlobalTransactional
    public void createOrder(OrderDTO orderDTO) {
    // 1. 创建订单
    Order order = new Order();
    order.setUserId(orderDTO.getUserId());
    order.setAmount(orderDTO.getAmount());
    orderMapper.insert(order);

    // 2. 调用库存服务
    Result stockResult = stockService.deduct(
    orderDTO.getProductId(),
    orderDTO.getQuantity()
    );
    if (!stockResult.isSuccess()) {
    throw new BusinessException("库存不足");
    }

    // 3. 调用账户服务
    Result accountResult = accountService.debit(
    orderDTO.getUserId(),
    orderDTO.getAmount()
    );
    if (!accountResult.isSuccess()) {
    throw new BusinessException("余额不足");
    }
    }
  5. 优缺点分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    优点:
    1. 对业务无侵入
    2. 无需手动编写回滚逻辑
    3. 性能损耗小
    4. 兼容已有数据库

    缺点:
    1. 依赖数据库事务
    2. 需要代理数据源
    3. 无法处理复杂业务
    4. 表需要主键和索引
  6. 使用建议

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    适用场景:
    1. 简单的数据库操作
    2. 常规的CRUD业务
    3. 对性能要求不是特别高
    4. 数据库支持事务

    不适用场景:
    1. 复杂业务逻辑
    2. 高并发场景
    3. 涉及非事务性资源
    4. 跨数据库类型
  7. 性能优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Configuration
    public class SeataConfig {

    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
    return new GlobalTransactionScanner(
    "order-service", // 应用名
    "my_test_tx_group" // 事务分组
    );
    }

    // 配置事务分组
    @Bean
    public ConfigurationCache configurationCache() {
    ConfigurationCache cache = new ConfigurationCache();
    cache.putConfig(
    "service.vgroupMapping.my_test_tx_group",
    "default"
    );
    return cache;
    }
    }

CellWriteHandlerContext上下文信息获取

  1. 获取总行数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class FormulaCellWriteHandler implements CellWriteHandler {
    @Override
    public void afterCellDispose(CellWriteHandlerContext context) {
    // 获取当前sheet
    WriteSheetHolder writeSheetHolder = context.getWriteSheetHolder();
    // 获取当前sheet的总行数
    int totalRows = writeSheetHolder.getSheet().getLastRowNum();

    Cell cell = context.getCell();
    // 判断是否是最后一行的第三列(C列)
    if (cell.getRowIndex() == totalRows && cell.getColumnIndex() == 2) {
    // 设置求和公式,从第2行开始到最后一行
    cell.setCellFormula("SUM(C2:C" + totalRows + ")");
    }
    }
    }
  2. 上下文中可获取的信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // 行相关
    context.getRow(); // 当前行
    context.getRowIndex(); // 当前行号
    context.getRelativeRowIndex(); // 相对行号(不包括表头)

    // 列相关
    context.getCell(); // 当前单元格
    context.getColumnIndex(); // 当前列号
    context.getHead(); // 表头信息

    // Sheet相关
    context.getWriteSheetHolder(); // Sheet相关信息
    context.getWriteTableHolder(); // Table相关信息

    // 工作簿相关
    context.getWriteWorkbookHolder(); // 工作簿相关信息
  3. 实际应用示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class FormulaCellWriteHandler implements CellWriteHandler {
    @Override
    public void afterCellDispose(CellWriteHandlerContext context) {
    WriteSheetHolder writeSheetHolder = context.getWriteSheetHolder();
    Sheet sheet = writeSheetHolder.getSheet();

    // 当前行号
    int currentRowIndex = context.getRowIndex();
    // 总行数
    int totalRows = sheet.getLastRowNum();
    // 当前列号
    int columnIndex = context.getColumnIndex();

    // 判断是否是最后一行的金额列
    if (currentRowIndex == totalRows && columnIndex == 2) {
    Cell cell = context.getCell();
    // 设置公式
    cell.setCellFormula("SUM(C2:C" + totalRows + ")");

    // 设置样式
    CellStyle style = context.getWriteWorkbookHolder().getWorkbook().createCellStyle();
    style.setFillForegroundColor(IndexedColors.GREY_25_PERCENT.getIndex());
    style.setFillPattern(FillPatternType.SOLID_FOREGROUND);
    cell.setCellStyle(style);
    }
    }
    }
  4. 注意事项

  • getLastRowNum()返回的是最后一行的索引(从0开始)
  • 需要考虑表头行的影响
  • 相对行号不包括表头行
  • 可以通过上下文获取完整的写入信息

EasyExcel写入Excel函数

  1. 基本实现方案(1)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    @Test
    public void writeWithFormula() {
    String fileName = "测试公式.xlsx";

    try (ExcelWriter excelWriter = EasyExcel.write(fileName, DemoData.class).build()) {
    WriteSheet writeSheet = EasyExcel.writerSheet("模板").build();

    // 1. 写入正常数据
    List<DemoData> dataList = getData();
    excelWriter.write(dataList, writeSheet);

    // 2. 写入公式行
    // 假设金额在第3列,数据从第2行开始(第1行是表头)
    int lastRow = dataList.size() + 1; // 最后一行的位置
    List<List<Object>> formulaRowList = new ArrayList<>();
    List<Object> formulaRow = new ArrayList<>();
    formulaRow.add("合计"); // 第1列
    formulaRow.add(null); // 第2列
    // 第3列写入SUM函数,计算从第2行到最后一行的和
    formulaRow.add(new FormulaData("SUM(C2:C" + lastRow + ")"));
    formulaRowList.add(formulaRow);

    excelWriter.write(formulaRowList, writeSheet);
    }
    }

    // FormulaData类用于包装公式
    @Data
    public class FormulaData {
    private String formula;

    public FormulaData(String formula) {
    this.formula = formula;
    }
    }
  2. 使用CellWriteHandler实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class FormulaCellWriteHandler implements CellWriteHandler {
    @Override
    public void afterCellDispose(CellWriteHandlerContext context) {
    Cell cell = context.getCell();
    // 判断是否是最后一行的第三列
    if (cell.getRowIndex() == context.getWriteSheetHolder().getSheetNo()
    && cell.getColumnIndex() == 2) {
    // 设置公式
    cell.setCellFormula("SUM(C2:C" + (cell.getRowIndex()) + ")");
    }
    }
    }
  3. 支持的常用Excel函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 求和函数
    "SUM(C2:C10)"

    // 平均值函数
    "AVERAGE(C2:C10)"

    // 最大值函数
    "MAX(C2:C10)"

    // 最小值函数
    "MIN(C2:C10)"

    // 计数函数
    "COUNT(C2:C10)"

    // IF条件函数
    "IF(C2>100,\"高\",\"低\")"
  4. 注意事项

  • 函数单元格需要使用FormulaData包装
  • 函数引用的单元格范围要准确
  • 注意行列的索引从0开始
  • 可以使用相对引用和绝对引用
  • 复杂函数建议使用CellWriteHandler实现

EasyExcel写入公式完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Test
public void writeWithFormula() {
String fileName = "测试公式.xlsx";

// 关键点:在这里注册CellWriteHandler
try (ExcelWriter excelWriter = EasyExcel.write(fileName, DemoData.class)
.registerWriteHandler(new FormulaCellWriteHandler()) // 注册处理器
.build()) {

WriteSheet writeSheet = EasyExcel.writerSheet("模板").build();

// 写入数据
List<DemoData> dataList = getData();
excelWriter.write(dataList, writeSheet);
}
}

// 处理器实现
public class FormulaCellWriteHandler implements CellWriteHandler {
@Override
public void afterCellDispose(CellWriteHandlerContext context) {
Cell cell = context.getCell();
// 判断是否是最后一行的第三列
if (cell.getRowIndex() == context.getWriteSheetHolder().getSheetNo()
&& cell.getColumnIndex() == 2) {
// 设置公式
cell.setCellFormula("SUM(C2:C" + (cell.getRowIndex()) + ")");
}
}
}

// 数据对象
@Data
public class DemoData {
private String name;
private String date;
private BigDecimal amount;
}

关键点说明:

  1. 使用.registerWriteHandler()注册处理器
  2. 处理器会在每个单元格写入时被调用
  3. 不需要单独写入最后一行,处理器会自动处理
  4. 相比之前的方案更加简洁和自动化