Skip to main content

11小节:推送用户已预约优惠券提醒

作者:程序员马丁

在线博客:https://nageoffer.com

note

热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校招或社招上拿个offer。

开发优惠券预约通知功能(二),元数据信息:

©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。


内容摘要:开发了优惠券预约通知的消息队列消费者,采用线程池并行发送用户预约提醒,确保用户能够及时收到通知。同时,为了保障线程池中的任务不丢失,我们通过 Redis Stream 延迟队列的方式来实现任务的可靠处理。这种方式既保证了提醒的实时性,也确保了高并发场景下任务的稳定性和可靠性。

课程目录如下所示:

  • 业务背景
  • Git 分支
  • 推送用户预约提醒
  • 常见问题答疑
  • 文末总结

业务背景

在上一个章节中,我们讨论了优惠券预约的设计,重点介绍了如何将预约提醒发送到 RocketMQ 延时队列中。在本章节,我们将详细说明如何将消息推送给用户,确保他们能够及时接收到预约提醒。

在 v1 分支的基础上,对优惠券模板提醒相关的类进行了名称重构,大家查看当前分支提交记录即可。

Git 分支

20240919_dev_coupon-remind-v2_rocketmq-bitmap_youya

本章节预约提醒优惠券核心代码由优雅同学贡献,感谢优雅提供的优秀代码设计。

推送用户预约提醒

1. 预约消息消费者

开发 RocketMQ 消息队列消费者,相关的用户提醒代码进行了抽象,放在了一个执行器中。

代码如下所示:

/**
* 提醒抢券消费者
* <p>
* 作者:优雅
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2024-07-21
*/
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "one-coupon_engine-service_coupon-remind_topic${unique-name:}",
consumerGroup = "one-coupon_engine-service_coupon-remind_cg${unique-name:}"
)
@Slf4j(topic = "CouponTemplateRemindDelayConsumer")
public class CouponTemplateRemindDelayConsumer implements RocketMQListener<MessageWrapper<CouponTemplateRemindDelayEvent>> {

private final CouponTemplateRemindExecutor couponTemplateRemindExecutor;

@Override
public void onMessage(MessageWrapper<CouponTemplateRemindDelayEvent> messageWrapper) {
// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)
log.info("[消费者] 提醒用户抢券 - 执行消费逻辑,消息体:{}", JSON.toJSONString(messageWrapper));

CouponTemplateRemindDelayEvent event = messageWrapper.getMessage();
CouponTemplateRemindDTO couponTemplateRemindDTO = BeanUtil.toBean(event, CouponTemplateRemindDTO.class);

// 根据不同策略向用户发送消息提醒
couponTemplateRemindExecutor.executeRemindCouponTemplate(couponTemplateRemindDTO);
}
}

向用户发送消息提醒的逻辑在 CouponTemplateRemindExecutor 执行器,分为几个版本演进。

代码如下所示:

/**
* 执行提醒
*
* @param couponTemplateRemindDTO 用户预约提醒请求信息
*/
public void executeRemindCouponTemplate(CouponTemplateRemindDTO couponTemplateRemindDTO) {
// 向用户发起消息提醒
switch (Objects.requireNonNull(CouponRemindTypeEnum.getByType(couponTemplateRemindDTO.getType()))) {
case APP -> sendAppMessageRemindCouponTemplate.remind(couponTemplateRemindDTO);
case EMAIL -> sendEmailRemindCouponTemplate.remind(couponTemplateRemindDTO);
default -> {
}
}
}

2. 线程池并行发送提醒

通过第一版代码,我们可以明显发现一个问题:消息队列的消费速度较慢,因为是逐条处理每条消息后才消费下一条。而我们希望尽快将这些预约信息发送给用户。基于这一前提,我们可以采用线程池进行并行发送,以提高消息处理和发送的效率。

代码如下所示:

// 提醒用户属于 IO 密集型任务
private final ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() << 1,
Runtime.getRuntime().availableProcessors() << 2,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy()
);

/**
* 执行提醒
*
* @param couponTemplateRemindDTO 用户预约提醒请求信息
*/
public void executeRemindCouponTemplate(CouponTemplateRemindDTO couponTemplateRemindDTO) {
executorService.execute(() -> {
// 向用户发起消息提醒
switch (Objects.requireNonNull(CouponRemindTypeEnum.getByType(couponTemplateRemindDTO.getType()))) {
case APP -> sendAppMessageRemindCouponTemplate.remind(couponTemplateRemindDTO);
case EMAIL -> sendEmailRemindCouponTemplate.remind(couponTemplateRemindDTO);
default -> {
}
}
});
}

线程池参数解析如下:

  • 核心线程数:Runtime.getRuntime().availableProcessors() << 1 CPU 核数 * 2,因为是 IO 密集型线程数可以多些。
  • 最大线程数:Runtime.getRuntime().availableProcessors() << 2 CPU 核数 * 4,因为是 IO 密集型线程数可以多些。
  • 阻塞队列:SynchronousQueue,不缓冲任务。
  • 拒绝策略:CallerRunsPolicy,通过提交任务线程运行被拒绝的任务。

我们通过线程池来加快预约任务的通知提醒处理。当线程池达到其处理能力的瓶颈时,采用任务拒绝策略,将被拒绝的任务交由提交任务的线程自行执行,以确保通知任务不会被丢弃并尽可能提高系统的处理效率。

解锁付费内容,👉 戳