2.2 对话管理 — AgentChat协调机制详解 本节导读:掌握AgentChat中多智能体对话的创建、控制和协调机制,学习如何设计高效的对话流程和解决协作冲突 学习目标 理解AgentChat对话管理的基本概念和架构 掌握多种对话创建方式和使用场景 学会配置对话流程控制参数 能够处理对话过程中的常见问题 设计高效的智能体协作模式 核心概念 对话管理是AgentChat框架的核心功能,负责协调多个智能体之间的交互和信息流动。它提供了丰富的对话模式和控制机制,使开发者能够构建复杂的多智能体协作系统。 环境准备 / 前置知识 Python 3.10+ 异步编程基础 AutoGen-AgentChat库: 基础智能体创建经验(2.
本节导读:掌握AgentChat中多智能体对话的创建、控制和协调机制,学习如何设计高效的对话流程和解决协作冲突
对话管理是AgentChat框架的核心功能,负责协调多个智能体之间的交互和信息流动。它提供了丰富的对话模式和控制机制,使开发者能够构建复杂的多智能体协作系统。
pip install autogen-agentchatimport asyncio from autogen_agentchat.agents import AssistantAgent, UserProxyAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient # 创建基础智能体 writer = AssistantAgent( name="content_writer", model_client=OpenAIChatCompletionClient(model="gpt-4o"), system_message="你是一个专业的内容创作者,擅长撰写高质量文章。" ) reviewer = AssistantAgent( name="content_reviewer", model_client=OpenAIChatCompletionClient(model="gpt-4o"), system_message="你是一个内容评审专家,专注于内容质量评估和改进建议。" ) # 创建对话团队 team = RoundRobinGroupChat(participants=[writer, reviewer]) # 执行对话任务 async def content_creation_workflow(): result = await team.run( task="请为一篇关于人工智能发展趋势的文章进行协作创作和评审。" ) return result # 运行对话流程 result = asyncio.run(content_creation_workflow())
| 团队类型 | 交互模式 | 适用场景 | 特点 |
|---|---|---|---|
| RoundRobinGroupChat | 轮流发言 | 讨论型任务 | 所有智能体依次发言 |
| SequentialGroupChat | 顺序执行 | 流程型任务 | 按指定顺序执行 |
| SelectorGroupChat | 选择发言 | 决策型任务 | 根据条件选择发言者 |
| TwoPartyGroupChat | 双向对话 | 交互型任务 | 两个智能体对话 |
from autogen_agentchat.teams import RoundRobinGroupChat # 基础配置 basic_team = RoundRobinGroupChat( participants=[writer, reviewer] ) # 带终止条件的配置 terminated_team = RoundRobinGroupChat( participants=[writer, reviewer], termination_condition=lambda messages: len([m for m in messages if "完成" in m.content]) > 0 ) # 带停止条件的配置 stopped_team = RoundRobinGroupChat( participants=[writer, reviewer], stop_condition=lambda messages: len(messages) > 20 # 对话超过20轮停止 ) # 带超时设置的配置 timed_team = RoundRobinGroupChat( participants=[writer, reviewer], timeout=300 # 5分钟超时 )
from autogen_agentchat.teams import SequentialGroupChat # 定义工作流程 content_team = SequentialGroupChat( participants=[ writer, # 内容创作 reviewer, # 内容评审 writer # 根据评审意见修改 ], participant_configs=[ {"name": "writer", "role": "创作"}, {"name": "reviewer", "role": "评审"}, {"name": "writer", "role": "修改"} ] ) # 执行顺序流程 async def sequential_content_creation(): result = await content_team.run( task="创建一篇关于机器学习基础概念的技术文章" ) return result
from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.messages import TextMessage, FunctionCallMessage # 消息过滤函数 def filter_relevant_messages(messages): """过滤相关消息""" return [ msg for msg in messages if isinstance(msg, TextMessage) and len(msg.content) > 10 ] # 创建带消息过滤的团队 filtered_team = RoundRobinGroupChat( participants=[writer, reviewer], message_filter=filter_relevant_messages )
import asyncio from typing import List, Dict, Any from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.messages import Message class StatefulTeam(RoundRobinGroupChat): """带状态管理的对话团队""" def __init__(self, participants, **kwargs): super().__init__(participants, **kwargs) self.conversation_state = { "current_phase": "initialization", "completed_tasks": [], "pending_tasks": [], "context": {} } async def run(self, task: str) -> Any: """带状态管理的对话执行""" # 更新任务状态 self.conversation_state["current_task"] = task self.conversation_state["current_phase"] = "task_analysis" # 执行对话 result = await super().run(task) # 更新完成状态 self.conversation_state["completed_tasks"].append(task) self.conversation_state["current_phase"] = "completed" return result def get_conversation_state(self) -> Dict[str, Any]: """获取当前对话状态""" return self.conversation_state.copy() def update_context(self, key: str, value: Any): """更新上下文信息""" self.conversation_state["context"][key] = value
import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.teams import RoundRobinGroupChat # 创建可中断的对话团队 interruptible_team = RoundRobinGroupChat( participants=[writer, reviewer] ) # 中断控制器 class ConversationController: def __init__(self, team): self.team = team self.should_interrupt = False async def run_with_interruption(self, task: str): """可中断的对话执行""" task_future = asyncio.create_task(self.team.run(task)) # 模拟中断条件 await asyncio.sleep(5) # 5秒后中断 self.should_interrupt = True if self.should_interrupt: task_future.cancel() print("对话被手动中断") return {"interrupted": True, "partial_result": None} return await task_future
import json import pickle from autogen_agentchat.teams import RoundRobinGroupChat class PersistableTeam(RoundRobinGroupChat): """可持久化的对话团队""" def __init__(self, participants, **kwargs): super().__init__(participants, **kwargs) self.conversation_history = [] async def run(self, task: str): """执行对话并记录历史""" result = await super().run(task) # 保存对话历史 self.conversation_history.append({ "task": task, "timestamp": asyncio.get_event_loop().time(), "result": result }) return result def save_state(self, filepath: str): """保存对话状态""" state = { "conversation_history": self.conversation_history, "participants": [p.name for p in self.participants] } with open(filepath, 'w', encoding='utf-8') as f: json.dump(state, f, indent=2, ensure_ascii=False)
import logging from datetime import datetime from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.messages import Message class LoggingTeam(RoundRobinGroupChat): """带日志记录的对话团队""" def __init__(self, participants, **kwargs): super().__init__(participants, **kwargs) self.logger = logging.getLogger("autogen_conversation") self.logger.setLevel(logging.INFO) # 创建文件处理器 handler = logging.FileHandler('conversation.log') formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) self.logger.addHandler(handler) async def run(self, task: str): """记录对话执行""" start_time = datetime.now() self.logger.info(f"对话开始 - 任务: {task}") try: result = await super().run(task) end_time = datetime.now() duration = (end_time - start_time).total_seconds() self.logger.info(f"对话完成 - 耗时: {duration:.2f}秒") return result except Exception as e: self.logger.error(f"对话失败 - 错误: {str(e)}") raise
import asyncio from typing import List, Dict, Any from autogen_agentchat.agents import AssistantAgent, UserProxyAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient class DocumentCollaborationSystem: """多智能体文档协作系统""" def __init__(self): self.agents = {} self.team = None self.document_state = { "title": "", "content": "", "reviews": [], "status": "draft" } def setup_agents(self): """设置协作智能体""" # 创建文档写作智能体 self.agents["writer"] = AssistantAgent( name="document_writer", model_client=OpenAIChatCompletionClient(model="gpt-4o"), system_message="""你是一个专业文档作者,擅长: - 结构化文档写作 - 清晰表达技术概念 - 组织逻辑和层次 - 确保内容完整性""" ) # 创建技术审核智能体 self.agents["technical_reviewer"] = AssistantAgent( name="technical_reviewer", model_client=OpenAIChatCompletionClient(model="gpt-4o"), system_message="""你是一个技术审核专家,负责: - 技术准确性检查 - 代码质量评估 - 最佳实践遵循 - 技术深度分析""" ) # 创建编辑智能体 self.agents["editor"] = AssistantAgent( name="content_editor", model_client=OpenAIChatCompletionClient(model="gpt-4o"), system_message="""你是一个内容编辑,负责: - 语言表达优化 - 结构调整建议 - 逻辑流程完善 - 用户体验提升""" ) # 创建质量检查智能体 self.agents["quality_checker"] = AssistantAgent( name="quality_checker", model_client=OpenAIChatCompletionClient(model="gpt-4o"), system_message="""你是一个质量检查员,负责: - 整体质量评估 - 缺陷识别和修复 - 完整性检查 - 最终审核确认""" ) def create_collaboration_team(self): """创建协作团队""" self.team = RoundRobinGroupChat( participants=list(self.agents.values()), termination_condition=self._should_terminate, stop_condition=self._should_stop ) def _should_terminate(self, messages: List[Any]) -> bool: """判断是否应该终止对话""" # 检查是否有完成确认消息 completion_messages = [ msg for msg in messages if isinstance(msg, dict) and "完成" in str(msg.get("content", "")) ] return len(completion_messages) > 0 def _should_stop(self, messages: List[Any]) -> bool: """判断是否应该停止对话""" return len(messages) > 30 # 最多30轮对话 async def collaborate_document_creation(self, topic: str) -> Dict[str, Any]: """协作创建文档""" # 设置初始任务 task = f"请协作创建一篇关于'{topic}'的完整技术文档" print(f"开始文档协作:{topic}") # 执行协作过程 result = await self.team.run(task) # 更新文档状态 self.document_state["status"] = "completed" return { "result": result, "document_state": self.document_state, "collaboration_summary": self._generate_summary() } def _generate_summary(self) -> Dict[str, str]: """生成协作总结""" return { "total_participants": len(self.agents), "collaboration_mode": "round_robin", "document_status": self.document_state["status"], "collaboration_efficiency": "high" } # 使用文档协作系统 async def document_collaboration_example(): # 创建系统实例 system = DocumentCollaborationSystem() # 设置智能体 system.setup_agents() # 创建协作团队 system.create_collaboration_team() # 执行协作 result = await system.collaborate_document_creation("机器学习入门指南") print("协作完成:") print(f"文档状态: {result['document_state']}") print(f"协作总结: {result['collaboration_summary']}") return result
A:可以通过多种方式避免和处理死循环:
# 方法1:设置最大对话轮数 from autogen_agentchat.teams import RoundRobinGroupChat safe_team = RoundRobinGroupChat( participants=[agent1, agent2], max_turns=15, # 限制最大对话轮数 stop_condition=lambda messages: len(messages) > 20 ) # 方法2:使用超时机制 import asyncio class TimeoutTeam(RoundRobinGroupChat): def __init__(self, participants, timeout_seconds=300, **kwargs): super().__init__(participants, **kwargs) self.timeout_seconds = timeout_seconds async def run(self, task: str): """带超时的对话执行""" try: return await asyncio.wait_for( super().run(task), timeout=self.timeout_seconds ) except asyncio.TimeoutError: print("对话超时,停止执行") return {"timeout": True, "partial_result": None}
A:性能优化建议:
# 1. 智能体缓存 from functools import lru_cache class CachedAssistantAgent(AssistantAgent): """带缓存的智能体""" @lru_cache(maxsize=100) async def _generate_response_cached(self, task_hash: str): """缓存响应生成""" # 实现缓存逻辑 pass # 2. 异步执行优化 import asyncio class OptimizedTeam(RoundRobinGroupChat): """优化性能的团队""" async def run(self, task: str): """优化的对话执行""" # 使用任务并行化 tasks = [ asyncio.create_task(self._participant_task(participant, task)) for participant in self.participants ] # 等待任务完成 results = await asyncio.gather(*tasks, return_exceptions=True) return self._combine_results(results)
1. 明确的角色分工
# ✅ 好的实践:明确的角色定义 agent_roles = { "researcher": AssistantAgent( name="research_specialist", system_message="你是一个研究专家,专注于收集和分析信息,提供数据支持。" ), "writer": AssistantAgent( name="content_writer", system_message="你是一个内容创作者,负责将研究转化为清晰、有说服力的内容。" ), "reviewer": AssistantAgent( name="quality_reviewer", system_message="你是一个质量评审专家,确保内容的准确性、完整性和逻辑性。" ) }
2. 合理的对话流程设计
# ✅ 好的实践:顺序合理的流程 sequential_team = SequentialGroupChat( participants=[ agent_roles["researcher"], # 先研究 agent_roles["writer"], # 再写作 agent_roles["reviewer"] # 最后评审 ] )
1. 智能体复用
# ✅ 好的实践:智能体池管理 class AgentPool: def __init__(self): self._agents = {} def get_agent(self, role: str): """获取或创建智能体""" if role not in self._agents: self._agents[role] = self._create_agent(role) return self._agents[role]
本节详细介绍了AgentChat中的对话管理机制,从基础的对话创建到高级的流程控制和性能优化,我们学习了如何设计和管理多智能体对话系统。
关键要点回顾:
下一节将深入探讨AgentChat中的消息传递机制,学习智能体间如何高效交换信息和协调协作。
关键词:AgentChat, 对话管理, 多智能体协作, 团队配置, 流程控制, 对话优化
难度:进阶
预计阅读:40 分钟