3. 检索引擎


文档摘要

检索引擎 本节导读:深入理解RAG检索核心技术,构建高效精准的文档检索系统 学习目标 掌握向量检索和关键词检索的原理与实现 理解混合检索策略的设计和优化 学会评估检索性能和调优方法 构建高精度的企业级检索引擎 核心概念 检索系统架构 Haystack检索引擎采用多层架构设计,支持多种检索算法和优化策略,确保检索结果的准确性和响应速度。 语义与关键词的协同 传统关键词检索与语义向量检索的优势互补,实现更全面的信息覆盖和更高的相关性判断。

3. 检索引擎

本节导读:深入理解RAG检索核心技术,构建高效精准的文档检索系统

学习目标

  • 掌握向量检索和关键词检索的原理与实现
  • 理解混合检索策略的设计和优化
  • 学会评估检索性能和调优方法
  • 构建高精度的企业级检索引擎

核心概念

检索系统架构

Haystack检索引擎采用多层架构设计,支持多种检索算法和优化策略,确保检索结果的准确性和响应速度。

语义与关键词的协同

传统关键词检索与语义向量检索的优势互补,实现更全面的信息覆盖和更高的相关性判断。

向量检索

向量化基础

from haystack.components.embedders import SentenceTransformersTextEmbedder # 文本向量化 embedder = SentenceTransformersTextEmbedder( model="sentence-transformers/all-MiniLM-L6-v2", device="cpu" ) # 生成嵌入向量 embeddings = embedder.run(texts=["查询文本"])

向量存储配置

from haystack.document_stores import FAISSDocumentStore # FAISS存储配置 faiss_store = FAISSDocumentStore( embedding_dim=384, # 匹配模型维度 faiss_index_factory="Flat", # 索引类型 return_embedding=True, # 返回向量 similarity="cosine" # 相似度计算 )

相似度计算方法

方法 公式 适用场景
余弦相似度 cos(θ) = A·B/( A
欧氏距离 d = sqrt(∑(x_i-y_i)²) 距离计算
内积相似度 sim = A·B 效率优先

向量检索实现

from haystack.components.retrievers import EmbeddingRetriever # 向量检索器 retriever = EmbeddingRetriever( document_store=faiss_store, top_k=5, # 返回前5个结果 scale_score=True, # 分数缩放 ) # 执行检索 retrieved_docs = retriever.run(query="用户查询")

关键词检索

BM25算法原理

from haystack.components.retrievers import BM25Retriever # BM25检索器 bm25_retriever = BM25Retriever(document_store=document_store) # 参数配置 bm25_retriever = BM25Retriever( document_store=document_store, top_k=10, all_terms_must_match=False, # 是否要求所有词匹配 )

关键词预处理

from haystack.components.preprocessors import TextPreprocessor # 文本预处理 preprocessor = TextPreprocessor( split_by="word", split_length=None, split_overlap=0, remove_punctuation=True, remove_numbers=False, remove_stopwords=True, stemming=True, lowercase=True )

模糊搜索支持

# 模糊BM25检索 fuzzy_bm25 = BM25Retriever( document_store=document_store, top_k=10, fuzzy=True, # 启用模糊匹配 fuzziness=2, # 模糊程度 )

混合检索策略

线性组合策略

from haystack.components.retrievers import HybridRetriever # 混合检索器 hybrid_retriever = HybridRetriever( document_store=document_store, retrievers=[bm25_retriever, vector_retriever], weights=[0.6, 0.4] # 权重分配 ) # 执行混合检索 hybrid_results = hybrid_retriever.run(query="混合查询")

Reciprocal Rank Fusion (RRF)

from haystack.components.rankers import Ranker # RRF重排序 rrf_ranker = Ranker( algorithm="rrf", k=60, # RRF参数 top_k=5 ) # 融合结果 ranked_results = rrf_ranker.run(documents=retrieved_docs)

动态权重策略

class DynamicWeightRetriever: def __init__(self, retrievers, query_analyzer): self.retrievers = retrievers self.query_analyzer = query_analyzer def run(self, query): # 分析查询类型 query_type = self.query_analyzer.analyze(query) # 根据查询类型动态调整权重 if query_type["technical"]: weights = [0.3, 0.7] # 向量权重更高 elif query_type["factual"]: weights = [0.7, 0.3] # 关键词权重更高 else: weights = [0.5, 0.5] # 均衡权重 # 执行混合检索 results = hybrid_retriever.run(query, weights=weights) return results

检索性能优化

索引优化

# 批量索引构建 def batch_index_documents(document_store, documents, batch_size=1000): for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] document_store.write_documents(batch) print(f"索引完成 {min(i + batch_size, len(documents))}/{len(documents)}")

缓存策略

from functools import lru_cache @lru_cache(maxsize=1000) def cached_retrieval(query, retriever_type="hybrid"): """带缓存的检索""" if retriever_type == "hybrid": return hybrid_retriever.run(query) elif retriever_type == "bm25": return bm25_retriever.run(query) else: return vector_retriever.run(query)

查询优化

def optimize_query(query): """查询预处理优化""" # 查询扩展 expanded_query = expand_query_with_synonyms(query) # 查询重构 refined_query = refine_query_syntax(expanded_query) # 长查询分解 if len(query.split()) > 20: refined_query = decompose_long_query(refined_query) return refined_query

性能评估与监控

评估指标

from sklearn.metrics import precision_score, recall_score, f1_score def evaluate_retrieval(retrieved_docs, relevant_docs): precision = precision_score(relevant_docs, retrieved_docs) recall = recall_score(relevant_docs, retrieved_docs) f1 = f1_score(relevant_docs, retrieved_docs) return { "precision": precision, "recall": recall, "f1_score": f1 }

实时监控

import time from datetime import datetime class RetrievalMonitor: def __init__(self): self.metrics = [] def log_retrieval(self, query, results, duration): metric = { "timestamp": datetime.now(), "query": query, "result_count": len(results), "duration": duration, "avg_score": sum(doc.score for doc in results) / len(results) if results else 0 } self.metrics.append(metric) def get_performance_stats(self): if not self.metrics: return None durations = [m["duration"] for m in self.metrics] return { "avg_duration": sum(durations) / len(durations), "max_duration": max(durations), "min_duration": min(durations), "total_queries": len(self.metrics) }

企业级部署优化

分布式检索

# 分布式检索架构 class DistributedRetriever: def __init__(self, shard_configs): self.shards = [] for config in shard_configs: shard = create_retriever_shard(config) self.shards.append(shard) def run(self, query): # 并行查询各个分片 shard_results = [] for shard in self.shards: result = shard.run(query) shard_results.append(result) # 合并和重排序结果 final_results = merge_and_rank_results(shard_results) return final_results

负载均衡

from roundrobin import RoundRobin class LoadBalancedRetriever: def __init__(self, retrievers): self.retrievers = retrievers self.balancer = RoundRobin() def run(self, query): # 选择可用的检索器 retriever = self.balancer.select(self.retrievers) # 执行检索 result = retriever.run(query) return result

本节小结

本章节详细讲解了Haystack检索引擎的核心技术,包括向量检索、关键词检索和混合检索策略。通过性能优化和监控机制,可以构建高精度的企业级检索系统。下一章我们将关注生成模块和提示工程,这是RAG系统的输出核心。

延伸阅读

关键词:向量检索, BM25, 混合检索, Haystack, 企业级搜索, 性能优化
难度:进阶
预计阅读:75分钟


发布者: 作者: 转发
评论区 (0)
U