3.3 多代理编排与子代理模式


文档摘要

3.3 多代理编排与子代理模式 | Loop Engineering 多 Agent 协作架构与并行执行策略 导读 当单个 Agent 的循环能力达到瓶颈时,多代理编排提供了突破路径。本节深入讲解 Loop Engineering 中的三大子代理编排模式——Orchestrator-Workers、Maker-Checker、Pipeline,结合 Claude Code 的 和 Codex 的 实际配置,帮你构建高效的多代理协作系统。

3.3 多代理编排与子代理模式 | Loop Engineering 多 Agent 协作架构与并行执行策略

导读

当单个 Agent 的循环能力达到瓶颈时,多代理编排提供了突破路径。本节深入讲解 Loop Engineering 中的三大子代理编排模式——Orchestrator-Workers、Maker-Checker、Pipeline,结合 Claude Code 的 .claude/agents/ 和 Codex 的 .codex/agents/ 实际配置,帮你构建高效的多代理协作系统。

学习目标

  • 对比 Orchestrator-Workers、Maker-Checker、Pipeline 三种核心编排模式的适用场景
  • 实现基于任务队列的多代理编排框架,支持并行与串行执行策略
  • 理解子代理间的通信机制与结果聚合策略
  • 评估多代理架构的 Token 成本收益比,做出合理的架构决策

核心概念

为什么需要多代理?

在 Loop Engineering 的实践中,单个 Agent 循环面临三个固有瓶颈:(1)上下文窗口限制——复杂任务的上下文可能超出单 Agent 的处理能力;(2)关注点冲突——同时负责规划、编码、测试容易导致漂移;(3)串行效率低——可并行的子任务被强制串行执行。多代理编排通过分离关注点、并行执行和专业化分工来突破这些限制。

三大编排模式

Claude Code 与 Codex 的子代理支持

Claude Code 通过 .claude/agents/ 目录定义子代理配置,每个 .md 文件描述一个子代理的系统指令和权限范围。Codex 则使用 .codex/agents/ 实现类似功能。在 OpenClaw 中,subagents 原语提供了更灵活的运行时子代理创建能力,可以通过编程方式动态生成和管理子代理。

环境准备

前置知识

  • 已掌握 3.1-3.2 节的状态持久化和漂移检测知识
  • 理解 Loop Engineering 六大原语中的 Sub-agents 概念
  • 具备 Python 并发编程基础(threading、asyncio)

环境要求

  • Python 3.10+
  • 依赖包:pip install asyncio aiohttp

分步实战

步骤一:实现子代理基类与任务模型

首先定义通用的子代理抽象和任务数据结构,为后续三种编排模式提供统一基础。

"""multi_agent_framework.py — 多代理编排框架核心""" import asyncio, time, uuid from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Coroutine class AgentRole(Enum): ORCHESTRATOR = "orchestrator" WORKER = "worker" MAKER = "maker" CHECKER = "checker" PIPELINE_STAGE = "pipeline_stage" @dataclass class Task: """代理任务""" id: str = field(default_factory=lambda: uuid.uuid4().hex[:8]) description: str = "" role: AgentRole = AgentRole.WORKER input_data: dict = field(default_factory=dict) output_data: dict = field(default_factory=dict) status: str = "pending" # pending/running/success/failed agent_id: str = "" created_at: float = field(default_factory=time.time) completed_at: float = 0.0 @dataclass class SubAgent: """子代理定义""" id: str name: str role: AgentRole system_prompt: str # 子代理的系统指令 max_tokens: int = 4096 allowed_tools: list[str] = field(default_factory=list) class AgentMessage: """代理间消息传递""" def __init__(self, sender: str, receiver: str, content: dict): self.sender = sender; self.receiver = receiver self.content = content; self.timestamp = time.time()

步骤二:实现三种编排模式

基于统一的子代理基类,分别实现 Orchestrator-Workers、Maker-Checker 和 Pipeline 三种模式。

"""orchestration_patterns.py — 三种编排模式实现""" import asyncio, time, json from multi_agent_framework import Task, SubAgent, AgentRole class OrchestratorWorkersPattern: """Orchestrator-Workers 模式: 一个编排器分配任务给多个并行 Worker""" def __init__(self, orchestrator: SubAgent, workers: list[SubAgent], max_parallel: int = 3): self.orchestrator = orchestrator self.workers = workers self.max_parallel = max_parallel self.task_queue: asyncio.Queue = asyncio.Queue() self.results: list[dict] = [] async def submit_tasks(self, tasks: list[dict]) -> list[dict]: """提交任务并返回聚合结果""" print(f"📋 编排器({self.orchestrator.name})接收{len(tasks)}个任务") # 编排器分配任务到Worker(模拟) worker_assignments = self._assign_tasks(tasks) # 并行执行(受max_parallel限制) semaphore = asyncio.Semaphore(self.max_parallel) async def run_with_limit(task_info): async with semaphore: worker, task = task_info print(f" 🔧 {worker.name} 开始: {task['description'][:40]}") await asyncio.sleep(0.5) # 模拟执行 result = {"task_id": task["id"], "worker": worker.name, "output": f"{worker.name}完成: {task['description'][:30]}"} return result assignments = [(w, t) for w, tasks in worker_assignments.items() for t in tasks] results = await asyncio.gather(*[run_with_limit(a) for a in assignments]) # 聚合结果 self.results = [r for r in results] print(f"✅ 编排完成: {len(self.results)}个结果已聚合") return self.results def _assign_tasks(self, tasks: list[dict]) -> dict[SubAgent, list]: """简单的轮询分配策略""" assignments = {w: [] for w in self.workers} for i, task in enumerate(tasks): worker = self.workers[i % len(self.workers)] task["agent_id"] = worker.id assignments[worker].append(task) return assignments class MakerCheckerPattern: """Maker-Checker 模式: Maker产出, Checker审核, 不通过则返回重做""" def __init__(self, maker: SubAgent, checker: SubAgent, max_revisions: int = 3): self.maker = maker; self.checker = checker self.max_revisions = max_revisions self.review_history: list[dict] = [] async def produce_and_review(self, task: dict) -> dict: """执行 Maker-Checker 循环""" revision = 0 while revision <= self.max_revisions: revision += 1 print(f"\n🔄 第{revision}轮:") # Maker 产出 print(f" 📝 Maker({self.maker.name}) 产出中...") await asyncio.sleep(0.3) output = self._simulate_maker_output(task) # Checker 审核 print(f" 🔍 Checker({self.checker.name}) 审核中...") await asyncio.sleep(0.2) review = self._simulate_checker_review(output) self.review_history.append({ "revision": revision, "passed": review["passed"], "issues": review.get("issues", [])}) if review["passed"]: print(f" ✅ 第{revision}轮审核通过!") return {"output": output, "revisions": revision, "reviews": self.review_history} else: print(f" ❌ 审核未通过: {review['issues']}") print(f" ⚠ 达到最大修订次数({self.max_revisions}), 返回最新产出") return {"output": output, "revisions": revision, "accepted": False, "reviews": self.review_history} def _simulate_maker_output(self, task): return f"代码实现: {task['description']}\n# 示例代码...\noutput = result" def _simulate_checker_review(self, output): # 模拟: 第二轮开始通过 revision_count = len(self.review_history) + 1 if revision_count >= 2: return {"passed": True, "issues": []} return {"passed": False, "issues": ["缺少错误处理", "未添加类型注解"]} class PipelinePattern: """Pipeline 模式: 任务按阶段顺序流经多个处理阶段""" def __init__(self, stages: list[SubAgent]): self.stages = stages async def process(self, initial_data: dict) -> dict: """数据按阶段流经 Pipeline""" current_data = {"stage": 0, "data": initial_data, "logs": []} for i, stage in enumerate(self.stages, 1): print(f"\n管道阶段 {i}/{len(self.stages)}: {stage.name}") print(f" 输入: {str(current_data['data'])[:60]}") await asyncio.sleep(0.3) # 模拟阶段处理 current_data = self._process_stage(stage, current_data) print(f" 输出: {str(current_data['data'])[:60]}") print(f"\n✅ Pipeline 完成: 数据经过 {len(self.stages)} 个阶段") return current_data def _process_stage(self, stage, data): return { "stage": data["stage"] + 1, "data": f"[{stage.name}] 已处理: {data['data']}", "logs": data["logs"] + [f"阶段{data['stage']+1}({stage.name})完成"] }

步骤三:成本分析与优化策略

每个子代理独立消耗 Token,多代理架构的 Token 成本需要仔细评估。下面的工具帮助分析成本收益比。

"""token_cost_analyzer.py — 多代理Token成本分析""" from dataclasses import dataclass @dataclass class AgentCost: agent_id: str name: str input_tokens: int output_tokens: int iterations: int @property def total_tokens(self): return self.input_tokens + self.output_tokens def cost_estimate(self, input_price=3.0, output_price=15.0): """按 Claude 3.5 Sonnet 定价估算 (美元/百万Token)""" return (self.input_tokens/1e6*input_price + self.output_tokens/1e6*output_price) def compare_architectures(): """对比单代理 vs 多代理的成本""" single = AgentCost("single","单Agent",50000,15000,10) multi = [ AgentCost("orch","编排器",20000,5000,10), AgentCost("worker1","代码Worker",15000,8000,5), AgentCost("worker2","测试Worker",10000,4000,5), AgentCost("checker","审核Agent",12000,3000,5), ] single_cost = single.cost_estimate() multi_cost = sum(a.cost_estimate() for a in multi) print("=== 架构成本对比 ===") print(f"单Agent: {single.total_tokens:,} tokens, ${single_cost:.4f}") print(f"多Agent: {sum(a.total_tokens for a in multi):,} tokens, " f"${multi_cost:.4f}") print(f"成本差异: {'多Agent +{:.0%}'.format((multi_cost-single_cost)/single_cost) if multi_cost>single_cost else '多Agent节省{:.0%}'.format((single_cost-multi_cost)/single_cost)}") # 但多Agent可能更快(并行) single_time = single.iterations * 30 # 假设每次30秒 multi_time = 10 * 30 # 并行, 编排器主导 print(f"\n单Agent时间: {single_time}s | 多Agent时间: {multi_time}s") print(f"速度提升: {single_time/multi_time:.1f}x") if __name__ == "__main__": compare_architectures()

完整示例:端到端多代理代码审查系统

下面的示例构建一个完整的多代理代码审查系统:Orchestrator 分配任务,Worker 生成代码和测试,Maker-Checker 确保质量,最后通过 Pipeline 输出最终结果。

"""e2e_multi_agent.py — 端到端多代理代码审查系统""" import asyncio from orchestration_patterns import ( OrchestratorWorkersPattern, MakerCheckerPattern, PipelinePattern ) from multi_agent_framework import SubAgent, AgentRole async def main(): # 定义子代理 orchestrator = SubAgent("orch1","主编排器",AgentRole.ORCHESTRATOR, "你负责分解任务并分配给合适的Worker") code_worker = SubAgent("w1","代码编写者",AgentRole.WORKER, "你负责根据需求编写高质量Python代码", allowed_tools=["read","write"]) test_worker = SubAgent("w2","测试工程师",AgentRole.WORKER, "你负责编写单元测试和集成测试", allowed_tools=["read","exec"]) doc_worker = SubAgent("w3","文档撰写者",AgentRole.WORKER, "你负责编写技术文档和API说明", allowed_tools=["read","write"]) maker = SubAgent("mk","代码生成器",AgentRole.MAKER,"你负责编写代码") checker = SubAgent("ck","代码审核员",AgentRole.CHECKER, "你负责审核代码质量、安全性和规范性") # === 模式1: Orchestrator-Workers === print("="*50) print("📋 模式1: Orchestrator-Workers 并行开发") print("="*50) ow = OrchestratorWorkersPattern(orchestrator, [code_worker, test_worker, doc_worker]) tasks = [ {"id":"t1","description":"实现用户认证API端点"}, {"id":"t2","description":"编写认证模块的单元测试"}, {"id":"t3","description":"编写API使用文档"}, {"id":"t4","description":"添加速率限制中间件"}, ] results = await ow.submit_tasks(tasks) # === 模式2: Maker-Checker === print("\n" + "="*50) print("🔍 模式2: Maker-Checker 质量审核") print("="*50) mc = MakerCheckerPattern(maker, checker, max_revisions=3) review_result = await mc.produce_and_review({ "description": "实现JWT认证中间件", "requirements": ["支持token刷新", "处理过期token"] }) print(f"最终结果: 修订{review_result['revisions']}次, " f"审核{'通过' if review_result.get('accepted', True) else '未通过'}") # === 模式3: Pipeline === print("\n" + "="*50) print("🏭 模式3: Pipeline 阶段处理") print("="*50) pipeline_stages = [ SubAgent("s1","需求分析",AgentRole.PIPELINE_STAGE,"分析需求文档"), SubAgent("s2","架构设计",AgentRole.PIPELINE_STAGE,"设计系统架构"), SubAgent("s3","代码实现",AgentRole.PIPELINE_STAGE,"实现代码"), SubAgent("s4","集成测试",AgentRole.PIPELINE_STAGE,"执行集成测试"), ] pipeline = PipelinePattern(pipeline_stages) final = await pipeline.process({"project": "用户认证系统"}) print(f"Pipeline日志: {final['logs']}") if __name__ == "__main__": asyncio.run(main())

常见问题FAQ

Q1:Orchestrator-Workers 和 Pipeline 模式看起来很像,怎么选择?

核心区别在于任务关系。Pipeline 适合有明确前后依赖的顺序任务(分析→设计→实现→测试),每个阶段的输出是下一阶段的输入。Orchestrator-Workers 适合彼此独立、可并行执行的任务(同时写代码、写测试、写文档)。如果任务间有依赖用 Pipeline,如果任务互相独立用 Orchestrator-Workers。实际项目中常混合使用:编排器将任务分组,每组内部按 Pipeline 串行,组间并行。

Q2:每个子代理都需要独立的 API 调用吗?Token 成本怎么控制?

是的,每个子代理都是独立的 LLM 会话,每次交互都消耗 Token。成本控制的关键策略:(1)精简子代理的系统指令,避免冗余描述;(2)严格限制每个子代理的 allowed_tools,减少不必要的工具调用和输出;(3)使用更小的模型处理简单任务——测试 Worker 可能不需要最强的模型;(4)设置子代理的 max_tokens 限制,防止单次输出过长。Addy Osmani 警告过:"usage patterns can vary wildly if you are token rich or poor",多代理架构在 Token 受限时需要更加克制。

Q3:Claude Code 的 subagents 和 OpenClaw 的 subagents 有什么区别?

Claude Code 的子代理通过 .claude/agents/ 目录预定义,每个子代理是一个包含系统指令的 Markdown 文件,通过 /agents 命令调用,生命周期与主会话绑定。OpenClaw 的 subagents 是运行时动态创建的独立进程,支持通过 sessions_yield 机制返回结果,更适合长时间运行的独立任务循环。Codex 的 .codex/agents/ 与 Claude Code 类似。选择取决于你的使用场景:预定义角色用文件配置,动态任务用运行时创建。

最佳实践与避坑

✅ 最佳实践

按关注点分离定义子代理。 每个子代理应有明确单一的职责(只写代码、只写测试、只做审查),避免万能型子代理。这减少了每个代理的上下文压力,也降低了漂移风险。

使用 Maker-Checker 分离产出与验证。 不要让同一个 Agent 既产出又审核自己的工作。Maker-Checker 模式是 Loop Engineering 中验证器瓶颈的重要缓解手段——独立的 Checker 不会受到 Maker 的认知偏置影响。

控制并行度上限。 即使有 10 个 Worker,同时运行的不应超过 2-3 个。过高的并行度会导致:Token 消耗暴增、结果聚合困难、编排器成为瓶颈。

记录子代理间的所有通信。 多代理系统的可调试性远低于单代理,完整的通信日志是排查问题的关键。

⚠️ 避坑指南

不要为简单任务创建多代理。 单 Agent 能高效完成的任务,拆成多代理只会增加 Token 开销和编排复杂度。只有当任务确实涉及不同专业领域、或需要并行加速时才考虑多代理。

避免子代理间产生反馈循环。 如果 Worker A 的输出依赖 Worker B,而 B 又依赖 A,会形成死锁或无限循环。在设计编排模式时确保任务依赖图是 DAG(有向无环图)。

注意编排器本身也可能漂移。 编排器作为"总指挥"同样会受到上下文漂移的影响,而且它的漂移会影响所有 Worker。3.2 节的漂移检测对编排器尤为重要。

本节小结

本节深入探讨了 Loop Engineering 的多代理编排能力。我们学习了三大编排模式:Orchestrator-Workers 适合并行独立任务,Maker-Checker 适合质量把关,Pipeline 适合顺序依赖任务。我们实现了完整的多代理编排框架,并分析了 Token 成本收益权衡。

核心要点:多代理架构突破单 Agent 的上下文和效率瓶颈;编排模式选择取决于任务关系(依赖→Pipeline,独立→Workers);每个子代理独立消耗 Token,需要严格成本控制;Maker-Checker 分离产出与验证是缓解验证器瓶颈的关键。

下一节将深入探讨多代理架构带来的核心挑战——Token 经济学与成本控制,学习如何在不牺牲质量的前提下最大化循环效率。

延伸阅读

  1. Boris Cherny, "Claude Code Sub-agents: Architecture and Best Practices" — 子代理设计指南
  2. Addy Osmani, "AI Engineering: Token Economics and Agent Orchestration" — Token 经济学与编排策略
  3. OpenClaw Documentation, "Subagents Runtime API" — 运行时子代理 API 参考
  4. Research: "Multi-Agent Systems for Software Engineering" (2025) — 多代理软件工程研究
  5. Claude Code Docs, ".claude/agents/ Configuration Guide" — 子代理配置指南

关键词:多代理编排、Multi-Agent、子代理、Sub-agents、Orchestrator-Workers、Maker-Checker、Pipeline、Claude Code、Codex、并行执行、Token成本、Agent协作
难度:⭐⭐⭐⭐ 中高级
预计阅读时间:25 分钟


发布者: 作者: 转发
评论区 (0)
U