Skip to main content

10小节:基于模板方法重构消息发送者

作者:程序员马丁

在线博客:https://nageoffer.com

note

热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校招或社招上拿个offer。

基于模板方法模式重构消息队列发送功能,元数据信息:

©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。


内容摘要:通过消息队列执行即时分发任务,对于定时分发任务,使用 XXL-Job 定期扫描并将任务发送至消息队列。同时,采用模板方法设计模式抽象消息队列的发送逻辑。

课程目录如下所示:

  • 业务背景
  • Git 分支
  • 优惠券分发调用消息队列
  • 什么是模板方法设计模式?
  • 模板方法设计模式重构消息发送

业务背景

优惠券分发任务有两种执行类型,分别是立即发送和定时发送。如果用户创建的是立即发送类型的分发任务,需要通过消息队列执行分发请求发送逻辑,distribution 分发服务监听到这个消息后开始正式执行用户优惠券分发流程。我们本章节先开发立即发送类型的消息队列发送,以及使用模板方法模式重构消息队列发送流程。

Git 分支

20240824_dev_coupon-task-execute_template-method_ding.ma

优惠券分发调用消息队列

1. 使用 RocketMQ 发送普通消息

之前我们发送过消息队列的延时消息,这次发送普通消息即可。代码如下所示:

@Transactional(rollbackFor = Exception.class)
@Override
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
// ......
// 如果是立即发送任务,直接调用消息队列进行发送流程
if (Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType())) {
// 使用 RocketMQ5.x 发送任意时间延时消息
// 定义 Topic
String couponTemplateDelayCloseTopic = "one-coupon_distribution-service_coupon-task-execute_topic${unique-name:}";

// 通过 Spring 上下文解析占位符,也就是把咱们 VM 参数里的 unique-name 替换到字符串中
couponTemplateDelayCloseTopic = configurableEnvironment.resolvePlaceholders(couponTemplateDelayCloseTopic);

// 构建消息体
String messageKeys = UUID.randomUUID().toString();
Message<Long> message = MessageBuilder
.withPayload(couponTaskDO.getId())
.setHeader(MessageConst.PROPERTY_KEYS, messageKeys)
.build();

// 执行 RocketMQ5.x 消息队列发送&异常处理逻辑
SendResult sendResult;
try {
sendResult = rocketMQTemplate.syncSend(couponTemplateDelayCloseTopic, message, 2000L);
log.info("[生产者] 执行优惠券分发任务 - 发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), messageKeys);
} catch (Exception ex) {
log.error("[生产者] 执行优惠券分发任务 - 消息发送失败,消息体:{}", couponTaskDO.getId(), ex);
}
}
}

这段代码逻辑如下:

  1. 通过判断入参发送是否为立即发送任务,如果是的话直接调用消息队列进行发送流程;
  2. 基于 configurableEnvironment 解析出目标 Topic 字符串;
  3. 构建消息体,主要设置分发任务 ID 和 Keys,前者用于到分发服务里执行,后者利用问题排查;
  4. 调用同步发送消息,并记录详细的成功和错误日志,方便后续问题排查。

2. 使用立即发送创建分发任务

在前端 API 接口处通过创建优惠券推送任务接口发起请求,sendType 字段使用值 0 立即发送。

{
"taskName": "发送百万优惠券推送任务",
"fileAddress": "/Users/machen/workspace/opensource/onecoupon-rebuild/tmp/oneCoupon任务推送Excel.xlsx",
"notifyType": "0,3",
"couponTemplateId": 1826268813595824129,
"sendType": 0,
"sendTime": "2024-07-12 12:00:00"
}

消息队列发送日志如下所示:

2024-08-24T20:04:48.025+08:00  INFO 18768 --- [io-10010-exec-3] c.n.o.m.a.s.impl.CouponTaskServiceImpl   : [生产者] 执行优惠券分发任务 - 发送结果:SEND_OK,消息ID2408820760D4CCC0F015B3A56F47B9554950251A69D77AC14AFA0001,消息Keys:d14090a8-ee9b-4ae6-b4db-43c9d9f72968

3. RocketMQ 控制台查看发送结果

因为我们目前还没有开发分发服务,所以没办法查看消费日志,只能通过控制台查看。通过下图得知消息是正常发送成功了。

什么是模板方法设计模式?

1. 需求背景

大家可能会注意到,每次发送消息时,总是充斥着大量相同的冗余代码,这些逻辑散落在业务代码中,不利于对核心业务的理解和维护。那我们有没有方法进行抽象出来?

在开始写代码前,我们一般会做方案设计,其中主要的就是我们的预期是什么?

  • 将业务代码和消息发送的代码解耦,最好情况下业务代码里只有一行消息发送逻辑。
  • 抽象消息发送逻辑,比如调用发送接口和发送日志打印,不同的 Topic 配置等由业务方自定义。

解锁付费内容,👉 戳