第 9 章 智能体集成 memU 不是孤立的记忆库——本章讲如何把它嵌入 LangGraph 工具链、OpenAI 兼容客户端,以及 memU Cloud 托管 API,形成可上线的智能体记忆层。 9.1 集成模式总览 选型建议: 模式 | 何时用 自托管 SDK | 数据在内网、需定制 Pipeline LangGraph 工具 | 已用 LangChain 生态 Cloud API | 快速验证、免运维 OpenAI Wrapper | 最小改动接入现有 OpenAI 调用 9.
memU 不是孤立的记忆库——本章讲如何把它嵌入 LangGraph 工具链、OpenAI 兼容客户端,以及 memU Cloud 托管 API,形成可上线的智能体记忆层。
┌────────────────────────────────────────────────────────┐ │ 你的智能体应用 │ ├─────────────┬─────────────────┬────────────────────────┤ │ 自托管 SDK │ LangGraph 工具 │ Cloud REST API │ │ MemoryService│ save/search │ /api/v3/memory/* │ ├─────────────┴─────────────────┴────────────────────────┤ │ OpenAI Wrapper(自动注入记忆) │ └────────────────────────────────────────────────────────┘
选型建议:
| 模式 | 何时用 |
|---|---|
| 自托管 SDK | 数据在内网、需定制 Pipeline |
| LangGraph 工具 | 已用 LangChain 生态 |
| Cloud API | 快速验证、免运维 |
| OpenAI Wrapper | 最小改动接入现有 OpenAI 调用 |
import asyncio from memu import MemoryService async def agent_turn(service, user_id, user_message): # 1. 检索相关记忆 context = await service.retrieve( queries=[{"role": "user", "content": {"text": user_message}}], where={"user_id": user_id}, ) # 2. 组装 Prompt(概念性) memory_block = format_context(context) system = f"你是助手。相关记忆:\n{memory_block}" reply = await call_main_llm(system, user_message) # 3. 可选:把本轮对话写回记忆 conversation = [ {"role": "user", "content": user_message}, {"role": "assistant", "content": reply}, ] # 将 conversation 序列化后上传到可访问 URL,再 memorize await service.memorize( resource_url="https://temp-storage.example.com/last-turn.json", modality="conversation", user={"user_id": user_id}, ) return reply
节奏:
安装:
pip install "memu-py[langgraph]"
memU 提供 LangChain / LangGraph 适配工具,典型暴露两个 tool:
| Tool | 作用 |
|---|---|
save_memory |
包装 memorize |
search_memory |
包装 retrieve |
概念性用法:
from memu.integrations.langgraph import create_memory_tools from memu import MemoryService service = MemoryService(...) tools = create_memory_tools(service) # 将 tools 注册进 LangGraph agent # agent 可在推理中自主决定何时存/取记忆
智能体在 ReAct 循环中:
search_memorysave_memory 沉淀 tool / skill 记忆memU 提供 opt-in 的 OpenAI 客户端包装:在每次 chat completion 前自动 retrieve,把记忆注入 system context。
概念性初始化:
from memu.client.openai_wrapper import MemUOpenAI client = MemUOpenAI( api_key="your_openai_key", memu_service=service, user_id="u123", ) response = await client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": "继续我们上次的讨论"}], )
适合:已有 OpenAI 调用栈,希望最小侵入加记忆。
| 项目 | 值 |
|---|---|
| Base URL | https://api.memu.so |
| Header | Authorization: Bearer YOUR_API_KEY |
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/v3/memory/memorize |
提交摄入任务 |
| GET | /api/v3/memory/memorize/status/{task_id} |
轮询任务状态 |
| POST | /api/v3/memory/categories |
列出自动类别 |
| POST | /api/v3/memory/retrieve |
检索记忆 |
import httpx import asyncio BASE = "https://api.memu.so" HEADERS = {"Authorization": "Bearer YOUR_API_KEY"} async def cloud_memorize(payload): async with httpx.AsyncClient() as client: # 1. 提交任务 resp = await client.post( f"{BASE}/api/v3/memory/memorize", headers=HEADERS, json=payload, ) task_id = resp.json()["task_id"] # 2. 轮询状态 while True: status = await client.get( f"{BASE}/api/v3/memory/memorize/status/{task_id}", headers=HEADERS, ) data = status.json() if data["status"] == "completed": return data["result"] if data["status"] == "failed": raise RuntimeError(data.get("error")) await asyncio.sleep(2) async def cloud_retrieve(query, user_id): async with httpx.AsyncClient() as client: resp = await client.post( f"{BASE}/api/v3/memory/retrieve", headers=HEADERS, json={ "queries": [{"role": "user", "content": {"text": query}}], "where": {"user_id": user_id}, }, ) return resp.json()
Cloud 与自托管返回结构对齐,便于 hybrid 迁移。
| 组件 | 作用 |
|---|---|
| memU(核心) | 摄入、提取、检索 |
| memU-server | 实时同步、Webhook |
| memU-ui | 可视化浏览记忆 |
| memU Cloud | 全托管 API |
架构建议:
Agent App ──► memU-server ──► Postgres │ ▼ memU-ui(运维 / 调试)
memU README 未默认提供 MCP Server,但你可以:
retrieve() / memorize()概念性 MCP tool 定义:
async def search_memory_tool(query: str, user_id: str) -> str: ctx = await service.retrieve( queries=[{"role": "user", "content": {"text": query}}], where={"user_id": user_id}, ) return format_context(ctx)
通过 scope 隔离不同 agent:
# 编码 agent 的记忆 await service.memorize(..., user={"user_id": "proj_x", "agent_id": "coder"}) # 产品 agent 的记忆 await service.memorize(..., user={"user_id": "proj_x", "agent_id": "pm"}) # 检索时可指定 agent_id await service.retrieve(..., where={"user_id": "proj_x", "agent_id": "coder"})
同项目多角色共存,互不污染。
| 风险 | 对策 |
|---|---|
| 记忆泄露 | 严格 where;租户级 DB 隔离 |
| PII 存储 | memorize 前脱敏;访问审计 |
| Prompt 注入 | retrieve 结果当数据,不当指令 |
| API Key | 环境变量;Cloud 用短期 Token |
agent_id scope 隔离。下一章:第 10 章 — 工程实践与进阶拓展。
回顾:第 6 章 retrieve;第 8 章 Postgres。