Skip to main content

08小节:自定义oneThread-SpringBoot-Starter基础组件

作者:程序员马丁

在线博客:https://nageoffer.com

note

热门项目实战社群,收获国内众多知名公司面试青睐,近千名同学面试成功!助力你在校招或社招上拿个offer。

自定义oneThread-SpringBoot-Starter基础组件,元数据信息:

©版权所有 - 拿个offer-开源&项目实战星球专属学习项目,依据《中华人民共和国著作权法实施条例》《知识星球产权保护》,严禁未经本项目原作者明确书面授权擅自分享至 GitHub、Gitee 等任何开放平台。违者将面临法律追究。


内容摘要:本章节深入探讨了如何通过 Spring Boot Starter 统一管理动态线程池,包括动态线程池标记远程配置覆盖Starter 开发以及可插拔设计等功能。

课程目录如下所示:

  • 如何发现动态线程池?
  • Spring 后置处理器
  • 开发 SpringBoot Starter
  • 关于启用动态线程池标识
  • 文末总结

本章节将涉及到 corespring-basestarter/common-spring-boot-starternacos-cloud-example 四个模块。

这篇文章涉及多个模块之间的协作,不像单点功能那样聚焦明确,对于没接触过复杂多模块系统的同学,可能阅读起来会有一点“晕”。

不过没关系,文中我已经尽可能梳理了关键流程和细节,帮助大家理解模块之间的关系。如果在阅读或实践过程中还有不清楚的地方,也欢迎随时留言提问,我们一起讨论~

如何发现动态线程池?

上一章节我们已经了解了 SpringBoot Starter 的基本概念,本章节将具体介绍如何借助 SpringBoot Starter 将线程池注册到统一的线程池容器 OneThreadRegistry 中。

在开始之前,先提出一个问题:如何统一管理动态线程池?我想到的一个简单易行的方法是,将每个线程池定义为一个 Spring 的 Bean,并通过自定义的注解标记为动态线程池,如下所示:

/**
* 动态线程池注解
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2025-04-23
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DynamicThreadPool {
}

参考动态线程池创建的示例代码:

@Bean
@DynamicThreadPool
public ThreadPoolExecutor onethreadProducer() {
return ThreadPoolExecutorBuilder.builder()
.threadPoolId("onethread-producer")
.corePoolSize(2)
.maximumPoolSize(4)
.keepAliveTime(9999L)
.awaitTerminationMillis(5000L)
.workQueueType(BlockingQueueTypeEnum.SYNCHRONOUS_QUEUE)
.threadFactory("onethread-producer_")
.rejectedHandler(new ThreadPoolExecutor.CallerRunsPolicy())
.dynamicPool()
.build();
}

到这里,可能有同学会疑惑,仅仅标记 @Bean@DynamicThreadPool 就可以把动态线程池注册到统一的容器里吗?答案显然是否定的。

上述代码只是对动态线程池的标记,要想真正将它们加入统一管理的容器,还需要借助 Spring 提供的后置处理器 BeanPostProcessor

Spring 后置处理器

1. 逻辑概述

后置处理器除了将动态线程池注册到统一容器 OneThreadRegistry 外,还承担另一个重要功能:从配置中心读取远程线程池配置并覆盖本地配置。

通俗地讲,就是尽管你本地定义了线程池的配置参数,但这些参数可能并不会被使用,而是在项目启动时,自动从远程配置中心(如 Nacos)拉取最新的线程池参数并生效。

配置示例如下:

onethread:
nacos:
data-id: onethread-nacos-cloud-example-ding-ma.yaml
group: DEFAULT_GROUP
config-file-type: YAML
web:
core-pool-size: 17
maximum-pool-size: 26
keep-alive-time: 60
notify:
receives: xxx
notify-platforms:
platform: DING
url: https://oapi.dingtalk.com/robot/send?access_token=xxx
executors:
- thread-pool-id: onethread-producer
core-pool-size: 14
maximum-pool-size: 22
queue-capacity: 1999
work-queue: ResizableCapacityLinkedBlockingQueue
rejected-handler: DiscardOldestPolicy
keep-alive-time: 160
allow-core-thread-time-out: false
notify:
receives: xxx
interval: 10
alarm:
enable: false
queue-threshold: 90
active-threshold: 90
- thread-pool-id: onethread-consumer
core-pool-size: 10
maximum-pool-size: 20
queue-capacity: 1024
work-queue: LinkedBlockingQueue
rejected-handler: AbortPolicy
keep-alive-time: 9999
allow-core-thread-time-out: true
notify:
receives: xxx
interval: 5
alarm:
enable: false
queue-threshold: 80
active-threshold: 80

远端配置只覆盖配置中心中定义的参数,其他如线程工厂的定义则不会被覆盖。

2. 远端配置读取

远程配置读取逻辑如下,以 Nacos 示例程序为例:

server:
port: 18080 # 应用服务端口,启动后访问地址为 http://localhost:18080

spring:
application:
name: nacos-cloud-example${unique-name:} # Spring 应用名称,支持通过 unique-name 参数自定义服务名,方便多实例区分
config:
import: nacos:onethread-nacos-cloud-example${unique-name:}.yaml # 从 Nacos 导入指定配置文件
profiles:
active: dev # 激活的配置环境(开发环境)

cloud:
nacos:
config:
username: nacos # 连接 Nacos 的用户名
password: nacos # 连接 Nacos 的密码
file-extension: yaml # 指定配置文件的后缀类型为 YAML
extension-configs:
- data-id: onethread-nacos-cloud-example${unique-name:}.yaml # 指定扩展配置文件的 dataId
group: DEFAULT_GROUP # 配置文件所在的 Nacos 分组
refresh: true # 是否开启自动刷新,当 Nacos 配置变更时自动更新本地配置
server-addr: 127.0.0.1:8848 # Nacos 服务器地址,默认端口为 8848

在这段配置中,有两个关键点需要特别关注:

  • - data-id:这是我们在 Nacos 中创建的配置文件的唯一标识,应用会根据它来读取对应的远程配置内容。一个项目可以配置多个 data-id,实现灵活的模块化配置。
  • spring.config.import:这里尤其值得注意。在 SpringBoot2 中,远程配置默认会自动合并到本地配置中;但从 SpringBoot 3 开始,如果不通过 spring.config.import 明确指定远程配置文件的来源,Spring Boot 将不会加载这些配置。因此,如果省略了该项,下面提到的“远程配置覆盖本地配置”的功能将无法生效。

此外,配置中心中存储的参数本质上是字符串形式的键值对,直接使用时不够直观也不便于管理。在 Java 应用中,我们通常会将其绑定为配置类的属性对象,这样更便于类型转换、代码提示和后续维护。

/**
* oneThread 配置中心参数
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2025-04-23
*/
@Data
public class BootstrapConfigProperties {

public static final String PREFIX = "onethread";

/**
* 是否开启动态线程池开关
*/
private Boolean enable = Boolean.TRUE;

/**
* Nacos 配置文件
*/
private NacosConfig nacos;

/**
* Apollo 配置文件
*/
private ApolloConfig apollo;

/**
* Web 线程池配置
*/
private WebThreadPoolExecutorConfig web;

/**
* Nacos 远程配置文件格式类型
*/
private ConfigFileTypeEnum configFileType;

/**
* 通知配置
*/
private NotifyPlatformsConfig notifyPlatforms;

/**
* 监控配置
*/
private MonitorConfig monitorConfig = new MonitorConfig();

/**
* 线程池配置集合
*/
private List<ThreadPoolExecutorProperties> executors;

@Data
public static class NotifyPlatformsConfig {

/**
* 通知类型,比如:DING
*/
private String platform;

/**
* 完整 WebHook 地址
*/
private String url;
}

@Data
public static class MonitorConfig {

/**
* 默认开启监控配置
*/
private Boolean enable = Boolean.TRUE;

/**
* 监控类型
*/
private String collectType = "micrometer";

/**
* 采集间隔,默认 10 秒
*/
private Long collectInterval = 10L;
}

@Data
public static class NacosConfig {

private String dataId;

private String group;
}

@Data
public static class ApolloConfig {

private String namespace;
}

@Data
public static class WebThreadPoolExecutorConfig {

/**
* 核心线程数
*/
private Integer corePoolSize;

/**
* 最大线程数
*/
private Integer maximumPoolSize;

/**
* 线程空闲存活时间(单位:秒)
*/
private Long keepAliveTime;

/**
* 通知配置
*/
private NotifyConfig notify;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class NotifyConfig {

/**
* 接收人集合
*/
private String receives;
}

private static BootstrapConfigProperties INSTANCE = new BootstrapConfigProperties();

public static BootstrapConfigProperties getInstance() {
return INSTANCE;
}

public static void setInstance(BootstrapConfigProperties properties) {
INSTANCE = properties;
}
}

这里还涉及一层设计上的逻辑:core 包本身无法直接获取 Spring 容器中的 Bean,但我们又希望在 core 包中的组件(如线程池监控、告警模块)能够使用远程配置中心下发的线程池参数。

为了解决这个矛盾,在装配 BootstrapConfigProperties Bean 时,我们做了一些处理,使用了一个 “小技巧”:在 Bean 创建完成后,将其实例手动赋值给类中的静态单例变量,从而实现全局共享。

这样,即使在非 Spring 环境下的模块中(比如 core 包),也可以通过 BootstrapConfigProperties.getInstance() 的方式获取到线程池的配置参数。

public class CommonAutoConfiguration {

@Bean
public BootstrapConfigProperties bootstrapConfigProperties(Environment environment) {
BootstrapConfigProperties bootstrapConfigProperties = Binder.get(environment)
.bind(BootstrapConfigProperties.PREFIX, Bindable.of(BootstrapConfigProperties.class))
.get();
BootstrapConfigProperties.setInstance(bootstrapConfigProperties);
return bootstrapConfigProperties;
}
}

通常情况下,我们只需在 BootstrapConfigProperties 类上添加 @ConfigurationProperties(prefix = "onethread") 注解,Spring Boot 就会自动完成属性的绑定,无需如此复杂的处理逻辑。

@ConfigurationProperties(prefix = "onethread")
public class BootstrapConfigProperties {

// ......
}

public class CommonAutoConfiguration {

@Bean
public BootstrapConfigProperties bootstrapConfigProperties() {
return new BootstrapConfigProperties();
}
}

如果是一个常规的 SpringBoot Starter 项目,且不考虑兼容非 Spring 或早期 Spring 项目,使用 Spring Boot 提供的自动属性绑定机制(如 @ConfigurationProperties)就足够了,无需额外处理。

但考虑到我们希望框架具有更强的通用性和扩展性,因此采用了两个“小技巧”:

  1. 手动绑定配置属性:不使用 SpringBoot 默认的自动绑定方式,而是通过 Binder.bind(...) 手动加载配置,显式控制绑定过程,并确保 BootstrapConfigProperties 实例在绑定完成后即为完整对象;
  2. 维护内部单例:在 BootstrapConfigProperties 内部维护一个静态单例引用,Bean 创建并赋值后,即可通过静态方法全局访问该配置。

通过这种方式,即使在不依赖 Spring 容器的 core 包中,也能读取远程配置中心(如 Nacos)下发的线程池参数,实现配置的全局可用性与模块解耦。

这里需要补充一点说明:从架构设计角度来看,这种做法其实存在一定的职责不清晰问题。按照理想的模块边界划分,core 包应当保持纯净,专注于非 Spring 依赖的通用逻辑,不应该直接感知或依赖 Spring 环境。不过,为了降低理解成本、提升使用便利性,我们在这里做了一定程度的耦合处理,通过内部单例让 core 也能访问到配置中心下发的参数。

当然,这种耦合是可以避免的。例如,如果某些核心模块(如线程池告警)需要依赖配置项(如通知接收人、WebHook 地址),完全可以通过 参数传递 的方式将其注入进来。也就是说,由 Starter 作为入口,将相关配置作为方法参数传递给 core 层,既实现了功能,又保持了模块的独立性与解耦。

3. 远程参数替换

这里我们先通过一张时序图,帮助大家快速建立整体流程的认知。有了全局视角之后,再去调试具体的代码逻辑,会更加清晰、事半功倍。

代码如下所示:

/**
* 动态线程池后置处理器,扫描 Bean 是否为动态线程池,如果是的话进行属性填充和注册
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2025-04-23
*/
@Slf4j
@RequiredArgsConstructor
public class OneThreadBeanPostProcessor implements BeanPostProcessor {

private final BootstrapConfigProperties properties;

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof OneThreadExecutor) {
DynamicThreadPool dynamicThreadPool;
try {
// 通过 IOC 容器扫描 Bean 是否存在动态线程池注解
dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class);
if (Objects.isNull(dynamicThreadPool)) {
return bean;
}
} catch (Exception ex) {
log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean;
}

OneThreadExecutor oneThreadExecutor = (OneThreadExecutor) bean;
// 从配置中心读取动态线程池配置并对线程池进行赋值
ThreadPoolExecutorProperties executorProperties = properties.getExecutors()
.stream()
.filter(each -> Objects.equals(oneThreadExecutor.getThreadPoolId(), each.getThreadPoolId()))
.findFirst()
.orElseThrow(() -> new RuntimeException("The thread pool id does not exist in the configuration."));

overrideLocalThreadPoolConfig(executorProperties, oneThreadExecutor);

// 注册到动态线程池注册器,后续监控和报警从注册器获取线程池实例。同时,参数动态变更需要依赖 ThreadPoolExecutorProperties 比对是否有边跟
OneThreadRegistry.putHolder(oneThreadExecutor.getThreadPoolId(), oneThreadExecutor, executorProperties);
}

return bean;
}

private void overrideLocalThreadPoolConfig(ThreadPoolExecutorProperties executorProperties, OneThreadExecutor oneThreadExecutor) {
Integer remoteCorePoolSize = executorProperties.getCorePoolSize();
Integer remoteMaximumPoolSize = executorProperties.getMaximumPoolSize();
Assert.isTrue(remoteCorePoolSize <= remoteMaximumPoolSize, "remoteCorePoolSize must be smaller than remoteMaximumPoolSize.");

// 如果不清楚为什么有这段逻辑,可以参考 Hippo4j Issue https://github.com/opengoofy/hippo4j/issues/1063
int originalMaximumPoolSize = oneThreadExecutor.getMaximumPoolSize();
if (remoteCorePoolSize > originalMaximumPoolSize) {
oneThreadExecutor.setMaximumPoolSize(remoteMaximumPoolSize);
oneThreadExecutor.setCorePoolSize(remoteCorePoolSize);
} else {
oneThreadExecutor.setCorePoolSize(remoteCorePoolSize);
oneThreadExecutor.setMaximumPoolSize(remoteMaximumPoolSize);
}

// 阻塞队列没有常规 set 方法,所以使用反射赋值
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getWorkQueue(), executorProperties.getQueueCapacity());
// Java 9+ 的模块系统(JPMS)默认禁止通过反射访问 JDK 内部 API 的私有字段,所以需要配置开放反射权限
// 在启动命令中增加以下参数,显式开放 java.util.concurrent 包
// IDE 中通过在 VM options 中添加参数:--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
// 部署的时候,在启动脚本(如 java -jar 命令)中加入该参数:java -jar --add-opens=java.base/java.util.concurrent=ALL-UNNAMED your-app.jar
ReflectUtil.setFieldValue(oneThreadExecutor, "workQueue", workQueue);

// 赋值动态线程池其他核心参数
oneThreadExecutor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS);
oneThreadExecutor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut());
oneThreadExecutor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()));
}
}

这里可能有同学会担心:通过反射替换 workQueue 是否存在风险?比如队列中是否可能已经有未完成的任务?

实际上这种情况是不存在的。因为此时线程池仍处于 Bean 创建阶段,尚未对外提供服务,也就不会有任何任务提交进来。因此,替换 workQueue 是安全且可控的。

开发 SpringBoot Starter

前面我们已经讲解了动态线程池的标识方式远程配置的读取逻辑以及核心参数的覆盖机制,也就是说实现层的代码基本已经完成。

但有一个关键问题大家需要思考:代码写好了,如何才能让它在项目启动时自动生效?

这就要回到我们在上一章节提到的 Spring Boot Starter 机制,通过自动装配的方式,让相关逻辑在启动时被正确加载和执行。

我们可以把开发 Starter 比作“把大象装进冰箱”,只需要三步:

  1. 编写核心业务逻辑代码(比如远程配置读取、后置处理器等);
  2. 编写配置类,将这些逻辑注册为 Spring Bean;
  3. 通过自动装配机制,将配置类集成进应用启动流程中。

前两步我们已经介绍得差不多了,接下来我们重点演示后两步:如何进行配置类注册与自动装配。

1. Spring 装配类

之所以将 OneThreadBaseConfiguration 放在 spring-base 模块,而将 CommonAutoConfiguration 放在 common-spring-boot-starter 模块,是因为我们在最初设计时就考虑到了要兼容 普通 Spring 项目(非 Spring Boot)。因此,基础配置类与自动装配类进行了合理拆分,分别放置于不同模块中,便于按需引入与复用。

/**
* 动态线程池基础 Spring 配置类
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2025-04-23
*/
@Configuration
public class OneThreadBaseConfiguration {

@Bean
public ApplicationContextHolder applicationContextHolder() {
return new ApplicationContextHolder();
}

@Bean
@DependsOn("applicationContextHolder")
public OneThreadBeanPostProcessor oneThreadBeanPostProcessor(BootstrapConfigProperties properties) {
return new OneThreadBeanPostProcessor(properties);
}

// ......
}

/**
* 基于配置中心的公共自动装配配置
* <p>
* 作者:马丁
* 加项目群:早加入就是优势!500人内部项目群,分享的知识总有你需要的 <a href="https://t.zsxq.com/cw7b9" />
* 开发时间:2025-04-28
*/
@Import(OneThreadBaseConfiguration.class)
@AutoConfigureAfter(OneThreadBaseConfiguration.class)
public class CommonAutoConfiguration {

@Bean
public BootstrapConfigProperties bootstrapConfigProperties(Environment environment) {
BootstrapConfigProperties bootstrapConfigProperties = Binder.get(environment)
.bind(BootstrapConfigProperties.PREFIX, Bindable.of(BootstrapConfigProperties.class))
.get();
BootstrapConfigProperties.setInstance(bootstrapConfigProperties);
return bootstrapConfigProperties;
}
// ......
}

上述代码中包含三个关键细节,值得特别关注:

  • @DependsOn:由于 OneThreadBeanPostProcessor 依赖其他 Bean(如 ApplicationContextHolder),而 Spring 在初始化 Bean 时默认不保证顺序,因此通过 @DependsOn 显式声明依赖关系,以确保所需 Bean 已就绪,避免初始化异常。
  • @Import:用于在一个自动配置类中引入另一个配置类,从而让其一并生效。这是模块化 Starter 中常用的装配手段。
  • @AutoConfigureAfter:指定当前自动配置类应在某个配置类之后加载。由于 OneThreadBaseConfiguration 属于基础配置,因此需要确保它优先于其他自动配置类被加载。

2. 自动装配

通过上面的代码,我们已经明确了实际需要自动装配的配置类只有一个。接下来要做的,就是让 Spring Boot 能够发现并加载它

Spring Boot 3.x 中,官方引入了全新的自动装配机制:需要在 META-INF/spring/ 目录下,按类型维护对应的 imports 文件。对于自动配置类,需要创建如下文件:

META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports

# 以下为文件具体内容
com.nageoffer.onethread.config.common.starter.configuration.CommonAutoConfiguration

至此,我们的第一个 Spring Boot Starter —— common-spring-boot-starter 已经完成。

有同学可能会疑问:它本身还不包含动态线程池刷新的能力,那为什么还要单独定义这个 Starter 呢

解锁付费内容,👉 戳