RAG系统构建完全指南


文档摘要

RAG系统构建完全指南 检索增强生成(RAG)结合了大模型的知识能力和外部数据的准确性,本文提供从零构建生产级RAG系统的完整指南。 RAG架构设计 基本流程 高级架构 文档处理 文档切分策略 智能切分 向量化与检索 嵌入模型选择 混合检索 重排序 Cross-Encoder重排序 上下文优化 上下文压缩 动态上下文选择 生成优化 提示词工程 引用生成 评估与优化 评估指标 A/B测试 生产部署 性能优化 监控 通过系统化的构建和优化,RAG系统可以提供准确、可靠的问答服务,成为企业知识管理的核心工具。

RAG系统构建完全指南

检索增强生成(RAG)结合了大模型的知识能力和外部数据的准确性,本文提供从零构建生产级RAG系统的完整指南。

1. RAG架构设计

基本流程

用户查询 → 检索相关文档 → 生成回答 → 返回结果 ↓ 向量数据库

高级架构

用户查询 ↓ 查询理解(重写、扩展) ↓ 混合检索 ├─ 向量检索(语义) ├─ 关键词检索(BM25) └─ 知识图谱检索 ↓ 结果重排序 ↓ 上下文构建 ↓ LLM生成 ↓ 答案验证 ↓ 返回结果 + 引用

2. 文档处理

文档切分策略

# 递归字符切分 from langchain.text_splitter import RecursiveCharacterTextSplitter splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # 块大小 chunk_overlap=200, # 重叠大小 separators=["\n\n", "\n", "。", " ", ""] ) chunks = splitter.split_text(long_document)

智能切分

# 保持语义完整 def semantic_split(text): sentences = sent_tokenize(text) chunks = [] current_chunk = [] current_size = 0 for sent in sentences: if current_size + len(sent) > 1000: chunks.append(" ".join(current_chunk)) current_chunk = [sent] current_size = len(sent) else: current_chunk.append(sent) current_size += len(sent) return chunks

3. 向量化与检索

嵌入模型选择

# OpenAI Embedding from openai import OpenAI client = OpenAI() def embed(text): return client.embeddings.create( model="text-embedding-3-large", input=text ).data[0].embedding # 本地模型(更经济) from sentence_transformers import SentenceTransformer model = SentenceTransformer("BAAI/bge-large-zh-v1.5") def embed_local(text): return model.encode(text)

混合检索

# BM25 + 向量检索 from rank_bm25 import BM25Okapi import faiss class HybridRetriever: def __init__(self, documents): # BM25索引 self.bm25 = BM25Okapi([d.split() for d in documents]) # 向量索引 embeddings = [embed(d) for d in documents] self.index = faiss.IndexFlatL2(len(embeddings[0])) self.index.add(np.array(embeddings)) self.documents = documents def retrieve(self, query, k=10): # 关键词检索 tokenized_query = query.split() bm25_scores = self.bm25.get_scores(tokenized_query) # 向量检索 query_vector = embed(query) vector_distances, _ = self.index.search( np.array([query_vector]), k ) # 融合分数 combined_scores = [] for i in range(len(self.documents)): combined = ( 0.5 * bm25_scores[i] + 0.5 * (1 / (1 + vector_distances[0][i])) ) combined_scores.append((i, combined)) # 返回top-k top_k = sorted(combined_scores, key=lambda x: -x[1])[:k] return [self.documents[i] for i, _ in top_k]

4. 重排序

Cross-Encoder重排序

from sentence_transformers import CrossEncoder # 加载重排序模型 reranker = CrossEncoder("BAAI/bge-reranker-large") def rerank_documents(query, documents, top_k=5): # 计算查询-文档相关性 pairs = [[query, doc] for doc in documents] scores = reranker.predict(pairs) # 排序并返回top-k results = list(zip(documents, scores)) results.sort(key=lambda x: -x[1]) return [doc for doc, _ in results[:top_k]]

5. 上下文优化

上下文压缩

# 只保留相关信息 from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import LLMChainExtractor compressor = LLMChainExtractor.from_llm(llm) compression_retriever = ContextualCompressionRetriever( base_compressor=compressor, base_retriever=retriever ) compressed_docs = compression_retriever.get_relevant_documents(query)

动态上下文选择

# 根据查询复杂度调整上下文 def adaptive_context_retrieval(query): complexity = estimate_complexity(query) if complexity == "simple": # 简单查询:top-3文档 k = 3 elif complexity == "medium": # 中等查询:top-7文档 k = 7 else: # 复杂查询:top-15文档 k = 15 return retrieve_and_rerank(query, k=k)

6. 生成优化

提示词工程

# RAG提示词模板 RAG_TEMPLATE = """ 基于以下上下文回答问题。如果上下文中没有相关信息,请明确说明。 上下文: {context} 问题:{question} 回答: """ def generate_answer(query, retrieved_docs): # 构建上下文 context = "\n\n".join(retrieved_docs) # 生成回答 prompt = RAG_TEMPLATE.format( context=context, question=query ) response = llm.complete(prompt) return response

引用生成

# 添加文档引用 def generate_with_citations(query, retrieved_docs): context_with_refs = [] for i, doc in enumerate(retrieved_docs): context_with_refs.append( f"[{i+1}] {doc}" ) prompt = f""" 上下文: {chr(10).join(context_with_refs)} 问题:{query} 回答时请注明引用,例如:根据[1],... """ return llm.complete(prompt)

7. 评估与优化

评估指标

# RAGAS评估框架 from ragas import evaluate from ragas.metrics import ( faithfulness, answer_relevancy, context_precision ) result = evaluate( dataset=test_dataset, metrics=[ faithfulness, # 忠实度 answer_relevancy, # 相关性 context_precision # 上下文精确度 ] ) print(result)

A/B测试

# 对比不同策略 def ab_test(query, strategy_a, strategy_b): answer_a = strategy_a(query) answer_b = strategy_b(query) # 人工或自动评估 score_a = evaluate_quality(answer_a, query) score_b = evaluate_quality(answer_b, query) return { "strategy_a": score_a, "strategy_b": score_b, "winner": "A" if score_a > score_b else "B" }

8. 生产部署

性能优化

# 批量处理 def batch_embed(texts, batch_size=32): embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] embeddings.extend(model.encode(batch)) return embeddings # 缓存 from functools import lru_cache @lru_cache(maxsize=1000) def cached_embed(text): return embed(text)

监控

# 跟踪关键指标 class RAGMonitor: def __init__(self): self.metrics = { "retrieval_time": [], "generation_time": [], "total_time": [], "relevance_scores": [] } def log(self, retrieval_time, generation_time, relevance): self.metrics["retrieval_time"].append(retrieval_time) self.metrics["generation_time"].append(generation_time) self.metrics["total_time"].append(retrieval_time + generation_time) self.metrics["relevance_scores"].append(relevance) def report(self): return { "avg_retrieval_time": np.mean(self.metrics["retrieval_time"]), "avg_generation_time": np.mean(self.metrics["generation_time"]), "avg_relevance": np.mean(self.metrics["relevance_scores"]) }

通过系统化的构建和优化,RAG系统可以提供准确、可靠的问答服务,成为企业知识管理的核心工具。


发布者: 作者: 转发
评论区 (0)
U