3.3 协作模式


文档摘要

3.3 协作模式 — AutoGen多智能体协作机制与模式 本节导读:深入理解AutoGen中多智能体的协作模式,学习不同的协作策略、冲突解决机制和团队构建方法,构建高效的多智能体协作系统。 学习目标 理解AutoGen多智能体协作的核心概念和模式 掌握不同协作模式的适用场景和实现方法 学会实现智能体间的通信和状态共享 了解冲突解决和共识达成机制 掌握团队构建和动态调整策略 核心概念 AutoGen的多智能体协作系统支持多种协作模式,从简单的对话交互到复杂的团队协作,为不同场景提供了灵活的解决方案。

3.3 协作模式 — AutoGen多智能体协作机制与模式

本节导读:深入理解AutoGen中多智能体的协作模式,学习不同的协作策略、冲突解决机制和团队构建方法,构建高效的多智能体协作系统。

学习目标

  • 理解AutoGen多智能体协作的核心概念和模式
  • 掌握不同协作模式的适用场景和实现方法
  • 学会实现智能体间的通信和状态共享
  • 了解冲突解决和共识达成机制
  • 掌握团队构建和动态调整策略

核心概念

AutoGen的多智能体协作系统支持多种协作模式,从简单的对话交互到复杂的团队协作,为不同场景提供了灵活的解决方案。

协作模式分类

分步实战

步骤 1:基础协作模式实现

首先实现基础的Round Robin协作模式:

import asyncio from typing import List, Dict, Optional from dataclasses import dataclass, field from enum import Enum class AgentStatus(Enum): IDLE = "idle" BUSY = "busy" WAITING = "waiting" ERROR = "error" class MessageType(Enum): TASK = "task" RESULT = "result" ERROR = "error" HEARTBEAT = "heartbeat" COORDINATION = "coordination" @dataclass class Message: id: str type: MessageType content: str sender: str receiver: str timestamp: float = field(default_factory=lambda: asyncio.get_event_loop().time()) priority: int = 1 @dataclass class Agent: id: str name: str capabilities: List[str] status: AgentStatus = AgentStatus.IDLE current_task: Optional[str] = None message_queue: List[Message] = field(default_factory=list) neighbors: List[str] = field(default_factory=list) processing_time: float = 0.0 success_rate: float = 1.0 def send_message(self, message: Message): """发送消息""" self.message_queue.append(message) print(f"📤 {self.name} 发送消息: {message.type.value} -> {message.receiver}") def receive_message(self, message: Message) -> bool: """接收消息""" if self.id == message.receiver: self.message_queue.append(message) print(f"📥 {self.name} 收到消息: {message.type.value} <- {message.sender}") return True return False def process_messages(self) -> List[Message]: """处理消息队列""" processed = [] while self.message_queue: message = self.message_queue.pop(0) processed.append(message) self._handle_message(message) return processed def _handle_message(self, message: Message): """处理单条消息""" if message.type == MessageType.TASK: self.current_task = message.content self.status = AgentStatus.BUSY elif message.type == MessageType.RESULT: print(f"✅ {self.name} 收到结果: {message.content}") elif message.type == MessageType.ERROR: print(f"❌ {self.name} 收到错误: {message.content}") elif message.type == MessageType.COORDINATION: print(f"🤝 {self.name} 处理协调请求: {message.content}") class RoundRobinCollaboration: def __init__(self, agents: List[Agent]): self.agents = {agent.id: agent for agent in agents} self.current_agent_index = 0 self.round_count = 0 self.message_bus = MessageBus() def add_neighbor_connections(self): """添加邻居连接""" agent_ids = list(self.agents.keys()) for i, agent in enumerate(self.agents.values()): # 每个智能体连接到其他智能体 neighbor_ids = [aid for aid in agent_ids if aid != agent.id] agent.neighbors = neighbor_ids print(f"🔗 {agent.name} 的邻居: {[self.agents[aid].name for aid in neighbor_ids]}") async def execute_round_robin_task(self, task: str): """执行Round Robin协作任务""" print(f"🔄 开始Round Robin协作,任务: {task}") print("=" * 60) # 分配任务给所有智能体 for agent_id, agent in self.agents.items(): message = Message( id=f"msg_{self.round_count}_{agent_id}", type=MessageType.TASK, content=task, sender="coordinator", receiver=agent_id ) self.message_bus.send_message(message) # 等待任务完成 await self._wait_for_completion() async def _wait_for_completion(self): """等待所有智能体完成任务""" completed_agents = set() while len(completed_agents) < len(self.agents): await asyncio.sleep(1) # 等待处理 # 检查每个智能体的状态 for agent_id, agent in self.agents.items(): if agent_id not in completed_agents and agent.status == AgentStatus.IDLE: completed_agents.add(agent_id) print(f"✅ {agent.name} 任务完成") print(f"🎉 所有智能体任务完成,共耗时: {len(completed_agents)}轮") class MessageBus: """消息总线,负责消息传递""" def __init__(self): self.messages: List[Message] = [] def send_message(self, message: Message): """发送消息到总线""" self.messages.append(message) def get_messages_for_agent(self, agent_id: str) -> List[Message]: """获取指定智能体的消息""" return [msg for msg in self.messages if msg.receiver == agent_id] # 使用示例 async def round_robin_demo(): # 创建智能体 agents = [ Agent("agent1", "数据分析员", ["analysis", "reporting"]), Agent("agent2", "代码工程师", ["coding", "testing"]), Agent("agent3", "测试工程师", ["testing", "validation"]), Agent("agent4", "文档工程师", ["documentation", "review"]) ] # 创建协作系统 collaboration = RoundRobinCollaboration(agents) collaboration.add_neighbor_connections() # 执行协作任务 task = "开发一个用户认证系统并生成测试报告" await collaboration.execute_round_robin_task(task) # 模拟通信 await collaboration.simulate_communication() if __name__ == "__main__": asyncio.run(round_robin_demo())

步骤 2:Sequential协作模式实现

实现Sequential(顺序)协作模式:

import asyncio from typing import List, Dict, Optional from dataclasses import dataclass, field from enum import Enum class TaskStage(Enum): ANALYSIS = "analysis" DESIGN = "design" IMPLEMENTATION = "implementation" TESTING = "testing" DOCUMENTATION = "documentation" @dataclass class SequentialTask: id: str name: str stage: TaskStage dependencies: List[str] = field(default_factory=list) estimated_duration: float = 10.0 assigned_agent: Optional[str] = None status: str = "pending" result: Optional[str] = None class SequentialCollaboration: def __init__(self, agents: Dict[str, 'Agent']): self.agents = agents self.tasks: Dict[str, SequentialTask] = {} self.task_graph: Dict[str, List[str]] = {} self.completed_tasks: set = set() self.current_stage = TaskStage.ANALYSIS def add_task(self, task: SequentialTask): """添加任务""" self.tasks[task.id] = task if task.dependencies: self.task_graph[task.id] = task.dependencies else: self.task_graph[task.id] = [] def get_ready_tasks(self) -> List[SequentialTask]: """获取可以执行的任务""" ready_tasks = [] for task in self.tasks.values(): if task.status == "pending": # 检查依赖任务是否完成 dependencies_met = all( dep_id in self.completed_tasks for dep_id in task.dependencies ) if dependencies_met: ready_tasks.append(task) return ready_tasks def _find_best_agent_for_task(self, task: SequentialTask) -> Optional[str]: """为任务找到最佳智能体""" best_agent = None best_score = -1 for agent_id, agent in self.agents.items(): # 检查智能体是否空闲 if agent.status != "idle": continue # 检查智能体能力是否匹配任务类型 capability_mapping = { TaskStage.ANALYSIS: ["analysis", "data_analysis"], TaskStage.DESIGN: ["design", "architecture"], TaskStage.IMPLEMENTATION: ["coding", "development"], TaskStage.TESTING: ["testing", "qa"], TaskStage.DOCUMENTATION: ["documentation", "writing"] } required_capabilities = capability_mapping.get(task.stage, []) if not any(cap in agent.capabilities for cap in required_capabilities): continue # 计算匹配度分数 score = len(set(agent.capabilities) & set(required_capabilities)) if score > best_score: best_score = score best_agent = agent_id return best_agent async def execute_sequential_collaboration(self): """执行顺序协作""" print("🚀 开始顺序协作任务") print("=" * 60) stage_order = [ TaskStage.ANALYSIS, TaskStage.DESIGN, TaskStage.IMPLEMENTATION, TaskStage.TESTING, TaskStage.DOCUMENTATION ] for stage in stage_order: print(f"\n📋 当前阶段: {stage.value}") await self._execute_stage(stage) print(f"\n🎉 所有阶段完成!") # 输出最终结果 print("\n📊 任务完成情况:") for task in self.tasks.values(): status_emoji = "✅" if task.status == "completed" else "❌" print(f"{status_emoji} {task.name}: {task.status}") async def _execute_stage(self, stage: TaskStage): """执行单个阶段""" self.current_stage = stage # 执行该阶段的任务 ready_tasks = self.get_ready_tasks() for task in ready_tasks: best_agent = self._find_best_agent_for_task(task) if best_agent: task.assigned_agent = best_agent task.status = "assigned" await self._execute_agent_task(best_agent, task) await asyncio.sleep(2) async def _execute_agent_task(self, agent_id: str, task: SequentialTask): """执行智能体任务""" agent = self.agents[agent_id] print(f"🔄 {agent.name} 开始执行: {task.name}") # 模拟任务执行 await asyncio.sleep(task.estimated_duration) # 随机成功或失败 import random success = random.random() > 0.1 # 90%成功率 if success: task.status = "completed" self.completed_tasks.add(task.id) task.result = f"任务{task.name}执行成功" else: task.status = "failed" task.result = f"任务{task.name}执行失败"

步骤 3:Selector协作模式实现

实现Selector(选择器)协作模式:

import asyncio from typing import List, Dict, Optional from dataclasses import dataclass, field from enum import Enum class DecisionType(Enum): MAJORITY_VOTE = "majority_vote" WEIGHTED_VOTE = "weighted_vote" EXPERT_OPINION = "expert_opinion" CONDITIONAL = "conditional" class DecisionContext: def __init__(self, problem: str, options: List[str], constraints: List[str] = None): self.problem = problem self.options = options self.constraints = constraints or [] self.votes = {} self.decision = None def add_vote(self, agent_id: str, option: str, weight: float = 1.0): """添加投票""" if agent_id not in self.votes: self.votes[agent_id] = [] self.votes[agent_id].append((option, weight)) def get_vote_results(self) -> Dict[str, float]: """获取投票结果""" results = {} for option in self.options: total_weight = 0 for votes in self.votes.values(): for vote_option, weight in votes: if vote_option == option: total_weight += weight results[option] = total_weight return results class SelectorCollaboration: def __init__(self, agents: Dict[str, 'Agent']): self.agents = agents self.decision_history: List[DecisionContext] = [] self.decision_mechanism = DecisionType.MAJORITY_VOTE def set_decision_mechanism(self, mechanism: DecisionType): """设置决策机制""" self.decision_mechanism = mechanism print(f"🎯 决策机制已设置为: {mechanism.value}") async def make_collective_decision(self, problem: str, options: List[str], constraints: List[str] = None) -> str: """做出集体决策""" print(f"🤝 开始集体决策: {problem}") print(f"📋 选项: {', '.join(options)}") # 创建决策上下文 context = DecisionContext(problem, options, constraints) # 收集智能体意见 await self._collect_agent_opinions(context) # 根据决策机制做出决策 decision = await self._apply_decision_mechanism(context) # 记录决策历史 self.decision_history.append(context) print(f"🎉 最终决策: {decision}") return decision async def _collect_agent_opinions(self, context: DecisionContext): """收集智能体意见""" print("\n📊 收集智能体意见...") # 并行收集意见 opinion_tasks = [] for agent_id, agent in self.agents.items(): task = self._get_agent_opinion(agent_id, agent, context) opinion_tasks.append(task) opinions = await asyncio.gather(*opinion_tasks) # 处理意见 for agent_id, option, weight in opinions: context.add_vote(agent_id, option, weight) print(f"🤔 {self.agents[agent_id].name} 支持: {option} (权重: {weight})") async def _get_agent_opinion(self, agent_id: str, agent: 'Agent', context: DecisionContext) -> tuple: """获取智能体意见""" print(f"🤔 {agent.name} 正在分析问题...") await asyncio.sleep(1) # 根据智能体特点生成意见 opinion = self._generate_agent_opinion(agent, context) # 计算权重 weight = self._calculate_vote_weight(agent, opinion, context) return (agent_id, opinion, weight) def _generate_agent_opinion(self, agent: 'Agent', context: DecisionContext) -> str: """生成智能体意见""" if "analytical" in agent.capabilities: return self._get_technical_optimal(context.options) elif "creative" in agent.capabilities: return self._get_innovative_option(context.options) elif "practical" in agent.capabilities: return self._get_practical_option(context.options) else: return context.options[0] def _calculate_vote_weight(self, agent: 'Agent', opinion: str, context: DecisionContext) -> float: """计算投票权重""" base_weight = 1.0 if "expert" in agent.capabilities: base_weight = 2.0 elif "novice" in agent.capabilities: base_weight = 0.5 return base_weight def _majority_vote(self, vote_results: Dict[str, float]) -> str: """多数投票决策""" winner = max(vote_results.items(), key=lambda x: x[1]) print(f"🗳️ 多数投票结果: {winner[0]} ({winner[1]}票)") return winner[0] async def _apply_decision_mechanism(self, context: DecisionContext) -> str: """应用决策机制""" vote_results = context.get_vote_results() if self.decision_mechanism == DecisionType.MAJORITY_VOTE: return self._majority_vote(vote_results) elif self.decision_mechanism == DecisionType.WEIGHTED_VOTE: return self._weighted_vote(vote_results) else: return self._majority_vote(vote_results)

常见问题 FAQ

Q1:如何智能体间出现意见冲突如何解决?

A:可以通过层次决策机制解决冲突:

class HierarchicalConflictResolver: def __init__(self, agents: Dict[str, 'Agent']): self.agents = agents self.authority_levels = { "system_admin": ["analyst", "manager"], "technical_lead": ["developer", "designer"], "qa_lead": ["tester"] } async def resolve_conflict(self, conflicting_agents: List[str], decision_context: DecisionContext) -> str: """通过层次结构解决冲突""" # 按权限级别分组 groups = self._group_by_authority(conflicting_agents) # 逐级决策 for level, agents in groups.items(): if len(agents) > 1: # 同级冲突,投票解决 decision = await self._resolve_level_conflict(agents, decision_context) else: # 单个智能体,直接采用其意见 decision = self._get_agent_opinion(agents[0], decision_context) return decision

Q2:如何实现智能体间的状态同步?

A:可以使用共享状态管理器:

class SharedStateManager: def __init__(self): self.shared_state = {} self.subscribers = defaultdict(list) self.lock = asyncio.Lock() async def update_state(self, key: str, value: Any): """更新共享状态""" async with self.lock: self.shared_state[key] = value # 通知订阅者 for callback in self.subscribers[key]: await callback(key, value) async def subscribe_to_state(self, key: str, callback: Callable): """订阅状态变化""" self.subscribers[key].append(callback)

最佳实践与避坑

最佳实践

  1. 明确协作模式:根据问题类型选择合适的协作模式
  2. 定义清晰的协议:智能体间通信使用标准化的消息格式
  3. 实现容错机制:处理智能体失败和异常情况
  4. 建立监控体系:实时监控协作性能和状态
  5. 文档化协作规则:记录协作策略和决策流程

常见避坑

  1. 过度复杂化:避免设计过于复杂的协作规则
  2. 状态不一致:确保所有智能体对共享状态的理解一致
  3. 死锁风险:避免循环依赖和资源竞争
  4. 性能瓶颈:减少不必要的通信和同步开销
  5. 可扩展性问题:设计可扩展的协作架构

本节小结

通过本节的学习,我们深入理解了AutoGen多智能体协作的不同模式和实现方法。从基础的Round Robin模式到复杂的Selector决策模式,我们掌握了构建高效协作系统的关键技术。

关键要点包括:

  1. 协作模式选择:根据任务特点选择合适的协作模式
  2. 智能通信机制:实现高效的消息传递和状态同步
  3. 冲突解决策略:建立智能的冲突检测和解决机制
  4. 性能优化:通过负载均衡和缓存提升协作效率

下一节我们将开始第4章代码执行与环境的探讨。

延伸阅读

关键词:AutoGen, 协作模式, 多智能体, 冲突解决, 状态管理
难度:进阶
预计阅读:15分钟


发布者: 作者: 转发
评论区 (0)
U