检索引擎 本节导读:深入理解RAG检索核心技术,构建高效精准的文档检索系统 学习目标 掌握向量检索和关键词检索的原理与实现 理解混合检索策略的设计和优化 学会评估检索性能和调优方法 构建高精度的企业级检索引擎 核心概念 检索系统架构 Haystack检索引擎采用多层架构设计,支持多种检索算法和优化策略,确保检索结果的准确性和响应速度。 语义与关键词的协同 传统关键词检索与语义向量检索的优势互补,实现更全面的信息覆盖和更高的相关性判断。
本节导读:深入理解RAG检索核心技术,构建高效精准的文档检索系统
Haystack检索引擎采用多层架构设计,支持多种检索算法和优化策略,确保检索结果的准确性和响应速度。
传统关键词检索与语义向量检索的优势互补,实现更全面的信息覆盖和更高的相关性判断。
from haystack.components.embedders import SentenceTransformersTextEmbedder # 文本向量化 embedder = SentenceTransformersTextEmbedder( model="sentence-transformers/all-MiniLM-L6-v2", device="cpu" ) # 生成嵌入向量 embeddings = embedder.run(texts=["查询文本"])
from haystack.document_stores import FAISSDocumentStore # FAISS存储配置 faiss_store = FAISSDocumentStore( embedding_dim=384, # 匹配模型维度 faiss_index_factory="Flat", # 索引类型 return_embedding=True, # 返回向量 similarity="cosine" # 相似度计算 )
| 方法 | 公式 | 适用场景 |
|---|---|---|
| 余弦相似度 | cos(θ) = A·B/( | A |
| 欧氏距离 | d = sqrt(∑(x_i-y_i)²) | 距离计算 |
| 内积相似度 | sim = A·B | 效率优先 |
from haystack.components.retrievers import EmbeddingRetriever # 向量检索器 retriever = EmbeddingRetriever( document_store=faiss_store, top_k=5, # 返回前5个结果 scale_score=True, # 分数缩放 ) # 执行检索 retrieved_docs = retriever.run(query="用户查询")
from haystack.components.retrievers import BM25Retriever # BM25检索器 bm25_retriever = BM25Retriever(document_store=document_store) # 参数配置 bm25_retriever = BM25Retriever( document_store=document_store, top_k=10, all_terms_must_match=False, # 是否要求所有词匹配 )
from haystack.components.preprocessors import TextPreprocessor # 文本预处理 preprocessor = TextPreprocessor( split_by="word", split_length=None, split_overlap=0, remove_punctuation=True, remove_numbers=False, remove_stopwords=True, stemming=True, lowercase=True )
# 模糊BM25检索 fuzzy_bm25 = BM25Retriever( document_store=document_store, top_k=10, fuzzy=True, # 启用模糊匹配 fuzziness=2, # 模糊程度 )
from haystack.components.retrievers import HybridRetriever # 混合检索器 hybrid_retriever = HybridRetriever( document_store=document_store, retrievers=[bm25_retriever, vector_retriever], weights=[0.6, 0.4] # 权重分配 ) # 执行混合检索 hybrid_results = hybrid_retriever.run(query="混合查询")
from haystack.components.rankers import Ranker # RRF重排序 rrf_ranker = Ranker( algorithm="rrf", k=60, # RRF参数 top_k=5 ) # 融合结果 ranked_results = rrf_ranker.run(documents=retrieved_docs)
class DynamicWeightRetriever: def __init__(self, retrievers, query_analyzer): self.retrievers = retrievers self.query_analyzer = query_analyzer def run(self, query): # 分析查询类型 query_type = self.query_analyzer.analyze(query) # 根据查询类型动态调整权重 if query_type["technical"]: weights = [0.3, 0.7] # 向量权重更高 elif query_type["factual"]: weights = [0.7, 0.3] # 关键词权重更高 else: weights = [0.5, 0.5] # 均衡权重 # 执行混合检索 results = hybrid_retriever.run(query, weights=weights) return results
# 批量索引构建 def batch_index_documents(document_store, documents, batch_size=1000): for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] document_store.write_documents(batch) print(f"索引完成 {min(i + batch_size, len(documents))}/{len(documents)}")
from functools import lru_cache @lru_cache(maxsize=1000) def cached_retrieval(query, retriever_type="hybrid"): """带缓存的检索""" if retriever_type == "hybrid": return hybrid_retriever.run(query) elif retriever_type == "bm25": return bm25_retriever.run(query) else: return vector_retriever.run(query)
def optimize_query(query): """查询预处理优化""" # 查询扩展 expanded_query = expand_query_with_synonyms(query) # 查询重构 refined_query = refine_query_syntax(expanded_query) # 长查询分解 if len(query.split()) > 20: refined_query = decompose_long_query(refined_query) return refined_query
from sklearn.metrics import precision_score, recall_score, f1_score def evaluate_retrieval(retrieved_docs, relevant_docs): precision = precision_score(relevant_docs, retrieved_docs) recall = recall_score(relevant_docs, retrieved_docs) f1 = f1_score(relevant_docs, retrieved_docs) return { "precision": precision, "recall": recall, "f1_score": f1 }
import time from datetime import datetime class RetrievalMonitor: def __init__(self): self.metrics = [] def log_retrieval(self, query, results, duration): metric = { "timestamp": datetime.now(), "query": query, "result_count": len(results), "duration": duration, "avg_score": sum(doc.score for doc in results) / len(results) if results else 0 } self.metrics.append(metric) def get_performance_stats(self): if not self.metrics: return None durations = [m["duration"] for m in self.metrics] return { "avg_duration": sum(durations) / len(durations), "max_duration": max(durations), "min_duration": min(durations), "total_queries": len(self.metrics) }
# 分布式检索架构 class DistributedRetriever: def __init__(self, shard_configs): self.shards = [] for config in shard_configs: shard = create_retriever_shard(config) self.shards.append(shard) def run(self, query): # 并行查询各个分片 shard_results = [] for shard in self.shards: result = shard.run(query) shard_results.append(result) # 合并和重排序结果 final_results = merge_and_rank_results(shard_results) return final_results
from roundrobin import RoundRobin class LoadBalancedRetriever: def __init__(self, retrievers): self.retrievers = retrievers self.balancer = RoundRobin() def run(self, query): # 选择可用的检索器 retriever = self.balancer.select(self.retrievers) # 执行检索 result = retriever.run(query) return result
本章节详细讲解了Haystack检索引擎的核心技术,包括向量检索、关键词检索和混合检索策略。通过性能优化和监控机制,可以构建高精度的企业级检索系统。下一章我们将关注生成模块和提示工程,这是RAG系统的输出核心。
关键词:向量检索, BM25, 混合检索, Haystack, 企业级搜索, 性能优化
难度:进阶
预计阅读:75分钟