Skip to main content

三态熔断器与故障转移

上一篇拆解了 ModelSelector 的选择算法,知道了它如何从一份 YAML 配置中选出一个有序的、可用的候选列表。但选好了之后呢?候选列表交给谁执行?调用第一个模型失败了怎么办?什么时候该认定一个模型挂了不再尝试?挂了的模型什么时候重新启用?

这一篇回答这些问题。我们会深入两个核心组件:ModelHealthStore(三态熔断器)和 ModelRoutingExecutor(故障转移执行器)。前者管健康状态——谁能调、谁不能调;后者管执行逻辑——怎么调、失败了怎么切换。

三态熔断器:ModelHealthStore

1. 为什么需要熔断器

假设你的电商客服系统配了四个 Chat 模型:qwen3-maxglm-4.7qwen-plusqwen3-local。某天百炼平台出了故障,qwen3-max 连接超时。如果不做任何处理,每次用户提问都会先尝试调 qwen3-max,等 30 秒超时(或者直接失败),然后才切换到 glm-4.7。用户的每一个问题都要多等 30 秒——这显然不可接受。

熔断器的作用就是:连续失败达到阈值后,直接标记这个模型为不可用,后续请求跳过它,不再白等。等一段冷却时间过去,再试探性地放一个请求过去看看恢复了没有。恢复了就重新启用,没恢复就继续熔断。

这就是经典的断路器(Circuit Breaker)模式,ModelHealthStore 是它在项目中的实现。

2. 数据结构

ModelHealthStore 的核心是一个 ConcurrentHashMap,key 是模型 id,value 是这个模型的健康状态:

@Component
@RequiredArgsConstructor
public class ModelHealthStore {

private final AIModelProperties properties;

private final Map<String, ModelHealth> healthById = new ConcurrentHashMap<>();
// ...
}

每个模型的健康状态由 ModelHealth 内部类表示:

private static class ModelHealth {
private int consecutiveFailures; // 连续失败次数
private long openUntil; // 熔断截止时间戳(毫秒)
private boolean halfOpenInFlight; // 是否有探测请求在飞行中
private State state; // 当前状态

private ModelHealth() {
this.consecutiveFailures = 0;
this.openUntil = 0L;
this.halfOpenInFlight = false;
this.state = State.CLOSED;
}
}

状态枚举只有三个值:

private enum State {
CLOSED,
OPEN,
HALF_OPEN
}

新创建的 ModelHealth 默认是 CLOSED 状态,所有字段归零:

字段初始值含义
stateCLOSED健康,正常放行
consecutiveFailures0连续失败次数,CLOSED 状态下每次失败 +1
openUntil0L熔断截止时间戳,OPEN 状态下有值
halfOpenInFlightfalse是否有探测请求正在执行,HALF_OPEN 状态下使用

熔断行为由两个配置参数控制,来自 AIModelProperties.Selection

  • failureThreshold:连续失败多少次触发熔断,默认 2
  • openDurationMs:熔断持续时间(毫秒),默认 30000(30 秒)

也就是说,默认配置下,一个模型连续失败 2 次就熔断,30 秒后才允许探测恢复。

3. 状态转换图

三态之间的转换关系用一张状态图表示:

四条转换边,每条都有明确的触发方法和条件。接下来逐个拆解这些方法的实现。

4. allowCall——放行检查

allowCallModelRoutingExecutor 在实际调用模型之前调用的检查方法。它不只是读状态——它还会在条件满足时修改状态(OPEN → HALF_OPEN 的转换就发生在这里)。

@SuppressWarnings("BooleanMethodIsAlwaysInverted")
public boolean allowCall(String id) {
if (id == null) {
return false;
}
long now = System.currentTimeMillis();
AtomicBoolean allowed = new AtomicBoolean(false);
healthById.compute(id, (k, v) -> {
if (v == null) {
v = new ModelHealth();
}
if (v.state == State.OPEN) {
if (v.openUntil > now) {
return v;
}
v.state = State.HALF_OPEN;
v.halfOpenInFlight = true;
allowed.set(true);
return v;
}
if (v.state == State.HALF_OPEN) {
if (v.halfOpenInFlight) {
return v;
}
v.halfOpenInFlight = true;
allowed.set(true);
return v;
}
allowed.set(true);
return v;
});
return allowed.get();
}

整个方法包在一个 ConcurrentHashMap.compute() 调用里,对同一个 key 的操作是原子的。lambda 内部的逻辑分五种情况:

当前状态条件操作返回
无记录(v == null创建初始 ModelHealth(CLOSED)true(允许)
OPENopenUntil > now(冷却未到)不修改false(拒绝)
OPENopenUntil <= now(冷却已到)转 HALF_OPEN,设 halfOpenInFlight = truetrue(允许探测)
HALF_OPENhalfOpenInFlight = true(已有探测在飞)不修改false(拒绝)
HALF_OPENhalfOpenInFlight = false(无探测在飞)halfOpenInFlight = truetrue(允许探测)
CLOSED不修改true(允许)

注意 AtomicBoolean allowed 的用法。compute 方法的返回值是 ModelHealth(Map 的 value 类型),不是 boolean。lambda 内部没办法直接返回一个 boolean 给外层。所以用 AtomicBoolean 做中转——lambda 内部设值,lambda 外部读值。

为什么用 AtomicBoolean 而不是普通的 boolean[]?因为 lambda 捕获的局部变量必须是 effectively final。AtomicBoolean 本身是 final 的引用,但它的值可以通过 set 修改,满足这个约束。用 boolean[1] 也行,但 AtomicBoolean 语义更清晰。

4.1 为什么用 ConcurrentHashMap.compute()

解锁付费内容,👉 戳