RAG 系统生产实践:从原型到生产环境


文档摘要

RAG 系统生产实践:从原型到生产环境 RAG 生产化概述 从原型到生产 原型阶段: 验证可行性 快速迭代 小规模测试 生产阶段: 高可用性 可扩展性 性能优化 成本控制 架构设计 整体架构 数据流水线 文档处理 文档切分策略 元数据提取 向量存储优化 分片策略 异步索引 检索优化 查询理解 混合检索 生成优化 提示词模板 上下文压缩 性能监控 指标收集 质量评估 成本优化 缓存策略 批处理 故障处理 重试机制 降级策略 \ python

RAG 系统生产实践:从原型到生产环境

RAG 生产化概述

从原型到生产

原型阶段

  • 验证可行性
  • 快速迭代
  • 小规模测试

生产阶段

  • 高可用性
  • 可扩展性
  • 性能优化
  • 成本控制

架构设计

1. 整体架构

# RAG 系统组件 class ProductionRAG: def __init__(self): # 文档处理 self.ingestor = DocumentIngestor() self.chunker = DocumentChunker() self.embedder = EmbeddingModel() # 向量存储 self.vector_store = VectorStore() # LLM self.llm = LLM() # 缓存 self.cache = RedisCache() # 监控 self.metrics = MetricsCollector()

2. 数据流水线

class DataPipeline: def __init__(self): self.extractor = TextExtractor() self.cleaner = TextCleaner() self.splitter = DocumentSplitter() self.embedder = EmbeddingModel() self.vector_store = VectorStore() def process(self, documents): """处理文档并索引""" for doc in documents: # 提取文本 text = self.extractor.extract(doc) # 清洗文本 text = self.cleaner.clean(text) # 分块 chunks = self.splitter.split(text) # 嵌入 embeddings = self.embedder.embed_batch(chunks) # 存储到向量数据库 self.vector_store.add_batch(chunks, embeddings)

文档处理

1. 文档切分策略

class SmartChunker: def __init__(self): self.chunk_size = 1000 self.overlap = 200 def split(self, document): """智能切分""" # 按段落切分 paragraphs = self.split_by_paragraph(document) chunks = [] current_chunk = [] current_length = 0 for para in paragraphs: para_length = len(para) if current_length + para_length > self.chunk_size: if current_chunk: chunks.append("\n".join(current_chunk)) # 处理跨段落内容 current_chunk = list(current_chunk[-self.overlap//200:]) current_length = sum(len(p) for p in current_chunk) else: current_chunk.append(para) current_length += para_length if current_chunk: chunks.append("\n".join(current_chunk)) return chunks def split_by_paragraph(self, document): """按段落切分""" # 识别段落标题 import re pattern = r'\n(?=#{1,3}\s)' parts = re.split(pattern, document) return parts

2. 元数据提取

class MetadataExtractor: def extract(self, document): """提取文档元数据""" metadata = { 'title': self.extract_title(document), 'author': self.extract_author(document), 'date': self.extract_date(document), 'tags': self.extract_tags(document), 'category': self.classify_category(document) } return metadata def extract_title(self, document): # 使用 LLM 提取标题 prompt = f""提取文档的标题:\n{document[:500]}...""" return self.llm.generate(prompt) def classify_category(self, document): """分类文档""" categories = ["技术", "商业", "科学", "艺术"] prompt = f""" 将以下文档分类到以下类别之一:{', '.join(categories)} 文档:{document[:500]}... 只返回类别名称。 """ return self.llm.generate(prompt)

向量存储优化

1. 分片策略

class ShardedVectorStore: def __init__(self, num_shards=10): self.shards = [ VectorStore(f"shard_{i}") for i in range(num_shards) ] def add(self, document, embedding): """添加到分片""" shard_id = hash(document['id']) % len(self.shards) self.shards[shard_id].add(document, embedding) def search(self, embedding, top_k=5): """搜索所有分片""" results = [] for shard in self.shards: shard_results = shard.search(embedding, top_k=top_k) results.extend(shard_results) # 重排序并返回 top_k results.sort(key=lambda x: x['score'], reverse=True) return results[:top_k]

2. 异步索引

import asyncio class AsyncIndexer: def __init__(self, embedder, vector_store): self.embedder = embedder self.vector_store = vector_store self.queue = asyncio.Queue() async def process_queue(self): """异步处理索引队列""" while True: batch = await self.get_batch() # 并行嵌入 embeddings = await self.embedder.embed_async(batch) # 批量添加 await self.vector_store.add_async(batch, embeddings) await asyncio.sleep(0.1) async def get_batch(self, batch_size=100): """获取一批文档""" batch = [] for _ in range(batch_size): try: doc = self.queue.get_nowait() batch.append(doc) except asyncio.QueueEmpty: break return batch

检索优化

1. 查询理解

class QueryUnderstanding: def __init__(self, llm): self.llm = llm def expand_query(self, query): """查询扩展""" prompt = f""` 原始查询:{query} 请生成 3-5 个相关的查询词,用于提高召回率。 每行一个查询词。 """ response = self.llm.generate(prompt) return response.strip().split("\n") def rewrite_query(self, query): """查询重写""" prompt = f""` 重写以下查询,使其更清晰、更具体: 原始查询:{query} 重写后的查询: """ return self.llm.generate(prompt)

2. 混合检索

class HybridRetriever: def __init__(self, vector_store, keyword_index, llm): self.vector_retriever = VectorRetriever(vector_store) self.keyword_retriever = KeywordRetriever(keyword_index) self.reranker = Reranker(llm) def retrieve(self, query, top_k=5): # 并行检索 import concurrent.futures with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: future_vector = executor.submit( self.vector_retriever.retrieve, query, top_k * 2 ) future_keyword = executor.submit( self.keyword_retriever.retrieve, query, top_k * 2 ) vector_results = future_vector.result() keyword_results = future_keyword.result() # 融合结果 combined = self.merge(vector_results, keyword_results) # 重排序 reranked = self.reranker.rerank(query, combined, top_k) return reranked

生成优化

1. 提示词模板

class PromptTemplates: SYSTEM_PROMPT = """你是一个专业的问答助手。 请根据以下参考信息回答用户的问题。 参考信息: {context} 用户问题: {question} 答案要求: 1. 基于参考信息回答 2. 如果参考信息不足,明确说明 3. 使用清晰的结构 4. 引用来源 """ QA_PROMPT = """请回答以下问题: {question} 参考信息: {context} 要求: - 准确、简洁 - 引用来源 - 如果不确定,明确说明 """

2. 上下文压缩

class ContextCompressor: def compress(self, contexts, max_tokens=3000): """压缩上下文以适应 token 限制""" # 计算总 token 数 total_tokens = sum(self.count_tokens(ctx) for ctx in contexts) if total_tokens <= max_tokens: return contexts # 选择最重要的上下文 scored_contexts = [] for ctx in contexts: score = self.score_relevance(ctx, self.query) scored_contexts.append((score, ctx)) # 按相关性排序并选择 scored_contexts.sort(key=lambda x: x[0], reverse=True) selected = [] current_tokens = 0 for score, ctx in scored_contexts: ctx_tokens = self.count_tokens(ctx) if current_tokens + ctx_tokens <= max_tokens: selected.append(ctx) current_tokens += ctx_tokens else: break return selected

性能监控

1. 指标收集

class RAGMetrics: def __init__(self): self.metrics = { 'query_count': 0, 'retrieval_time': [], 'generation_time': [], 'total_time': [], 'cache_hit_rate': 0 } def record_query(self, retrieval_time, generation_time, cache_hit): self.metrics['query_count'] += 1 self.metrics['retrieval_time'].append(retrieval_time) self.metrics['generation_time'].append(generation_time) self.metrics['total_time'].append(retrieval_time + generation_time) if cache_hit: self.metrics['cache_hit_rate'] += 1 def get_stats(self): """获取统计信息""" from statistics import mean return { 'total_queries': self.metrics['query_count'], 'avg_retrieval_time': mean(self.metrics['retrieval_time']), 'avg_generation_time': mean(self.metrics['generation_time']), 'avg_total_time': mean(self.metrics['total_time']), 'cache_hit_rate': self.metrics['cache_hit_rate'] / self.metrics['query_count'] }

2. 质量评估

class QualityMetrics: def __init__(self, llm): self.llm = llm def evaluate_relevance(self, question, answer, retrieved_docs): """评估相关性""" prompt = f""" 问题:{question} 答案:{answer} 检索到的文档: {self.format_docs(retrieved_docs)} 请评估答案与问题的相关性(0-1 分)。 只返回分数。 """ return float(self.llm.generate(prompt)) def evaluate_faithfulness(self, answer, retrieved_docs): """评估忠实度""" prompt = f""" 检索到的文档: {self.format_docs(retrieved_docs)} 答案:{answer} 答案是否完全基于检索到的文档?如果有内容是检索文档中没有的,请说明。 """ return self.llm.generate(prompt)

成本优化

1. 缓存策略

class RAGCache: def __init__(self, redis_client): self.redis = redis_client self.query_cache_ttl = 3600 # 1小时 self.doc_cache_ttl = 86400 # 24小时 def get_cached_answer(self, query_hash): """获取缓存的答案""" return self.redis.get(f"answer:{query_hash}") def cache_answer(self, query_hash, answer): """缓存答案""" self.redis.setex( f"answer:{query_hash}", self.query_cache_ttl, answer ) def get_cached_docs(self, query_hash): """获取缓存的文档""" return self.redis.get(f"docs:{query_hash}") def cache_docs(self, query_hash, docs): """缓存的文档""" import json self.redis.setex( f"docs:{query_hash}", self.doc_cache_ttl, json.dumps(docs) )

2. 批处理

class BatchProcessor: def __init__(self, batch_size=10, timeout=5.0): self.batch_size = batch_size self.timeout = timeout self.queue = [] def add_query(self, query): """添加查询到批次""" self.queue.append({ 'query': query, 'timestamp': time.time() }) if len(self.queue) >= self.batch_size: return self.process_batch() return None def process_batch(self): """处理批次""" batch = self.queue[:] self.queue = [] # 批量检索 queries = [item['query'] for item in batch] results = self.vector_store.batch_search(queries) # 返回结果 return dict(zip([item['query'] for item in batch], results))

故障处理

1. 重试机制

from tenacity import retry, stop_after_attempt, wait_exponential class RobustRAG: @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) def retrieve_with_retry(self, query, top_k=5): return self.retriever.retrieve(query, top_k)

2. 降级策略

``python


发布者: 作者: 转发
评论区 (0)
U