Guohao Lu's Blog

个人技术博客

RangeAssignor 是 Kafka 中的一种默认分区分配策略,它的工作方式比较简单直接。这个策略主要用于在消费者和分区之间进行一种连续的范围分配。

工作原理:

  1. 排序所有分区:首先,RangeAssignor 将所有的分区按照主题和分区号的顺序进行排序。
  2. 排序所有消费者:同样地,所有请求订阅的消费者也会被按照消费者ID进行排序。
  3. 分配分区:分配过程中,RangeAssignor 会将排序后的分区列表分成尽可能均等的N份(N是消费者的数量),每个消费者分得一份。对于不能均分的情况,靠前的消费者将分配到稍微多一点的分区。

分配示例:

假设有两个主题AB,主题A有3个分区(A0, A1, A2),主题B有2个分区(B0, B1),共有5个分区。如果有两个消费者C1C2:

  • 分区会被排序为:A0, A1, A2, B0, B1
  • 消费者会被排序为:C1, C2
  • 分区分配可能如下:
    • C1A0, A1, B0(前三个分区)
    • C2A2, B1(后两个分区)

特点和使用场景:

  • 公平性RangeAssignor 简单地尝试将分区均匀分配给每个消费者,这种方法算法简单,执行效率高。
  • 适用性:当消费者数量较少或每个消费者处理能力相近时,这种分配方法相对公平。
  • 缺点:在某些情况下,可能不是最优的负载均衡策略,特别是在分区数量不均匀分布于多个主题或消费者性能差异大的情况下。

总之,RangeAssignor 提供了一种基础而直观的分区分配方式,适用于简单应用场景,但可能不适合需要复杂负载均衡或性能优化的高级场景。在实际使用中,选择合适的分配策略应根据具体的业务需求和消费者设置来定。

Kafka消息重复消费分析

  1. 重复消费场景分类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    场景分类:
    1. 生产端重复发送
    2. 消费端重复消费
    3. Rebalance导致重复
    4. 位移提交异常

    影响程度:
    ┌────────────────┬───────────┬────────────┐
    │ 场景类型 │ 影响范围 │ 发生概率 │
    ├────────────────┼───────────┼────────────┤
    │生产端重复 │ 单条消息 │ 低 │
    │消费端重复 │ 批量消息 │ 中 │
    │Rebalance重复 │ 分区数据 │ 高 │
    │位移提交异常 │ 批量消息 │ 中 │
    └────────────────┴───────────┴────────────┘
  2. 生产端重复发送

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 问题代码
    public void send(String topic, String message) {
    try {
    producer.send(new ProducerRecord<>(topic, message));
    } catch (Exception e) {
    // 捕获异常后重试,可能导致重复发送
    producer.send(new ProducerRecord<>(topic, message));
    }
    }

    // 解决方案:启用幂等性
    Properties props = new Properties();
    props.put("enable.idempotence", true);
    props.put("transactional.id", "tx-1");
    // 使用事务API
    producer.initTransactions();
    try {
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
    } catch (Exception e) {
    producer.abortTransaction();
    }
  3. 消费端重复消费

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 问题代码
    @KafkaListener(topics = "my-topic")
    public void consume(ConsumerRecord<String, String> record) {
    processMessage(record); // 处理消息
    consumer.commitSync(); // 同步提交可能失败
    }

    // 解决方案:实现幂等消费
    public class IdempotentConsumer {
    private final Set<String> processedMessages =
    Collections.synchronizedSet(new HashSet<>());

    @KafkaListener(topics = "my-topic")
    public void consume(ConsumerRecord<String, String> record) {
    String messageId = generateMessageId(record);
    if (processedMessages.add(messageId)) {
    try {
    processMessage(record);
    // 持久化消息ID
    saveMessageId(messageId);
    } catch (Exception e) {
    processedMessages.remove(messageId);
    throw e;
    }
    }
    }
    }
  4. Rebalance导致重复

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // 问题:Rebalance时未提交的消息会重复消费
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 分区被撤销时可能未及时提交位移
    }
    });

    // 解决方案:优化Rebalance配置和提交策略
    Properties props = new Properties();
    // 增加会话超时时间,减少不必要的Rebalance
    props.put("session.timeout.ms", "30000");
    // 使用手动提交,确保处理完成后再提交
    props.put("enable.auto.commit", "false");

    // 实现再均衡监听器
    class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // Rebalance前确保提交位移
    consumer.commitSync(currentOffsets);
    }
    }
  5. 位移提交异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // 问题代码
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
    processRecord(record);
    }
    try {
    consumer.commitSync(); // 批量提交可能失败
    } catch (Exception e) {
    // 异常处理不当导致重复消费
    }
    }

    // 解决方案:实现精确的位移管理
    public class OffsetManager {
    private final Map<TopicPartition, OffsetAndMetadata> offsets =
    new ConcurrentHashMap<>();

    public void markOffset(String topic, int partition, long offset) {
    offsets.put(
    new TopicPartition(topic, partition),
    new OffsetAndMetadata(offset + 1)
    );
    }

    public void commitOffsets(KafkaConsumer<?, ?> consumer) {
    try {
    consumer.commitSync(offsets);
    offsets.clear();
    } catch (Exception e) {
    // 记录失败的位移,下次重试
    handleCommitFailure(offsets, e);
    }
    }
    }
  6. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    1. 消息设计
    - 生成全局唯一消息ID
    - 包含业务去重字段
    - 添加时间戳信息

    2. 消费端优化
    - 实现幂等性检查
    - 使用本地缓存+持久化存储
    - 合理设置批处理大小

    3. 监控告警
    - 监控重复消费率
    - 设置消费延迟阈值
    - 跟踪位移提交状态

Kafka堆内存与GC性能的关系

  1. JVM内存特性(1)
  • JVM对象开销大,通常是原始数据大小的2倍或更多
  • 堆内存越大,GC扫描和处理的对象越多
  • Full GC时需要扫描整个堆空间
  1. Kafka的内存使用策略
  • 不在JVM堆上缓存消息数据
  • 利用操作系统的页面缓存(Page Cache)
  • 主要使用堆内存存储元数据
  1. 大堆内存的问题

    1
    2
    3
    4
    举例:32GB堆内存的GC影响
    - Minor GC:扫描年轻代,可能需要100-200ms
    - Full GC:扫描整个堆,可能需要1-2s
    - GC期间:服务暂停,无法处理请求
  2. 最佳实践

  • 推荐堆内存配置:4-8GB
  • 剩余内存留给页面缓存
  • 配置示例:
    1
    2
    3
    4
    # 32GB物理内存的配置建议
    系统预留:4GB
    Kafka堆内存:6GB
    页面缓存:22GB
  1. 性能对比
    1
    2
    3
    4
    5
    6GB堆内存 vs 24GB堆内存
    - GC频率:较高 vs 较低
    - GC时间:较短(~100ms) vs 较长(~1s)
    - 服务影响:短暂停顿 vs 长时间停顿
    - 页面缓存:更多 vs 更少

获取List最后一个元素的工具类方法

  1. Apache Commons Collections

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    import org.apache.commons.collections4.CollectionUtils;

    List<String> list = Arrays.asList("a", "b", "c");

    // 方法1:使用CollectionUtils.get()
    String last = CollectionUtils.get(list, list.size() - 1);

    // 方法2:使用ListUtils.defaultIfEmpty()
    String last = ListUtils.defaultIfEmpty(list, Collections.emptyList())
    .get(list.size() - 1);
  2. Google Guava

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import com.google.common.collect.Iterables;

    List<String> list = Arrays.asList("a", "b", "c");

    // 方法1:使用Iterables.getLast()
    String last = Iterables.getLast(list);

    // 方法2:带默认值的方式
    String last = Iterables.getLast(list, "default");
  3. Java 8 Stream API

    1
    2
    3
    4
    5
    6
    7
    8
    9
    List<String> list = Arrays.asList("a", "b", "c");

    // 方法1:使用Stream
    Optional<String> last = list.stream().reduce((first, second) -> second);

    // 方法2:使用skip
    Optional<String> last = list.stream()
    .skip(Math.max(0, list.size() - 1))
    .findFirst();
  4. 最佳实践建议

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // 推荐使用Guava的方式,代码最简洁且自带空值处理
    public String getLastElement(List<String> list) {
    return Iterables.getLast(list, null);
    }

    // 如果需要返回Optional
    public Optional<String> getLastElementSafe(List<String> list) {
    return Optional.ofNullable(Iterables.getLast(list, null));
    }
  5. 注意事项

  • 所有方法在list为空时的处理:
    • Apache Commons:抛出IndexOutOfBoundsException
    • Guava:如果没有默认值会抛出NoSuchElementException
    • Stream:返回Optional.empty()
  • 建议先判断list是否为空
  • 考虑是否需要返回Optional

解决思路

  1. 暴力解法
  2. 滑动窗口
  3. 动态规划

特点

  1. 最大子序和:连续、子串和最大

    1
    2
    dp[i]:以nums[i]为结尾的子串的最大和
    dp[i] = max(dp[i-1] + nums[i], nums[i])
  2. 最长回文子串:连续、子串为回文

    1
    2
    dp[i][j]:s[i]到s[j]是否为回文
    dp[i][j] = dp[i+1][j-1] && s[i] == s[j]
    1
    2
    3
    4
    5
    6
    for (int k = 0; k < s.length(); k++) {
    for (int i = 0, j = k; i < s.length() && j < s.length(); i++, j++) {
    boolean isSym = s.charAt(i) == s.charAt(j);
    dp[i][j] = isSym && ((j <= (i + 1)) || (dp[i + 1][j - 1] == true));
    }
    }

问题描述

给定一个整数数组 nums,找出具有最大和的连续子数组(至少包含一个元素),返回其最大和。

分析思路

1. 问题可视化

1
2
3
4
5
6
7
8
9
10
11
12
示例数组: [-2, 1, -3, 4, -1, 2, 1, -5, 4]

可视化表示:
┌─┐
│4│
└─┘ ┌─┐
┌─┐ │ ┌─┐│2│┌─┐
│1│ │ │ ││ ││1│
└─┘ │ └─┘└─┘└─┘ ┌─┐
│ │ │ │ │4│
└─────┴─────┴─────┴──────└─┘
-2 1 -3 4 -1 2 1 -5 4

2. 思路分析

  1. 局部最优解

    1
    2
    3
    4
    5
    6
    7
    8
    位置i的最大子数组和有两种可能:
    ┌────────────────┐
    │ 1. 加入前面的和 │
    └────────────────┘

    ┌────────────┐
    │ 2. 从自己开始 │
    └────────────┘
  2. 状态转移

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    dp[i] = max(nums[i], dp[i-1] + nums[i])

    示意图:
    dp[i-1] < 0 → ┌─────┐
    │重新开始│
    └─────┘

    dp[i-1] > 0 → ┌─────┐
    │继续累加│
    └─────┘

3. 解题思路图解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
数组: [-2, 1, -3, 4, -1, 2, 1, -5, 4]

步骤分析:
1) -2: [-2]
└── 当前最大和: -2

2) 1: [-2, 1]
└── 重新开始: 1

3) -3: [1, -3]
└── 继续累加: -2,重新开始

4) 4: [4]
└── 重新开始: 4

5) -1: [4, -1]
└── 继续累加: 3

6) 2: [4, -1, 2]
└── 继续累加: 5

7) 1: [4, -1, 2, 1]
└── 继续累加: 6

8) -5: [4, -1, 2, 1, -5]
└── 继续累加: 1

9) 4: [4, -1, 2, 1, -5, 4]
└── 继续累加: 5

4. 关键观察

  1. 连续性质

    1
    2
    [a] → [a,b] → [a,b,c]
    连续扩展
  2. 决策点

    1
    2
    3
    4
    5
    6
    7
    当前位置i的决策:
    ┌─────────────┐
    │ 加入前面的和 │ ← dp[i-1] > 0
    └─────────────┘
    ┌─────────┐
    │ 重新开始 │ ← dp[i-1] ≤ 0
    └─────────┘

5. 优化思路

  1. 空间优化

    1
    2
    3
    4
    只需要记录:
    ┌──────────┐ ┌──────────┐
    │当前最大和│ 和 │全局最大和│
    └──────────┘ └──────────┘
  2. 时间优化

    1
    2
    3
    一次遍历 → O(n)
    无需回溯
    无需额外空间

总结

  1. 问题的核心是理解:在每个位置,我们要决定是加入之前的和,还是重新开始。

  2. 使用动态规划思想,但可以优化为O(1)空间复杂度。

  3. 关键是理解局部最优和全局最优的关系。

架构概览

MPP (Massive Parallel Processing)

1
2
3
4
5
6
7
8
9
10
11
12
13
┌──────────────────────────────────────────────┐
│ MPP 架构 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node 1 │ │ Node 2 │ │ Node 3 │ │
│ │ CPU │ │ CPU │ │ CPU │ │
│ │ Memory │◄─►│ Memory │◄─►│ Memory │ │
│ │ Storage │ │ Storage │ │ Storage │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ ▲ ▲ ▲ │
│ └────────────┴────────────┘ │
│ 高速互联网络 │
└──────────────────────────────────────────────┘

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌──────────────────────────────────────────────┐
│ MapReduce 架构 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Map 1 │ │ Map 2 │ │ Map 3 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │Shuffle 1│ │Shuffle 2│ │Shuffle 3│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │Reduce 1 │ │Reduce 2 │ │Reduce 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└──────────────────────────────────────────────┘

核心区别

1. 数据处理模式

MPP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// MPP数据处理示例
public class MPPProcessor {
public void processQuery(String sql) {
// 1. 并行分发查询
List<Node> nodes = getAvailableNodes();
CompletableFuture<?>[] futures = nodes.stream()
.map(node -> CompletableFuture.runAsync(() -> {
// 每个节点并行执行相同的查询
node.executeQuery(sql);
}))
.toArray(CompletableFuture[]::new);

// 2. 等待所有节点执行完成
CompletableFuture.allOf(futures).join();

// 3. 合并结果
mergeResults(nodes);
}
}

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// MapReduce处理示例
public class MapReduceProcessor {
public void processData(List<String> input) {
// 1. Map阶段
Map<String, List<String>> mappedData = input.stream()
.parallel()
.map(this::mapFunction)
.collect(Collectors.groupingBy(MapResult::getKey));

// 2. Shuffle阶段
shuffleData(mappedData);

// 3. Reduce阶段
List<String> result = mappedData.entrySet().stream()
.parallel()
.map(entry -> reduceFunction(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}
}

2. 数据交互方式

1
2
3
4
5
6
7
8
9
MPP数据交互:
┌────────┐ ┌────────┐
│Node 1 │◄───►│Node 2 │ 实时数据交换
└────────┘ └────────┘

MapReduce数据交互:
┌────────┐ ┌────────┐ ┌────────┐
│Map │────►│Shuffle │────►│Reduce │ 阶段性数据传输
└────────┘ └────────┘ └────────┘

3. 资源管理

MPP

1
2
3
4
5
6
7
8
9
10
11
# MPP资源配置示例
cluster:
nodes:
- id: node1
cpu: 16
memory: 64GB
storage: 2TB
network: 10Gbps
interconnect:
type: InfiniBand
bandwidth: 100Gbps

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# MapReduce资源配置示例
job:
mappers: 100
reducers: 20
resources:
map:
cpu: 2
memory: 4GB
reduce:
cpu: 4
memory: 8GB
intermediate:
compression: true
storage: HDFS

性能对比

1. 延迟比较

1
2
3
4
5
6
7
响应时间对比:
MPP: ├────────┤ (毫秒级)
MapReduce: ├────────────────────┤ (分钟级)

适用场景:
MPP: 实时分析、交互式查询
MapReduce: 批量处理、大规模ETL

2. 扩展性对比

1
2
3
4
5
6
7
8
9
10
11
节点扩展效果:

MPP扩展:
性能 ─────────►
节点数 ─────────►
(近似线性扩展,但有上限)

MapReduce扩展:
性能 ─────────────►
节点数 ─────────────►
(可以持续线性扩展)

应用场景

MPP最适合:

  1. OLAP分析场景
  2. 实时数据仓库
  3. 交互式查询
  4. 复杂SQL处理

MapReduce最适合:

  1. 大规模数据批处理
  2. ETL作业
  3. 日志分析
  4. 数据清洗

选型建议

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ArchitectureSelector {
public String selectArchitecture(Requirements req) {
if (req.needsRealTimeProcessing() &&
req.dataSize() < 100_000_000 &&
req.requiresComplexSQL()) {
return "MPP";
} else if (req.isBatchProcessing() &&
req.dataSize() > 1_000_000_000) {
return "MapReduce";
}
return "Need further analysis";
}
}

总结对比表

特性 MPP MapReduce
处理模式 并行处理 分阶段处理
数据交互 实时 阶段性
延迟 毫秒级 分钟级
扩展性 有限制 近乎无限
适用场景 实时分析 批处理
数据规模 GB~TB TB~PB
计算复杂度 中等
资源消耗 较大 可控

热key识别方案

1. 客户端采样统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class HotKeyDetector {
private LoadingCache<String, LongAdder> keyCounterCache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.build(new CacheLoader<String, LongAdder>() {
@Override
public LongAdder load(String key) {
return new LongAdder();
}
});

// 采样率1%
private static final double SAMPLE_RATE = 0.01;

public void recordKeyAccess(String key) {
// 采样统计
if (ThreadLocalRandom.current().nextDouble() < SAMPLE_RATE) {
keyCounterCache.getUnchecked(key).increment();
}
}

// 定时任务,每分钟统计热key
@Scheduled(fixedRate = 60000)
public void detectHotKeys() {
Map<String, LongAdder> counters = keyCounterCache.asMap();
// 按访问量排序,取Top N
List<Map.Entry<String, LongAdder>> hotKeys = counters.entrySet().stream()
.sorted((e1, e2) -> Long.compare(e2.getValue().sum(), e1.getValue().sum()))
.limit(100)
.collect(Collectors.toList());

// 推送到本地缓存
updateLocalCache(hotKeys);
}
}

2. Redis Server端监控

1
2
3
4
5
6
7
8
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│Redis Monitor│--->│日志分析服务 │--->│热key计算 │
└─────────────┘ └─────────────┘ └─────────────┘


┌─────────────┐
│本地缓存更新 │
└─────────────┘
1
2
# Redis MONITOR命令采样
redis-cli MONITOR | grep -v "PING" | awk '{print $4}' | sort | uniq -c | sort -nr | head -n 10

3. 代理层统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Aspect
@Component
public class RedisAccessAspect {
private static final int WINDOW_SIZE_SECONDS = 60;
private TimeWindowCounter counter = new TimeWindowCounter(WINDOW_SIZE_SECONDS);

@Around("execution(* org.springframework.data.redis.core.RedisTemplate.*(..))")
public Object around(ProceedingJoinPoint point) throws Throwable {
String key = extractKey(point);
counter.increment(key);
return point.proceed();
}
}

public class TimeWindowCounter {
private Queue<Map<String, AtomicInteger>> windows = new LinkedList<>();
private final int windowSize;

public void increment(String key) {
getCurrentWindow().computeIfAbsent(key, k -> new AtomicInteger()).incrementAndGet();
}

public Map<String, Integer> getTopKeys(int n) {
// 合并所有时间窗口的统计数据
return mergeWindows().entrySet().stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.limit(n)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(v1, v2) -> v1,
LinkedHashMap::new
));
}
}

大key识别方案

1. Redis命令扫描

1
2
# 使用SCAN命令渐进式扫描
redis-cli --bigkeys -i 0.1

2. 自定义扫描工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Service
public class BigKeyScanner {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

public List<KeySize> scanBigKeys(int threshold) {
List<KeySize> bigKeys = new ArrayList<>();
ScanOptions options = ScanOptions.scanOptions().count(100).build();
Cursor<String> cursor = redisTemplate.scan(options);

while(cursor.hasNext()) {
String key = cursor.next();
long size = getKeySize(key);
if (size > threshold) {
bigKeys.add(new KeySize(key, size));
}
}
return bigKeys;
}

private long getKeySize(String key) {
DataType type = redisTemplate.type(key);
switch (type) {
case STRING:
return redisTemplate.opsForValue().get(key).toString().length();
case HASH:
return redisTemplate.opsForHash().size(key);
case LIST:
return redisTemplate.opsForList().size(key);
case SET:
return redisTemplate.opsForSet().size(key);
case ZSET:
return redisTemplate.opsForZSet().size(key);
default:
return 0;
}
}
}

本地缓存优化方案

1. 多级缓存架构

1
2
3
4
5
请求 --> 本地缓存(Caffeine) --> Redis集群 --> 数据库
│ │
│ │
└──────────────────────┘
热key直接返回

2. 本地缓存实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Service
public class MultiLevelCache {
private LoadingCache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.recordStats()
.build(key -> null); // 缓存未命中时返回null

@Autowired
private RedisTemplate<String, Object> redisTemplate;

public Object get(String key) {
// 1. 查询本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}

// 2. 查询Redis
value = redisTemplate.opsForValue().get(key);
if (value != null) {
// 如果是热key,放入本地缓存
if (isHotKey(key)) {
localCache.put(key, value);
}
}

return value;
}

private boolean isHotKey(String key) {
// 从热key统计结果中判断
return HotKeyDetector.isHot(key);
}
}

3. 缓存一致性保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class CacheConsistencyManager {
@Autowired
private MultiLevelCache multiLevelCache;

// 监听数据变更消息
@KafkaListener(topics = "cache-update")
public void handleCacheUpdate(CacheUpdateMessage message) {
// 删除本地缓存
multiLevelCache.evict(message.getKey());
// 更新Redis缓存
multiLevelCache.refreshRedis(message.getKey());
}
}

监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
metrics:
- name: "hot_key_count"
type: "gauge"
labels:
- "key"
- "qps"

- name: "big_key_size"
type: "gauge"
labels:
- "key"
- "size"

- name: "local_cache_hit_rate"
type: "gauge"
labels:
- "application"

最佳实践

  1. 采样率控制

    • 客户端采样率动态调整
    • 重点监控高QPS时段
  2. 本地缓存策略

    • 仅缓存热key
    • 设置合理的过期时间
    • 控制缓存数量
  3. 一致性保证

    • 消息队列通知更新
    • 定时刷新机制
    • 版本号控制
  4. 监控告警

    • 热key变化趋势
    • 内存使用监控
    • 缓存命中率监控

问题背景

在一次线上故障中,某热门商品的缓存key恰好过期,导致大量请求直接击穿到数据库,引发系统性能问题。本文将详细分析处理过程和解决方案。

故障现象

1
2
3
4
5
6
7
8
┌──────────┐         ┌──────────┐         ┌──────────┐
│ 用户请求 │ ──────> │ Redis │ ──────> │ 数据库 │
└──────────┘ └──────────┘ └──────────┘
│ × !!!
│ 缓存失效 QPS暴增
│ 响应变慢
└─────────────────────────────────────────┘
大量请求直接访问数据库

主要表现:

  1. Redis某个key突然失效
  2. 大量并发请求涌入数据库
  3. 数据库CPU使用率飙升
  4. 系统响应时间显著增加

紧急处理流程

1. 数据库限流保护

1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
public class DbProtector {
private RateLimiter rateLimiter = RateLimiter.create(100.0); // 限制QPS为100

public Product queryProduct(Long productId) {
if (!rateLimiter.tryAcquire()) {
log.warn("数据库访问被限流,productId: {}", productId);
throw new RuntimeException("系统繁忙,请稍后重试");
}
return productMapper.selectById(productId);
}
}

2. 问题商品下线

1
2
3
4
5
6
7
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│ 运维平台 │--->│ 配置中心 │--->│ 应用服务 │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
│ │
└──────────────────────────────────────┘
更新商品状态为"已下线"

3. 手动Mock缓存

1
2
3
4
5
6
7
8
9
10
11
12
@Service
public class CacheRecoveryService {
@Autowired
private RedisTemplate redisTemplate;

public void mockProductCache(Long productId, Product product) {
String cacheKey = "product:" + productId;
// 设置较短的过期时间,便于后续恢复
redisTemplate.opsForValue().set(cacheKey, product, 5, TimeUnit.MINUTES);
log.info("Mock cache success for productId: {}", productId);
}
}

4. 重启服务

1
2
3
4
5
6
7
8
9
分批重启流程:
┌────────────┐ ┌────────────┐ ┌────────────┐
│ 实例1下线 │ --> │ 实例2下线 │ --> │ 实例3下线 │
└────────────┘ └────────────┘ └────────────┘
│ │ │
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ 实例1上线 │ --> │ 实例2上线 │ --> │ 实例3上线 │
└────────────┘ └────────────┘ └────────────┘

长期解决方案

1. 缓存预热

1
2
3
4
5
6
7
8
9
10
11
@Component
public class CacheWarmer {
@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行
public void warmHotProducts() {
List<Long> hotProductIds = getHotProductIds();
for (Long productId : hotProductIds) {
Product product = productService.getById(productId);
cacheService.setProductCache(productId, product);
}
}
}

2. 双重检查锁防击穿

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public Product getProduct(Long productId) {
String cacheKey = "product:" + productId;
Product product = redisTemplate.opsForValue().get(cacheKey);

if (product == null) {
String lockKey = "lock:" + productId;
try {
// 获取分布式锁
if (redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS)) {
// 双重检查
product = redisTemplate.opsForValue().get(cacheKey);
if (product == null) {
product = productMapper.selectById(productId);
redisTemplate.opsForValue().set(cacheKey, product, 1, TimeUnit.HOURS);
}
}
} finally {
redisTemplate.delete(lockKey);
}
}
return product;
}

3. 缓存降级方案

1
2
3
4
5
6
7
8
9
正常访问流程:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 请求 │ --> │ Redis │ --> │ 数据库 │
└──────────┘ └──────────┘ └──────────┘

降级后流程:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 请求 │ --> │ 本地缓存 │ --> │ 数据库 │
└──────────┘ └──────────┘ └──────────┘

监控预警

  1. 缓存监控指标

    • 缓存命中率
    • 缓存过期监控
    • 数据库QPS监控
  2. 告警规则

    1
    2
    3
    4
    5
    6
    7
    8
    9
    rules:
    - name: "缓存击穿告警"
    conditions:
    - metric: "cache.miss.rate"
    threshold: 80% # 缓存未命中率超过80%
    - metric: "db.qps"
    threshold: 1000 # 数据库QPS超过1000
    duration: "1m" # 持续1分钟
    severity: "critical"

经验总结

  1. 预防措施

    • 热点数据永不过期
    • 定时缓存预热
    • 多级缓存设计
  2. 应急处理

    • 及时限流保护
    • 快速恢复服务
    • 分批重启降低影响
  3. 长期规划

    • 完善监控体系
    • 建立降级方案
    • 优化缓存策略

MySQL默认隔离级别

MySQL InnoDB存储引擎默认使用可重复读(REPEATABLE READ)隔离级别。该级别通过MVCC(多版本并发控制)和锁机制的配合来实现。

MVCC实现原理

1. 版本链

每行记录都存在一个版本链:

1
2
3
┌────────────┐ ┌────────────┐ ┌────────────┐
│ 最新记录 │ ──> │ 历史记录1 │ ──> │ 历史记录2 │
└────────────┘ └────────────┘ └────────────┘

2. 重要字段

每条记录都包含以下系统字段:

  • DB_TRX_ID:创建/最后修改该记录的事务ID
  • DB_ROLL_PTR:回滚指针,指向上一个版本
  • DB_ROW_ID:行ID(可选)

3. 快照读实现

在事务开始时,会创建一个快照(Read View),包含:

  • creator_trx_id:创建该Read View的事务ID
  • m_ids:活跃的事务ID列表
  • min_trx_id:活跃事务中最小的事务ID
  • max_trx_id:下一个将被分配的事务ID

快照读判断规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
             是否可见?

┌───────┴───────┐
▼ ▼
trx_id < min_trx_id trx_id >= max_trx_id
(可见) (不可见)
│ │
└───────┬───────┘

trx_id ∈ m_ids?
┌────┴────┐
▼ ▼
是 否
(不可见) (可见)

锁机制

1. 记录锁(Record Lock)

  • 锁定单个索引记录
  • 防止其他事务修改或删除

2. 间隙锁(Gap Lock)

  • 锁定索引记录之间的间隙
  • 防止其他事务在间隙插入记录

3. Next-Key Lock

  • 记录锁和间隙锁的组合
  • 可以防止幻读

可重复读的实现过程

  1. 事务开始

    • 创建Read View
    • 记录当前活跃事务
  2. 读操作

    • 快照读:通过MVCC实现
    • 当前读:使用锁机制
  3. 写操作

    • 加Next-Key Lock
    • 创建新的版本记录

示例说明

1
2
3
4
5
6
7
8
9
10
11
-- 事务1
START TRANSACTION;
SELECT * FROM users WHERE id = 1; -- 创建Read View
-- 其他事务修改数据
SELECT * FROM users WHERE id = 1; -- 使用相同Read View,看到相同结果
COMMIT;

-- 事务2
START TRANSACTION;
UPDATE users SET name = 'new_name' WHERE id = 1; -- 加锁,创建新版本
COMMIT;

总结

MySQL通过以下机制实现可重复读:

  1. MVCC保证读操作的一致性:

    • 版本链保存历史记录
    • Read View确定可见性
  2. 锁机制保证写操作的隔离性:

    • 记录锁防止并发修改
    • 间隙锁防止幻读

这种实现既保证了数据的一致性,又提供了较好的并发性能。