第 5 章 memorize 写入流水线 是 memU 的 WRITE 路径:把一份原始来源编译进结构化记忆。本章按流水线七步拆解原理,并给出可落地的调用模式。 5.1 流水线总览 每一步都是 Workflow 中的一个 ,带明确的输入/输出键与能力标签(llm / vector / db / io / vision)。 5.2 第一步:ingestresource 作用:根据 获取原始字节,登记为 Resource 记录。
memorize()是 memU 的 WRITE 路径:把一份原始来源编译进结构化记忆。本章按流水线七步拆解原理,并给出可落地的调用模式。
resource_url + modality │ ▼ ┌───────────────┐ │ ingest_resource│ 拉取本地/远程文件到 blob 存储 └───────┬───────┘ ▼ ┌───────────────────┐ │ preprocess_multimodal│ 解析文本 / 视觉描述 / 音频转写 └───────┬───────────┘ ▼ ┌───────────────┐ │ extract_items │ LLM 按 memory_type 提取结构化条目 └───────┬───────┘ ▼ ┌───────────────┐ │ dedupe_merge │ 去重合并(当前为占位 pass-through) └───────┬───────┘ ▼ ┌───────────────┐ │ categorize_items│ 持久化 Resource + Items + Relations + Embeddings └───────┬───────┘ ▼ ┌───────────────┐ │ persist_index │ 更新 Category 摘要 └───────┬───────┘ ▼ ┌───────────────┐ │ build_response │ 组装返回字典 └───────────────┘
每一步都是 Workflow 中的一个 WorkflowStep,带明确的输入/输出键与能力标签(llm / vector / db / io / vision)。
作用:根据 resource_url 获取原始字节,登记为 Resource 记录。
result = await service.memorize( resource_url="https://cdn.example.com/standup-notes.md", modality="document", )
支持的 URL 类型:
| 类型 | 示例 |
|---|---|
| HTTP/HTTPS | https://example.com/file.pdf |
| 本地路径 | 开发机上的绝对或相对路径 |
| 已挂载存储 | 配置 blob 存储后的内部地址 |
modality 参数告诉预处理管道用哪种解析器:
| modality | 预处理行为 |
|---|---|
conversation |
解析 JSON / 文本对话格式 |
document |
提取纯文本(Markdown、TXT、日志等) |
image |
Vision 模型生成 caption |
video |
抽帧 / 描述生成 caption |
audio |
Transcribe 模型转文字 |
modality 应与真实内容匹配,否则提取质量下降。
预处理产出 模态无关的文本描述,作为后续 LLM 提取的输入:
conversation → "User: ... Assistant: ..." document → 正文纯文本 image → "Screenshot showing a dashboard with ..." audio → 转写文本 video → 内容摘要 / 关键帧描述
该描述同时写入 Resource 的 caption 字段,供检索时回退阅读。
核心 LLM 步骤:把预处理文本拆成多条 MemoryItem。
内部逻辑(概念性):
# memU 内部类似如下过程(示意,非真实源码) for memory_type in ["profile", "event", "knowledge", "behavior", "skill", "tool"]: candidates = await llm.extract( text=preprocessed_text, target_type=memory_type, prompt=EXTRACTION_PROMPT, ) items.extend(candidates)
质量取决于:
当前版本为 占位阶段(pass-through),未来可能支持:
工程上现阶段需接受「可能产生语义相近的多条 Item」,可通过 CRUD API 手工清理(第 10 章提及)。
将新 Item 写入数据库,并:
Category 引导是 lazy + scoped:在需要时按作用域初始化,按标准化名称映射。
更新已有 Category 的 summary,反映新 Item 带来的主题变化。
这保证 Category 层摘要始终「活着」——不是静态标签,而是演进式综述。
组装第 3 章所述的返回字典,同步返回给调用方(非 Cloud 异步模式)。
result = await service.memorize( resource_url="...", # 必填:来源地址 modality="conversation", # 必填:模态类型 user={"user_id": "u1"}, # 可选:作用域 agent={"agent_id": "a1"}, # 可选:智能体 scope # 其他 scope 字段取决于 UserConfig )
sources = [ ("https://example.com/doc1.txt", "document"), ("https://example.com/chat.json", "conversation"), ("https://example.com/diagram.png", "image"), ] for url, mod in sources: r = await service.memorize(resource_url=url, modality=mod, user={"user_id": "u1"}) print(f"{mod}: {len(r['items'])} items")
顺序写入即可;memU 会自动合并 Category。大量并发写入时注意 LLM 速率限制。
async def ingest_conversation(service, user_id, conversation_url): result = await service.memorize( resource_url=conversation_url, modality="conversation", user={"user_id": user_id}, ) profiles = [i for i in result["items"] if i["memory_type"] == "profile"] events = [i for i in result["items"] if i["memory_type"] == "event"] return { "profiles": profiles, "events": events, "categories": result["categories"], }
典型产出:
user_preferences、project_context编码智能体可把工具执行日志作为 document 摄入:
await service.memorize( resource_url="https://logs.example.com/agent-run-20250623.txt", modality="document", user={"user_id": "dev_team", "agent_id": "code_agent"}, )
期望提取 memory_type=tool 的条目,例如:
「编辑 YAML 配置前先全库搜索 key,避免漏改 duplicate。」
后续 retrieve 时查询「配置编辑最佳实践」即可召回。
| 错误类型 | 可能原因 | 建议 |
|---|---|---|
| 下载失败 | URL 不可达 | 检查网络与权限 |
| Vision 失败 | 模型不支持图像 | 配置 vision Profile |
| 提取为空 | 文本过短或无信息 | 检查 modality 与内容 |
| 速率限制 | LLM QPS | 指数退避重试 |
import asyncio async def memorize_with_retry(service, url, modality, retries=3): for attempt in range(retries): try: return await service.memorize(resource_url=url, modality=modality) except Exception as e: if attempt == retries - 1: raise await asyncio.sleep(2 ** attempt)
| 维度 | 自托管 memorize | Cloud API |
|---|---|---|
| 返回时机 | 处理完成后同步返回 | 异步 task_id,需轮询 status |
| 认证 | 本地 API Key | Bearer Token |
| 端点 | Python SDK | POST /api/v3/memory/memorize |
Cloud 对接详见第 9 章。
modality 决定预处理管道;user 决定作用域。conversation 和 document modality 写入,对比 items 差异。memory_type 分布,验证是否符合第 2 章预期。下一章:第 6 章 — retrieve 检索策略与分层召回。
回顾:第 3 章 memorize 返回结构。