1.1 LangGraph概述与架构设计 本节导读:通过本节学习,你将掌握LangGraph的核心定位和架构优势,理解其与传统智能体框架的本质区别,能够根据项目需求判断是否选择LangGraph。 学习目标 理解LangGraph的定位:低级编排框架vs高级智能体框架 掌握LangGraph的四大核心能力:持久化执行、流式输出、人类在回路、综合记忆 能够对比分析LangGraph与LangChain的适用场景 学会根据项目复杂度选择合适的技术栈 核心概念 LangGraph是什么?
本节导读:通过本节学习,你将掌握LangGraph的核心定位和架构优势,理解其与传统智能体框架的本质区别,能够根据项目需求判断是否选择LangGraph。
LangGraph是什么?LangGraph是一个低级编排框架(Low-level Orchestration Framework)和运行时,专门用于构建、管理和部署长时间运行、有状态(long-running, stateful)的AI智能体。
与传统智能体框架的根本区别在于:
| 特性 | LangChain | LangGraph |
|---|---|---|
| 抽象层次 | 高级抽象,开箱即用 | 低级控制,灵活定制 |
| 架构设计 | 预定义智能体循环 | 自定义图结构 |
| 适用场景 | 简单到中等复杂度任务 | 复杂、长时间运行任务 |
| 学习曲线 | 平缓,快速上手 | 陡峭,深度控制 |
| 核心优势 | 快速开发、集成丰富 | 状态持久化、流式处理 |
LangGraph专注于四个关键能力,这些能力对于构建复杂的长时间运行任务至关重要:
# LangGraph支持故障恢复和长时间运行 from langgraph.graph import StateGraph, START, END # 状态持久化:即使中间失败也能恢复 graph = StateGraph(AgentState) graph.add_node("process_data", process_data_node) graph.add_edge(START, "process_data") graph.add_edge("process_data", END) # 编译时指定检查点策略 compiled_graph = graph.compile(checkpointer=MemorySaver())
# 实时输出中间结果,支持多种流式模式 for chunk in graph.stream({"messages": [{"role": "user", "content": "分析销售数据"}]}, stream_mode="values"): print("当前状态:", chunk) # 或流式输出消息 for message in graph.stream({"messages": user_input}, stream_mode="messages"): print(message.content, end="", flush=True)
# 在任意暂停点加入人工干预 graph.add_node("human_review", human_review_node) graph.add_edge("human_review", "continue_processing") # 编译时配置中断策略 interrupt_before=["human_review"]
# 短期工作记忆 + 长期持久化记忆 class AgentState(TypedDict): messages: Annotated[Sequence[BaseMessage], add_messages] working_memory: dict # 短期记忆 persistent_memory: dict # 长期记忆
# 基础安装 pip install -U langgraph # 可选依赖:LangChain集成 pip install -U langchain langchain-openai langchain-anthropic # 可选依赖:持久化存储 pip install -U redis psycopg2-binary
from langgraph.graph import StateGraph, MessagesState, START, END from langchain_openai import ChatOpenAI # 定义状态 class State(MessagesState): pass # 创建LLM节点 def llm_node(state: State): llm = ChatOpenAI(model="gpt-4-turbo") response = llm.invoke(state["messages"]) return {"messages": [response]} # 构建图 graph = StateGraph(State) graph.add_node("llm", llm_node) graph.add_edge(START, "llm") graph.add_edge("llm", END) # 编译并运行 compiled_graph = graph.compile() result = compiled_graph.invoke({"messages": [{"role": "user", "content": "你好!"}]}) print(result["messages"][-1].content)
from langgraph.checkpoint.memory import MemorySaver # 创建带检查点的图 checkpointer = MemorySaver() def process_step(state: State): # 模拟复杂处理 last_message = state["messages"][-1] processed = f"处理后的内容: {last_message.content}" return {"messages": [{"role": "assistant", "content": processed}]} graph = StateGraph(State) graph.add_node("process", process_step) graph.add_edge(START, "process") graph.add_edge("process", END) # 编译时启用检查点 compiled_graph = graph.compile(checkpointer=checkpointer) # 第一次运行 initial_state = {"messages": [{"role": "user", "content": "第一步处理"}]} result1 = compiled_graph.invoke(initial_state) # 第二次运行(从检查点恢复) result2 = compiled_graph.invoke(initial_state)
# 创建支持流式输出的智能体 def analysis_node(state: State): llm = ChatOpenAI(model="gpt-4-turbo") # 模拟逐步分析过程 steps = [ "开始分析数据...", "识别关键模式...", "生成洞察报告...", "完成分析" ] messages = [] for step in steps: response = llm.invoke([ {"role": "system", "content": "你是数据分析专家,逐步报告你的分析过程"}, {"role": "user", "content": f"{step}: {state['messages'][0].content}"} ]) messages.append(response) return {"messages": messages} graph = StateGraph(State) graph.add_node("analysis", analysis_node) graph.add_edge(START, "analysis") graph.add_edge("analysis", END) # 使用流式模式 compiled_graph = graph.compile() # 流式执行 for chunk in compiled_graph.stream({"messages": [{"role": "user", "content": "分析销售趋势"}]}): print("当前状态:", chunk)
# 定义人工审核节点 def human_review_node(state: State): print("需要人工审核以下内容:") for message in state["messages"]: print(f"- {message.content}") # 模拟用户输入 user_decision = input("是否批准? (y/n): ") return {"messages": [{"role": "assistant", "content": f"用户决策: {user_decision}"}]} # 构建包含人工审核的图 graph = StateGraph(State) graph.add_node("process", process_step) graph.add_node("review", human_review_node) graph.add_edge(START, "process") graph.add_edge("process", "review") graph.add_edge("review", END) # 编译时配置中断 compiled_graph = graph.compile(checkpointer=checkpointer, interrupt_before=["review"]) # 执行到中断点 result = compiled_graph.invoke({"messages": [{"role": "user", "content": "需要审核的决策"}]}) # 人工审核后继续 final_result = compiled_graph.invoke(result)
from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from langchain_openai import ChatOpenAI from typing import TypedDict, Annotated, Sequence import time class AnalysisState(TypedDict): messages: Annotated[Sequence[dict], add_messages] current_step: str analysis_results: dict user_feedback: str class DataAnalysisAgent: def __init__(self): self.llm = ChatOpenAI(model="gpt-4-turbo") self.checkpointer = MemorySaver() self.graph = self._build_graph() self.compiled_graph = self.graph.compile(checkpointer=self.checkpointer) def _build_graph(self): def data_collection(state: AnalysisState): print("📊 正在收集数据...") time.sleep(1) return { "messages": [{"role": "assistant", "content": "数据收集完成"}], "current_step": "data_collection", "analysis_results": {"data_collected": True} } def data_analysis(state: AnalysisState): print("🔍 正在分析数据...") time.sleep(2) return { "messages": [{"role": "assistant", "content": "数据分析完成,发现3个关键趋势"}], "current_step": "data_analysis", "analysis_results": {"trends_found": 3} } def human_review(state: AnalysisState): print("👤 需要人工审核分析结果") print("分析结果:", state["analysis_results"]) return { "messages": [{"role": "assistant", "content": "请审核分析结果"}], "current_step": "human_review" } def final_report(state: AnalysisState): feedback = state.get("user_feedback", "无反馈") report = f""" 最终分析报告: - 数据收集状态: 完成 - 发现趋势数量: {state['analysis_results'].get('trends_found', 0)} - 用户反馈: {feedback} - 生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')} """ return { "messages": [{"role": "assistant", "content": report}], "current_step": "final_report" } graph = StateGraph(AnalysisState) graph.add_node("data_collection", data_collection) graph.add_node("data_analysis", data_analysis) graph.add_node("human_review", human_review) graph.add_node("final_report", final_report) graph.add_edge(START, "data_collection") graph.add_edge("data_collection", "data_analysis") graph.add_edge("data_analysis", "human_review") graph.add_edge("human_review", "final_report") graph.add_edge("final_report", END) return graph def run_analysis(self, user_request: str): return self.compiled_graph.invoke({ "messages": [{"role": "user", "content": user_request}], "current_step": "start", "analysis_results": {}, "user_feedback": "" }) # 使用示例 if __name__ == "__main__": agent = DataAnalysisAgent() # 第一次运行 result1 = agent.run_analysis("分析2024年Q1销售数据") print("第一次完成:", result1["current_step"]) # 第二次运行(从检查点恢复,模拟人工审核) result2 = agent.run_analysis("分析2024年Q1销售数据") print("最终结果:", result2["messages"][-1]["content"])
A:根据项目复杂度选择:
选择建议:从LangChain开始,当遇到其限制时再迁移到LangGraph。
A:相对较陡峭,主要挑战在于:
建议:先掌握LangChain基础,再学习LangGraph的高级特性。
A:不是必须的。LangGraph可以独立使用,但与LangChain集成后会更加强大:
A:LangGraph提供多种状态管理方案:
通过本节学习,我们掌握了LangGraph的核心定位和架构优势。LangGraph作为低级编排框架,在状态持久化、流式输出、人类协作和综合记忆方面具有独特优势,特别适合构建复杂的长时间运行AI智能体。
下一节我们将深入探讨LangGraph的基础组件——状态图(State Graph),这是理解LangGraph架构的关键。
关键词:LangGraph, 低级编排框架, 状态持久化, 流式输出, 人类在回路, 智能体架构
难度:入门
预计阅读:25分钟