Skip to main content

一次提问同时查三个知识库——多通道并行检索架构

开篇引言

上一篇讲完了意图分数到检索动作的映射——SYSTEM 意图短路直接回复,MCP 意图触发工具调用,KB 意图走定向检索或全局兜底。四个电商客服场景跑了一遍,每种意图类型各走一条路径。读者已经知道哪些通道会被激活、每个通道查多少条。

用一句话概括第 9 篇和本篇的分工:第 9 篇解决的是查什么——哪些通道该激活、查哪个 Collection、TopK 多少。本篇解决的是怎么查——通道怎么并行跑、线程池怎么隔离、结果怎么收回来、挂了怎么容错。

来看一个具体场景。假设用户在电商客服界面发了一句:

AirPods 保修多久,顺便退货运费谁出?

经过查询重写拆成两个子问题,意图分类分别命中了 3C 数码 > 保修政策(score=0.88)和 3C 数码 > 退货政策(score=0.85)。第 9 篇告诉我们,定向检索通道被激活了,要查 kb_3c_warrantykb_3c_return 两个 Milvus Collection。分数都在 0.85 以上,全局兜底通道不启用。

两个 Collection 的向量检索各花 50ms 左右。如果串行查,合计 100ms。并行查,50ms 就出结果。快了一倍听起来不错,但问题不只是快不快——

  • 通道之间用什么线程池?

  • 通道内部多个 Collection 又用什么线程池?会不会互相抢资源?

  • 如果 kb_3c_warranty 对应的 Milvus 分区正在 compaction 导致超时了怎么办?

这就是多通道并行检索(Multi-Channel Parallel Retrieval)要解决的问题。

总览:MultiChannelRetrievalEngine 的两阶段架构

1. 入口方法

MultiChannelRetrievalEngine 是多通道检索的核心编排者。它的入口方法 retrieveKnowledgeChannels 把整个检索过程分成两个阶段:

@RagTraceNode(name = "multi-channel-retrieval", type = "RETRIEVE_CHANNEL")
public List<RetrievedChunk> retrieveKnowledgeChannels(List<SubQuestionIntent> subIntents, int topK) {
// 构建检索上下文
SearchContext context = buildSearchContext(subIntents, topK);

// 【阶段1:多通道并行检索】
List<SearchChannelResult> channelResults = executeSearchChannels(context);
if (CollUtil.isEmpty(channelResults)) {
return List.of();
}

// 【阶段2:后置处理器链】
return executePostProcessors(channelResults, context);
}

两阶段的职责很清楚:

  • 阶段 1executeSearchChannels——并行执行所有启用的 Channel,收集每个通道的原始结果
  • 阶段 2executePostProcessors——串行执行后处理器链,做去重、精排、截断

本篇聚焦阶段 1。阶段 2 的后处理器链(去重 + Cross-Encoder 精排)在第 11 篇详细展开,这里只需要知道它的输入是阶段 1 收回来的所有 Chunk,输出是精选后的最终 Chunk 列表。

2. 两阶段的执行流程

为了展示完整的两阶段流程,下面这张时序图以两个通道同时启用为例(比如用户问题的意图分数在 0.55 左右,定向和全局通道都被激活),展示从入口到向量数据库再到后处理的执行过程:

图里有几个关键细节:

  • 两个 Channel 在外层线程池(ragRetrievalExecutor)里并行执行,耗时取决于最慢那个
  • 定向检索通道内部又在内层线程池(ragInnerRetrievalExecutor)里并行查两个 Collection
  • 所有原始结果汇聚后,串行经过后处理器链,最终输出精选结果

3. SearchContext:所有通道共享的上下文

所有通道拿到的是同一个 SearchContext 对象:

@Data
@Builder
public class SearchContext {

private String originalQuestion; // 原始问题
private String rewrittenQuestion; // 改写后的问题
private List<String> subQuestions; // 子问题列表
private List<SubQuestionIntent> intents; // 意图识别结果
private int topK; // 期望返回的结果数量

@Builder.Default
private Map<String, Object> metadata = new HashMap<>();

// 优先使用改写后的问题做检索
public String getMainQuestion() {
return rewrittenQuestion != null ? rewrittenQuestion : originalQuestion;
}
}

getMainQuestion() 优先用改写后的问题做检索——第 4 篇查询重写把那它保修多久改写成了 AirPods Pro 的保修期是多久,检索用改写后的版本效果更好。

4. SearchChannelResult:通道的输出

每个通道返回一个 SearchChannelResult

@Data
@Builder
public class SearchChannelResult {

private SearchChannelType channelType; // 通道类型标识
private String channelName; // 通道名称
private List<RetrievedChunk> chunks; // 检索到的 Chunk 列表
private long latencyMs; // 检索耗时(毫秒)

@Builder.Default
private Map<String, Object> metadata = new HashMap<>();
}

channelType 不只是打日志用——后处理器做去重时会根据这个字段判断优先级。同一个 Chunk 被定向检索(INTENT_DIRECTED)和全局检索(VECTOR_GLOBAL)同时命中时,保留定向检索的版本,因为定向检索是基于意图识别的精准匹配,可信度更高。latencyMs 则方便排查哪个通道拖了后腿。

SearchChannel 接口:通道的统一契约

1. 五个方法各干什么

SearchChannel 是所有检索通道的统一接口:

public interface SearchChannel {

/** 通道名称(用于日志和监控) */
String getName();

/** 通道优先级(数字越小优先级越高) */
int getPriority();

/** 是否启用该通道 */
boolean isEnabled(SearchContext context);

/** 执行检索 */
SearchChannelResult search(SearchContext context);

/** 通道类型 */
SearchChannelType getType();
}

五个方法各司其职:

方法职责举例
getName()日志打印和监控标识IntentDirectedSearch
getPriority()去重时的优先级,数字越小越优先定向=1,全局=10
isEnabled(context)根据当前请求动态判断是否启用第 9 篇已详细讲过
search(context)执行实际检索,返回 Chunk 列表本篇重点
getType()标识通道类型,后处理器据此判断来源INTENT_DIRECTED

isEnabled 接收 SearchContext 而不是无参方法,这个设计很关键——同一个 Channel 对不同请求的表现可以完全不同。用户问 AirPods 保修多久时定向检索启用、全局检索不启用;用户含糊地问售后怎么弄时两个通道同时启用。激活条件的具体逻辑在第 9 篇已经拆过,这里不重复。

2. 接口驱动的扩展性

MultiChannelRetrievalEngine 通过 Spring 的列表注入拿到所有通道实现:

private final List<SearchChannel> searchChannels;

Spring 会自动收集所有实现了 SearchChannel 接口的 @Component,注入到这个列表里。这意味着新增一个检索通道的步骤是:

  1. 写一个新类实现 SearchChannel
  2. 加上 @Component 注解
  3. 搞定——主引擎代码零改动,新通道自动生效

SearchChannelType 枚举也为未来扩展留了位置:

public enum SearchChannelType {
VECTOR_GLOBAL, // 当前实现
INTENT_DIRECTED, // 当前实现
KEYWORD_ES, // 预留:ES 关键词检索
HYBRID // 预留:混合检索
}

当前只有 INTENT_DIRECTEDVECTOR_GLOBAL 两种实现,但架构上随时可以加 ES 关键词检索或混合检索通道。

3. 过滤、排序与并行执行

executeSearchChannels 里的编排逻辑值得看一下:

private List<SearchChannelResult> executeSearchChannels(SearchContext context) {
// 过滤启用的通道,按优先级排序
List<SearchChannel> enabledChannels = searchChannels.stream()
.filter(channel -> channel.isEnabled(context))
.sorted(Comparator.comparingInt(SearchChannel::getPriority))
.toList();

if (enabledChannels.isEmpty()) {
return List.of();
}

// 并行提交到外层线程池
List<CompletableFuture<SearchChannelResult>> futures = enabledChannels.stream()
.map(channel -> CompletableFuture.supplyAsync(
() -> {
try {
return channel.search(context);
} catch (Exception e) {
log.error("检索通道 {} 执行失败", channel.getName(), e);
return emptyResult(channel);
}
},
ragRetrievalExecutor
))
.toList();

// 等待所有通道完成
List<SearchChannelResult> results = futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.toList();

return results;
}

这段代码有个容易被忽略的点:虽然 enabledChannelsgetPriority() 排了序,但所有通道都是通过 CompletableFuture.supplyAsync 并行执行的——排序不影响执行顺序,只影响日志打印和结果列表的稳定性。

还有一个关键设计——每个 Channel 的 search() 被 try-catch 包裹,失败时返回 emptyResult(空 Chunk 列表)而不是抛异常。这保证了单个通道崩溃不会拖垮整个检索。

两个通道各自怎么搜

1. IntentDirectedSearchChannel:按意图并行查多个 Collection

1.1 内部执行流程

定向检索通道的优先级是 1(最高),它的 search() 方法做的事情很明确——把 KB 意图列表交给 IntentParallelRetriever,对每个意图绑定的 Collection 做并行检索:

@Override
public SearchChannelResult search(SearchContext context) {
long startTime = System.currentTimeMillis();

try {
List<NodeScore> kbIntents = extractKbIntents(context);

// 并行检索所有意图对应的知识库
int topKMultiplier = properties.getChannels().getIntentDirected().getTopKMultiplier();
List<RetrievedChunk> allChunks = parallelRetriever.executeParallelRetrieval(
context.getMainQuestion(),
kbIntents,
context.getTopK(),
topKMultiplier
);

long latency = System.currentTimeMillis() - startTime;

return SearchChannelResult.builder()
.channelType(SearchChannelType.INTENT_DIRECTED)
.channelName(getName())
.chunks(allChunks)
.latencyMs(latency)
.metadata(Map.of("intentCount", kbIntents.size()))
.build();

} catch (Exception e) {
log.error("意图定向检索失败", e);
return SearchChannelResult.builder()
.channelType(SearchChannelType.INTENT_DIRECTED)
.channelName(getName())
.chunks(List.of())
.latencyMs(System.currentTimeMillis() - startTime)
.build();
}
}

流程很清晰:提取 KB 意图 → 委托并行检索器 → 包装成 SearchChannelResult 返回。整个 search() 外面包了 try-catch——即使 parallelRetriever 内部出了意料之外的异常,也返回空结果而不是让调用方崩溃。

1.2 构造器注入内层线程池

解锁付费内容,👉 戳