3.1 工作流设计 — AutoGen工作流架构与设计模式 本节导读:深入理解AutoGen工作流的核心设计理念和架构模式,掌握如何设计和实现可扩展的多智能体协作流程,为构建复杂AI系统奠定坚实基础。 学习目标 理解AutoGen工作流的分层架构和设计理念 掌握工作流组件的配置和组合方法 学会使用状态管理和任务分发机制 能够设计符合实际需求的多智能体协作流程 了解工作流的性能优化和监控策略 核心概念 AutoGen的工作流设计基于事件驱动的异步架构,采用分层模式实现可扩展性和灵活性。
本节导读:深入理解AutoGen工作流的核心设计理念和架构模式,掌握如何设计和实现可扩展的多智能体协作流程,为构建复杂AI系统奠定坚实基础。
AutoGen的工作流设计基于事件驱动的异步架构,采用分层模式实现可扩展性和灵活性。工作流系统主要由以下几个核心组件构成:
# 核心依赖 pip install "autogen-core>=0.4.0" pip install "autogen-agentchat>=0.4.0" pip install "autogen-ext>=0.4.0" # 可选依赖 pip install "autogen-studio>=0.4.0" # Web界面 pip install "asyncio" # 异步编程
首先创建一个简单的工作流,实现两个智能体的基本协作:
import asyncio from autogen_agentchat.agents import AssistantAgent, UserProxyAgent from autogen_agentchat.teams import RoundRobinGroupChat async def basic_workflow(): # 创建智能体 assistant = AssistantAgent( name="assistant", description="AI助手,负责回答问题和生成内容" ) user_proxy = UserProxyAgent( name="user", description="用户代理,负责执行代码和提供反馈" ) # 创建工作流 team = RoundRobinGroupChat( [assistant, user_proxy], termination_condition="max_turns=5" ) # 执行工作流 async for message in team.run("创建一个简单的Python计算器程序"): print(f"Agent: {message.source.name}") print(f"Message: {message.content}") print("-" * 50) # 运行工作流 asyncio.run(basic_workflow())
创建具有状态管理能力的工作流,实现多轮对话和任务追踪:
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_agentchat.conditions import TextMentionTermination async def stateful_workflow(): # 创建具有记忆的智能体 coding_assistant = AssistantAgent( name="coding_assistant", description="编程助手,帮助用户解决编程问题", system_message="你是一位专业的编程助手,具有上下文记忆能力。" ) code_executor = UserProxyAgent( name="code_executor", description="代码执行器,负责运行代码并提供反馈", human_input_mode="NEVER", code_execution_config={"use_docker": False} ) # 创建状态感知的工作流 team = RoundRobinGroupChat( [coding_assistant, code_executor], termination_condition=TextMentionTermination("问题解决完成"), max_turns=10 ) # 定义任务 task = """ 创建一个待办事项管理系统的核心功能: 1. 添加任务 2. 查看所有任务 3. 标记任务完成 4. 删除任务 请逐步实现这些功能。 """ # 执行工作流 print("开始执行待办事项管理系统开发任务...") async for message in team.run(task): print(f"🤖 {message.source.name}: {message.content}") print("-" * 60) if __name__ == "__main__": asyncio.run(stateful_workflow())
设计更复杂的协作模式,实现团队分工和专业协作:
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent from autogen_agentchat.teams import RoundRobinGroupChat, SelectorGroupChat from autogen_agentchat.conditions import MaxMessageTermination async def complex_workflow(): # 创建专业智能体 product_manager = AssistantAgent( name="product_manager", description="产品经理,负责需求分析和产品规划", system_message="你是产品经理,负责分析用户需求,制定产品功能规划。" ) frontend_developer = AssistantAgent( name="frontend_developer", description="前端开发者,负责用户界面实现", system_message="你是前端开发者,专注于用户界面的实现和优化。" ) backend_developer = AssistantAgent( name="backend_developer", description="后端开发者,负责业务逻辑实现", system_message="你是后端开发者,专注于服务端逻辑和数据库设计。" ) tester = AssistantAgent( name="tester", description="测试工程师,负责质量保证", system_message="你是测试工程师,负责功能测试和质量控制。" ) # 创建分层工作流 # 第一阶段:需求分析 planning_team = RoundRobinGroupChat( [product_manager, frontend_developer, backend_developer], termination_condition=MaxMessageTermination(3), name="planning_team" ) # 第二阶段:实现和测试 development_team = RoundRobinGroupChat( [frontend_developer, backend_developer, tester], termination_condition=MaxMessageTermination(10), name="development_team" ) # 创建主工作流协调器 def workflow_coordinator(context): if "规划完成" in context.get_last_message().content: return "development_team" elif "测试完成" in context.get_last_message().content: return "done" return "planning_team" main_team = SelectorGroupChat( [planning_team, development_team], workflow_coordinator, termination_condition=MaxMessageTermination(20), name="main_workflow" ) # 执行完整工作流 project_task = """ 开发一个在线图书商城系统,包含以下核心功能: - 用户注册和登录 - 图书浏览和搜索 - 购物车管理 - 订单处理 - 支付集成 请按照产品规划→前端开发→后端开发→测试的流程进行。 """ print("🚀 开始开发在线图书商城系统...") async for message in main_team.run(project_task): print(f"📋 {message.source.name} ({message.source.team}): {message.content}") print("-" * 70) if __name__ == "__main__": asyncio.run(complex_workflow())
实现高性能工作流,包含并行处理和结果聚合:
import asyncio from autogen_agentchat.agents import AssistantAgent, UserProxyAgent from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor async def optimized_workflow(): # 创建高性能代码执行器 code_executor = UserProxyAgent( name="code_executor", code_execution_config={ "executor": DockerCommandLineCodeExecutor( image="python:3.11-slim", work_dir="/workspace" ), "use_docker": True } ) # 创建并行处理智能体 data_analyst = AssistantAgent( name="data_analyst", description="数据分析专家,负责数据分析和可视化", system_message="你是数据分析专家,擅长处理大规模数据和创建可视化图表。" ) ml_engineer = AssistantAgent( name="ml_engineer", description="机器学习工程师,负责模型训练和优化", system_message="你是机器学习工程师,专注于模型开发和性能优化。" ) # 创建并行任务处理器 async def parallel_tasks(): # 任务1:数据分析 data_task = data_analyst.run("分析销售数据并生成月度报告") # 任务2:模型训练 ml_task = ml_engineer.run("训练销售预测模型并评估性能") # 任务3:并行执行 results = await asyncio.gather(data_task, ml_task) # 任务4:结果聚合 aggregator = AssistantAgent( name="aggregator", description="结果聚合器,整合分析结果" ) final_result = await aggregator.run( f"整合以下分析结果:\n" f"数据分析结果:{results[0]}\n" f"模型训练结果:{results[1]}\n" f"提供综合业务建议。" ) return final_result # 执行优化工作流 print("🔄 开始执行并行数据分析任务...") try: result = await parallel_tasks() print("✅ 任务完成,结果:") print(result) except Exception as e: print(f"❌ 执行失败:{e}") if __name__ == "__main__": asyncio.run(optimized_workflow())
""" AutoGen完整工作流示例:项目管理协作系统 实现产品规划、开发、测试的完整协作流程 """ import asyncio from typing import Dict, List from dataclasses import dataclass from autogen_agentchat.agents import AssistantAgent, UserProxyAgent from autogen_agentchat.teams import RoundRobinGroupChat, SelectorGroupChat from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination @dataclass class Project: name: str description: str status: str = "planning" requirements: List[str] = None tasks: List[Dict] = None def __post_init__(self): if self.requirements is None: self.requirements = [] if self.tasks is None: self.tasks = [] class ProjectManagementWorkflow: def __init__(self): self.project = None self.agents = self._create_agents() self.teams = self._create_teams() def _create_agents(self): """创建项目团队智能体""" return { "product_owner": AssistantAgent( name="product_owner", description="产品负责人,负责需求分析和优先级排序", system_message="你是产品负责人,负责用户需求分析、产品规划和优先级排序。" ), "tech_lead": AssistantAgent( name="tech_lead", description="技术负责人,负责技术方案设计和架构规划", system_message="你是技术负责人,负责系统架构设计、技术选型和方案评估。" ), "frontend_dev": AssistantAgent( name="frontend_dev", description="前端开发工程师,负责用户界面实现", system_message="你是前端开发工程师,专注于用户界面实现和用户体验优化。" ), "backend_dev": AssistantAgent( name="backend_dev", description="后端开发工程师,负责业务逻辑实现", system_message="你是后端开发工程师,专注于服务端逻辑和数据库设计。" ), "qa_engineer": AssistantAgent( name="qa_engineer", description="质量保证工程师,负责测试和质量控制", system_message="你是质量保证工程师,负责功能测试、性能测试和质量保证。" ), "tech_writer": AssistantAgent( name="tech_writer", description="技术文档工程师,负责文档编写", system_message="你是技术文档工程师,负责技术文档编写、用户手册制作。" ) } def _create_teams(self): """创建协作团队""" return { "planning_team": RoundRobinGroupChat( [self.agents["product_owner"], self.agents["tech_lead"]], termination_condition=TextMentionTermination("规划完成"), name="planning_team" ), "development_team": RoundRobinGroupChat( [self.agents["frontend_dev"], self.agents["backend_dev"]], termination_condition=MaxMessageTermination(15), name="development_team" ), "qa_team": RoundRobinGroupChat( [self.agents["qa_engineer"], self.agents["tech_writer"]], termination_condition=TextMentionTermination("测试完成"), name="qa_team" ), "documentation_team": RoundRobinGroupChat( [self.agents["tech_writer"]], termination_condition=TextMentionTermination("文档完成"), name="documentation_team" ) } def _get_next_team(self, context): """工作流协调函数""" last_message = context.get_last_message() content = last_message.content if "需求分析完成" in content: return "development_team" elif "开发完成" in content: return "qa_team" elif "测试完成" in content: return "documentation_team" elif "文档完成" in content: return "done" else: return "planning_team" async def run_project(self, project_name: str, description: str): """执行项目管理工作流""" print(f"🚀 开始项目:{project_name}") print(f"📝 项目描述:{description}") print("=" * 60) # 初始化项目 self.project = Project(name=project_name, description=description) # 创建主工作流 main_workflow = SelectorGroupChat( list(self.teams.values()), self._get_next_team, termination_condition=MaxMessageTermination(50), name="project_management_workflow" ) # 执行完整工作流 project_task = f""" 请按照以下流程完成项目{project_name}的开发: 1. 需求分析阶段: - 分析用户需求 - 确定功能范围 - 制定技术方案 - 排定开发优先级 2. 开发阶段: - 前端界面开发 - 后端逻辑实现 - API接口设计 - 数据库设计 3. 测试阶段: - 功能测试 - 性能测试 - 用户验收测试 - Bug修复 4. 文档阶段: - 技术文档编写 - 用户手册制作 - 部署指南 - 维护文档 请确保每个阶段都有明确的结果和验收标准。 """ # 执行工作流 async for message in main_workflow.run(project_task): team_name = message.source.team if hasattr(message.source, 'team') else 'unknown' print(f"🏢 {message.source.name} ({team_name}): {message.content}") print("-" * 80) print(f"🎉 项目{project_name}执行完成!") # 使用示例 async def main(): workflow = ProjectManagementWorkflow() # 执行项目管理工作流 await workflow.run_project( "在线教育平台", "开发一个支持在线课程、作业提交、考试测验的综合性教育平台" ) if __name__ == "__main__": asyncio.run(main())
A:AutoGen提供了多种终止条件来防止死循环:
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination # 使用最大消息数限制 termination = MaxMessageTermination(20) # 使用特定文本触发终止 termination = TextMentionTermination("任务完成") # 组合使用 termination = lambda ctx: ( MaxMessageTermination(15)(ctx) or TextMentionTermination("完成")(ctx) )
A:可以通过状态管理器实现持久化:
from autogen_agentchat.state import ChatState import json class StatefulWorkflow: def __init__(self, state_file="workflow_state.json"): self.state_file = state_file self.state = self._load_state() def _load_state(self): try: with open(self.state_file, 'r') as f: return json.load(f) except FileNotFoundError: return {} def save_state(self): with open(self.state_file, 'w') as f: json.dump(self.state, f, indent=2) async def run(self, task): # 在工作流执行过程中更新状态 self.state['last_task'] = task self.save_state() # 执行工作流... await self._execute_workflow(task) # 更新完成状态 self.state['completed'] = True self.save_state()
A:可以通过性能监控装饰器实现:
import time import functools from typing import Callable, Any def monitor_performance(func: Callable) -> Callable: @functools.wraps(func) async def wrapper(*args, **kwargs): start_time = time.time() result = await func(*args, **kwargs) end_time = time.time() duration = end_time - start_time print(f"⏱️ {func.__name__} 执行耗时: {duration:.2f}秒") return result return wrapper # 使用示例 @monitor_performance async def team_execution(team, task): async for message in team.run(task): # 处理消息... pass
通过本节的学习,我们深入理解了AutoGen工作流的核心设计理念和架构模式。从基础的智能体协作到复杂的项目管理系统,我们掌握了工作流设计的完整方法论。关键要点包括:
下一节我们将深入探讨工作流中的任务分发机制,学习如何实现智能的任务分配和负载均衡。
关键词:AutoGen, 工作流, 设计模式, 异步编程, 智能体协作, 状态管理, 项目管理
难度:进阶
预计阅读:25分钟