2026年04月03日-AI Agent技能每日速递 今日主题:Multi-Agent协作 - 从单一Agent到Agent社会 核心概念 Multi-Agent系统(MAS)是AI Agent领域的前沿方向。不同于单一Agent独立完成任务,Multi-Agent系统通过多个专门化Agent的协作,解决更复杂的问题。 为什么需要Multi-Agent? 任务分解:复杂任务拆解为子任务,每个Agent负责一部分 专长互补:不同Agent拥有不同技能和知识 并行处理:多Agent并行工作,提升效率 容错能力:单个Agent失败不影响整体任务 核心技能:Agent通信协议 消息传递协议 协商机制 核心技能:任务分配 Contract Net协议 核心技能:协作模式 层次式协作 平等式协作
Multi-Agent系统(MAS)是AI Agent领域的前沿方向。不同于单一Agent独立完成任务,Multi-Agent系统通过多个专门化Agent的协作,解决更复杂的问题。
from typing import List, Dict, Any from dataclasses import dataclass @dataclass class Message: """Agent间通信的消息格式""" sender: str # 发送者ID receiver: str # 接收者ID content: Dict[str, Any] # 消息内容 message_type: str # 消息类型 timestamp: float # 时间戳 conversation_id: str # 会话ID(用于关联对话) class AgentCommunicationProtocol: """Agent通信协议""" MESSAGE_TYPES = { "REQUEST": "请求协助", "RESPONSE": "响应结果", "NOTIFICATION": "通知", "QUERY": "查询", "COMMAND": "命令", "INFORM": "信息告知" } @staticmethod def create_message(sender: str, receiver: str, message_type: str, content: Dict[str, Any]) -> Message: """创建标准消息""" import time return Message( sender=sender, receiver=receiver, content=content, message_type=message_type, timestamp=time.time(), conversation_id=f"{sender}-{receiver}-{int(time.time())}" ) # 使用示例 message = AgentCommunicationProtocol.create_message( sender="researcher_agent", receiver="writer_agent", message_type="REQUEST", content={ "task": "write_summary", "data": {"topic": "AI trends", "points": [...]}, "deadline": 3600 # 1小时内完成 } )
class AgentNegotiator: """Agent协商机制""" def __init__(self, agent_id: str): self.agent_id = agent_id self.proposals = {} # 存储提案 def make_proposal(self, task: Dict, offer: Dict) -> str: """提出提案""" proposal_id = f"proposal-{len(self.proposals)}" self.proposals[proposal_id] = { "task": task, "offer": offer, "status": "pending" } return proposal_id def evaluate_proposal(self, proposal: Dict) -> float: """评估提案(返回0-1的分数)""" # 计算能力匹配度 capability_score = self._check_capability(proposal["task"]) # 计算资源可用性 resource_score = self._check_resource_availability() # 计算时间可行性 time_score = self._check_time_feasibility(proposal["task"]["deadline"]) # 加权平均 total_score = ( capability_score * 0.5 + resource_score * 0.3 + time_score * 0.2 ) return total_score def accept_proposal(self, proposal_id: str) -> bool: """接受提案""" if proposal_id in self.proposals: self.proposals[proposal_id]["status"] = "accepted" return True return False def reject_proposal(self, proposal_id: str, reason: str) -> bool: """拒绝提案""" if proposal_id in self.proposals: self.proposals[proposal_id]["status"] = "rejected" self.proposals[proposal_id]["reason"] = reason return True return False
from typing import List import random class ContractNetProtocol: """Contract Net协议:用于任务分配""" def __init__(self, manager_agent: str, worker_agents: List[str]): self.manager_agent = manager_agent self.worker_agents = worker_agents def announce_task(self, task: Dict) -> List[Dict]: """1. Manager发布任务(Call for Proposals)""" announcements = [] for worker in self.worker_agents: message = { "type": "CFP", # Call for Proposals "task": task, "deadline": task.get("deadline", 3600) } announcements.append((worker, message)) return announcements def submit_bid(self, worker_agent: str, task: Dict, bid: Dict) -> Dict: """2. Worker提交投标""" return { "type": "BID", "worker": worker_agent, "task_id": task["id"], "bid": bid, # 包含:预估成本、时间、质量保证 "confidence": bid.get("confidence", 0.8) } def evaluate_bids(self, bids: List[Dict]) -> str: """3. Manager评估投标,选择最佳Worker""" # 根据多个因素评分 scored_bids = [] for bid in bids: score = ( bid["bid"]["cost"] * -0.3 + # 成本越低越好 bid["bid"]["time"] * -0.3 + # 时间越短越好 bid["confidence"] * 0.4 # 信心越高越好 ) scored_bids.append((score, bid)) # 选择最高分 scored_bids.sort(reverse=True) return scored_bids[0][1]["worker"] def award_contract(self, worker: str, task: Dict): """4. Manager授予合同""" return { "type": "AWARD", "worker": worker, "task": task, "status": "awarded" } # 使用示例 cnp = ContractNetProtocol("manager", ["worker1", "worker2", "worker3"]) # 发布任务 task = { "id": "task-001", "type": "research", "topic": "Multi-Agent Systems", "deadline": 7200 } announcements = cnp.announce_task(task) # Workers提交投标 bids = [] for worker in ["worker1", "worker2", "worker3"]: bid = { "cost": random.randint(50, 100), "time": random.randint(3000, 6000), "confidence": random.random() } bids.append(cnp.submit_bid(worker, task, bid)) # Manager选择最佳投标 best_worker = cnp.evaluate_bids(bids) contract = cnp.award_contract(best_worker, task) print(f"Contract awarded to: {best_worker}")
class HierarchicalMultiAgent: """层次式Multi-Agent系统""" def __init__(self): self.coordinator = CoordinatorAgent("coordinator") self.agents = { "researcher": ResearcherAgent("researcher"), "writer": WriterAgent("writer"), "reviewer": ReviewerAgent("reviewer"), "publisher": PublisherAgent("publisher") } def execute_task(self, task: Dict): """执行任务(通过协调器)""" # 1. 任务分解 subtasks = self.coordinator.decompose_task(task) # 2. 分配子任务 assignments = self.coordinator.assign_subtasks(subtasks, self.agents) # 3. 并行执行 results = {} for agent_id, subtask in assignments.items(): agent = self.agents[agent_id] result = agent.execute(subtask) results[agent_id] = result # 4. 聚合结果 final_result = self.coordinator.aggregate_results(results) return final_result class CoordinatorAgent: """协调者Agent""" def decompose_task(self, task: Dict) -> List[Dict]: """分解任务""" if task["type"] == "create_article": return [ {"type": "research", "topic": task["topic"]}, {"type": "write", "outline": None}, # 将由研究填充 {"type": "review", "draft": None}, {"type": "publish", "article": None} ] def assign_subtasks(self, subtasks: List[Dict], agents: Dict) -> Dict: """分配子任务""" mapping = { "research": "researcher", "write": "writer", "review": "reviewer", "publish": "publisher" } assignments = {} for subtask in subtasks: agent_id = mapping[subtask["type"]] assignments[agent_id] = subtask return assignments def aggregate_results(self, results: Dict) -> Dict: """聚合结果""" # 简单聚合(实际可能需要更复杂的逻辑) return { "research": results["researcher"], "article": results["writer"], "review": results["reviewer"], "published_url": results["publisher"] }
class EgalitarianMultiAgent: """平等式Multi-Agent系统(无中央协调器)""" def __init__(self, agents: List): self.agents = agents self.shared_memory = {} # 共享内存 self.blackboard = {} # 黑板(公开工作区) def collaborate(self, task: Dict): """协作完成任务""" # 1. 发布任务到黑板 self.blackboard["task"] = task self.blackboard["status"] = "pending" # 2. Agents自主决定是否参与 participants = [] for agent in self.agents: if agent.can_handle(task): participants.append(agent) # 3. 通过投票选择方案 proposals = [] for agent in participants: proposal = agent.propose_solution(task) proposals.append(proposal) # 4. 投票 best_proposal = self._vote(proposals, participants) # 5. 执行最佳方案 result = self._execute_proposal(best_proposal, participants) return result def _vote(self, proposals: List, agents: List) -> Dict: """投票机制""" votes = {} for agent in agents: preference = agent.rank_proposals(proposals) for i, proposal in enumerate(proposals): score = len(proposals) - i # 简单排名投票 votes[proposal["id"]] = votes.get(proposal["id"], 0) + score # 返回得票最高的提案 best_id = max(votes, key=votes.get) return next(p for p in proposals if p["id"] == best_id)
class FederationAgent: """联邦Agent(自治的本地协作)""" def __init__(self, agent_id: str, local_agents: List): self.agent_id = agent_id self.local_agents = local_agents # 本地管理的Agents self.peer_federations = [] # 其他联邦 def local_collaboration(self, task: Dict): """本地协作""" for agent in self.local_agents: if agent.can_handle(task): return agent.execute(task) return None def request_peer_assistance(self, task: Dict): """请求对等联邦协助""" for federation in self.peer_federations: result = federation.handle_external_request(task, self.agent_id) if result: return result return None def handle_external_request(self, task: Dict, requester: str): """处理外部请求""" # 检查是否愿意协助(基于信任度、资源等) if self._should_assist(requester, task): return self.local_collaboration(task) return None
class ReportGenerationSystem: """Multi-Agent报告生成系统""" def __init__(self): # 创建Agents self.planner = PlannerAgent("planner") self.researchers = [ WebResearcherAgent("web_researcher"), DatabaseResearcherAgent("db_researcher") ] self.writer = WriterAgent("writer") self.reviewer = ReviewerAgent("reviewer") def generate_report(self, topic: str): """生成报告""" # 1. 规划 plan = self.planner.create_plan(topic) print(f"Plan created: {len(plan['sections'])} sections") # 2. 研究(并行) research_tasks = [] for section in plan["sections"]: for researcher in self.researchers: task = researcher.create_task(section) research_tasks.append(task) # 并行执行研究 research_results = self._parallel_execute(research_tasks) # 3. 撰写 draft = self.writer.write_report(plan, research_results) # 4. 审查(可能需要迭代) review = self.reviewer.review(draft) if review["needs_revision"]: revised_draft = self.writer.revise(draft, review["comments"]) final_report = revised_draft else: final_report = draft return final_report def _parallel_execute(self, tasks: List) -> List: """并行执行任务""" from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=5) as executor: results = list(executor.map(lambda t: t.execute(), tasks)) return results # 具体Agent实现 class PlannerAgent: def create_plan(self, topic: str) -> Dict: return { "topic": topic, "sections": [ {"title": "Introduction", "points": 5}, {"title": "Background", "points": 10}, {"title": "Analysis", "points": 15}, {"title": "Conclusion", "points": 5} ], "timeline": 3600 # 1小时 } class WebResearcherAgent: def create_task(self, section: Dict): return lambda: self._research(section["title"]) def _research(self, topic: str) -> Dict: # 实际实现会调用搜索API return {"topic": topic, "content": f"Research on {topic}"} class WriterAgent: def write_report(self, plan: Dict, research: List) -> str: # 使用LLM撰写报告 prompt = self._create_prompt(plan, research) # 调用LLM... return "Generated report content" def revise(self, draft: str, comments: List) -> str: # 根据审查意见修改 revised_prompt = f"Revise the following based on comments: {comments}\n\n{draft}" # 调用LLM... return "Revised report"
from langgraph.graph import StateGraph from langgraph.prebuilt import ToolExecutor # 定义Multi-Agent工作流 workflow = StateGraph(AgentState) # 添加Agent节点 workflow.add_node("researcher", research_agent) workflow.add_node("writer", writer_agent) workflow.add_node("reviewer", reviewer_agent) # 定义边(连接规则) workflow.add_conditional_edges( "researcher", should_continue, { "continue": "writer", "end": END } ) workflow.add_edge("writer", "reviewer") workflow.add_edge("reviewer", END) # 编译工作流 app = workflow.compile()
import autogen # 创建Agents assistant = autogen.AssistantAgent( name="assistant", llm_config={"model": "gpt-4"} ) user_proxy = autogen.UserProxyAgent( name="user_proxy", human_input_mode="NEVER", max_consecutive_auto_reply=10 ) # 启动对话 user_proxy.initiate_chat( assistant, message="Write a report on Multi-Agent Systems" )
from crewai import Agent, Task, Crew # 定义Agents researcher = Agent( role='Researcher', goal='Research AI trends', backstory='An expert researcher', tools=[search_tool] ) writer = Agent( role='Writer', goal='Write reports', backstory='A skilled writer', tools=[file_tool] ) # 定义任务 task1 = Task( description='Research recent AI trends', agent=researcher ) task2 = Task( description='Write a report based on research', agent=writer ) # 创建Crew crew = Crew( agents=[researcher, writer], tasks=[task1, task2], verbose=2 ) # 执行 result = crew.kickoff()
Multi-Agent系统的学习与进化 - Agent如何通过交互学习和改进
💡 核心洞察:Multi-Agent系统的核心不是"多个Agent",而是"有效的协作"。
📊 数据点:研究表明,3-5个专门化Agent的协作可以比单一Agent效率提升200-500%。
🔗 关键论文: