2.2 流式输出技术


文档摘要

2.2 流式输出技术 本节导读:通过本节学习,你将掌握LangGraph的流式输出技术,理解如何在长时间运行任务中实时输出中间结果,提升用户体验和系统可观测性。 学习目标 理解流式输出的概念和重要性 掌握不同流式模式的使用方法 学会实现实时响应和进度反馈 能够优化流式输出的性能 理解流式输出的错误处理机制 核心概念 什么是流式输出? 流式输出(Streaming)是LangGraph提供的实时数据流传输机制,它允许在任务执行过程中逐步返回中间结果,而不是等待整个任务完成。

2.2 流式输出技术

本节导读:通过本节学习,你将掌握LangGraph的流式输出技术,理解如何在长时间运行任务中实时输出中间结果,提升用户体验和系统可观测性。

学习目标

  • 理解流式输出的概念和重要性
  • 掌握不同流式模式的使用方法
  • 学会实现实时响应和进度反馈
  • 能够优化流式输出的性能
  • 理解流式输出的错误处理机制

核心概念

什么是流式输出?

流式输出(Streaming)是LangGraph提供的实时数据流传输机制,它允许在任务执行过程中逐步返回中间结果,而不是等待整个任务完成。

流式输出的核心价值

流式输出为智能体提供了以下关键能力:

  1. 实时响应:用户可以立即看到进展,减少等待焦虑
  2. 进度反馈:提供详细的执行进度信息
  3. 可观测性:便于调试和监控长时间运行的任务
  4. 资源优化:避免内存中积压大量结果数据
  5. 用户体验:提供更好的交互体验

流式模式类型

LangGraph支持多种流式输出模式:

  1. values模式:输出完整的中间状态
  2. messages模式:输出消息流
  3. custom模式:自定义流式输出格式
# 不同流式模式的对比 stream_modes = { "values": "输出完整状态对象", "messages": "输出消息流", "custom": "自定义输出格式" }

环境准备 / 前置知识

依赖安装

# 基础LangGraph安装 pip install -U langgraph langchain langchain-openai # 流式处理依赖 pip install -U aiohttp asyncio # 可选:LangSmith调试工具 pip install -U langsmith

前置要求

  • 理Python异步编程(async/await)
  • 掌握生成器(generator)概念
  • 了解流式数据处理基础
  • 熟悉回调机制

分步实战

步骤 1:基础流式输出实现

from langgraph.graph import StateGraph, START, END from langchain_openai import ChatOpenAI from typing import TypedDict, Annotated, Sequence from langgraph.graph import add_messages import time import uuid class StreamingState(TypedDict): messages: Annotated[Sequence[dict], add_messages] current_step: str progress_data: dict is_complete: bool def analysis_with_streaming(state: StreamingState): """支持流式输出的数据分析节点""" llm = ChatOpenAI(model="gpt-4-turbo") # 模拟逐步分析过程 analysis_steps = [ "开始数据分析...", "正在收集原始数据...", "清洗和预处理数据...", "识别数据模式...", "计算统计指标...", "生成分析报告...", "完成分析" ] messages = [] for step in analysis_steps: print(f"📊 {step}") # 模拟处理延迟 time.sleep(1) # 实时生成响应 response = llm.invoke([ {"role": "system", "content": "你是数据分析专家,逐步报告你的分析过程"}, {"role": "user", "content": f"{step}: 分析销售数据"} ]) messages.append(response) # 返回中间状态 yield { "messages": messages, "current_step": step, "progress_data": { "completed_steps": len(messages), "total_steps": len(analysis_steps), "progress_percent": int(len(messages) / len(analysis_steps) * 100) }, "is_complete": len(messages) == len(analysis_steps) } # 最终结果 final_result = { "messages": messages, "current_step": "analysis_complete", "progress_data": { "completed_steps": len(analysis_steps), "total_steps": len(analysis_steps), "progress_percent": 100, "summary": "数据分析完成" }, "is_complete": True } yield final_result # 构建流式图 streaming_graph = StateGraph(StreamingState) streaming_graph.add_node("analysis", analysis_with_streaming) streaming_graph.add_edge(START, "analysis") streaming_graph.add_edge("analysis", END) # 编译流式图 compiled_streaming_graph = streaming_graph.compile() # 测试基础流式输出 print("=== 基础流式输出测试 ===") initial_state = { "messages": [], "current_step": "start", "progress_data": {}, "is_complete": False } # 使用流式模式执行 for chunk in compiled_streaming_graph.stream(initial_state, stream_mode="values"): print(f"当前进度: {chunk['progress_data']['progress_percent']}%") print(f"当前步骤: {chunk['current_step']}") print("-" * 40)

步骤 2:消息流模式实现

class ChatStreamingState(TypedDict): messages: Annotated[Sequence[dict], add_messages] session_id: str response_complete: bool streaming_tokens: list def chat_with_streaming(state: ChatStreamingState): """支持消息流的聊天节点""" llm = ChatOpenAI(model="gpt-4-turbo") user_message = state["messages"][-1]["content"] print(f"🤖 用户: {user_message}") print("🤖 助手: ", end="", flush=True) # 模拟逐步响应 response_parts = [ "我理解您的问题。让我为您详细分析...", "根据您的描述,", "这个问题涉及到多个方面的考虑。", "首先,", "从技术角度来看,", "解决方案应该包括以下几个方面:", "1. 基础架构优化", "2. 性能调优", "3. 监控和日志完善", "4. 错误处理机制改进", "建议您按照这个顺序逐步实施。", "如果您需要更详细的指导,请随时告诉我。" ] accumulated_response = "" messages = [] for part in response_parts: # 模拟处理延迟 time.sleep(0.5) accumulated_response += part print(part, end="", flush=True) # 实时生成消息 full_message = { "role": "assistant", "content": accumulated_response } messages.append(full_message) # 流式输出消息 yield { "messages": messages, "session_id": state["session_id"], "response_complete": len(response_parts) == 1, "streaming_tokens": accumulated_response.split() } print() # 换行 # 最终消息 final_message = { "messages": messages, "session_id": state["session_id"], "response_complete": True, "streaming_tokens": [] } yield final_message # 构建消息流图 chat_streaming_graph = StateGraph(ChatStreamingState) chat_streaming_graph.add_node("chat_response", chat_with_streaming) chat_streaming_graph.add_edge(START, "chat_response") chat_streaming_graph.add_edge("chat_response", END) # 编译消息流图 compiled_chat_graph = chat_streaming_graph.compile() # 测试消息流模式 print("\n=== 消息流模式测试 ===") chat_session = f"chat_session_{uuid.uuid4().hex[:8]}" initial_chat_state = { "messages": [{"role: "user", "content": "如何优化大型AI系统的性能?"}], "session_id": chat_session, "response_complete": False, "streaming_tokens": [] } # 使用消息流模式执行 for message_chunk in compiled_chat_graph.stream(initial_chat_state, stream_mode="messages"): if message_chunk["role"] == "assistant": print(f"收到消息片段: {message_chunk['content'][:50]}...")

步骤 3:自定义流式输出

class CustomStreamingState(TypedDict): data_points: list processing_results: dict metrics: dict custom_output: dict def custom_format_output(chunk): """自定义输出格式化""" return { "timestamp": time.time(), "data_count": len(chunk.get("data_points", [])), "results_summary": len(chunk.get("processing_results", {})), "metrics_summary": chunk.get("metrics", {}), "custom_format": "自定义流式输出格式" } def data_processing_with_custom_streaming(state: CustomStreamingState): """支持自定义流式输出的数据处理节点""" data_sources = ["API数据", "数据库数据", "文件数据", "缓存数据"] processing_results = {} for i, source in enumerate(data_sources): print(f"🔄 正在处理 {source}...") # 模拟数据处理 time.sleep(0.8) # 生成处理结果 result_key = f"result_{i+1}" result_value = { "source": source, "status": "completed", "processing_time": 0.8, "data_size": f"{i+1}MB", "quality_score": min(95 + i * 2, 100) } processing_results[result_key] = result_value # 生成指标数据 metrics = { "processed_sources": i + 1, "total_sources": len(data_sources), "avg_processing_time": 0.8, "total_data_size": f"{sum(range(i+2))}MB" } # 自定义输出格式 custom_output = custom_format_output({ "data_points": [source], "processing_results": processing_results, "metrics": metrics }) yield { "data_points": [source], "processing_results": processing_results, "metrics": metrics, "custom_output": custom_output } # 最终自定义输出 final_custom_output = custom_format_output({ "data_points": data_sources, "processing_results": processing_results, "metrics": { "processed_sources": len(data_sources), "total_sources": len(data_sources), "avg_processing_time": 0.8, "total_data_size": f"{sum(range(len(data_sources)+1))}MB", "completion_status": "100%" } }) final_state = { "data_points": data_sources, "processing_results": processing_results, "metrics": { "processed_sources": len(data_sources), "total_sources": len(data_sources), "avg_processing_time": 0.8, "total_data_size": f"{sum(range(len(data_sources)+1))}MB" }, "custom_output": final_custom_output } yield final_state # 构建自定义流式图 custom_streaming_graph = StateGraph(CustomStreamingState) custom_streaming_graph.add_node("custom_processing", custom_processing_with_custom_streaming) custom_streaming_graph.add_edge(START, "custom_processing") custom_streaming_graph.add_edge("custom_processing", END) # 编译自定义流式图 compiled_custom_graph = custom_streaming_graph.compile() # 测试自定义流式输出 print("\n=== 自定义流式输出测试 ===") initial_custom_state = { "data_points": [], "processing_results": {}, "metrics": {}, "custom_output": {} } # 使用自定义流式模式执行 for custom_chunk in compiled_custom_graph.stream(initial_custom_state): print(f"时间戳: {custom_chunk['custom_output']['timestamp']}") print(f"处理数据源: {custom_chunk['custom_output']['data_count']}") print(f"指标摘要: {custom_chunk['custom_output']['metrics_summary']}") print(f"格式类型: {custom_chunk['custom_output']['custom_format']}") print("-" * 50)

步骤 4:流式输出性能优化

import asyncio from concurrent.futures import ThreadPoolExecutor class OptimizedStreamingState(TypedDict): batch_data: list processing_results: list performance_metrics: dict optimization_status: str async def async_data_processing(item: dict) -> dict: """异步数据处理函数""" # 模拟异步I/O操作 await asyncio.sleep(0.3) return { "item_id": item["id"], "processed": True, "processing_time": 0.3, "result": f"processed_{item['data']}" } def batch_optimized_streaming(state: OptimizedStreamingState): """支持批量优化的流式输出节点""" batch_data = state["batch_data"] batch_size = 5 # 每批处理5个 results = [] print(f"🚀 开始批量处理,共 {len(batch_data)} 个项目") # 分批处理 for batch_start in range(0, len(batch_data), batch_size): batch_end = min(batch_start + batch_size, len(batch_data)) current_batch = batch_data[batch_start:batch_end] print(f"📦 处理批次 {batch_start//batch_size + 1}: {len(current_batch)} 个项目") # 使用异步处理 async def process_batch(): tasks = [async_data_processing(item) for item in current_batch] batch_results = await asyncio.gather(*tasks) return batch_results # 运行异步处理 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: batch_results = loop.run_until_complete(process_batch()) results.extend(batch_results) # 生成性能指标 metrics = { "processed_count": len(results), "total_count": len(batch_data), "batch_number": batch_start//batch_size + 1, "batch_size": len(current_batch), "total_batches": (len(batch_data) + batch_size - 1) // batch_size, "avg_time_per_item": 0.3, "estimated_completion": len(results) / len(batch_data) * 100 } yield { "batch_data": current_batch, "processing_results": results, "performance_metrics": metrics, "optimization_status": f"批次 {batch_start//batch_size + 1} 完成" } finally: loop.close() # 最终状态 final_metrics = { "processed_count": len(results), "total_count": len(batch_data), "batches_completed": (len(batch_data) + batch_size - 1) // batch_size, "avg_time_per_item": 0.3, "total_processing_time": len(batch_data) * 0.3, "optimization_status": "100%" } final_state = { "batch_data": batch_data, "processing_results": results, "performance_metrics": final_metrics, "optimization_status": "批量处理完成" } yield final_state # 构建优化流式图 optimized_graph = StateGraph(OptimizedStreamingState) optimized_graph.add_node("batch_processing", batch_optimized_streaming) optimized_graph.add_edge(START, "batch_processing") optimized_graph.add_edge("batch_processing", END) # 编译优化流式图 compiled_optimized_graph = optimized_graph.compile() # 测试性能优化 print("\n=== 性能优化流式输出测试 ===") test_data = [ {"id": i, "data": f"data_{i}"} for i in range(15) # 15个项目,分3批处理 ] initial_optimized_state = { "batch_data": test_data, "processing_results": [], "performance_metrics": {}, "optimization_status": "开始" } # 执行优化流式处理 for optimized_chunk in compiled_optimized_graph.stream(initial_optimized_state): print(f"处理进度: {optimized_chunk['performance_metrics']['estimated_completion']:.1f}%") print(f"已处理: {optimized_chunk['performance_metrics']['processed_count']}/{optimized_chunk['performance_metrics']['total_count']}") print(f"批次状态: {optimized_chunk['optimization_status']}") print("-" * 30)

步骤 5:流式输出错误处理

class ErrorHandlingStreamingState(TypedDict): messages: Annotated[Sequence[dict], add_messages] error_count: int successful_operations: list failed_operations: list recovery_attempts: list def resilient_streaming_operation(state: ErrorHandlingStreamingState): """具备错误处理能力的流式输出节点""" operations = [ {"id": 1, "operation": "数据验证", "success_rate": 0.9}, {"id": 2, "operation": "数据清洗", "success_rate": 0.8}, {"id": 3, "operation": "数据分析", "success_rate": 0.7}, {"id": 4, "operation": "报告生成", "success_rate": 0.95}, {"id": 5, "operation": "结果验证", "success_rate": 0.85} ] successful_operations = [] failed_operations = [] recovery_attempts = [] for operation in operations: operation_id = operation["id"] operation_name = operation["operation"] success_rate = operation["success_rate"] print(f"🔄 执行操作: {operation_name} (ID: {operation_id})") # 模拟操作执行 import random is_successful = random.random() < success_rate if is_successful: print(f"✅ 操作成功: {operation_name}") successful_operations.append({ "id": operation_id, "operation": operation_name, "status": "completed", "timestamp": time.time() }) yield { "messages": [{"role": "assistant", "content": f"操作 {operation_name} 成功完成"}], "error_count": len(failed_operations), "successful_operations": successful_operations, "failed_operations": failed_operations, "recovery_attempts": recovery_attempts } else: error_msg = f"操作失败: {operation_name}" print(f"❌ {error_msg}") failed_operations.append({ "id": operation_id, "operation": operation_name, "status": "failed", "error": "模拟错误", "timestamp": time.time() }) # 尝试恢复 print(f"🔄 尝试恢复操作: {operation_name}") recovery_success = random.random() < 0.8 # 80%恢复率 if recovery_success: print(f"✅ 恢复成功: {operation_name}") recovery_attempts.append({ "operation_id": operation_id, "operation": operation_name, "recovery_success": True, "timestamp": time.time() }) successful_operations.append({ "id": operation_id, "operation": operation_name, "status": "recovered", "timestamp": time.time() }) yield { "messages": [ {"role": "assistant", "content": error_msg}, {"role": "assistant", "content": f"已成功恢复操作: {operation_name}"} ], "error_count": len(failed_operations), "successful_operations": successful_operations, "failed_operations": failed_operations, "recovery_attempts": recovery_attempts } else: print(f"❌ 恢复失败: {operation_name}") recovery_attempts.append({ "operation_id": operation_id, "operation": operation_name, "recovery_success": False, "timestamp": time.time() }) yield { "messages": [ {"role": "assistant", "content": error_msg}, {"role": "assistant", "content": f"恢复失败,操作 {operation_name} 跳过"} ], "error_count": len(failed_operations), "successful_operations": successful_operations, "failed_operations": failed_operations, "recovery_attempts": recovery_attempts } time.sleep(1) # 模拟操作时间 # 最终状态汇总 final_summary = { "total_operations": len(operations), "successful_count": len(successful_operations), "failed_count": len(failed_operations), "recovery_success_count": len([r for r in recovery_attempts if r["recovery_success"]]), "recovery_failure_count": len([r for r in recovery_attempts if not r["recovery_success"]]), "success_rate": len(successful_operations) / len(operations) * 100 } final_message = { "role": "assistant", "content": f""" 操作执行完成总结: - 总操作数: {final_summary['total_operations']} - 成功操作: {final_summary['successful_count']} - 失败操作: {final_summary['failed_count']} - 恢复成功: {final_summary['recovery_success_count']} - 恢复失败: {final_summary['recovery_failure_count']} - 成功率: {final_summary['success_rate']:.1f}% """.strip() } yield { "messages": [final_message], "error_count": len(failed_operations), "successful_operations": successful_operations, "failed_operations": failed_operations, "recovery_attempts": recovery_attempts } # 构建错误处理流式图 error_handling_graph = StateGraph(ErrorHandlingStreamingState) error_handling_graph.add_node("resilient_operation", resilient_streaming_operation) error_handling_graph.add_edge(START, "resilient_operation") error_handling_graph.add_edge("resilient_operation", END) # 编译错误处理流式图 compiled_error_graph = error_handling_graph.compile() # 测试错误处理流式输出 print("\n=== 错误处理流式输出测试 ===") initial_error_state = { "messages": [{"role": "user", "content": "开始带错误处理的流式操作"}], "error_count": 0, "successful_operations": [], "failed_operations": [], "recovery_attempts": [] } # 执行错误处理流式操作 for error_chunk in compiled_error_graph.stream(initial_error_state): print(f"当前错误数: {error_chunk['error_count']}") print(f"成功操作: {len(error_chunk['successful_operations'])}") print(f"失败操作: {len(error_chunk['failed_operations'])}") print(f"恢复尝试: {len(error_chunk['recovery_attempts'])}") if error_chunk['messages']: print(f"最新消息: {error_chunk['messages'][-1]['content'][:100]}...") print("-" * 40)

发布者: 作者: 转发
评论区 (0)
U