流式路由的首包探测机制
上一篇拆解了供应商级别的流式调用实现——doStreamChat 如何通过 StreamAsyncExecutor 异步提交、OpenAIStyleSseParser 逐行解析 SSE、StreamCallback 回调推送、StreamCancellationHandle 取消机制,把一次流式调用从请求构建到数据推送全链路跑通。
但那些都是一个 ChatClient 连接一个供应商的实现。在路由层面,还有一个更大的问题没解决。
之前讲的 executeWithFallback——同步调用可以逐个尝试候选,调成功了返回结果,调失败了 catch 住切换下一个。但流式调用的 client.streamChat() 调用后立即返回取消句柄,真正的数据在异步线程通过回调推送。等你发现第一个供应商不行(onError 被调用了),取消句柄已经返回给调用方了,前端可能已经收到了几个 token——来不及切换了。
这一篇解答这个问题:RoutingLLMService.streamChat() 是怎么在流式调用的异步特性下实现故障转移的。
阅读提示: 本篇讲的首包探测机制,核心解决的是本地部署模型(Ollama、vLLM 等)在承压时宕机的故障转移问题——HTTP 连接已建立、模型接受了请求,但在生成第一个 token 之前崩溃(GPU 显存溢出、进程崩溃等)。这类问题在企业级本地部署场景中很常见,本地模型直接暴露给应用,没有云平台的负载均衡保护。如果你的项目只使用云 API(百炼、硅基流动等),云端供应商通常自行处理节点故障,请求方 看到的要么是快速的 HTTP 错误、要么是正常响应,出现连接成功但首包前崩溃的概率极低。这种情况下首包探测的价值有限,加上整套机制(
ProbeStreamBridge+CompletableFuture同步等待)设计较为复杂,可以选择性查看本篇,或者只看 probe-and-commit 的设计思路,跳过具体实现细节。
流式路由的核心挑战
1. 为什么 executeWithFallback 不够用
第三篇讲的 executeWithFallback 是为同步调用设计的:
// 同步路由——一行代码搞定
public String chat(ChatRequest request) {
return executor.executeWithFallback(
ModelCapability.CHAT,
selector.selectChatCandidates(Boolean.TRUE.equals(request.getThinking())),
target -> clientsByProvider.get(target.candidate().getProvider()),
(client, target) -> client.chat(request, target) // ← 同步调用,成功返回,失败抛异常
);
}
client.chat(request, target) 是同步的——方法返回时结果已经确定,要么返回正确的 String,要么抛异常。executeWithFallback 的 try-catch 自然能捕获异常并切换到下一个候选。
流式调用就不行了。client.streamChat() 调用后立即返回一个 StreamCancellationHandle,方法返回的那一刻,HTTP 连接可能刚建立,第一个 token 还没到。真正的数据在异步线程上通过 StreamCallback.onContent() 推送,异常通过 StreamCallback.onError() 推送——这些都发生在 streamChat() 方法返回之后,try-catch 捕获不到。
如果硬要用 executeWithFallback 包流式调用,它只能捕获前置校验阶段的同步异常(比如 API Key 缺失),对流式传输过程中的 HTTP 500、网络断开等异步错误无能为力。
1.1 问题一:无法在返回前判断成功失败
executeWithFallback 的前提是:caller.call(client, target) 返回时,调用已经有了明确结果。但流式调用返回的是一个取消句柄,不是调用结果。调用是否成功,要等异步线程开始读取 SSE 数据、收到第一个 token 或错误之后才能确定。
1.2 问题二:数据已推送无法撤回
这个问题更致命。假设直接把真实 callback 传给第一个供应商:
- 百炼开始流式响应,推了
callback.onContent("AirPods")、callback.onContent(" Pro") - 前端已经渲染了 AirPods Pro 三个字
- 百炼突然报错,
callback.onError(ex)被调用 - 路由层想切换到硅基流动重试——但那两个 token 已经推给前端了
- 硅基流动重新生成完整回答,又推了
callback.onContent("AirPods")、callback.onContent(" Pro") - 前端看到的是:
AirPods Pro(百炼的残片)+ 一个错误 +AirPods Pro 2 的保修期是...(硅基的完整回答)——内容重复且断裂
用户体验完全崩了。核心问题在于:真实 callback 收到的数据无法撤回。一旦 onContent 被调用,前端就渲染了。
1.3 流式路由 ≠ fire-and-forget
看到这里可能有一个疑问:上一篇讲了 client.streamChat() 调用后立即返回,请求在异步线程执行。既然是异步的,路由层怎么知道成功还是失败?提交出去不就管不了了吗?
这个疑问混淆了两层 streamChat 的行为:
- 供应商层的
client.streamChat()——确实是 fire-and-forget。StreamAsyncExecutor.submit()把任务提交到线程池后立即返回取消句柄,不等结果。上一篇讲的就是这一层 - 路由层的
RoutingLLMService.streamChat()——在底层异步之上加了一层同步等待。它调完client.streamChat()之后,不是直接返回,而是阻塞在ProbeStreamBridge.awaitFirstPacket()上,等异步线程的首包信号
两层的行为差异用代码说最直接:
// 供应商层:提交后立即返回
handle = client.streamChat(request, bridge, target);
// ← 这里 client.streamChat() 已经 return 了,但路由层没有 return
// 路由层:阻塞等待首包信号,最多 60 秒
ProbeStreamBridge.ProbeResult result = awaitFirstPacket(bridge, handle, callback);
// ← 路由线程在这里等着,直到异步线程推了第一个 token 或报了错
所以 RoutingLLMService.streamChat() 的 for 循环在每个候选上都会同步等待首包结果,拿到结果后才决定 commit 还是切换下一个。它不是提交就不管了,而是用 CompletableFuture 把首包这一个关键节点从异步拉回了同步——只等这一个信号,后续 token 推送仍然走异步。
2. probe-and-commit 模式
解决思路是:不要急着把数据推给真实 callback,先探测一下供应商是不是正常的。
具体做法:在真实 callback 和供应商之间插入一个中间层——ProbeStreamBridge。它拦截所有回调事件,在确认供应商可用之前先缓冲起来,不转发。
整个流程分三个阶段:
探测阶段——调用 client.streamChat(request, bridge, target),传入的是 ProbeStreamBridge 而非真实 callback。异步线程开始流式读取,第一个 token(或错误)到达时通过 CompletableFuture.complete() 发出信号。路由线程阻塞在 bridge.awaitFirstPacket() 上等待这个信号。
判断阶段——路由线程被唤醒,检查结果。收到了内容?说明供应商正常工作。收到了错误?说明供应商有问题。超时了?说明供应商响应太慢。
提交/切换阶段——如果探测成功,awaitFirstPacket() 内部自动调 commit():把缓冲的事件一次性刷给真实 callback,后续事件直通不再缓冲。如果探测失败,取消当前连接,bridge 里缓冲的事件直接丢弃(不会转发给真实 callback),然后切换到下一个候选重来。
关键在于:awaitFirstPacket() 返回之前,真实 callback 没有收到任何数据。所以切换供应商时前端完全无感知——它既没有收到错误的 token,也没有收到错误通知。
打个比方:你点了一道菜,厨房开始做,但服务员先确认菜没问题才端给你。如果这道菜做坏了,服务员直接倒掉让厨房重做一份,你完全不知道中间废了一份。
RoutingLLMService.streamChat() 完整实现
1. chat() vs streamChat() 的路由策略
先看两者的代码放在一起对比。chat() 很简洁:
@Override
@RagTraceNode(name = "llm-chat-routing", type = "LLM_ROUTING")
public String chat(ChatRequest request) {
return executor.executeWithFallback(
ModelCapability.CHAT,
selector.selectChatCandidates(Boolean.TRUE.equals(request.getThinking())),
target -> clientsByProvider.get(target.candidate().getProvider()),
(client, target) -> client.chat(request, target)
);
}
一行 executeWithFallback 搞定——选候选、查客户端、调用、捕获异常、切换,全部由通用执行器处理。
streamChat() 自己实现了整套遍历和 故障转移逻辑。为什么不能复用 executeWithFallback?用一张表格说清楚:
| 维度 | chat()(同步路由) | streamChat()(流式路由) |
|---|---|---|
| 路由机制 | executeWithFallback() 通用执行器 | 自己实现遍历 + 故障转移 |
| 故障检测 | try-catch 捕获同步异常 | ProbeStreamBridge.awaitFirstPacket() 等待首包结果 |
| 结果判断 | 返回值(成功)或异常(失败) | 四种结果:SUCCESS / ERROR / TIMEOUT / NO_CONTENT |
| 数据保护 | 不需要(同步返回,无中间状态) | ProbeStreamBridge 缓冲事件 |
| 切换代价 | 零(失败直接调下一个) | 需要 handle.cancel() 取消当前连接 |
| 返回值 | String(完整回答) | StreamCancellationHandle(取消句柄) |
| 探测开销 | 无 | 每个候选创建独立的 ProbeStreamBridge |
不是不想复用 executeWithFallback,而是用不了。它的设计前提是调用是同步的、结果通过返回值或异常传递。流式调用的异步 + 回调模式打破了这个前提。
2. streamChat() 完整代码
@Override
@RagTraceNode(name = "llm-stream-routing", type = "LLM_ROUTING")
public StreamCancellationHandle streamChat(ChatRequest request, StreamCallback callback) {
List<ModelTarget> targets = selector.selectChatCandidates(Boolean.TRUE.equals(request.getThinking()));
if (CollUtil.isEmpty(targets)) {
throw new RemoteException(STREAM_NO_PROVIDER_MESSAGE);
}
String label = ModelCapability.CHAT.getDisplayName();
Throwable lastError = null;
for (ModelTarget target : targets) {
ChatClient client = resolveClient(target, label);
if (client == null) {
continue;
}
if (!healthStore.allowCall(target.id())) {
continue;
}
ProbeStreamBridge bridge = new ProbeStreamBridge(callback);
StreamCancellationHandle handle;
try {
handle = client.streamChat(request, bridge, target);
} catch (Exception e) {
healthStore.markFailure(target.id());
lastError = e;
log.warn("{} 流式请求启动失败,切换下一个模型。modelId:{},provider:{}",
label, target.id(), target.candidate().getProvider(), e);
continue;
}
if (handle == null) {
healthStore.markFailure(target.id());
lastError = new RemoteException(STREAM_START_FAILED_MESSAGE, BaseErrorCode.REMOTE_ERROR);
log.warn("{} 流式请求未返回取消句柄,切换下一个模型。modelId:{},provider:{}",
label, target.id(), target.candidate().getProvider());
continue;
}
ProbeStreamBridge.ProbeResult result = awaitFirstPacket(bridge, handle, callback);
if (result.isSuccess()) {
healthStore.markSuccess(target.id());
return handle;
}
// 失败处理
healthStore.markFailure(target.id());
handle.cancel();
lastError = buildLastErrorAndLog(result, target, label);
}
// 所有模型都失败了,通知客户端错误
throw notifyAllFailed(callback, lastError);
}
代码不短,逐段拆解。