第 5 章 memorize 写入流水线


文档摘要

第 5 章 memorize 写入流水线 是 memU 的 WRITE 路径:把一份原始来源编译进结构化记忆。本章按流水线七步拆解原理,并给出可落地的调用模式。 5.1 流水线总览 每一步都是 Workflow 中的一个 ,带明确的输入/输出键与能力标签(llm / vector / db / io / vision)。 5.2 第一步:ingestresource 作用:根据 获取原始字节,登记为 Resource 记录。

第 5 章 memorize 写入流水线

memorize() 是 memU 的 WRITE 路径:把一份原始来源编译进结构化记忆。本章按流水线七步拆解原理,并给出可落地的调用模式。

5.1 流水线总览

resource_url + modality │ ▼ ┌───────────────┐ │ ingest_resource│ 拉取本地/远程文件到 blob 存储 └───────┬───────┘ ▼ ┌───────────────────┐ │ preprocess_multimodal│ 解析文本 / 视觉描述 / 音频转写 └───────┬───────────┘ ▼ ┌───────────────┐ │ extract_items │ LLM 按 memory_type 提取结构化条目 └───────┬───────┘ ▼ ┌───────────────┐ │ dedupe_merge │ 去重合并(当前为占位 pass-through) └───────┬───────┘ ▼ ┌───────────────┐ │ categorize_items│ 持久化 Resource + Items + Relations + Embeddings └───────┬───────┘ ▼ ┌───────────────┐ │ persist_index │ 更新 Category 摘要 └───────┬───────┘ ▼ ┌───────────────┐ │ build_response │ 组装返回字典 └───────────────┘

每一步都是 Workflow 中的一个 WorkflowStep,带明确的输入/输出键与能力标签(llm / vector / db / io / vision)。

5.2 第一步:ingest_resource

作用:根据 resource_url 获取原始字节,登记为 Resource 记录。

result = await service.memorize( resource_url="https://cdn.example.com/standup-notes.md", modality="document", )

支持的 URL 类型:

类型 示例
HTTP/HTTPS https://example.com/file.pdf
本地路径 开发机上的绝对或相对路径
已挂载存储 配置 blob 存储后的内部地址

modality 参数告诉预处理管道用哪种解析器:

modality 预处理行为
conversation 解析 JSON / 文本对话格式
document 提取纯文本(Markdown、TXT、日志等)
image Vision 模型生成 caption
video 抽帧 / 描述生成 caption
audio Transcribe 模型转文字

modality 应与真实内容匹配,否则提取质量下降。

5.3 第二步:preprocess_multimodal

预处理产出 模态无关的文本描述,作为后续 LLM 提取的输入:

conversation → "User: ... Assistant: ..." document → 正文纯文本 image → "Screenshot showing a dashboard with ..." audio → 转写文本 video → 内容摘要 / 关键帧描述

该描述同时写入 Resource 的 caption 字段,供检索时回退阅读。

5.4 第三步:extract_items

核心 LLM 步骤:把预处理文本拆成多条 MemoryItem

内部逻辑(概念性):

# memU 内部类似如下过程(示意,非真实源码) for memory_type in ["profile", "event", "knowledge", "behavior", "skill", "tool"]: candidates = await llm.extract( text=preprocessed_text, target_type=memory_type, prompt=EXTRACTION_PROMPT, ) items.extend(candidates)

质量取决于

  • Chat 模型能力(建议 ≥ GPT-4o-mini 级别)
  • 原始文本信息密度
  • modality 是否正确

5.5 第四步:dedupe_merge

当前版本为 占位阶段(pass-through),未来可能支持:

  • 跨 Session 合并相似 profile
  • 冲突检测(新旧偏好矛盾)

工程上现阶段需接受「可能产生语义相近的多条 Item」,可通过 CRUD API 手工清理(第 10 章提及)。

5.6 第五步:categorize_items

将新 Item 写入数据库,并:

  1. 创建或匹配 MemoryCategory(按规范化 name)
  2. 写入 CategoryItem 关系边
  3. 为 Item 与 Category 计算 embedding

Category 引导是 lazy + scoped:在需要时按作用域初始化,按标准化名称映射。

5.7 第六步:persist_index

更新已有 Category 的 summary,反映新 Item 带来的主题变化。

这保证 Category 层摘要始终「活着」——不是静态标签,而是演进式综述。

5.8 第七步:build_response

组装第 3 章所述的返回字典,同步返回给调用方(非 Cloud 异步模式)。

5.9 API 参数完整说明

result = await service.memorize( resource_url="...", # 必填:来源地址 modality="conversation", # 必填:模态类型 user={"user_id": "u1"}, # 可选:作用域 agent={"agent_id": "a1"}, # 可选:智能体 scope # 其他 scope 字段取决于 UserConfig )

批量写入模式

sources = [ ("https://example.com/doc1.txt", "document"), ("https://example.com/chat.json", "conversation"), ("https://example.com/diagram.png", "image"), ] for url, mod in sources: r = await service.memorize(resource_url=url, modality=mod, user={"user_id": "u1"}) print(f"{mod}: {len(r['items'])} items")

顺序写入即可;memU 会自动合并 Category。大量并发写入时注意 LLM 速率限制。

5.10 实战:对话记忆

async def ingest_conversation(service, user_id, conversation_url): result = await service.memorize( resource_url=conversation_url, modality="conversation", user={"user_id": user_id}, ) profiles = [i for i in result["items"] if i["memory_type"] == "profile"] events = [i for i in result["items"] if i["memory_type"] == "event"] return { "profiles": profiles, "events": events, "categories": result["categories"], }

典型产出:

  • profile:用户偏好、沟通风格
  • event:本次对话中的决策、待办
  • categories:如 user_preferencesproject_context

5.11 实战:工具日志学习

编码智能体可把工具执行日志作为 document 摄入:

await service.memorize( resource_url="https://logs.example.com/agent-run-20250623.txt", modality="document", user={"user_id": "dev_team", "agent_id": "code_agent"}, )

期望提取 memory_type=tool 的条目,例如:

「编辑 YAML 配置前先全库搜索 key,避免漏改 duplicate。」

后续 retrieve 时查询「配置编辑最佳实践」即可召回。

5.12 错误处理与重试

错误类型 可能原因 建议
下载失败 URL 不可达 检查网络与权限
Vision 失败 模型不支持图像 配置 vision Profile
提取为空 文本过短或无信息 检查 modality 与内容
速率限制 LLM QPS 指数退避重试
import asyncio async def memorize_with_retry(service, url, modality, retries=3): for attempt in range(retries): try: return await service.memorize(resource_url=url, modality=modality) except Exception as e: if attempt == retries - 1: raise await asyncio.sleep(2 ** attempt)

5.13 与 Cloud API 的差异

维度 自托管 memorize Cloud API
返回时机 处理完成后同步返回 异步 task_id,需轮询 status
认证 本地 API Key Bearer Token
端点 Python SDK POST /api/v3/memory/memorize

Cloud 对接详见第 9 章。

5.14 本章小结

  • memorize 七步:摄入 → 预处理 → 提取 → 分类 → 索引 → 返回。
  • modality 决定预处理管道;user 决定作用域。
  • 提取质量核心依赖 LLM;Category 与 embedding 自动维护。
  • 支持顺序批量写入多来源。

动手实验

  1. 对同一段对话分别以 conversationdocument modality 写入,对比 items 差异。
  2. 写入三条不同主题文档,观察 categories 如何自组织。
  3. 打印每条 item 的 memory_type 分布,验证是否符合第 2 章预期。

下一章:第 6 章 — retrieve 检索策略与分层召回。

回顾:第 3 章 memorize 返回结构。


发布者: 作者: 转发
评论区 (0)
U