Skip to main content

流式路由的首包探测机制

上一篇拆解了供应商级别的流式调用实现——doStreamChat 如何通过 StreamAsyncExecutor 异步提交、OpenAIStyleSseParser 逐行解析 SSE、StreamCallback 回调推送、StreamCancellationHandle 取消机制,把一次流式调用从请求构建到数据推送全链路跑通。

但那些都是一个 ChatClient 连接一个供应商的实现。在路由层面,还有一个更大的问题没解决。

之前讲的 executeWithFallback——同步调用可以逐个尝试候选,调成功了返回结果,调失败了 catch 住切换下一个。但流式调用的 client.streamChat() 调用后立即返回取消句柄,真正的数据在异步线程通过回调推送。等你发现第一个供应商不行(onError 被调用了),取消句柄已经返回给调用方了,前端可能已经收到了几个 token——来不及切换了。

这一篇解答这个问题:RoutingLLMService.streamChat() 是怎么在流式调用的异步特性下实现故障转移的。

阅读提示: 本篇讲的首包探测机制,核心解决的是本地部署模型(Ollama、vLLM 等)在承压时宕机的故障转移问题——HTTP 连接已建立、模型接受了请求,但在生成第一个 token 之前崩溃(GPU 显存溢出、进程崩溃等)。这类问题在企业级本地部署场景中很常见,本地模型直接暴露给应用,没有云平台的负载均衡保护。如果你的项目只使用云 API(百炼、硅基流动等),云端供应商通常自行处理节点故障,请求方看到的要么是快速的 HTTP 错误、要么是正常响应,出现连接成功但首包前崩溃的概率极低。这种情况下首包探测的价值有限,加上整套机制(ProbeStreamBridge + CompletableFuture 同步等待)设计较为复杂,可以选择性查看本篇,或者只看 probe-and-commit 的设计思路,跳过具体实现细节。

流式路由的核心挑战

1. 为什么 executeWithFallback 不够用

第三篇讲的 executeWithFallback 是为同步调用设计的:

// 同步路由——一行代码搞定
public String chat(ChatRequest request) {
return executor.executeWithFallback(
ModelCapability.CHAT,
selector.selectChatCandidates(Boolean.TRUE.equals(request.getThinking())),
target -> clientsByProvider.get(target.candidate().getProvider()),
(client, target) -> client.chat(request, target) // ← 同步调用,成功返回,失败抛异常
);
}

client.chat(request, target) 是同步的——方法返回时结果已经确定,要么返回正确的 String,要么抛异常。executeWithFallback 的 try-catch 自然能捕获异常并切换到下一个候选。

流式调用就不行了。client.streamChat() 调用后立即返回一个 StreamCancellationHandle,方法返回的那一刻,HTTP 连接可能刚建立,第一个 token 还没到。真正的数据在异步线程上通过 StreamCallback.onContent() 推送,异常通过 StreamCallback.onError() 推送——这些都发生在 streamChat() 方法返回之后,try-catch 捕获不到。

如果硬要用 executeWithFallback 包流式调用,它只能捕获前置校验阶段的同步异常(比如 API Key 缺失),对流式传输过程中的 HTTP 500、网络断开等异步错误无能为力。

1.1 问题一:无法在返回前判断成功失败

executeWithFallback 的前提是:caller.call(client, target) 返回时,调用已经有了明确结果。但流式调用返回的是一个取消句柄,不是调用结果。调用是否成功,要等异步线程开始读取 SSE 数据、收到第一个 token 或错误之后才能确定。

1.2 问题二:数据已推送无法撤回

这个问题更致命。假设直接把真实 callback 传给第一个供应商:

  1. 百炼开始流式响应,推了 callback.onContent("AirPods")callback.onContent(" Pro")
  2. 前端已经渲染了 AirPods Pro 三个字
  3. 百炼突然报错,callback.onError(ex) 被调用
  4. 路由层想切换到硅基流动重试——但那两个 token 已经推给前端了
  5. 硅基流动重新生成完整回答,又推了 callback.onContent("AirPods")callback.onContent(" Pro")
  6. 前端看到的是:AirPods Pro(百炼的残片)+ 一个错误 + AirPods Pro 2 的保修期是...(硅基的完整回答)——内容重复且断裂

用户体验完全崩了。核心问题在于:真实 callback 收到的数据无法撤回。一旦 onContent 被调用,前端就渲染了。

1.3 流式路由 ≠ fire-and-forget

看到这里可能有一个疑问:上一篇讲了 client.streamChat() 调用后立即返回,请求在异步线程执行。既然是异步的,路由层怎么知道成功还是失败?提交出去不就管不了了吗?

这个疑问混淆了两层 streamChat 的行为:

  • 供应商层client.streamChat()——确实是 fire-and-forget。StreamAsyncExecutor.submit() 把任务提交到线程池后立即返回取消句柄,不等结果。上一篇讲的就是这一层
  • 路由层RoutingLLMService.streamChat()——在底层异步之上加了一层同步等待。它调完 client.streamChat() 之后,不是直接返回,而是阻塞在 ProbeStreamBridge.awaitFirstPacket() 上,等异步线程的首包信号

两层的行为差异用代码说最直接:

// 供应商层:提交后立即返回
handle = client.streamChat(request, bridge, target);

// ← 这里 client.streamChat() 已经 return 了,但路由层没有 return

// 路由层:阻塞等待首包信号,最多 60 秒
ProbeStreamBridge.ProbeResult result = awaitFirstPacket(bridge, handle, callback);
// ← 路由线程在这里等着,直到异步线程推了第一个 token 或报了错

所以 RoutingLLMService.streamChat() 的 for 循环在每个候选上都会同步等待首包结果,拿到结果后才决定 commit 还是切换下一个。它不是提交就不管了,而是用 CompletableFuture 把首包这一个关键节点从异步拉回了同步——只等这一个信号,后续 token 推送仍然走异步。

2. probe-and-commit 模式

解决思路是:不要急着把数据推给真实 callback,先探测一下供应商是不是正常的。

具体做法:在真实 callback 和供应商之间插入一个中间层——ProbeStreamBridge。它拦截所有回调事件,在确认供应商可用之前先缓冲起来,不转发。

整个流程分三个阶段:

探测阶段——调用 client.streamChat(request, bridge, target),传入的是 ProbeStreamBridge 而非真实 callback。异步线程开始流式读取,第一个 token(或错误)到达时通过 CompletableFuture.complete() 发出信号。路由线程阻塞在 bridge.awaitFirstPacket() 上等待这个信号。

判断阶段——路由线程被唤醒,检查结果。收到了内容?说明供应商正常工作。收到了错误?说明供应商有问题。超时了?说明供应商响应太慢。

提交/切换阶段——如果探测成功,awaitFirstPacket() 内部自动调 commit():把缓冲的事件一次性刷给真实 callback,后续事件直通不再缓冲。如果探测失败,取消当前连接,bridge 里缓冲的事件直接丢弃(不会转发给真实 callback),然后切换到下一个候选重来。

关键在于:awaitFirstPacket() 返回之前,真实 callback 没有收到任何数据。所以切换供应商时前端完全无感知——它既没有收到错误的 token,也没有收到错误通知。

打个比方:你点了一道菜,厨房开始做,但服务员先确认菜没问题才端给你。如果这道菜做坏了,服务员直接倒掉让厨房重做一份,你完全不知道中间废了一份。

RoutingLLMService.streamChat() 完整实现

1. chat() vs streamChat() 的路由策略

先看两者的代码放在一起对比。chat() 很简洁:

@Override
@RagTraceNode(name = "llm-chat-routing", type = "LLM_ROUTING")
public String chat(ChatRequest request) {
return executor.executeWithFallback(
ModelCapability.CHAT,
selector.selectChatCandidates(Boolean.TRUE.equals(request.getThinking())),
target -> clientsByProvider.get(target.candidate().getProvider()),
(client, target) -> client.chat(request, target)
);
}

一行 executeWithFallback 搞定——选候选、查客户端、调用、捕获异常、切换,全部由通用执行器处理。

streamChat() 自己实现了整套遍历和故障转移逻辑。为什么不能复用 executeWithFallback?用一张表格说清楚:

维度chat()(同步路由)streamChat()(流式路由)
路由机制executeWithFallback() 通用执行器自己实现遍历 + 故障转移
故障检测try-catch 捕获同步异常ProbeStreamBridge.awaitFirstPacket() 等待首包结果
结果判断返回值(成功)或异常(失败)四种结果:SUCCESS / ERROR / TIMEOUT / NO_CONTENT
数据保护不需要(同步返回,无中间状态)ProbeStreamBridge 缓冲事件
切换代价零(失败直接调下一个)需要 handle.cancel() 取消当前连接
返回值String(完整回答)StreamCancellationHandle(取消句柄)
探测开销每个候选创建独立的 ProbeStreamBridge

不是不想复用 executeWithFallback,而是用不了。它的设计前提是调用是同步的、结果通过返回值或异常传递。流式调用的异步 + 回调模式打破了这个前提。

2. streamChat() 完整代码

@Override
@RagTraceNode(name = "llm-stream-routing", type = "LLM_ROUTING")
public StreamCancellationHandle streamChat(ChatRequest request, StreamCallback callback) {
List<ModelTarget> targets = selector.selectChatCandidates(Boolean.TRUE.equals(request.getThinking()));
if (CollUtil.isEmpty(targets)) {
throw new RemoteException(STREAM_NO_PROVIDER_MESSAGE);
}

String label = ModelCapability.CHAT.getDisplayName();
Throwable lastError = null;

for (ModelTarget target : targets) {
ChatClient client = resolveClient(target, label);
if (client == null) {
continue;
}
if (!healthStore.allowCall(target.id())) {
continue;
}

ProbeStreamBridge bridge = new ProbeStreamBridge(callback);

StreamCancellationHandle handle;
try {
handle = client.streamChat(request, bridge, target);
} catch (Exception e) {
healthStore.markFailure(target.id());
lastError = e;
log.warn("{} 流式请求启动失败,切换下一个模型。modelId:{},provider:{}",
label, target.id(), target.candidate().getProvider(), e);
continue;
}
if (handle == null) {
healthStore.markFailure(target.id());
lastError = new RemoteException(STREAM_START_FAILED_MESSAGE, BaseErrorCode.REMOTE_ERROR);
log.warn("{} 流式请求未返回取消句柄,切换下一个模型。modelId:{},provider:{}",
label, target.id(), target.candidate().getProvider());
continue;
}

ProbeStreamBridge.ProbeResult result = awaitFirstPacket(bridge, handle, callback);

if (result.isSuccess()) {
healthStore.markSuccess(target.id());
return handle;
}

// 失败处理
healthStore.markFailure(target.id());
handle.cancel();

lastError = buildLastErrorAndLog(result, target, label);
}

// 所有模型都失败了,通知客户端错误
throw notifyAllFailed(callback, lastError);
}

代码不短,逐段拆解。

3. 执行流程逐段讲解

选择候选列表selector.selectChatCandidates() 获取按优先级排序的候选列表(第二篇讲过)。空列表直接抛异常——没有可用模型,无法继续。

遍历候选:和 executeWithFallback 一样的 for 循环结构,逐个尝试候选。

客户端查找 + 熔断检查resolveClient(target, label)clientsByProvider Map 中按供应商查找客户端,null 则 warn + continue。healthStore.allowCall(target.id()) 做熔断检查(第三篇讲过),false 则 continue。这两步和同步路由完全一样。

这里可能有疑问:healthStore.allowCall() 已经做了熔断检查,为什么还需要后面的首包探测?因为两者管的维度不同。ModelHealthStore 基于历史统计——过去 N 次调用的成功失败比例,它回答的是这个模型最近整体靠不靠谱。而 ProbeStreamBridge 探测的是当前这一次请求的实时状态——一个模型可能过去 100 次调用全部成功(熔断器 CLOSED,放行),但这一次恰好赶上 API 限流、服务滚动重启、本地模型崩溃,请求实际会失败。熔断器是粗筛,过滤掉已知不健康的模型,避免浪费时间去探测一个大概率失败的候选。首包探测是细筛,对熔断器放行的模型做当次请求的实时验证。两者互补,不是替代。

创建探测桥接器——这是流式路由独有的:

ProbeStreamBridge bridge = new ProbeStreamBridge(callback);

每个候选都创建独立的 ProbeStreamBridge。为什么不能复用?因为内部的 CompletableFuture 是一次性的——complete() 之后状态不可逆。缓冲列表和 committed 状态也是一次性的。每次重试是全新的探测过程,必须用全新的桥接器。

启动流式调用

handle = client.streamChat(request, bridge, target);

注意传入的是 bridge 而非真实 callback。这是整个 probe-and-commit 模式的关键——供应商的所有回调事件都先经过 bridge,不直接到达真实 callback。

启动异常处理client.streamChat() 本身可能抛同步异常(比如前置校验失败、StreamAsyncExecutor 线程池拒绝等)。这类异常不需要等首包,直接 markFailure + continue 到下一个候选。

handle null 检查:防御性编程,streamChat() 理论上不应该返回 null,但加一层检查更安全。

阻塞等待首包

ProbeStreamBridge.ProbeResult result = awaitFirstPacket(bridge, handle, callback);

这一行是路由线程的阻塞点——在这里等待异步线程的首包信号,最多等 60 秒。详细机制后面讲 ProbeStreamBridge 时展开。

结果判断——四种情况对应不同处理:

if (result.isSuccess()) {
healthStore.markSuccess(target.id()); // 标记模型健康
return handle; // 返回取消句柄给调用方
}

// 失败
healthStore.markFailure(target.id()); // 标记模型失败
handle.cancel(); // 取消当前流式连接
lastError = buildLastErrorAndLog(result, target, label); // 记日志,continue 到下一个

成功路径:markSuccessreturn handleawaitFirstPacket() 内部已经自动完成了 commit——缓冲的事件已刷给真实 callback,后续 token 直通不再缓冲。返回的 handle 是当前供应商的取消句柄,调用方可以用它停止生成。

失败路径:markFailurehandle.cancel() → continue。取消当前供应商的连接(释放资源),切换到下一个候选。bridge 里缓冲的事件(比如错误的 onError)不会被 commit,直接随 bridge 对象一起被 GC 回收——真实 callback 永远看不到它们。

所有候选失败

throw notifyAllFailed(callback, lastError);

循环结束都没有成功返回,说明所有候选都失败了。notifyAllFailed 做两件事:callback.onError() 通知前端显示错误信息,然后抛 RemoteException 通知调用链上层。

4. awaitFirstPacket 方法

解锁付费内容,👉 戳