Skip to main content

09小节:通过线程池和延时队列优化接口响应时间

作者:程序员马丁

在线博客:https://nageoffer.com

note

热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校招或社招上拿个offer。

通过线程池和延时队列优化接口响应时间,元数据信息:

©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。


内容摘要:这节课我们通过线程池和 Redis 延迟队列的形式优化 EasyExcel 解析百万数据接口耗时问题,接口响应从从 5 秒提升到毫秒级。

课程目录如下所示:

  • 业务背景
  • Git 分支
  • 线程池异步解析 Excel 行数
  • Redis 延时队列兜底任务
  • 添加 Spring 事务
  • 文末总结

业务背景

在上一节中,我们通过 EasyExcel 解析百万数据量的 Excel 行数,避免了 JVM 内存占用过多问题。但是末了还有一个小问题没有说,那就是接口响应太慢,百万数据量需要解析 5 秒,这种在后管系统里不是不能接受,但是能优化还是要优化。在这节课我们通过线程池和 Redis 延迟队列的形式优化接口响应时间。

Git 分支

20240823_optimize_create-coupon-task_threadpool-delayqueue_ding.ma

线程池异步解析 Excel 行数

1. 创建线程池

创建一个公共线程池,因为咱们这个逻辑比较简单,所以直接定义即可。

@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {

private final ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() << 1,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy()
);

// ......
}

有些同学可能习惯使用 Executors 工具类直接创建线程池,这种是不推荐的。虽然 Executors 提供了创建线程池的便捷方法,然而,Executors 基于默认配置创建的线程池可能并不适合所有场景,这里我们说下每个方法创建的线程池都有哪些弊端:

  • newFixedThreadPoolnewSingleThreadExecutor:这两种固定大小的线程池使用无界的 LinkedBlockingQueue 作为工作队列。当任务提交速度超过处理速度时,工作队列会不断增长,可能导致内存溢出。
  • newScheduledThreadPool:虽然最大线程数是 Integer 最大值,但是因为阻塞队列是无界的,所以核心问题同上。
  • newCachedThreadPool:核心线程数为 0,使用同步的 SynchronousQueue,并且允许创建无限数量的线程。在高并发情况下,可能会创建大量线程,导致系统资源耗尽,甚至使系统崩溃。

扩展知识,线程池处理逻辑如下:

2. 线程池参数解析

解析下我们线程池中的参数为什么这么设置:

  • corePoolSize:因为属于后管任务,大概率不会很频繁,所以直接取服务器 CPU 核数。
  • maximumPoolSize:运行任务属于 IO 密集型,最大线程数直接服务器 CPU 核数 2 倍。
  • workQueue:理论上说我们不会有阻塞的情况,因为设置的线程数不少,所以如果使用不存储任务的同步队列。
  • handler:如果线程数都在运行,直接将任务丢弃即可,因为我们还有延时队列兜底。

3. 使用线程池异步处理

因为线程池和延时队列都可能会用到 Excel 解析的代码,所以我们把这一块逻辑抽象出来一个方法。因为用到了两个参数,为了避免复杂,直接使用 JSONObject 即可。

private void refreshCouponTaskSendNum(JSONObject delayJsonObject) {
// 通过 EasyExcel 监听器获取 Excel 中所有行数
RowCountListener listener = new RowCountListener();
EasyExcel.read(delayJsonObject.getString("fileAddress"), listener).sheet().doRead();
int totalRows = listener.getRowCount();

// 刷新优惠券推送记录中发送行数
CouponTaskDO updateCouponTaskDO = CouponTaskDO.builder()
.id(delayJsonObject.getLong("couponTaskId"))
.sendNum(totalRows)
.build();
couponTaskMapper.updateById(updateCouponTaskDO);
}

使用线程池异步解析用户上传的 Excel 文件,代码如下:

@Service
@RequiredArgsConstructor
public class CouponTaskServiceImpl extends ServiceImpl<CouponTaskMapper, CouponTaskDO> implements CouponTaskService {

private final ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() << 1,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadPoolExecutor.DiscardPolicy()
);

@Transactional(rollbackFor = Exception.class)
@Override
public void createCouponTask(CouponTaskCreateReqDTO requestParam) {
// 验证非空参数
// 验证参数是否正确,比如文件地址是否为我们期望的格式等
// 验证参数依赖关系,比如选择定时发送,发送时间是否不为空等
CouponTemplateQueryRespDTO couponTemplate = couponTemplateService.findCouponTemplateById(requestParam.getCouponTemplateId());
if (couponTemplate == null) {
throw new ClientException("优惠券模板不存在,请检查提交信息是否正确");
}
// ......

// 构建优惠券推送任务数据库持久层实体
CouponTaskDO couponTaskDO = BeanUtil.copyProperties(requestParam, CouponTaskDO.class);
couponTaskDO.setBatchId(IdUtil.getSnowflakeNextId());
couponTaskDO.setOperatorId(Long.parseLong(UserContext.getUserId()));
couponTaskDO.setShopNumber(UserContext.getShopNumber());
couponTaskDO.setStatus(
Objects.equals(requestParam.getSendType(), CouponTaskSendTypeEnum.IMMEDIATE.getType())
? CouponTaskStatusEnum.IN_PROGRESS.getStatus()
: CouponTaskStatusEnum.PENDING.getStatus()
);

// 保存优惠券推送任务记录到数据库
couponTaskMapper.insert(couponTaskDO);

// 为什么需要统计行数?因为发送后需要比对所有优惠券是否都已发放到用户账号
// 100 万数据大概需要 4 秒才能返回前端,如果加上验证将会时间更长,所以这里将最耗时的统计操作异步化
JSONObject delayJsonObject = JSONObject
.of("fileAddress", requestParam.getFileAddress(), "couponTaskId", couponTaskDO.getId());
executorService.execute(() -> refreshCouponTaskSendNum(delayJsonObject));
}
}

不关使用线程池执行什么类型的任务,都会有一个通用的致命问题,那就是刚投递到线程池,还没有运行完,应用宕机了怎么整?

所以就需要我们接下来讲到的延时队列兜底,避免这种宕机行为。

解锁付费内容,👉 戳