Skip to main content

用户点了停止生成集群里发生了什么

开篇引言

上一篇讲了流式生成的正常路径——LLM 一个字一个字吐内容,StreamCallback.onContent() 接住,StreamChatEventHandler 累积并通过 SseEmitterSender 推给前端,onComplete() 触发落库,最后推 finish + done 事件,前端关闭连接。整个过程一气呵成,从第一个 Token 到最后一个 Token,没有意外。

但现实中,用户经常中途点停止按钮。可能是问错了想换个问题,可能是答案已经看到关键部分不想等了,也可能是模型答偏了及时止损。ChatGPT、通义千问、Kimi 这些产品都有停止生成按钮,用户对这个交互已经很熟悉了。

问题是:停止生成不是前端单方面的事。

前端关掉连接只是断了接收端,后端的 LLM 流还在跑,Token 还在消耗,线程还在占用。更复杂的是,假设你部署了两台服务器——节点 A 和节点 B。用户的停止请求被 Nginx 分配到了节点 A,但 LLM 的流式推理跑在节点 B 上。节点 A 收到停止信号,但它本地根本没有这个任务,怎么把取消信号送到节点 B?节点 B 收到信号后,怎么打断正在 while 循环里一行一行读 LLM 响应的线程?已经生成了一半的内容要不要保存?SSE 连接怎么优雅关闭——先推 cancel 事件再推 done,还是直接断开?

这就是本篇要解决的问题。从前端按钮到集群广播,到 LLM 流中断,到资源释放——取消机制的完整链路。

前端点了停止,后端收到了什么

1. 前端的停止按钮

回忆一下上一篇讲的正常流程:前端通过 EventSource 连接 GET /rag/v3/chat,建立 SSE 长连接。连接建立后,后端会推一个 meta 事件,里面带着 conversationIdtaskId。前端把这个 taskId 存下来——它就是后面停止时的凭证。

用户点停止按钮后,前端做两件事:

  1. POST /rag/v3/stop?taskId=xxx,告诉后端取消这个任务
  2. 等待后端推回 cancel 事件,收到后再关闭 EventSource

注意,前端不是直接断开连接,而是先发停止请求,等后端推回确认事件后再断。这样做的好处是:后端有机会保存已生成的内容,推送一个带 messageIdcancel 事件,前端可以据此展示部分回复而不是一片空白。

2. 后端入口:Controller → Service → TaskManager

停止请求的入口很简单:

@RestController
@RequiredArgsConstructor
public class RAGChatController {

private final RAGChatService ragChatService;

/**
* 停止指定任务
*/
@IdempotentSubmit
@PostMapping(value = "/rag/v3/stop")
public Result<Void> stop(@RequestParam String taskId) {
ragChatService.stopTask(taskId);
return Results.success();
}
}

@IdempotentSubmit 做幂等保护——用户可能紧张地连点好几下停止按钮,只处理一次就够了。

Service 层更简单,一行转发:

@Override
public void stopTask(String taskId) {
taskManager.cancel(taskId);
}

整条调用链:stop(taskId)stopTask(taskId)taskManager.cancel(taskId)。所有的复杂度都在 StreamTaskManager 里。

StreamTaskManager:跨集群任务管控的核心

StreamTaskManager 是本篇的主角。它要解决的核心问题是:在多机部署下,取消信号怎么从接收停止请求的节点传播到真正跑 LLM 流的节点,并且在各种时序竞争下都能正确工作。

1. 一张图看完整的取消流程

用一个具体场景贯穿:用户提了一个问题,LLM 正在节点 B 上流式生成回复。用户看到前几行发现答偏了,点击停止按钮,停止请求被负载均衡分配到了节点 A。

整个流程的关键在于:节点 A 不直接联系节点 B,而是通过 Redis 做中转。Redis 既做持久化标记(防止时序问题),又做 Pub/Sub 广播(实时通知)。节点 B 收到广播后在本地执行取消逻辑:中断 LLM 流、保存已生成内容、推 SSE 事件、关闭连接。

2. 两层存储:Guava Cache + Redis

StreamTaskManager 用了两层存储,各管各的事:

2.1 本地 Guava Cache

private final Cache<String, StreamTaskInfo> tasks = CacheBuilder.newBuilder()
.expireAfterWrite(CANCEL_TTL) // 30 分钟自动过期
.maximumSize(10000) // 最多存 1 万个任务
.build();

Guava Cache 存的是 StreamTaskInfo——每个正在进行的流式任务在本地的注册信息:

private static final class StreamTaskInfo {
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private volatile StreamCancellationHandle handle;
private volatile SseEmitterSender sender;
private volatile Supplier<CompletionPayload> onCancelSupplier;
}

四个字段分别是:取消标志位(AtomicBoolean,CAS 保证幂等)、LLM 流的取消句柄(用来中断 OkHttp 连接)、SSE 发送器(用来推事件和关连接)、取消时的回调(用来保存已生成内容并构造 CompletionPayload)。

为什么用 Guava Cache 而不是普通的 ConcurrentHashMap?两个原因:

  • expireAfterWrite(30 分钟):任务完成后正常路径会调 unregister() 清理,但如果进程崩溃或者某些异常路径没走到清理逻辑,Guava Cache 的 TTL 会自动回收条目,不会内存泄漏
  • maximumSize(10000):限制最大容量,即使出现异常情况导致大量任务堆积,也不会把内存撑爆

这两个能力是 ConcurrentHashMap 不具备的。

Guava Cache 的作用是管本地状态——这台机器上有哪些正在跑的任务,每个任务的取消句柄和 SSE 连接在哪里。读写都在本地内存,没有网络开销。这一点很重要,因为 StreamChatEventHandler 在每次 onContent() 回调中都要调 isCancelled() 检查任务是否已被取消。LLM 生成过程中这个检查可能每秒执行几十次,如果每次都查 Redis,延迟和负载都不可接受。

2.2 Redis 标记 + Pub/Sub

Redis 在取消流程中扮演两个角色:

RBucket(Key-Value 存储)—— 持久化取消标记

Key:   ragent:stream:cancel:{taskId}
Value: true
TTL: 30 分钟

取消标记写入 Redis 后,30 分钟内任何节点都能查到这个任务已被取消。它解决的是时序问题——如果取消信号到达时任务还没注册到本地 Cache(比如 StreamChatEventHandler.initialize() 还没执行),等任务注册时可以去 Redis 查一下,发现已经被取消了,直接推 cancel 事件关连接。

RTopic(Pub/Sub)—— 实时广播取消信号

取消发生时,通过 RTopic.publish(taskId) 把 taskId 广播到所有订阅了 ragent:stream:cancel 频道的节点。每个节点收到后在本地查找并执行取消逻辑。注意取消方调 cancel() 时不区分本节点还是远端节点,广播会回到本机,统一由监听器处理,避免本地直调和远端调用走两条路径。

为什么两者都要用?因为它们各有短板:

  • Pub/Sub 是即时通知但不持久:Redis Pub/Sub 是 fire-and-forget,消息发出后不落盘、不重投。这里真正会丢的不是网络层面的消息丢失,而是消息到达节点时,对应的 taskId 还没走到 register() 那一步——订阅器收到 taskId,但本地 Cache 里查不到,这条广播对该任务就等于无效
  • Bucket 是持久化但不通知:写入 Redis 后 30 分钟内都能查到,但它不会主动通知各节点。如果只用 Bucket,就得靠轮询来发现取消状态,延迟太高

两者配合:Pub/Sub 负责实时通知已注册的任务立刻取消,Bucket 负责兜底未注册的任务在注册时发现自己已被取消。

3. cancel():发起取消

public void cancel(String taskId) {
RBucket<Boolean> bucket = redissonClient.getBucket(cancelKey(taskId));
bucket.set(Boolean.TRUE, CANCEL_TTL);
redissonClient.getTopic(CANCEL_TOPIC).publish(taskId);
}

两行代码,两步操作:

  1. 先写标记bucket.set(Boolean.TRUE, CANCEL_TTL) 把取消标记写入 Redis,TTL 30 分钟
  2. 再广播topic.publish(taskId) 通过 Pub/Sub 通知所有节点

顺序是先写标记再广播。这样做是为了防止一种极端情况:如果先广播再写标记,广播到达某个节点时,该节点发现本地没有这个任务,就忽略了;紧接着任务注册到本地,去 Redis 查标记——但标记还没写入(虽然在同一个 Redis 实例中 Pub/Sub 和 SET 的时序很接近,但语义上先写标记更稳妥)。

注意 cancel() 方法没有操作本地 Guava Cache。它只写 Redis 和广播。本地取消逻辑由 Pub/Sub 的监听回调 cancelLocal() 处理——即使取消请求打到的正好是跑任务的那台机器,也是走广播再回到本地执行,逻辑路径统一。

4. subscribe():监听取消频道

解锁付费内容,👉 戳