3.3 混合检索策略 本节导读:掌握关键词检索与向量检索的融合技术,构建高性能的混合检索系统,实现更精准的信息检索 学习目标 理解混合检索的原理和价值 掌握线性组合和RRF等融合策略 学会实现动态权重和自适应检索 构建企业级混合检索系统 评估和优化混合检索性能 核心概念 混合检索原理 混合检索结合了关键词检索和向量检索的优势,通过多层次的信息融合机制,提供更全面、准确的相关性判断。 多路检索 多路检索同时使用多种检索算法,如BM25关键词检索、向量检索、语义检索等,从不同角度理解查询意图。 结果融合策略 结果融合是将多个检索源的结果合并并重新排序的过程,常见的策略包括线性组合、Reciprocal Rank Fusion (RRF)、机器学习排序等。
本节导读:掌握关键词检索与向量检索的融合技术,构建高性能的混合检索系统,实现更精准的信息检索
混合检索结合了关键词检索和向量检索的优势,通过多层次的信息融合机制,提供更全面、准确的相关性判断。
多路检索同时使用多种检索算法,如BM25关键词检索、向量检索、语义检索等,从不同角度理解查询意图。
结果融合是将多个检索源的结果合并并重新排序的过程,常见的策略包括线性组合、Reciprocal Rank Fusion (RRF)、机器学习排序等。
pip install haystack-ai sentence-transformers scikit-learn numpy
from haystack.components.retrievers import BM25Retriever, EmbeddingRetriever from haystack.components.rankers import Ranker from haystack.document_stores import InMemoryDocumentStore, FAISSDocumentStore from haystack.core import Pipeline # 1. 创建文档存储 document_store = InMemoryDocumentStore() # 2. 准备文档数据 documents = [ { "id": "doc1", "content": "机器学习是人工智能的一个分支,专注于让计算机系统从数据中学习模式和规律。监督学习、无监督学习和强化学习是机器学习的三大主要范式。", "meta": {"category": "AI", "language": "zh"} }, { "id": "doc2", "content": "深度学习是机器学习的子集,使用多层神经网络来学习数据的复杂表示。卷积神经网络(CNN)在图像处理中表现出色,循环神经网络(RNN)和Transformer在序列数据处理中效果显著。", "meta": {"category": "AI", "language": "zh"} }, { "id": "doc3", "content": "自然语言处理(NLP)是AI的一个领域,专注于计算机与人类语言之间的交互。现代NLP主要基于Transformer架构,BERT、GPT等模型在理解、生成和翻译任务中取得了突破性进展。", "meta": {"category": "NLP", "language": "zh"} }, { "id": "doc4", "content": "计算机视觉使计算机能够从图像和视频中获取高级理解,类似于人类视觉系统。目标检测、图像分割、图像生成是计算机视觉的核心任务,YOLO、SSD、Mask R-CNN等算法被广泛应用。", "meta": {"category": "CV", "language": "zh"} }, { "id": "doc5", "content": "检索增强生成(RAG)结合了信息检索和文本生成,通过外部知识库增强语言模型的能力。这种方法解决了大语言模型的知识时效性和幻觉问题,提高了回答的准确性和可靠性。", "meta": {"category": "RAG", "language": "zh"} } ] # 3. 写入文档 document_store.write_documents(documents) # 4. 创建关键词检索器 bm25_retriever = BM25Retriever(document_store=document_store, top_k=5) # 5. 创建向量存储(模拟) faiss_store = FAISSDocumentStore( embedding_dim=384, faiss_index_factory="Flat", return_embedding=True, similarity="cosine" ) # 写入同样的文档到向量存储 faiss_store.write_documents(documents) # 6. 创建向量检索器 embedding_retriever = EmbeddingRetriever( document_store=faiss_store, top_k=5, scale_score=True ) # 7. 创建混合检索器 from haystack.components.retrievers import HybridRetriever hybrid_retriever = HybridRetriever( document_store=document_store, retrievers=[bm25_retriever, embedding_retriever], weights=[0.5, 0.5] # 线性组合权重 ) # 8. 执行混合检索 query = "深度学习是什么?" results = hybrid_retriever.run(query=query) print(f"查询: {query}") print(f"混合检索结果:") print(f"找到 {len(results['documents'])} 个文档\n") for i, doc in enumerate(results['documents'], 1): print(f"{i}. [分数: {doc.score:.4f}] {doc.content}") print(f" ID: {doc.id}\n")
from haystack.components.rankers import Ranker # 1. 创建RRF重排序器 rrf_ranker = Ranker( algorithm="rrf", k=60, # RRF参数,通常设置为文档数量的1-2倍 top_k=5 # 最终返回结果数量 ) # 2. 分别执行不同检索 bm25_results = bm25_retriever.run(query=query) embedding_results = embedding_retriever.run(query=query) # 3. 合并结果 combined_results = { "documents": bm25_results['documents'] + embedding_results['documents'] } # 4. 应用RRF重排序 ranked_results = rrf_ranker.run(documents=combined_results) print(f"RRF重排序结果:") print(f"重排序后 {len(ranked_results['documents'])} 个文档\n") for i, doc in enumerate(ranked_results['documents'], 1): print(f"{i}. [分数: {doc.score:.4f}] {doc.content}") print(f" ID: {doc.id}\n")
import re from typing import Dict, List, Tuple class QueryAnalyzer: """查询类型分析器""" def __init__(self): self.technical_keywords = [ "算法", "神经网络", "深度学习", "机器学习", "卷积", "循环", "transformer", "attention", "embedding", "向量", "矩阵" ] self.factual_keywords = [ "定义", "是什么", "概念", "原理", "介绍", "概述", "历史", "发展", "应用", "特点", "优势" ] def analyze(self, query: str) -> Dict[str, float]: """分析查询类型""" query_lower = query.lower() technical_score = sum(1 for kw in self.technical_keywords if kw in query_lower) factual_score = sum(1 for kw in self.factual_keywords if kw in query_lower) total_length = len(query.split()) length_factor = min(total_length / 10, 1.0) # 长度因子 return { "technical": technical_score * length_factor, "factual": factual_score * length_factor, "length": total_length, "complexity": (technical_score + factual_score) * length_factor } class DynamicHybridRetriever: """动态权重混合检索器""" def __init__(self, bm25_retriever, embedding_retriever, query_analyzer): self.bm25_retriever = bm25_retriever self.embedding_retriever = embedding_retriever self.query_analyzer = query_analyzer def get_weights(self, query: str) -> Tuple[float, float]: """根据查询类型获取动态权重""" analysis = self.query_analyzer.analyze(query) technical_ratio = analysis["technical"] / max(analysis["complexity"], 1) factual_ratio = analysis["factual"] / max(analysis["complexity"], 1) # 根据查询类型调整权重 if technical_ratio > factual_ratio: # 技术查询,向量的权重更高 bm25_weight = 0.3 embedding_weight = 0.7 else: # 事实查询,关键词的权重更高 bm25_weight = 0.7 embedding_weight = 0.3 return bm25_weight, embedding_weight def run(self, query: str) -> Dict[str, any]: """执行动态混合检索""" # 获取动态权重 bm25_weight, embedding_weight = self.get_weights(query) # 并行执行检索 bm25_results = self.bm25_retriever.run(query=query) embedding_results = self.embedding_retriever.run(query=query) # 合并并重排序结果 combined_documents = bm25_results['documents'] + embedding_results['documents'] # 使用RRF进行重排序 rrf_ranker = Ranker(algorithm="rrf", k=60, top_k=5) ranked_results = rrf_ranker.run(documents={"documents": combined_documents}) return { "documents": ranked_results['documents'], "weights": { "bm25": bm25_weight, "embedding": embedding_weight }, "query_analysis": self.query_analyzer.analyze(query), "method": "dynamic_rrf" } # 测试动态权重检索 query_analyzer = QueryAnalyzer() dynamic_retriever = DynamicHybridRetriever(bm25_retriever, embedding_retriever, query_analyzer) # 测试不同类型的查询 test_queries = [ "深度学习算法的原理是什么?", # 技术查询 "什么是检索增强生成?", # 事实查询 "机器学习有什么应用?", # 平衡查询 "Transformer架构的attention机制" # 高技术查询 ] print("动态权重混合检索测试:") print("=" * 60) for query in test_queries: results = dynamic_retriever.run(query) print(f"\n查询: '{query}'") print(f"查询分析: {results['query_analysis']}") print(f"权重分配: BM25={results['weights']['bm25']:.2f}, 向量={results['weights']['embedding']:.2f}") print(f"检索结果:") for i, doc in enumerate(results['documents'], 1): print(f" {i}. [分数: {doc.score:.4f}] {doc.content[:60]}...")
import time import logging from typing import List, Dict, Any, Optional from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor, as_completed import threading logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class HybridRetrievalConfig: """混合检索配置""" bm25_weight: float = 0.5 embedding_weight: float = 0.5 rrf_k: int = 60 top_k: int = 10 enable_dynamic: bool = True enable_parallel: bool = True cache_size: int = 1000 class EnterpriseHybridRetriever: """企业级混合检索系统""" def __init__(self, config: HybridRetrievalConfig): self.config = config self.bm25_retriever = None self.embedding_retriever = None self.rrf_ranker = None self.query_analyzer = None self.cache = {} self.cache_lock = threading.Lock() def initialize(self, document_store, faiss_store=None): """初始化检索系统""" logger.info("正在初始化企业级混合检索系统") # 初始化BM25检索器 self.bm25_retriever = BM25Retriever( document_store=document_store, top_k=20, all_terms_must_match=False ) # 初始化向量检索器 if faiss_store is None: faiss_store = FAISSDocumentStore( embedding_dim=384, faiss_index_factory="Flat", return_embedding=True, similarity="cosine" ) self.embedding_retriever = EmbeddingRetriever( document_store=faiss_store, top_k=15, scale_score=True ) # 初始化RRF重排序器 self.rrf_ranker = Ranker( algorithm="rrf", k=self.config.rrf_k, top_k=self.config.top_k ) # 初始化查询分析器 self.query_analyzer = QueryAnalyzer() logger.info("混合检索系统初始化完成") def run(self, query: str) -> Dict[str, Any]: """执行混合检索""" start_time = time.time() try: # 获取动态权重 if self.config.enable_dynamic: bm25_weight, embedding_weight = self._get_dynamic_weights(query) else: bm25_weight, embedding_weight = self.config.bm25_weight, self.config.embedding_weight # 并行检索 if self.config.enable_parallel: with ThreadPoolExecutor(max_workers=2) as executor: bm25_future = executor.submit(self.bm25_retriever.run, query=query) embedding_future = executor.submit(self.embedding_retriever.run, query=query) bm25_results = bm25_future.result() embedding_results = embedding_future.result() else: bm25_results = self.bm25_retriever.run(query=query) embedding_results = self.embedding_retriever.run(query=query) # 合并结果 all_documents = ( bm25_results["documents"] + embedding_results["documents"] ) # 重排序 ranked_results = self.rrf_ranker.run(documents={"documents": all_documents}) # 构建返回结果 result = { "documents": ranked_results["documents"], "query": query, "weights": { "bm25": bm25_weight, "embedding": embedding_weight }, "query_analysis": self.query_analyzer.analyze(query) if self.config.enable_dynamic else None, "retrieval_time": time.time() - start_time, "total_results": len(all_documents), "final_results": len(ranked_results["documents"]) } return result except Exception as e: logger.error(f"检索失败: {e}") raise def _get_dynamic_weights(self, query: str) -> tuple: """获取动态权重""" analysis = self.query_analyzer.analyze(query) # 计算技术性和事实性权重 technical_score = analysis["technical"] factual_score = analysis["factual"] total_score = technical_score + factual_score if total_score == 0: return self.config.bm25_weight, self.config.embedding_weight technical_ratio = technical_score / total_score factual_ratio = factual_score / total_score # 根据查询类型调整权重 if technical_ratio > factual_ratio: # 技术查询,增加向量权重 bm25_weight = 0.3 embedding_weight = 0.7 else: # 事实查询,增加关键词权重 bm25_weight = 0.7 embedding_weight = 0.3 # 归一化 total = bm25_weight + embedding_weight return bm25_weight / total, embedding_weight / total # 使用示例 if __name__ == "__main__": # 配置检索系统 config = HybridRetrievalConfig( bm25_weight=0.5, embedding_weight=0.5, rrf_k=60, top_k=10, enable_dynamic=True, enable_parallel=True ) # 创建检索系统 retriever = EnterpriseHybridRetriever(config) # 创建文档存储 document_store = InMemoryDocumentStore() faiss_store = FAISSDocumentStore( embedding_dim=384, faiss_index_factory="Flat", return_embedding=True, similarity="cosine" ) # 初始化 retriever.initialize(document_store, faiss_store) # 准备文档数据 documents = [ { "id": f"doc{i}", "content": f"这是第{i}个文档的内容。包含了关于机器学习、深度学习、自然语言处理、计算机视觉等各种人工智能技术的详细介绍。这些技术在现代应用中发挥着重要作用。", "meta": {"category": "AI", "language": "zh", "id": i} } for i in range(1, 51) # 50个文档 ] # 索引文档 document_store.write_documents(documents) faiss_store.write_documents(documents) # 测试查询 test_queries = [ "机器学习算法原理", "深度学习神经网络", "自然语言处理应用", "计算机视觉目标检测" ] print("企业级混合检索系统测试:") print("=" * 80) for i, query in enumerate(test_queries, 1): print(f"\n=== 测试 {i}: {query} ===") result = retriever.run(query) print(f"权重分配: BM25={result['weights']['bm25']:.3f}, 向量={result['weights']['embedding']:.3f}") print(f"检索时间: {result['retrieval_time']:.4f}秒") print(f"检索到 {result['total_results']} 个候选文档") print(f"最终结果: {result['final_results']} 个") if result.get('query_analysis'): print(f"查询分析: 技术性={result['query_analysis']['technical']:.2f}, 事实性={result['query_analysis']['factual']:.2f}") print("前3个结果:") for j, doc in enumerate(result['documents'][:3], 1): print(f" {j}. [分数: {doc.score:.4f}] {doc.content[:50]}...")
A:权重分配应考虑以下因素:
A:混合检索并不一定能提高质量,取决于具体场景:
A:结果冲突处理策略:
class ProgressiveHybridRetriever: """渐进式混合检索器""" def __init__(self, document_store): self.document_store = document_store self.bm25_retriever = BM25Retriever(document_store=document_store, top_k=10) self.embedding_retriever = EmbeddingRetriever( document_store=faiss_store, top_k=10 ) # 性能阈值 self.latency_threshold = 0.1 # 100ms self.accuracy_threshold = 0.8 # 80%准确率 def run(self, query): """根据当前性能选择检索策略""" start_time = time.time() # 第一步:快速关键词检索 bm25_results = self.bm25_retriever.run(query=query) if len(bm25_results['documents']) >= 3: # 检查性能 keyword_time = time.time() - start_time if keyword_time < self.latency_threshold: # 性能好,返回关键词结果 return { "documents": bm25_results['documents'][:5], "method": "keyword_only", "time": keyword_time, "reason": "快速检索满足要求" } # 第二步:执行混合检索 hybrid_results = self._run_hybrid(query) total_time = time.time() - start_time return { "documents": hybrid_results['documents'], "method": "hybrid", "time": total_time, "reason": "混合检索提供更准确结果" }
class CachedHybridRetriever: """带缓存的混合检索器""" def __init__(self, hybrid_retriever, cache_size=1000): self.hybrid_retriever = hybrid_retriever self.cache = {} self.cache_size = cache_size self.access_order = [] def run(self, query): """执行带缓存的混合检索""" # 检查缓存 if query in self.cache: # 更新访问顺序 self.access_order.remove(query) self.access_order.append(query) return self.cache[query] # 执行检索 result = self.hybrid_retriever.run(query) # 缓存结果 self._cache_result(query, result) return result def _cache_result(self, query, result): """缓存结果,实现LRU策略""" if len(self.cache) >= self.cache_size: # 删除最旧的条目 oldest_query = self.access_order.pop(0) del self.cache[oldest_query] # 添加新条目 self.cache[query] = result self.access_order.append(query)
常见错误:固定权重无法适应不同查询类型。
解决方案:实现动态权重机制,根据查询特征自动调整权重。
常见错误:混合检索导致响应时间过长。
解决方案:实现并行检索、缓存机制和渐进式检索策略。
本章节详细介绍了混合检索策略,包括线性组合、RRF重排序、动态权重和多级融合等方法。通过企业级混合检索系统的实现,我们学会了如何将关键词检索和向量检索的优势有机结合,提供更全面、准确的信息检索服务。关键要点包括:理解不同融合策略的适用场景、实现动态权重调整、构建并行检索架构、优化性能和缓存机制。下一章我们将进入第4章,学习RAG系统的生成与提示工程。
关键词:混合检索, 多路检索, RRF, 结果融合, 动态权重, 企业级检索
难度:高级
预计阅读:90分钟