Guohao Lu's Blog

个人技术博客

Linux内存架构与页面缓存位置

  1. Linux内存空间划分

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    系统内存布局:
    ┌────────────────────┐
    │ 用户空间 │
    │ (User Space) │ <- 应用程序、JVM堆等
    │ │ 0 ~ 3G (32位系统)
    ├────────────────────┤
    │ 内核空间 │ <- 内核代码、内核数据结构
    │ (Kernel Space) │ 页面缓存位于此处
    │ │ 3G ~ 4G (32位系统)
    └────────────────────┘
  2. 页面缓存的位置

  • 位于内核空间
  • 由内核直接管理
  • 对用户空间透明
  • 通过系统调用访问
  1. 为什么在内核空间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    原因:
    1. 安全性:
    - 防止用户程序直接修改缓存
    - 保护系统关键数据

    2. 性能:
    - 避免用户态和内核态切换
    - 直接与磁盘I/O子系统交互

    3. 统一管理:
    - 集中管理所有进程的文件访问
    - 优化整体系统性能
  2. 访问机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    应用程序访问文件数据流程:

    用户空间 内核空间
    ┌──────┐ 系统调用 ┌───────────┐
    │ App │ ==========> │页面缓存 │
    └──────┘ └───────────┘

    ┌───────────┐
    │ 磁盘 │
    └───────────┘
  3. 内存映射(mmap)机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    使用mmap后:

    用户空间 内核空间
    ┌──────┐ 映射 ┌───────────┐
    │ App │ <======> │页面缓存 │
    └──────┘ └───────────┘

    特点:
    - 在用户空间直接映射内核的页面缓存
    - 避免了数据复制
    - 仍然由内核管理

Kafka页面缓存工作原理

  1. 什么是页面缓存(Page Cache)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    操作系统的内存分层:
    ┌────────────────────┐
    │ 用户空间 │ <- 应用程序(如JVM)使用
    ├────────────────────┤
    │ 页面缓存 │ <- 操作系统管理的磁盘数据缓存
    │ (Page Cache) │
    ├────────────────────┤
    │ 内核空间 │ <- 系统内核使用
    └────────────────────┘

    特点:
    - 由操作系统管理
    - 用于缓存磁盘数据
    - 采用LRU算法
    - 支持预读和回写
  2. 传统JVM缓存的问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    数据读取流程(双重缓存):
    磁盘 -> 页面缓存 -> JVM堆 -> 应用程序

    问题:
    1. 数据被复制两次:
    - 一次从磁盘到页面缓存
    - 一次从页面缓存到JVM堆

    2. 内存使用效率低:
    - 相同数据占用两份内存空间
    - JVM堆会触发GC
    - GC会导致停顿
  3. Kafka的页面缓存使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Kafka读取流程:
    磁盘 -> 页面缓存 -> 应用程序(零拷贝)

    实现方式:
    1. sendfile系统调用:
    - 直接从页面缓存传输到网络接口
    - 避免数据复制到JVM堆

    2. mmap内存映射:
    - 将文件映射到内存地址空间
    - 直接操作页面缓存
  4. 性能优势

    1
    2
    3
    4
    5
    6
    7
    8
    // 传统方式
    FileInputStream in = new FileInputStream("message.log");
    byte[] buffer = new byte[8192];
    in.read(buffer); // 数据复制到JVM堆

    // Kafka方式(零拷贝)
    FileChannel channel = new FileInputStream("message.log").getChannel();
    channel.transferTo(position, count, socketChannel); // 直接传输
  5. 具体实现机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // Kafka日志段读取代码
    public class FileRecords extends AbstractRecords {
    // 使用MappedByteBuffer直接操作页面缓存
    private final MappedByteBuffer mmap;

    public FileRecords(File file, boolean mmap) {
    if (mmap) {
    // 内存映射方式
    this.mmap = Utils.mmap(file, true);
    }
    }
    }
  6. 优化效果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    传统方式 vs Kafka方式:

    内存拷贝次数:
    - 传统:4次 (磁盘->内核->JVM堆->socket缓冲区->网卡)
    - Kafka:2次 (磁盘->页面缓存->网卡)

    上下文切换:
    - 传统:4次
    - Kafka:2次

    性能提升:
    - 吞吐量提高2-3倍
    - 延迟降低40-50%
  7. 最佳实践

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 1. 预留足够的页面缓存空间
    # 内存配置建议:
    系统内存 = JVM堆大小(4-8G) + 页面缓存(剩余内存)

    # 2. 避免使用过大的JVM堆
    # 配置示例:
    export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"

    # 3. 监控页面缓存使用
    free -m
    vmstat 1

Redis+Lua分布式限流实现

  1. 基本架构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    请求流转图:
    ┌──────────┐ 1.请求 ┌──────────┐
    │ Client 1 │─────────────►│ │
    └──────────┘ │ │
    ┌──────────┐ 2.限流 │ API │
    │ Client 2 │◄────────────►│ Gateway │
    └──────────┘ │ │
    ┌──────────┐ 3.计数 │ │
    │ Client 3 │─────────────►│ │
    └──────────┘ └────┬─────┘

    4.执行Lua脚本


    ┌──────────┐
    │ Redis │
    └──────────┘
  2. Lua限流脚本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    -- 限流脚本
    -- KEYS[1]: 限流key
    -- ARGV[1]: 时间窗口大小(秒)
    -- ARGV[2]: 限流阈值
    -- ARGV[3]: 当前时间戳
    local key = KEYS[1]
    local window = tonumber(ARGV[1])
    local threshold = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])

    -- 1. 移除时间窗口之前的数据
    redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)

    -- 2. 获取当前窗口的请求数
    local count = redis.call('ZCARD', key)

    -- 3. 判断是否超过阈值
    if count >= threshold then
    return 0
    end

    -- 4. 记录本次请求
    redis.call('ZADD', key, now, now)

    -- 5. 设置过期时间
    redis.call('EXPIRE', key, window)

    return 1
  3. Java实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class RedisLimiter {
    private final StringRedisTemplate redisTemplate;
    private final String luaScript;

    public RedisLimiter(StringRedisTemplate redisTemplate) {
    this.redisTemplate = redisTemplate;
    // 加载Lua脚本
    this.luaScript = loadLuaScript();
    }

    public boolean isAllowed(String key, int window, int threshold) {
    List<String> keys = Collections.singletonList(key);
    long now = System.currentTimeMillis();

    // 执行Lua脚本
    Long result = redisTemplate.execute(
    new DefaultRedisScript<>(luaScript, Long.class),
    keys,
    String.valueOf(window),
    String.valueOf(threshold),
    String.valueOf(now)
    );

    return result != null && result == 1;
    }
    }
  4. 动态调整实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class DynamicRateLimiter {
    private static final String LIMIT_CONFIG_KEY = "rate:limit:config";

    // 更新限流阈值
    public void updateThreshold(String key, int threshold) {
    redisTemplate.opsForHash().put(
    LIMIT_CONFIG_KEY,
    key,
    String.valueOf(threshold)
    );
    }

    // 获取当前阈值
    private int getCurrentThreshold(String key) {
    String value = (String) redisTemplate.opsForHash()
    .get(LIMIT_CONFIG_KEY, key);
    return value == null ?
    defaultThreshold : Integer.parseInt(value);
    }

    // 限流检查
    public boolean isAllowed(String key) {
    int threshold = getCurrentThreshold(key);
    return redisLimiter.isAllowed(
    key,
    window,
    threshold
    );
    }
    }
  5. 滑动时间窗口示意

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    时间窗口滑动:
    now-60s now
    │ │
    ▼ ▼
    ┌─────────────────────┐
    │ 时间窗口(60s) │
    └─────────────────────┘
    │ │
    │ ┌──────┐ │
    │ │请求量│ │
    │ └──────┘ │
    │ │
    过期的请求 新请求
  6. 使用示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @RestController
    public class ApiController {
    private final DynamicRateLimiter limiter;

    @GetMapping("/api/test")
    public String test() {
    String key = "api:test";
    if (!limiter.isAllowed(key)) {
    throw new RuntimeException("请求被限流");
    }
    // 业务逻辑
    return "success";
    }

    @PostMapping("/limit/update")
    public void updateLimit(
    @RequestParam String key,
    @RequestParam int threshold
    ) {
    limiter.updateThreshold(key, threshold);
    }
    }

Seata AT模式详解

  1. AT模式原理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    执行流程:
    ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
    │ 业务SQL │ │ 解析SQL │ │ 生成回滚SQL │
    └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
    │ │ │
    ▼ ▼ ▼
    ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
    │ 记录前镜像 │ ──► │ 执行业务SQL │ ──► │ 记录后镜像 │
    └─────────────┘ └─────────────┘ └─────────────┘

    两阶段提交:
    Phase 1: Try
    - 业务SQL执行
    - 解析SQL
    - 记录前后镜像
    - 生成回滚日志

    Phase 2: Commit/Rollback
    - Commit: 删除回滚日志
    - Rollback: 根据镜像生成反向SQL
  2. 核心组件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // 1. 数据源代理
    @Bean
    public DataSource dataSource() {
    DruidDataSource dataSource = new DruidDataSource();
    // 配置数据源
    dataSource.setUrl("jdbc:mysql://localhost:3306/test");

    // 使用Seata代理数据源
    return new DataSourceProxy(dataSource);
    }

    // 2. 全局事务注解
    @GlobalTransactional
    @Transactional
    public void businessMethod() {
    // 订单服务
    orderService.create(order);
    // 库存服务
    stockService.deduct(stock);
    // 账户服务
    accountService.debit(money);
    }
  3. 实际应用场景

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    场景一:电商下单
    ┌─────────────┐
    │ 下单服务 │
    └──────┬──────┘


    ┌──────┴──────┐ ┌─────────────┐ ┌─────────────┐
    │ 订单服务 │ ──► │ 库存服务 │ ──► │ 账户服务 │
    └─────────────┘ └─────────────┘ └─────────────┘
    订单表 库存表 账户表
    - 创建订单 - 扣减库存 - 扣减余额

    场景二:积分兑换
    ┌─────────────┐
    │ 兑换服务 │
    └──────┬──────┘


    ┌──────┴──────┐ ┌─────────────┐
    │ 积分服务 │ ──► │ 商品服务 │
    └─────────────┘ └─────────────┘
    积分表 库存表
    - 扣减积分 - 扣减库存
  4. 实现示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 订单服务
    @GlobalTransactional
    public void createOrder(OrderDTO orderDTO) {
    // 1. 创建订单
    Order order = new Order();
    order.setUserId(orderDTO.getUserId());
    order.setAmount(orderDTO.getAmount());
    orderMapper.insert(order);

    // 2. 调用库存服务
    Result stockResult = stockService.deduct(
    orderDTO.getProductId(),
    orderDTO.getQuantity()
    );
    if (!stockResult.isSuccess()) {
    throw new BusinessException("库存不足");
    }

    // 3. 调用账户服务
    Result accountResult = accountService.debit(
    orderDTO.getUserId(),
    orderDTO.getAmount()
    );
    if (!accountResult.isSuccess()) {
    throw new BusinessException("余额不足");
    }
    }
  5. 优缺点分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    优点:
    1. 对业务无侵入
    2. 无需手动编写回滚逻辑
    3. 性能损耗小
    4. 兼容已有数据库

    缺点:
    1. 依赖数据库事务
    2. 需要代理数据源
    3. 无法处理复杂业务
    4. 表需要主键和索引
  6. 使用建议

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    适用场景:
    1. 简单的数据库操作
    2. 常规的CRUD业务
    3. 对性能要求不是特别高
    4. 数据库支持事务

    不适用场景:
    1. 复杂业务逻辑
    2. 高并发场景
    3. 涉及非事务性资源
    4. 跨数据库类型
  7. 性能优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Configuration
    public class SeataConfig {

    @Bean
    public GlobalTransactionScanner globalTransactionScanner() {
    return new GlobalTransactionScanner(
    "order-service", // 应用名
    "my_test_tx_group" // 事务分组
    );
    }

    // 配置事务分组
    @Bean
    public ConfigurationCache configurationCache() {
    ConfigurationCache cache = new ConfigurationCache();
    cache.putConfig(
    "service.vgroupMapping.my_test_tx_group",
    "default"
    );
    return cache;
    }
    }

供应链场景下 EventStore 的价值与自研构建

在 Keep 的供应链 DDD 改造实践中,多层级业务单据的状态依赖库存变更,若「改库」与「发事件」分离,容易导致上下游状态不一致。采用 EventStore「先持久化事件、再驱动下游」是解决该问题的典型思路。本文从为何用、价值、如何自研三方面整理,并辅以事件流 Mermaid 图。

参考:一个电商供应链系统的 DDD 实战


一、为什么要用 EventStore?

1.1 问题本质

库存变更会牵涉多层业务单据,上层单据状态依赖底层仓储单据状态:

  • 采购入库单「入库完成」→ 采购单「已完成」
  • 销售出库单「出库完成」→ 销售发货单「已发货」

若采用「先改库再发 MQ」或「先发事件再改库」:

  • 库已更新、事件没发出去 → 下游状态永远不更新
  • 事件已发、本地事务回滚 → 下游已按事件改了状态,与真实库存不一致

本质是:本地事务事件投递不在同一原子边界内。

1.2 多单据状态依赖示意

1
2
3
4
5
6
7
8
9
10
11
flowchart TB
subgraph 仓储层
A[出库单] -->|出库完成| B[入库单]
end
subgraph 业务层
C[销售发货单] --> D[采购单]
end
A -.->|状态同步| C
B -.->|状态同步| D
A -->|依赖| E[库存变更]
B -->|依赖| E

EventStore 的思路:把「发生了什么」先作为事件持久化,再基于事件驱动下游。事件与写库纳入同一套一致性模型(如先落事件再改库、或同事务),下游只消费「已持久化」的事件,避免上述两类不一致。


二、使用 EventStore 的价值

价值 说明
状态一致性 上层单据状态由「已落库的领域事件」驱动,事件必先持久化再投递,避免发信与本地事务割裂。
可追溯 / 可审计 事件即流水,谁在何时因何单据触发何种库存变更,有据可查;出问题可回溯、对账。
解耦 库存上下文只发布「库存已扣减 / 已入库」等事件,订单、采购、售后通过订阅更新自身状态,不直接强依赖库存表或 RPC。
可复用 文中提到「沉淀出较通用的事件组件 EventStore,后续在 Keep 电商内部快速推广复用」。
可扩展 事件持久化后可重放、对账、离线分析,甚至向 Event Sourcing 演进。

三、EventStore 自研构建思路

文中未给出完整实现细节,仅给出「发布领域事件」「订阅组注册」「在订阅组中声明订阅事件」及「领域事件异常处理」等使用方式。下面结合通用实践,给出可落地的自研构建思路与事件流。

3.1 核心原则

  • 事件先持久化、再投递:事件表与业务表同库同事务,要么一起成功要么一起回滚。
  • 至少一次投递:通过扫表或 Outbox 把待发送事件推到 MQ,发送失败可重试。
  • 消费幂等:下游按事件 ID + 消费者组做幂等,避免重复消费导致状态错乱。

3.2 事件流总览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sequenceDiagram
participant Domain as 领域层
participant App as 应用/事务
participant EventStore as 事件表
participant Scanner as 扫表/Outbox
participant MQ as 消息队列
participant Consumer as 订阅者

Domain->>App: 库存变更 + 发布事件
App->>EventStore: 同事务:写业务表 + 写事件行
Note over App,EventStore: 事件状态:待发送
App-->>Domain: 事务提交

Scanner->>EventStore: 扫描待发送事件
Scanner->>MQ: 投递事件
Scanner->>EventStore: 标记已发送

MQ->>Consumer: 按订阅组推送
Consumer->>Consumer: 幂等校验 + 更新下游状态
Consumer->>MQ: ACK

3.3 事件表与发布

  • 事件表字段(示意):事件 ID、聚合根类型/ID、事件类型、payload、状态(待发送 / 已发送)、创建时间等。
  • 发布流程:领域层在库存变更后调用「发布接口」;应用层在同一事务内写业务表并插入事件行;事务提交后,事件行处于「待发送」状态。

3.4 投递与订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
flowchart LR
subgraph 持久化
E[事件表]
end
subgraph 投递
S[扫表/Outbox] --> M[MQ]
end
subgraph 消费
M --> G1[订阅组 A]
M --> G2[订阅组 B]
G1 --> C1[订单状态]
G2 --> C2[采购单状态]
end
E --> S
  • 投递:定时任务或 Binlog 解析扫描「待发送」事件,推送到 MQ;发送成功后更新为「已发送」,失败则保留待发送便于重试。
  • 订阅:按订阅组管理消费者;在组内声明本组关心的事件类型(文中「在订阅组中声明订阅事件」);消费时用事件 ID + 组 ID 做幂等。

3.5 异常处理(对应文中「黄色部分」)

  • 发送失败:事件仍为待发送,下次扫表再发;可加告警与最大重试次数。
  • 消费失败:重试、死信、告警;保证不丢事件、不重复生效(幂等)。
  • 监控:对「已持久化但长期未成功投递」的事件做监控与补偿。

四、小结

  • 为何用:多层级单据状态依赖库存变更,「改库」与「发事件」分离易导致不一致;EventStore 用「事件先持久化再驱动」把二者绑在同一套一致性边界内。
  • 价值:状态一致、可追溯、解耦、组件可复用、便于重放与扩展。
  • 如何自研:事件表与业务同事务写入 → 扫表/Outbox 推 MQ → 按订阅组 + 事件类型消费 → 幂等 + 异常与重试;领域层只依赖「事件发布接口」,技术细节在基础设施层实现。

把 EventStore 作为「领域事件持久化与投递」的单一职责组件,是供应链等多单据、强一致场景下常见且可复用的做法。

电商库存模型:占用、可用、在途、冻结的划分与落地

电商供应链里,库存不只是一串数字,而是多种「状态」的组合。把实物、占用、可用、在途、冻结五类拆清楚,并显式刻画它们之间的流转,才能避免超卖、少卖和对账困难。本文先说划分,再画流转,最后落库与一致性要点。


一、五类库存的定义与关系

单 SKU、单仓库为维度,可抽象出五类数量(均为非负整数):

类型 含义 典型来源
实物库存 仓内实际可盘点的数量 采购入库、销售出库、调拨、盘点
占用库存 已成交未出库(订单已下单未发货) 销售下单、预售确认
可用库存 可被新订单占用的数量 由公式算得,不单独持久
在途库存 已采购未入库 采购单下发、在途入库
冻结库存 被活动/调拨预占,尚未转成占用或出库 秒杀预占、仓间调拨预占

数量关系(核心公式):

1
可用库存 = 实物库存 - 占用库存 - 冻结库存

注意:在途不参与「可售」计算,只有入库后才变成实物,再通过可用被售卖;占用冻结都是从「可用」里扣出来的,只是业务含义不同(前者是已卖,后者是预占)。


二、库存状态的流转(核心)

下面用「谁触发 → 哪些数量增减」的方式描述流转,便于对照落库与事件。

2.1 采购链路

1
2
3
4
flowchart LR
A[采购下单] -->|在途 +N| B[在途]
B --> C[采购入库]
C -->|在途 -N,实物 +N| D[实物]
  • 采购单确认:只在「在途」上 +N,不动实物与可用。
  • 入库单完成:在途 -N,实物 +N;可用因公式自动增加(占用、冻结不变时)。

2.2 销售链路(正常下单 → 发货)

1
2
3
4
flowchart LR
A[下单占库存] -->|可用 -N,占用 +N| B[占用]
B --> C[发货出库]
C -->|占用 -N,实物 -N| D((出库完成))
  • 下单:先扣可用(等价于占用增加),占用 +N;实物暂时不变,等出库再减。
  • 出库:占用 -N、实物 -N;取消则只做「占用 -N」,可用通过公式回升。

2.3 订单取消

1
2
3
flowchart LR
A[订单取消] -->|占用 -N| B[占用释放]
B -->|可用 +N(公式)| C[可用]

只减占用,不碰实物;可用随公式恢复。

2.4 促销/秒杀预占(活动未开始)

1
2
3
4
5
flowchart TB
A[活动预占] -->|可用 -N,冻结 +N| B[冻结]
B --> C{后续操作}
C -->|活动开始/用户下单<br/>冻结 -N,占用 +N| D[占用]
C -->|活动取消/释放<br/>冻结 -N,可用 +N| E[可用]
  • 预占:把「可售」从可用挪到冻结,避免被普通订单卖光。
  • 开卖:冻结减、占用加,进入正常销售链路。
  • 取消/过期:只减冻结,可用通过公式恢复。

2.5 仓间调拨

1
2
3
4
5
6
7
8
9
10
11
flowchart TB
subgraph 调出仓
A1[调拨单创建/预占] -->|可用 -N,冻结 +N| A2[冻结]
A2 --> A3[调拨出库]
A3 -->|冻结 -N,实物 -N| A4((出库))
end
subgraph 调入仓
B1[在途] --> B2[调拨入库]
B2 -->|在途 -N,实物 +N| B3[实物]
end
A4 -.-> B1

调出仓:先冻结再出库,避免调拨与销售抢同一批可用;调入仓可按「在途」建模(在途 +N 在调拨发运时,在途 -N、实物 +N 在入库时)。

2.6 流转总览(概念图)

1
2
3
4
5
6
7
8
9
10
11
12
13
flowchart LR
subgraph 入库
PO[采购下单] -->|在途 +N| IT[在途]
IT -->|采购入库| PH[实物]
end
subgraph 销售与预占
PH -->|下单占库| OC[占用]
OC -->|发货出库| OUT1((出库))
AV[可用] -->|活动预占| FR[冻结]
FR -->|活动开始| OC
OC -->|取消订单| AV
FR -->|释放| AV
end

三、落地要点

3.1 表与字段设计(示意)

  • 库存主表(按 仓库 + SKU):physical_qtyoccupied_qtyfrozen_qtyin_transit_qty
  • 可用不落库,查询时算:available_qty = physical_qty - occupied_qty - frozen_qty
  • 约束available_qty 计算结果 ≥ 0;扣减可用时等价为「占用+」或「冻结+」,并做乐观锁或行锁,防止超卖。

3.2 扣减顺序与事务

  • 下单占库存:在事务内「可用检查 → 占用+」或「实物 - 占用 - 冻结 ≥ 需求」后 occupied_qty += N
  • 出库:同一事务内「占用-」「实物-」;取消订单只「占用-」。
  • 预占、释放、在途增减同理,保证一次业务操作内,涉及的多列增减原子完成

3.3 流水与可追溯

每次数量变动落库存流水表:仓库、SKU、变动类型(占用+/-、冻结+/-、实物+/-、在途+/-)、变动量、单据号(订单号/采购单号/调拨单号等)、前后快照。便于对账、排查超卖和做审计。

3.4 与 DDD / 事件结合

  • 聚合根:以「仓库+SKU」为库存聚合根,变更通过领域方法(如 reserve()confirmOutbound())完成,内部维护五类数量。
  • 跨上下文:出库完成可发「库存已扣减」领域事件,驱动订单侧更新发货状态;事件 payload 带单据号与数量,便于幂等与对账。

四、小结

  • 五类:实物(仓内实有)、占用(已卖未发)、可用(公式)、在途(已采未入)、冻结(预占未用)。
  • 关系:可用 = 实物 - 占用 - 冻结;在途独立,入库后才参与实物与可用。
  • 流转:采购动在途与实物;销售动可用↔占用与实物;活动/调拨动可用↔冻结,再转占用或实物。
  • 落地:主表存四类数量、可用计算得出;事务内原子增减;流水表记录每次变动;领域内封装变更并可选发事件,保证一致与可追溯。

把「划分 + 流转」在设计与代码里显式化,是库存准确、可解释、易扩展的基础。

解决思路

  1. 暴力解法
  2. 滑动窗口
  3. 动态规划

特点

  1. 最大子序和:连续、子串和最大

    1
    2
    dp[i]:以nums[i]为结尾的子串的最大和
    dp[i] = max(dp[i-1] + nums[i], nums[i])
  2. 最长回文子串:连续、子串为回文

    1
    2
    dp[i][j]:s[i]到s[j]是否为回文
    dp[i][j] = dp[i+1][j-1] && s[i] == s[j]
    1
    2
    3
    4
    5
    6
    for (int k = 0; k < s.length(); k++) {
    for (int i = 0, j = k; i < s.length() && j < s.length(); i++, j++) {
    boolean isSym = s.charAt(i) == s.charAt(j);
    dp[i][j] = isSym && ((j <= (i + 1)) || (dp[i + 1][j - 1] == true));
    }
    }

问题描述

给定一个整数数组 nums,找出具有最大和的连续子数组(至少包含一个元素),返回其最大和。

分析思路

1. 问题可视化

1
2
3
4
5
6
7
8
9
10
11
12
示例数组: [-2, 1, -3, 4, -1, 2, 1, -5, 4]

可视化表示:
┌─┐
│4│
└─┘ ┌─┐
┌─┐ │ ┌─┐│2│┌─┐
│1│ │ │ ││ ││1│
└─┘ │ └─┘└─┘└─┘ ┌─┐
│ │ │ │ │4│
└─────┴─────┴─────┴──────└─┘
-2 1 -3 4 -1 2 1 -5 4

2. 思路分析

  1. 局部最优解

    1
    2
    3
    4
    5
    6
    7
    8
    位置i的最大子数组和有两种可能:
    ┌────────────────┐
    │ 1. 加入前面的和 │
    └────────────────┘

    ┌────────────┐
    │ 2. 从自己开始 │
    └────────────┘
  2. 状态转移

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    dp[i] = max(nums[i], dp[i-1] + nums[i])

    示意图:
    dp[i-1] < 0 → ┌─────┐
    │重新开始│
    └─────┘

    dp[i-1] > 0 → ┌─────┐
    │继续累加│
    └─────┘

3. 解题思路图解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
数组: [-2, 1, -3, 4, -1, 2, 1, -5, 4]

步骤分析:
1) -2: [-2]
└── 当前最大和: -2

2) 1: [-2, 1]
└── 重新开始: 1

3) -3: [1, -3]
└── 继续累加: -2,重新开始

4) 4: [4]
└── 重新开始: 4

5) -1: [4, -1]
└── 继续累加: 3

6) 2: [4, -1, 2]
└── 继续累加: 5

7) 1: [4, -1, 2, 1]
└── 继续累加: 6

8) -5: [4, -1, 2, 1, -5]
└── 继续累加: 1

9) 4: [4, -1, 2, 1, -5, 4]
└── 继续累加: 5

4. 关键观察

  1. 连续性质

    1
    2
    [a] → [a,b] → [a,b,c]
    连续扩展
  2. 决策点

    1
    2
    3
    4
    5
    6
    7
    当前位置i的决策:
    ┌─────────────┐
    │ 加入前面的和 │ ← dp[i-1] > 0
    └─────────────┘
    ┌─────────┐
    │ 重新开始 │ ← dp[i-1] ≤ 0
    └─────────┘

5. 优化思路

  1. 空间优化

    1
    2
    3
    4
    只需要记录:
    ┌──────────┐ ┌──────────┐
    │当前最大和│ 和 │全局最大和│
    └──────────┘ └──────────┘
  2. 时间优化

    1
    2
    3
    一次遍历 → O(n)
    无需回溯
    无需额外空间

总结

  1. 问题的核心是理解:在每个位置,我们要决定是加入之前的和,还是重新开始。

  2. 使用动态规划思想,但可以优化为O(1)空间复杂度。

  3. 关键是理解局部最优和全局最优的关系。

架构概览

MPP (Massive Parallel Processing)

1
2
3
4
5
6
7
8
9
10
11
12
13
┌──────────────────────────────────────────────┐
│ MPP 架构 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Node 1 │ │ Node 2 │ │ Node 3 │ │
│ │ CPU │ │ CPU │ │ CPU │ │
│ │ Memory │◄─►│ Memory │◄─►│ Memory │ │
│ │ Storage │ │ Storage │ │ Storage │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ ▲ ▲ ▲ │
│ └────────────┴────────────┘ │
│ 高速互联网络 │
└──────────────────────────────────────────────┘

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
┌──────────────────────────────────────────────┐
│ MapReduce 架构 │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Map 1 │ │ Map 2 │ │ Map 3 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │Shuffle 1│ │Shuffle 2│ │Shuffle 3│ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ ┌────▼────┐ ┌────▼────┐ ┌────▼────┐ │
│ │Reduce 1 │ │Reduce 2 │ │Reduce 3 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└──────────────────────────────────────────────┘

核心区别

1. 数据处理模式

MPP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// MPP数据处理示例
public class MPPProcessor {
public void processQuery(String sql) {
// 1. 并行分发查询
List<Node> nodes = getAvailableNodes();
CompletableFuture<?>[] futures = nodes.stream()
.map(node -> CompletableFuture.runAsync(() -> {
// 每个节点并行执行相同的查询
node.executeQuery(sql);
}))
.toArray(CompletableFuture[]::new);

// 2. 等待所有节点执行完成
CompletableFuture.allOf(futures).join();

// 3. 合并结果
mergeResults(nodes);
}
}

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// MapReduce处理示例
public class MapReduceProcessor {
public void processData(List<String> input) {
// 1. Map阶段
Map<String, List<String>> mappedData = input.stream()
.parallel()
.map(this::mapFunction)
.collect(Collectors.groupingBy(MapResult::getKey));

// 2. Shuffle阶段
shuffleData(mappedData);

// 3. Reduce阶段
List<String> result = mappedData.entrySet().stream()
.parallel()
.map(entry -> reduceFunction(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
}
}

2. 数据交互方式

1
2
3
4
5
6
7
8
9
MPP数据交互:
┌────────┐ ┌────────┐
│Node 1 │◄───►│Node 2 │ 实时数据交换
└────────┘ └────────┘

MapReduce数据交互:
┌────────┐ ┌────────┐ ┌────────┐
│Map │────►│Shuffle │────►│Reduce │ 阶段性数据传输
└────────┘ └────────┘ └────────┘

3. 资源管理

MPP

1
2
3
4
5
6
7
8
9
10
11
# MPP资源配置示例
cluster:
nodes:
- id: node1
cpu: 16
memory: 64GB
storage: 2TB
network: 10Gbps
interconnect:
type: InfiniBand
bandwidth: 100Gbps

MapReduce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# MapReduce资源配置示例
job:
mappers: 100
reducers: 20
resources:
map:
cpu: 2
memory: 4GB
reduce:
cpu: 4
memory: 8GB
intermediate:
compression: true
storage: HDFS

性能对比

1. 延迟比较

1
2
3
4
5
6
7
响应时间对比:
MPP: ├────────┤ (毫秒级)
MapReduce: ├────────────────────┤ (分钟级)

适用场景:
MPP: 实时分析、交互式查询
MapReduce: 批量处理、大规模ETL

2. 扩展性对比

1
2
3
4
5
6
7
8
9
10
11
节点扩展效果:

MPP扩展:
性能 ─────────►
节点数 ─────────►
(近似线性扩展,但有上限)

MapReduce扩展:
性能 ─────────────►
节点数 ─────────────►
(可以持续线性扩展)

应用场景

MPP最适合:

  1. OLAP分析场景
  2. 实时数据仓库
  3. 交互式查询
  4. 复杂SQL处理

MapReduce最适合:

  1. 大规模数据批处理
  2. ETL作业
  3. 日志分析
  4. 数据清洗

选型建议

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ArchitectureSelector {
public String selectArchitecture(Requirements req) {
if (req.needsRealTimeProcessing() &&
req.dataSize() < 100_000_000 &&
req.requiresComplexSQL()) {
return "MPP";
} else if (req.isBatchProcessing() &&
req.dataSize() > 1_000_000_000) {
return "MapReduce";
}
return "Need further analysis";
}
}

总结对比表

特性 MPP MapReduce
处理模式 并行处理 分阶段处理
数据交互 实时 阶段性
延迟 毫秒级 分钟级
扩展性 有限制 近乎无限
适用场景 实时分析 批处理
数据规模 GB~TB TB~PB
计算复杂度 中等
资源消耗 较大 可控

热key识别方案

1. 客户端采样统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class HotKeyDetector {
private LoadingCache<String, LongAdder> keyCounterCache = CacheBuilder.newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES)
.build(new CacheLoader<String, LongAdder>() {
@Override
public LongAdder load(String key) {
return new LongAdder();
}
});

// 采样率1%
private static final double SAMPLE_RATE = 0.01;

public void recordKeyAccess(String key) {
// 采样统计
if (ThreadLocalRandom.current().nextDouble() < SAMPLE_RATE) {
keyCounterCache.getUnchecked(key).increment();
}
}

// 定时任务,每分钟统计热key
@Scheduled(fixedRate = 60000)
public void detectHotKeys() {
Map<String, LongAdder> counters = keyCounterCache.asMap();
// 按访问量排序,取Top N
List<Map.Entry<String, LongAdder>> hotKeys = counters.entrySet().stream()
.sorted((e1, e2) -> Long.compare(e2.getValue().sum(), e1.getValue().sum()))
.limit(100)
.collect(Collectors.toList());

// 推送到本地缓存
updateLocalCache(hotKeys);
}
}

2. Redis Server端监控

1
2
3
4
5
6
7
8
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│Redis Monitor│--->│日志分析服务 │--->│热key计算 │
└─────────────┘ └─────────────┘ └─────────────┘


┌─────────────┐
│本地缓存更新 │
└─────────────┘
1
2
# Redis MONITOR命令采样
redis-cli MONITOR | grep -v "PING" | awk '{print $4}' | sort | uniq -c | sort -nr | head -n 10

3. 代理层统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Aspect
@Component
public class RedisAccessAspect {
private static final int WINDOW_SIZE_SECONDS = 60;
private TimeWindowCounter counter = new TimeWindowCounter(WINDOW_SIZE_SECONDS);

@Around("execution(* org.springframework.data.redis.core.RedisTemplate.*(..))")
public Object around(ProceedingJoinPoint point) throws Throwable {
String key = extractKey(point);
counter.increment(key);
return point.proceed();
}
}

public class TimeWindowCounter {
private Queue<Map<String, AtomicInteger>> windows = new LinkedList<>();
private final int windowSize;

public void increment(String key) {
getCurrentWindow().computeIfAbsent(key, k -> new AtomicInteger()).incrementAndGet();
}

public Map<String, Integer> getTopKeys(int n) {
// 合并所有时间窗口的统计数据
return mergeWindows().entrySet().stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.limit(n)
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(v1, v2) -> v1,
LinkedHashMap::new
));
}
}

大key识别方案

1. Redis命令扫描

1
2
# 使用SCAN命令渐进式扫描
redis-cli --bigkeys -i 0.1

2. 自定义扫描工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Service
public class BigKeyScanner {
@Autowired
private RedisTemplate<String, Object> redisTemplate;

public List<KeySize> scanBigKeys(int threshold) {
List<KeySize> bigKeys = new ArrayList<>();
ScanOptions options = ScanOptions.scanOptions().count(100).build();
Cursor<String> cursor = redisTemplate.scan(options);

while(cursor.hasNext()) {
String key = cursor.next();
long size = getKeySize(key);
if (size > threshold) {
bigKeys.add(new KeySize(key, size));
}
}
return bigKeys;
}

private long getKeySize(String key) {
DataType type = redisTemplate.type(key);
switch (type) {
case STRING:
return redisTemplate.opsForValue().get(key).toString().length();
case HASH:
return redisTemplate.opsForHash().size(key);
case LIST:
return redisTemplate.opsForList().size(key);
case SET:
return redisTemplate.opsForSet().size(key);
case ZSET:
return redisTemplate.opsForZSet().size(key);
default:
return 0;
}
}
}

本地缓存优化方案

1. 多级缓存架构

1
2
3
4
5
请求 --> 本地缓存(Caffeine) --> Redis集群 --> 数据库
│ │
│ │
└──────────────────────┘
热key直接返回

2. 本地缓存实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Service
public class MultiLevelCache {
private LoadingCache<String, Object> localCache = Caffeine.newBuilder()
.maximumSize(10000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.recordStats()
.build(key -> null); // 缓存未命中时返回null

@Autowired
private RedisTemplate<String, Object> redisTemplate;

public Object get(String key) {
// 1. 查询本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}

// 2. 查询Redis
value = redisTemplate.opsForValue().get(key);
if (value != null) {
// 如果是热key,放入本地缓存
if (isHotKey(key)) {
localCache.put(key, value);
}
}

return value;
}

private boolean isHotKey(String key) {
// 从热key统计结果中判断
return HotKeyDetector.isHot(key);
}
}

3. 缓存一致性保证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class CacheConsistencyManager {
@Autowired
private MultiLevelCache multiLevelCache;

// 监听数据变更消息
@KafkaListener(topics = "cache-update")
public void handleCacheUpdate(CacheUpdateMessage message) {
// 删除本地缓存
multiLevelCache.evict(message.getKey());
// 更新Redis缓存
multiLevelCache.refreshRedis(message.getKey());
}
}

监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
metrics:
- name: "hot_key_count"
type: "gauge"
labels:
- "key"
- "qps"

- name: "big_key_size"
type: "gauge"
labels:
- "key"
- "size"

- name: "local_cache_hit_rate"
type: "gauge"
labels:
- "application"

最佳实践

  1. 采样率控制

    • 客户端采样率动态调整
    • 重点监控高QPS时段
  2. 本地缓存策略

    • 仅缓存热key
    • 设置合理的过期时间
    • 控制缓存数量
  3. 一致性保证

    • 消息队列通知更新
    • 定时刷新机制
    • 版本号控制
  4. 监控告警

    • 热key变化趋势
    • 内存使用监控
    • 缓存命中率监控