实战项目 本节导读:从理论到实践,构建完整的工业级RAG应用系统 学习目标 掌握端到端RAG系统架构设计 学会实现多场景RAG应用 理解企业级部署的关键考量 掌握性能监控和优化技术 核心概念 实战项目架构 Haystack实战项目采用模块化、可扩展的架构设计,支持从简单问答到复杂AI代理的多种应用场景。 生产级要求 企业级RAG系统需要满足高可用、高性能、可扩展和安全可靠等关键要求。 基础问答系统 系统架构设计 完整实现代码 使用示例 多轮对话代理 对话代理架构 完整对话系统实现 企业级RAG部署 容器化部署 Kubernetes部署配置 监控和日志 性能优化策略 完整应用示例 本节小结 本章节通过完整的实战项目,从基础问答系统到企业级部署,展示了Haystack框架的完整应用场景。
本节导读:从理论到实践,构建完整的工业级RAG应用系统
Haystack实战项目采用模块化、可扩展的架构设计,支持从简单问答到复杂AI代理的多种应用场景。
企业级RAG系统需要满足高可用、高性能、可扩展和安全可靠等关键要求。
from haystack import Pipeline from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter from haystack.components.converters import TextConverter from haystack.components.embedders import SentenceTransformersTextEmbedder from haystack.components.retrievers import EmbeddingRetriever from haystack.components.generators import OpenAIGenerator # 创建基础问答流水线 basic_qa_pipeline = Pipeline() # 添加文档处理组件 basic_qa_pipeline.add_component("converter", TextConverter()) basic_qa_pipeline.add_component("cleaner", DocumentCleaner()) basic_qa_pipeline.add_component("splitter", DocumentSplitter()) # 添加向量化组件 basic_qa_pipeline.add_component("embedder", SentenceTransformersTextEmbedder()) # 添加检索和生成组件 basic_qa_pipeline.add_component("retriever", EmbeddingRetriever()) basic_qa_pipeline.add_component("generator", OpenAIGenerator()) # 连接组件 basic_qa_pipeline.connect("converter", "cleaner") basic_qa_pipeline.connect("cleaner", "splitter") basic_qa_pipeline.connect("splitter", "embedder") basic_qa_pipeline.connect("embedder", "retriever") basic_qa_pipeline.connect("retriever", "generator")
import os from typing import List, Dict, Any from haystack import Document, Pipeline from haystack.document_stores import FAISSDocumentStore from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter from haystack.components.converters import TextConverter from haystack.components.embedders import SentenceTransformersTextEmbedder from haystack.components.retrievers import EmbeddingRetriever from haystack.components.generators import OpenAIGenerator class BasicQASystem: def __init__(self, model_name="gpt-4", embedding_model="all-MiniLM-L6-v2"): self.model_name = model_name self.embedding_model = embedding_model self.pipeline = None self.document_store = None self._initialize_system() def _initialize_system(self): """初始化系统组件""" # 初始化文档存储 self.document_store = FAISSDocumentStore( embedding_dim=384, faiss_index_factory="Flat", return_embedding=True ) # 创建处理流水线 self.pipeline = self._create_pipeline() def _create_pipeline(self) -> Pipeline: """创建处理流水线""" pipeline = Pipeline() # 文档处理组件 pipeline.add_component("converter", TextConverter()) pipeline.add_component("cleaner", DocumentCleaner()) pipeline.add_component("splitter", DocumentSplitter( split_by="word", split_length=200, split_overlap=20 )) # 向量和检索组件 pipeline.add_component("embedder", SentenceTransformersTextEmbedder( model=self.embedding_model )) pipeline.add_component("retriever", EmbeddingRetriever( document_store=self.document_store, top_k=3 )) # 生成组件 pipeline.add_component("generator", OpenAIGenerator( model=self.model_name, api_key=os.getenv("OPENAI_API_KEY"), generation_kwargs={ "temperature": 0.7, "max_tokens": 500, "top_p": 0.9 } )) # 连接组件 pipeline.connect("converter", "cleaner") pipeline.connect("cleaner", "splitter") pipeline.connect("splitter", "embedder") pipeline.connect("embedder", "retriever") pipeline.connect("retriever", "generator") return pipeline def load_documents(self, document_paths: List[str]): """加载文档""" # 使用转换器处理文档 converter_result = self.pipeline.run({ "converter": {"paths": document_paths} }) # 存储文档 documents = converter_result["converter"]["documents"] self.document_store.write_documents(documents) print(f"已加载 {len(documents)} 个文档") def ask_question(self, question: str) -> Dict[str, Any]: """回答问题""" result = self.pipeline.run({ "generator": {"query": question} }) return { "question": question, "answer": result["generator"]["replies"][0], "context": result["generator"]["meta"][0]["context"], "sources": [ doc.meta.get("source", "unknown") for doc in result["generator"]["meta"][0]["context"] ] } def batch_qa(self, questions: List[str]) -> List[Dict[str, Any]]: """批量问答""" results = [] for question in questions: result = self.ask_question(question) results.append(result) return results
# 初始化系统 qa_system = BasicQASystem() # 加载文档 document_paths = [ "data/company_docs/", "data/product_docs/", "data/technical_docs/" ] qa_system.load_documents(document_paths) # 测试问答 questions = [ "公司的主要产品有哪些?", "如何安装软件?", "技术支持联系方式是什么?" ] results = qa_system.batch_qa(questions) # 输出结果 for result in results: print(f"问题:{result['question']}") print(f"答案:{result['answer']}") print(f"来源:{result['sources']}") print("-" * 50)
from typing import List, Dict, Optional from dataclasses import dataclass @dataclass class ConversationMessage: role: str # "user" or "assistant" content: str timestamp: datetime metadata: Optional[Dict] = None class ConversationAgent: def __init__(self, qa_system, max_history=5): self.qa_system = qa_system self.max_history = max_history self.conversations = {} def add_message(self, conversation_id: str, role: str, content: str): """添加对话消息""" if conversation_id not in self.conversations: self.conversations[conversation_id] = [] message = ConversationMessage( role=role, content=content, timestamp=datetime.now() ) self.conversations[conversation_id].append(message) # 保持历史长度限制 if len(self.conversations[conversation_id]) > self.max_history: self.conversations[conversation_id] = \ self.conversations[conversation_id][-self.max_history:] def get_conversation_history(self, conversation_id: str) -> List[str]: """获取对话历史""" if conversation_id not in self.conversations: return [] history = [] for msg in self.conversations[conversation_id]: history.append(f"{msg.role}: {msg.content}") return history def chat(self, conversation_id: str, user_input: str) -> str: """多轮对话""" # 添加用户输入 self.add_message(conversation_id, "user", user_input) # 构建上下文 context = self._build_conversation_context(conversation_id) # 执行问答 result = self.qa_system.ask_question_with_context( user_input, context ) # 添加助手回复 self.add_message(conversation_id, "assistant", result["answer"]) return result["answer"] def _build_conversation_context(self, conversation_id: str) -> str: """构建对话上下文""" history = self.get_conversation_history(conversation_id) context = "对话历史:\n" + "\n".join(history) return context
class AdvancedConversationSystem: def __init__(self, qa_system): self.qa_system = qa_system self.conversation_agent = ConversationAgent(qa_system) self.user_profiles = {} def start_conversation(self, user_id: str, initial_message: str = None): """开始新对话""" conversation_id = f"conv_{user_id}_{datetime.now().timestamp()}" if initial_message: self.conversation_agent.add_message( conversation_id, "user", initial_message ) self.user_profiles[user_id] = { "conversation_id": conversation_id, "start_time": datetime.now(), "message_count": 0 } return conversation_id def send_message(self, user_id: str, message: str) -> Dict[str, Any]: """发送消息""" profile = self.user_profiles.get(user_id) if not profile: conversation_id = self.start_conversation(user_id, message) else: conversation_id = profile["conversation_id"] # 执行对话 response = self.conversation_agent.chat(conversation_id, message) # 更新用户资料 if profile: profile["message_count"] += 1 else: profile = self.user_profiles[user_id] profile["message_count"] = 1 return { "user_id": user_id, "message": message, "response": response, "conversation_id": conversation_id, "timestamp": datetime.now() } def get_conversation_summary(self, user_id: str) -> Dict[str, Any]: """获取对话摘要""" profile = self.user_profiles.get(user_id) if not profile: return {"error": "用户不存在"} conversation_id = profile["conversation_id"] messages = self.conversation_agent.conversations.get(conversation_id, []) return { "user_id": user_id, "conversation_id": conversation_id, "start_time": profile["start_time"], "message_count": profile["message_count"], "messages": [ { "role": msg.role, "content": msg.content, "timestamp": msg.timestamp } for msg in messages ] }
# Dockerfile FROM python:3.9-slim # 安装系统依赖 RUN apt-get update && apt-get install -y \ build-essential \ curl \ && rm -rf /var/lib/apt/lists/* # 设置工作目录 WORKDIR /app # 复制依赖文件 COPY requirements.txt . # 安装Python依赖 RUN pip install --no-cache-dir -r requirements.txt # 复制应用代码 COPY . . # 设置环境变量 ENV PYTHONPATH=/app ENV HAYSTACK_API_PROXY=http://localhost:8000 # 暴露端口 EXPOSE 8000 # 启动命令 CMD ["gunicorn", "--bind", "0.0.0.0:8000", "app:app"]
# deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: haystack-rag-system spec: replicas: 3 selector: matchLabels: app: haystack-rag template: metadata: labels: app: haystack-rag spec: containers: - name: haystack-service image: your-registry/haystack-rag:latest ports: - containerPort: 8000 env: - name: OPENAI_API_KEY valueFrom: secretKeyRef: name: api-keys key: openai-key - name: DATABASE_URL valueFrom: secretKeyRef: name: database key: url resources: requests: memory: "2Gi" cpu: "1000m" limits: memory: "4Gi" cpu: "2000m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10
import logging from prometheus_client import Counter, Histogram, generate_latest from datetime import datetime # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/app/logs/haystack.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Prometheus指标 REQUEST_COUNT = Counter('haystack_requests_total', 'Total requests') REQUEST_DURATION = Histogram('haystack_request_duration_seconds', 'Request duration') class HaystackMonitor: def __init__(self): self.metrics = { "requests": REQUEST_COUNT, "duration": REQUEST_DURATION, "error_rate": Counter('haystack_errors_total', 'Total errors'), "memory_usage": Counter('haystack_memory_usage_bytes', 'Memory usage') } @REQUEST_DURATION.time() def log_request(self, endpoint, method, status_code, duration): """记录请求指标""" self.metrics["requests"].inc() if status_code >= 400: self.metrics["error_rate"].inc() logger.info(f"Request: {method} {endpoint} - {status_code} - {duration:.2f}s") def log_memory_usage(self): """记录内存使用""" import psutil memory_info = psutil.Process().memory_info() self.metrics["memory_usage"].inc(memory_info.rss) logger.info(f"Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB")
class PerformanceOptimizer: def __init__(self, qa_system): self.qa_system = qa_system self.cache = {} self.performance_stats = { "cache_hits": 0, "cache_misses": 0, "avg_response_time": 0, "total_requests": 0 } def optimize_response(self, query: str) -> Dict[str, Any]: """优化响应性能""" start_time = time.time() # 检查缓存 if query in self.cache: self.performance_stats["cache_hits"] += 1 result = self.cache[query] # 更新性能统计 duration = time.time() - start_time self._update_performance_stats(duration, True) return result # 缓存未命中,执行查询 self.performance_stats["cache_misses"] += 1 result = self.qa_system.ask_question(query) # 更新缓存 self.cache[query] = result # 更新性能统计 duration = time.time() - start_time self._update_performance_stats(duration, False) return result def _update_performance_stats(self, duration: float, is_cache_hit: bool): """更新性能统计""" self.performance_stats["total_requests"] += 1 # 更新平均响应时间 current_avg = self.performance_stats["avg_response_time"] total_requests = self.performance_stats["total_requests"] self.performance_stats["avg_response_time"] = \ (current_avg * (total_requests - 1) + duration) / total_requests # 清理缓存(保持最近1000个查询) if len(self.cache) > 1000: oldest_key = next(iter(self.cache)) del self.cache[oldest_key] def get_performance_report(self) -> Dict[str, Any]: """获取性能报告""" hit_rate = (self.performance_stats["cache_hits"] / (self.performance_stats["cache_hits"] + self.performance_stats["cache_misses"]) * 100) return { "total_requests": self.performance_stats["total_requests"], "cache_hits": self.performance_stats["cache_hits"], "cache_misses": self.performance_stats["cache_misses"], "hit_rate": hit_rate, "avg_response_time": self.performance_stats["avg_response_time"], "cache_size": len(self.cache) }
def main(): """主函数 - 完整RAG应用""" # 1. 初始化系统 print("初始化RAG系统...") qa_system = BasicQASystem() # 2. 加载文档 print("加载文档...") document_paths = [ "data/company_annual_report_2023.pdf", "data/product_manuals/", "data/technical_specifications/" ] qa_system.load_documents(document_paths) # 3. 创建对话系统 print("创建对话系统...") conversation_system = AdvancedConversationSystem(qa_system) # 4. 启动Web服务 print("启动Web服务...") app = create_flask_app(conversation_system) if __name__ == "__main__": app.run(host="0.0.0.0", port=8000, debug=True) def create_flask_app(conversation_system): """创建Flask应用""" from flask import Flask, request, jsonify app = Flask(__name__) @app.route('/chat', methods=['POST']) def chat(): """聊天接口""" data = request.json user_id = data.get('user_id') message = data.get('message') if not user_id or not message: return jsonify({"error": "缺少必要参数"}), 400 try: result = conversation_system.send_message(user_id, message) return jsonify(result) except Exception as e: logger.error(f"Chat error: {e}") return jsonify({"error": str(e)}), 500 @app.route('/health', methods=['GET']) def health(): """健康检查""" return jsonify({"status": "healthy"}) @app.route('/metrics', methods=['GET']) def metrics(): """性能指标""" performance_report = conversation_system.performance_optimizer.get_performance_report() return jsonify(performance_report) return app
本章节通过完整的实战项目,从基础问答系统到企业级部署,展示了Haystack框架的完整应用场景。通过性能优化和监控机制,可以构建稳定可靠的企业级RAG系统。至此,我们已经完成了Haystack搭一条工业级RAG流水线的全部内容。
关键词:实战项目, 企业级部署, 对话代理, 性能优化, Haystack
难度:高级
预计阅读:120分钟