06小节:自定义动态线程池基础类
作者:程序员马丁
热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校 招或社招上拿个offer。
自定义动态线程池基础类,元数据信息:
- 什么是线程池oneThread:https://t.zsxq.com/5GfrN
- 代码仓库:https://gitcode.net/nageoffer/onethread —— 申请项目权限参考上述线程池项目链接
- 章节难度:★★☆☆☆ - 中等
- 视频地址:本章节内容简单,无
©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》和《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。
内容摘要:核心设计的提前规划与整体构思,是构建高质量动态线程池框架的基石,能够有效避免后期大幅返工,同时为系统的灵活扩展和维护奠定坚实基础。
课程目录如下所示:
- 核心设计概述
- 关键类详解
- Builder 设计模式
- 文末总结
核心设计概览
1. 设计思想
在上一篇文章中,我们已经初步了解了我在设计基础组件库时秉持的核心理念:尽可能减少对三方框架的依赖。
秉持这一设计思路,在最初开发 oneThread
框架时,我将大部分与第三方框架无关的通用逻辑,统一抽象封装在了 onethread-core
模块中。
该模块主要承担以下核心职责:
- 定义动态线程池的基础抽象类;
- 执行线程池运行时的告警扫描逻辑;
- 提供线程池运行状态的指标采集能力;
- 支持监听配置中心变更,并动态调整线程池参数及触发告警通知。
下面是 onethread-core
模块的 pom.xml
依赖,可以看到它并未强制依赖任何 Spring 相关的包,保持了良好的独立性和可移植性:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.nageoffer.onethread</groupId>
<artifactId>onethread-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>onethread-core</artifactId>
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
</dependency>
</dependencies>
</project>
有同学可能会疑惑:为什么上述依赖中没有显式指定版本号?
这是因为我们在项目的根目录(通常是 parent
项目的 pom.xml
)中,已经通过 <dependencyManagement>
统一定义了各依赖的版本号。因此,在 onethread-core
模块中只需声明依赖的坐标,无需重复指定版本,Maven 会自动从父模块继承对应的版本信息。这样不仅提高了依赖管理的统一性,也有助于减少版本冲突风险。
<properties>
<java.version>17</java.version>
<spring-boot.version>3.0.7</spring-boot.version>
<spring-cloud.version>2022.0.3</spring-cloud.version>
<spring-cloud-alibaba.version>2022.0.0.0-RC2</spring-cloud-alibaba.version>
<hutool-all.version>5.8.37</hutool-all.version>
<apollo-client-config-data.version>2.4.0</apollo-client-config-data.version>
<fastjson2.version>2.0.57</fastjson2.version>
<spotless-maven-plugin.version>2.22.1</spotless-maven-plugin.version>
<maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool-all.version}</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client-config-data</artifactId>
<version>${apollo-client-config-data.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
2. package 说明
在前面的文章中,我们已经介绍过 oneThread
框架中各个模块(Module)所承担的职责。但当时并未展开到更细粒度的 package 层级结构。
本文将进一步说明 onethread-core
模块中各个包的具体作用,帮助 大家更清晰地理解其内部结构与设计意图。
具体包划分与功能如下所示:
.
├── src
│ ├── main
│ │ └── java
│ │ └── com
│ │ └── nageoffer
│ │ └── onethread
│ │ └── core
│ │ ├── alarm # 告警扫描
│ │ ├── config # 组件库基础配置
│ │ ├── constant # 常量类
│ │ ├── executor # 最最核心的线程池基础包
│ │ │ └── support # 核心基础包至上增强的功能,如果不拆分这个也行,但是类会比较多
│ │ ├── monitor # 运行时指标监控
│ │ ├── notification # 线程池配置变更和告警通知
│ │ │ ├── dto
│ │ │ └── service
│ │ ├── parser # 解析配置内容字符串为键值对 Map
│ │ └── toolkit # 工具包,比如:线程池和线程工厂构建者
大家最需要关注的是 alarm、executor、monitor、notification 四个包。
3. 类图设计
在 core
包中,有四个最核心的类,它们构成了整个动态线程池功能的基础。其关系如下所示:
关键类详解
1. OneThreadExecutor
有同学可能会问:为什么要单独定义一个线程池基类,而不是直接使用原生的 ThreadPoolExecutor
呢?
原因有很多,但为了循序渐进地展开说明,这里先讲其中一个最关键的点:
原生线程池并没有线程池 ID 或名称的概念。
在实际业务中,我们往往需要对线程池进行运行时变更、指标监控、告警通知等操作。而缺乏唯一标识,会让我们在面对多个线程池时难以准确定位、识别和管理。
因此,为了支持更强的可观测性与可运维性,我们抽象出了具备线程池标识能力的基类,作为后续扩展的统一入口。
值得一提的是,ThreadPoolExecutor
本身并不排斥继承,反而在设计上为继承扩展预留了几个关键的扩展点方法。通过继承并重写这些方法,我们可以轻松注入自定义行为,构建更智能的线程池体系:
/**
* 任务执行前钩子(可用于记录开始时间、设置线程上下文等)
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
// 自定义逻辑
}
/**
* 任务执行后钩子(可用于统计执行耗时、清理资源、日志记录等)
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// 自定义逻辑
}
/**
* 线程池终止钩子(可用于清理资源、发送通知等)
*/
@Override
protected void terminated() {
super.terminated();
// 自定义逻辑
}
下面是 oneThread
的线程池基类,目前我们只在其中集成了线程池唯一标识这一核心能力。
在阅读源码时,建议大家暂时聚焦这一功能点。至于类中其他相对复杂的逻辑,如果一时看不太懂也没关系,与其陷入细节,不如先跟着马哥一起按“主线思路”一步步深入理解。
/**
* 增强的动态、报警和受监控的线程池 oneThread
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2025-04-20
*/
@Slf4j
public class OneThreadExecutor extends ThreadPoolExecutor {
/**
* 线程池唯一标识,用来动态变更参数等
*/
@Getter
private final String threadPoolId;
// ......
/**
* Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @param awaitTerminationMillis the maximum time to wait
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} or {@code unit}
* or {@code threadFactory} or {@code handler} is null
*/
public OneThreadExecutor(
@NonNull String threadPoolId,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
@NonNull TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler,
long awaitTerminationMillis) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
// ......
// 设置动态线程池扩展属性:线程池 ID 标识
this.threadPoolId = threadPoolId;
// ......
}
// ......
}
2. ThreadPoolExecutorProperties
有同学可能会问:线程池的参数明明已经在 OneThreadExecutor
中定义了,为什么还要单独拆分出一个属性实体类?
这是因为线程池中的参数通常分散在多个字段中,并且像告警阈值、通知规则等配置在原生线程池中是根本不存在的。为了更好地支持动态配置和统一管理,我们通过 threadPoolId
将这些“外围参数”与线程池进行关联,形成一套完整的配置体系,便于在运行时查看、存储、变更和追踪。
另外,为了马哥写代码方便,告警和通知相关的参数类目前是以静态内部类的形式存在。大家在实际开发中,也可以根据需要将其拆分为独立的外部类,以提升可复用性。
/**
* 线程池属性参数
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2025-04-20
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class ThreadPoolExecutorProperties {
/**
* 线程池唯一标识
*/
private String threadPoolId;
/**
* 核心线程数
*/
private Integer corePoolSize;
/**
* 最大线程数
*/
private Integer maximumPoolSize;
/**
* 队列容量
*/
private Integer queueCapacity;
/**
* 阻塞队列类型
*/
private String workQueue;
/**
* 拒绝策略类型
*/
private String rejectedHandler;
/**
* 线程空闲存活时间(单位:秒)
*/
private Long keepAliveTime;
/**
* 是否允许核心线程超时
*/
private Boolean allowCoreThreadTimeOut;
/**
* 通知配置
*/
private NotifyConfig notify;
/**
* 报警配置,默认设置
*/
private AlarmConfig alarm = new AlarmConfig();
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class NotifyConfig {
/**
* 接收人集合
*/
private String receives;
/**
* 告警间隔,单位分钟
*/
private Integer interval = 5;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class AlarmConfig {
/**
* 默认开启报警配配置
*/
private Boolean enable = Boolean.TRUE;
/**
* 队列阈值
*/
private Integer queueThreshold = 80;
/**
* 活跃线程阈值
*/
private Integer activeThreshold = 80;
}
}
3. ThreadPoolExecutorHolder
线程池实例对象和线程池参数配置对象本质上是两个独立的实体。但在实际使用中,我们往往需要同时操作这两部分信息,例如在执行动态变更、状态监控或告警处理时。
为此,我们引入了 Holder
的设计思路,将线程池实例与其对应的属性配置,通过 threadPoolId
进行绑定和聚合,封装成一个统一的结构,便于后续统一管理与访问。