一次提问同时查三个知识库——多通道并行检索架构
开篇引言
上一篇讲完了意图分数到检索动作的映射——SYSTEM 意图短路直接回复,MCP 意图触发工具调用,KB 意图走定向检索或全局兜底。四个电商客服场景跑了一遍,每种意图类型各走一条路径。读者已经知道哪些通道会被激活、每个通道查多少条。
用一句话概括第 9 篇和本篇的分工:第 9 篇解决的是查什么——哪些通道该激活、查哪个 Collection、TopK 多少。本篇解决的是怎么查——通道怎么并行跑、线程池怎么隔离、结果怎么收回来、挂了怎么容错。
来看一个具体场景。假设用户在电商客服界面发了一句:
AirPods 保修多久,顺便退货运费谁出?
经过查询重写拆成两个子问题,意图分类分别命中了 3C 数码 > 保修政策(score=0.88)和 3C 数码 > 退货政策(score=0.85)。第 9 篇告诉我们,定向检索通道被激活了,要查 kb_3c_warranty 和 kb_3c_return 两个 Milvus Collection。分数都在 0.85 以上,全局兜底通道不启用。
两个 Collection 的向量检索各花 50ms 左右。如果串行查,合计 100ms。并行查,50ms 就出结果。快了一倍听起来不错,但问题不只是快不快——
-
通道之间用什么线程池?
-
通道内部多个 Collection 又用什么线程池?会不会互相抢资源?
-
如果
kb_3c_warranty对应的 Milvus 分区正在 compaction 导致超时了怎么办?
这就是多通道并行检索(Multi-Channel Parallel Retrieval)要解决的问题。
总览:MultiChannelRetrievalEngine 的两阶段架构
1. 入口方法
MultiChannelRetrievalEngine 是多通道检索的核心编排者。它的入口方法 retrieveKnowledgeChannels 把整个检索过程分成两个阶段:
@RagTraceNode(name = "multi-channel-retrieval", type = "RETRIEVE_CHANNEL")
public List<RetrievedChunk> retrieveKnowledgeChannels(List<SubQuestionIntent> subIntents, int topK) {
// 构建检索上下文
SearchContext context = buildSearchContext(subIntents, topK);
// 【阶段1:多通道并行检索】
List<SearchChannelResult> channelResults = executeSearchChannels(context);
if (CollUtil.isEmpty(channelResults)) {
return List.of();
}
// 【阶段2:后置处理器链】
return executePostProcessors(channelResults, context);
}
两阶段的职责很清楚:
- 阶段 1:
executeSearchChannels——并行执行所有启用的 Channel,收集每个通道的原始结果 - 阶段 2:
executePostProcessors——串行执行后处理器链,做去重、精排、截断
本篇聚焦阶段 1。阶段 2 的后处理器链(去重 + Cross-Encoder 精排)在第 11 篇详细展开,这里只需要知道它的输入是阶段 1 收回来的所有 Chunk,输出是精选后的最终 Chunk 列表。
2. 两阶段的执行流程
为了展示完整的两阶段流程,下面这张时序图以两个通道同时启用为例(比如用户问题的意图分数在 0.55 左右,定向和全局通道都被激活),展示从入口到向量数据库再到后处理的执行过程:
图里有几个关键细节:
- 两个 Channel 在外层线程池(
ragRetrievalExecutor)里并行执行,耗时取决于最慢那个 - 定向检索通道内部又在内层线程池(
ragInnerRetrievalExecutor)里并行查两个 Collection - 所有原始结果汇聚后,串行经过后处理器链,最终输出精选结果
3. SearchContext:所有通道共享的上下文
所有通道拿到的是同一个 SearchContext 对象:
@Data
@Builder
public class SearchContext {
private String originalQuestion; // 原始问题
private String rewrittenQuestion; // 改写后的问题
private List<String> subQuestions; // 子问题列表
private List<SubQuestionIntent> intents; // 意图识别结果
private int topK; // 期望返回的结果数量
@Builder.Default
private Map<String, Object> metadata = new HashMap<>();
// 优先使用改写后的问题做检索
public String getMainQuestion() {
return rewrittenQuestion != null ? rewrittenQuestion : originalQuestion;
}
}
getMainQuestion() 优先用改写后的问题做检索——第 4 篇查询重写把那它保修多久改写成了 AirPods Pro 的保修期是多久,检索用改写后的版本效果更好。
4. SearchChannelResult:通道的输出
每个通道返回一个 SearchChannelResult:
@Data
@Builder
public class SearchChannelResult {
private SearchChannelType channelType; // 通道类型标识
private String channelName; // 通道名称
private List<RetrievedChunk> chunks; // 检索到的 Chunk 列表
private long latencyMs; // 检索耗时(毫秒)
@Builder.Default
private Map<String, Object> metadata = new HashMap<>();
}
channelType 不只是打日志用——后处理器做去重时会根据这个字段判断优先级。同一个 Chunk 被定向检索(INTENT_DIRECTED)和全局检索(VECTOR_GLOBAL)同时命中时,保留定向检索的版本,因为定向检索是基于意图识别的精准匹配,可信度更高。latencyMs 则方便排查哪个通道拖了后腿。
SearchChannel 接口:通道的统一契约
1. 五个方法各干什么
SearchChannel 是所有检索通道的统一接口:
public interface SearchChannel {
/** 通道名称(用于日志和监控) */
String getName();
/** 通道优先级(数字越小优先级越高) */
int getPriority();
/** 是否启用该通道 */
boolean isEnabled(SearchContext context);
/** 执行检索 */
SearchChannelResult search(SearchContext context);
/** 通道类型 */
SearchChannelType getType();
}
五个方法各司其职:
| 方法 | 职责 | 举例 |
|---|---|---|
getName() | 日志打印和监控标识 | IntentDirectedSearch |
getPriority() | 去重时的优先级,数字越小越优先 | 定向=1,全局=10 |
isEnabled(context) | 根据当前请求动态判断是否启用 | 第 9 篇已详细讲过 |
search(context) | 执行实际检索,返回 Chunk 列表 | 本篇重点 |
getType() | 标识通道类型,后处理器据此判断来源 | INTENT_DIRECTED |
isEnabled 接收 SearchContext 而不是无参方法,这个设计很关键——同一个 Channel 对不同请求的表现可以完全不同。用户问 AirPods 保修多久时定向检索启用、全局检索不启用;用户含糊地问售后怎么弄时两个通道同时启用。激活条件的具体逻辑在第 9 篇已经拆过,这里不重复。
2. 接口驱动的扩展性
MultiChannelRetrievalEngine 通 过 Spring 的列表注入拿到所有通道实现:
private final List<SearchChannel> searchChannels;
Spring 会自动收集所有实现了 SearchChannel 接口的 @Component,注入到这个列表里。这意味着新增一个检索通道的步骤是:
- 写一个新类实现
SearchChannel - 加上
@Component注解 - 搞定——主引擎代码零改动,新通道自动生效
SearchChannelType 枚举也为未来扩展留了位置:
public enum SearchChannelType {
VECTOR_GLOBAL, // 当前实现
INTENT_DIRECTED, // 当前实现
KEYWORD_ES, // 预留:ES 关键词检索
HYBRID // 预留:混合检索
}
当前只有 INTENT_DIRECTED 和 VECTOR_GLOBAL 两种实现,但架构上随时可以加 ES 关键词检索或混合检索 通道。
3. 过滤、排序与并行执行
executeSearchChannels 里的编排逻辑值得看一下:
private List<SearchChannelResult> executeSearchChannels(SearchContext context) {
// 过滤启用的通道,按优先级排序
List<SearchChannel> enabledChannels = searchChannels.stream()
.filter(channel -> channel.isEnabled(context))
.sorted(Comparator.comparingInt(SearchChannel::getPriority))
.toList();
if (enabledChannels.isEmpty()) {
return List.of();
}
// 并行提交到外层线程池
List<CompletableFuture<SearchChannelResult>> futures = enabledChannels.stream()
.map(channel -> CompletableFuture.supplyAsync(
() -> {
try {
return channel.search(context);
} catch (Exception e) {
log.error("检索通道 {} 执行失败", channel.getName(), e);
return emptyResult(channel);
}
},
ragRetrievalExecutor
))
.toList();
// 等待所有通道完成
List<SearchChannelResult> results = futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.toList();
return results;
}
这段代码有个容易被忽略的点:虽然 enabledChannels 按 getPriority() 排了序,但所有通道都是通过 CompletableFuture.supplyAsync 并行执行的——排序不影响执行顺序,只影响日志打印和结果列表的稳定性。
还有一个关键设计——每个 Channel 的 search() 被 try-catch 包裹,失败时返回 emptyResult(空 Chunk 列表)而不是抛异常。这保证了单个通道崩溃不会拖垮整个检索。
两个通道各自怎么搜
1. IntentDirectedSearchChannel:按意图并行查多个 Collection
1.1 内部执行流程
定向检索通道的优先级是 1(最高),它的 search() 方法做的事情很明确——把 KB 意图列表交给 IntentParallelRetriever,对每个意图绑定的 Collection 做并行检索:
@Override
public SearchChannelResult search(SearchContext context) {
long startTime = System.currentTimeMillis();
try {
List<NodeScore> kbIntents = extractKbIntents(context);
// 并行检索所有意图对应的知识库
int topKMultiplier = properties.getChannels().getIntentDirected().getTopKMultiplier();
List<RetrievedChunk> allChunks = parallelRetriever.executeParallelRetrieval(
context.getMainQuestion(),
kbIntents,
context.getTopK(),
topKMultiplier
);
long latency = System.currentTimeMillis() - startTime;
return SearchChannelResult.builder()
.channelType(SearchChannelType.INTENT_DIRECTED)
.channelName(getName())
.chunks(allChunks)
.latencyMs(latency)
.metadata(Map.of("intentCount", kbIntents.size()))
.build();
} catch (Exception e) {
log.error("意图定向检索失败", e);
return SearchChannelResult.builder()
.channelType(SearchChannelType.INTENT_DIRECTED)
.channelName(getName())
.chunks(List.of())
.latencyMs(System.currentTimeMillis() - startTime)
.build();
}
}
流程很清晰:提取 KB 意图 → 委托并行检索器 → 包装成 SearchChannelResult 返回。整个 search() 外面包了 try-catch——即使 parallelRetriever 内部出了意料之外的异常,也返回空结果而不是让调用方崩溃。