Skip to main content

07小节:基于注解实现去重表消息防止重复消费

作者:程序员马丁

在线博客:https://nageoffer.com

note

热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校招或社招上拿个offer。

基于注解实现去重表消息防止重复消费,元数据信息:

©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。


内容摘要:在本章节中,我们通过结合消息去重表和 Redis,设计并实现了高性能的幂等方案。这种方案确保在处理消息时,即使是重复消费的场景,也能保持系统的状态一致性,避免重复处理带来的问题。特别是在使用像 RocketMQ、Kafka 等消息队列产品时,幂等性处理是必须要关注的核心问题。

课程目录如下所示:

  • 业务背景
  • 消息幂等性
  • Git 分支
  • 幂等设计
  • 抽象通用幂等组件
  • 常见问题答疑
  • 更复杂的幂等场景

业务背景

当使用消息队列时,客户端重复消费可能会成为一个严重的问题。

这是因为消息队列具有持久性和可靠性的特性,确保消息能够被成功传递给消费者。然而,这也会导致客户端在某些情况下重复消费消息,例如网络故障(或延迟)、客户端崩溃、消息处理失败等情况。

为了避免这种情况发生,需要在客户端实现一些机制来确保消息不会被重复消费,例如记录消费者已经处理的消息 ID、使用分布式锁来控制消费进程的唯一性等。这些机制能够保证消息被成功处理,同时也能够提高系统的可靠性和稳定性。本小节将探讨如何确保消息队列中的消息不会被重复消费,下文将以 RocketMQ 为例说明。

消息幂等性

在使用消息队列 RocketMQ 实现异步化、解耦、削峰等功能的情况下,我们认为消息中间件是一个可靠的组件,这里的可靠性指的是,只要消息被成功投递到了消息中间件,它就不会丢失,至少能够被消费者成功消费一次。这是消息中间件最基本的特性之一,也就是我们通常所说的 “AT LEAST ONCE”,即消息至少会被成功消费一遍。

举个例子,假设一个消息 M 被发送到消息中间件并被消费程序 A 接收到,A 开始消费这个消息,但是在消费过程中程序重启了。由于这个消息没有被标记为已经被消费成功,消息中间件会持续地将这个消息投递给消费者,直到消息被成功消费为止。

然而,这种可靠性特性也会导致消息被多次投递的情况。举个例子,仍然以之前的例子为例,如果消费程序 A 接收并完成消息 M 的消费逻辑后,正准备通知消息中间件“我已经消费成功了”,但在此之前程序A又重启了,那么对于消息中间件来说,这个消息 M 并没有被成功消费过,因此消息中间件会继续投递这个消息。而对于消费程序A来说,尽管它已经成功消费了这个消息,但由于程序重启导致消息中间件继续投递,看起来就好像这个消息还没有被成功消费过一样。

在 RockectMQ 的场景中,这意味着同一个 messageId 的消息会被重复投递。由于消息的可靠投递是更重要的,所以避免消息重复投递的任务转移给了应用程序自身来实现。这也是 RocketMQ 文档强调消费逻辑需要自行实现幂等性的原因。实际上,这背后的逻辑是:在分布式场景下,保证消息不丢和避免消息重复投递是矛盾的,但是消息重复投递是可以解决的,而消息丢失则非常麻烦。

Git 分支

20240905_dev_coupon-distribute-v4_idempotent_ding.ma

幂等设计

下述方案的优点在于,使用 Redis 消息去重表,不依赖事务,针对消息表本身做了状态的区分:消费中、消费完成。

如果消息已经在消费中,抛出异常,消息会触发延迟消费,在 RocketMQ 的场景下如果消息消费失败,会间隔时间后再次发起消费流程。

通过该方案可以解决什么问题?

  1. 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。
  2. 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。
  3. 支持上游业务生产者重发的业务重复的消息幂等问题。

为什么要给初始化的幂等标识新增 10 分钟过期时间?

在并发场景下,我们使用消息状态来实现并发控制,以使第二条消息被不断延迟消费(即重试)。但如果在此期间第一条消息也因某些异常原因(例如机器重启或外部异常)未成功消费,该怎么办呢?因为每次查询时都会显示消费中的状态,所以延迟消费会一直进行下去,直到最终被视为消费失败并被投递到死信 Topic 中(RocketMQ 默认最多可以重复消费 16 次)。

针对这个问题,我们采取了一种解决方案:在插入消息表时,必须为每条消息设置一个最长消费过期时间,例如 10 分钟。这意味着,如果某个消息在消费过程中超过了 10 分钟,就会被视为消费失败并从消息表中删除。

抽象通用幂等组件

消息防重复消费幂等组件是通用的,接下来的代码开发还是会放到 framework 基础架构组件里。

1. 自定义幂等注解

我们提供了一种通用的幂等注解,并通过 SpEL 的形式生成去重表全局唯一 Key。如果对 SpEL 不熟悉的同学,移步查看 历史章节 进行学习。

package com.nageoffer.onecoupon.framework.idempotent;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* 幂等注解,防止消息队列消费者重复消费消息
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部沟通群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2024-07-26
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {

/**
* 设置防重令牌 Key 前缀
*/
String keyPrefix() default "";

/**
* 通过 SpEL 表达式生成的唯一 Key
*/
String key();

/**
* 设置防重令牌 Key 过期时间,单位秒,默认 1 小时
*/
long keyTimeout() default 3600L;
}

2. 定义 AOP 逻辑增强

上面我们有说到,幂等需要设置两个状态,消费中和已消费,创建对应的枚举:

package com.nageoffer.onecoupon.framework.idempotent;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.Objects;

/**
* 幂等 MQ 消费状态枚举
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2024-07-10
*/
@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {

/**
* 消费中
*/
CONSUMING("0"),

/**
* 已消费
*/
CONSUMED("1");

@Getter
private final String code;

/**
* 如果消费状态等于消费中,返回失败
*
* @param consumeStatus 消费状态
* @return 是否消费失败
*/
public static boolean isError(String consumeStatus) {
return Objects.equals(CONSUMING.code, consumeStatus);
}
}

解锁付费内容,👉 戳