Kafka消息消费失败处理方案
重试机制设计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class KafkaConsumerConfig {
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
// 禁用自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 从最早的消息开始消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
}
public class ReviewService {
public void handleQualificationChanged(
QualificationChangedEvent event,
Acknowledgment ack) {
try {
// 1. 业务处理
processEvent(event);
// 2. 确认消息
ack.acknowledge();
} catch (Exception e) {
// 3. 重试次数达到上限,进入死信队列
if (!isRetryable(e)) {
sendToDLQ(event);
ack.acknowledge();
return;
}
throw e; // 触发重试
}
}
}死信队列处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// 1. 死信队列配置
public TopicBuilder deadLetterTopic() {
return TopicBuilder.name("incentive.dlq.v1")
.partitions(3)
.replicas(3)
.build();
}
// 2. 死信消息处理服务
public class DeadLetterService {
public void processDLQ(ConsumerRecord<String, String> record) {
// 1. 记录错误日志
log.error("Processing DLQ message: {}", record);
// 2. 错误通知
notifyOperators(record);
// 3. 持久化到错误表
saveToErrorTable(record);
}
// 4. 手动重试接口
public void retryMessage(String messageId) {
Message msg = errorRepository.find(messageId);
kafkaTemplate.send(msg.getOriginalTopic(),
msg.getKey(),
msg.getPayload());
}
}错误数据持久化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ErrorMessage {
private String messageId;
private String topic;
private String key;
private String payload;
private String errorMsg;
private int retryCount;
private LocalDateTime createTime;
private MessageStatus status;
}
public enum MessageStatus {
FAILED, // 失败待处理
RETRYING, // 重试中
RESOLVED // 已解决
}监控告警机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class KafkaMonitor {
// 1. 消费延迟监控
public void checkLag() {
Map<TopicPartition, Long> lags = getLags();
if (isLagTooHigh(lags)) {
alertService.send("消费延迟过高");
}
}
// 2. 错误率监控
public void checkErrorRate() {
double errorRate = getErrorRate();
if (errorRate > threshold) {
alertService.send("错误率过高");
}
}
}补偿机制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class CompensationService {
// 1. 定时补偿
public void compensate() {
List<ErrorMessage> errors =
errorRepository.findUnresolved();
for (ErrorMessage error : errors) {
// 根据业务类型选择补偿策略
CompensationStrategy strategy =
getStrategy(error.getTopic());
strategy.compensate(error);
}
}
// 2. 手动补偿接口
public void manualCompensate(String messageId) {
ErrorMessage error =
errorRepository.find(messageId);
retryMessage(error);
}
}