3.2 任务分发 — AutoGen任务分配机制与负载均衡 本节导读:深入掌握AutoGen的任务分发机制,学习如何实现智能的任务分配、负载均衡和错误处理,构建高效可靠的多智能体协作系统。 学习目标 理解AutoGen任务分发的核心机制和架构 掌握任务队列、调度器和负载均衡策略 学会实现智能的任务分配算法 了解任务优先级管理和依赖关系处理 掌握任务监控和异常处理机制 核心概念 AutoGen的任务分发系统采用了先进的分布式任务管理架构,实现了智能化的任务分配和高效的负载均衡。 任务分发架构 环境准备 / 前置知识 基础依赖 分步实战 步骤 1:基础任务队列实现 步骤 2:智能调度器实现 步骤 3:负载均衡与容错机制 常见问题 FAQ Q1:如何处理任务分配中的负载不均衡问题?
本节导读:深入掌握AutoGen的任务分发机制,学习如何实现智能的任务分配、负载均衡和错误处理,构建高效可靠的多智能体协作系统。
AutoGen的任务分发系统采用了先进的分布式任务管理架构,实现了智能化的任务分配和高效的负载均衡。
# 核心依赖 pip install "autogen-core>=0.4.0" pip install "autogen-agentchat>=0.4.0" pip install "autogen-ext>=0.4.0" # 任务处理相关 pip install "celery>=5.3.0" # 分布式任务队列 pip install "redis>=4.5.0" # 任务队列存储 pip install "asyncio" # 异步编程
import asyncio import heapq from typing import List, Dict, Optional from dataclasses import dataclass, field from enum import Enum class TaskPriority(Enum): LOW = 1 NORMAL = 2 HIGH = 3 CRITICAL = 4 class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" @dataclass class Task: id: str content: str priority: TaskPriority = TaskPriority.NORMAL status: TaskStatus = TaskStatus.PENDING created_at: float = field(default_factory=lambda: asyncio.get_event_loop().time()) assigned_to: Optional[str] = None retry_count: int = 0 max_retries: int = 3 result: Optional[str] = None error: Optional[str] = None class PriorityQueue: def __init__(self): self._queue = [] self._index = 0 self._lock = asyncio.Lock() async def put(self, task: Task): async with self._lock: heapq.heappush(self._queue, (-task.priority.value, task.created_at, self._index, task)) self._index += 1 async def get(self) -> Optional[Task]: async with self._lock: if not self._queue: return None return heapq.heappop(self._queue)[3] class TaskManager: def __init__(self): self.priority_queue = PriorityQueue() self.running_tasks: Dict[str, Task] = {} self.completed_tasks: Dict[str, Task] = {} self.failed_tasks: Dict[str, Task] = {} async def submit_task(self, task: Task): await self.priority_queue.put(task) print(f"📝 任务已提交: {task.id} (优先级: {task.priority.name})") async def get_next_task(self) -> Optional[Task]: task = await self.priority_queue.get() if task: task.status = TaskStatus.RUNNING self.running_tasks[task.id] = task return task async def complete_task(self, task_id: str, result: str): if task_id in self.running_tasks: task = self.running_tasks.pop(task_id) task.status = TaskStatus.COMPLETED task.result = result self.completed_tasks[task_id] = task print(f"✅ 任务已完成: {task_id}") async def fail_task(self, task_id: str, error: str): if task_id in self.running_tasks: task = self.running_tasks.pop(task_id) task.status = TaskStatus.FAILED task.error = error task.retry_count += 1 if task.retry_count < task.max_retries: task.status = TaskStatus.PENDING await self.priority_queue.put(task) print(f"🔄 任务将重试: {task_id} (第{task.retry_count}次)") else: self.failed_tasks[task_id] = task print(f"❌ 任务失败: {task_id} - {error}")
import asyncio import time from typing import Dict, List, Optional from dataclasses import dataclass, field from enum import Enum class AgentStatus(Enum): IDLE = "idle" BUSY = "busy" OFFLINE = "offline" class AgentCapability(Enum): CODING = "coding" ANALYSIS = "analysis" COMMUNICATION = "communication" TESTING = "testing" @dataclass class Agent: id: str name: str capabilities: List[AgentCapability] status: AgentStatus = AgentStatus.IDLE current_task: Optional[str] = None task_queue: List[str] = field(default_factory=list) load_score: float = 0.0 success_rate: float = 1.0 total_tasks: int = 0 completed_tasks: int = 0 def can_handle_task(self, task_type: AgentCapability) -> bool: return task_type in self.capabilities def update_load_score(self, task_duration: float): load_factor = min(task_duration / 60.0, 1.0) self.load_score = min(self.load_score + load_factor * 0.3, 1.0) def get_efficiency_score(self) -> float: return (self.success_rate * 0.7 + (1 - self.load_score) * 0.3) class SmartScheduler: def __init__(self): self.agents: Dict[str, Agent] = {} self.task_queue: List[Dict] = [] self.task_agents: Dict[str, str] = {} self.agent_tasks: Dict[str, List[str]] = {} self._lock = asyncio.Lock() async def register_agent(self, agent: Agent): async with self._lock: self.agents[agent.id] = agent self.agent_tasks[agent.id] = [] print(f"🤖 智能体已注册: {agent.name} ({agent.id})") async def submit_task(self, task_id: str, task_type: AgentCapability, priority: int = 1): task = { "id": task_id, "type": task_type, "priority": priority, "created_at": time.time() } async with self._lock: self.task_queue.append(task) self.task_queue.sort(key=lambda t: (-t["priority"], t["created_at"])) print(f"📋 任务已提交: {task_id}") async def find_best_agent(self, task_type: AgentCapability) -> Optional[Agent]: best_agent = None best_score = -1 for agent in self.agents.values(): if not agent.can_handle_task(task_type): continue if agent.status == AgentStatus.OFFLINE: continue if len(agent.task_queue) > 5: continue score = agent.get_efficiency_score() if task_type == AgentCapability.CODING and AgentCapability.CODING in agent.capabilities: score += 0.2 if task_type == AgentCapability.ANALYSIS and AgentCapability.ANALYSIS in agent.capabilities: score += 0.2 if task_type == AgentCapability.TESTING and AgentCapability.TESTING in agent.capabilities: score += 0.2 if score > best_score: best_score = score best_agent = agent return best_agent async def assign_tasks(self): async with self._lock: while self.task_queue: task = self.task_queue.pop(0) agent = await self.find_best_agent(task["type"]) if agent: agent.task_queue.append(task["id"]) self.task_agents[task["id"]] = agent.id self.agent_tasks[agent.id].append(task["id"]) print(f"🎯 任务已分配: {task['id']} -> {agent.name}") else: self.task_queue.insert(0, task) print(f"⚠️ 无法为任务{task['id']}找到合适智能体") break async def process_task(self, agent_id: str, task_id: str): agent = self.agents.get(agent_id) if not agent: return False agent.status = AgentStatus.BUSY agent.current_task = task_id try: print(f"🔄 {agent.name} 开始处理任务: {task_id}") await asyncio.sleep(2) import random success = random.random() > 0.15 if success: agent.completed_tasks += 1 agent.success_rate = agent.completed_tasks / (agent.total_tasks + 1) print(f"✅ {agent.name} 完成任务: {task_id}") else: print(f"❌ {agent.name} 任务失败: {task_id}") agent.total_tasks += 1 agent.update_load_score(random.uniform(1, 5)) return success except Exception as e: print(f"❌ {agent.name} 任务执行异常: {e}") return False finally: agent.status = AgentStatus.IDLE agent.current_task = None if task_id in agent.task_queue: agent.task_queue.remove(task_id) if task_id in self.task_agents: del self.task_agents[task_id] if agent_id in self.agent_tasks and task_id in self.agent_tasks[agent_id]: self.agent_tasks[agent_id].remove(task_id)
import asyncio import time from typing import Dict, List, Optional from dataclasses import dataclass, field from enum import Enum class HealthStatus(Enum): HEALTHY = "healthy" WARNING = "warning" CRITICAL = "critical" @dataclass class AgentHealth: agent_id: str status: HealthStatus = HealthStatus.HEALTHY last_heartbeat: float = field(default_factory=time.time) response_time: float = 0.0 error_rate: float = 0.0 consecutive_failures: int = 0 max_failures: int = 3 def update_heartbeat(self, response_time: float, success: bool): self.last_heartbeat = time.time() self.response_time = response_time if success: self.consecutive_failures = 0 self.error_rate = max(0, self.error_rate - 0.1) else: self.consecutive_failures += 1 self.error_rate = min(1, self.error_rate + 0.2) if self.consecutive_failures >= self.max_failures: self.status = HealthStatus.CRITICAL elif self.error_rate > 0.5 or response_time > 5.0: self.status = HealthStatus.WARNING else: self.status = HealthStatus.HEALTHY class LoadBalancer: def __init__(self, check_interval: float = 5.0): self.agents: Dict[str, AgentHealth] = {} self.agent_capacities: Dict[str, int] = {} self.current_loads: Dict[str, int] = {} self.check_interval = check_interval self.running = False async def start_health_check(self): self.running = True asyncio.create_task(self._health_check_loop()) print("🏥 负载均衡器健康检查已启动") async def _health_check_loop(self): while self.running: await self._perform_health_check() await asyncio.sleep(self.check_interval) async def _perform_health_check(self): for agent_id, health in self.agents.items(): time_diff = time.time() - health.last_heartbeat if time_diff > 30: print(f"⚠️ 智能体{agent_id}已离线") health.status = HealthStatus.CRITICAL self.current_loads[agent_id] = 0 async def register_agent(self, agent_id: str, capacity: int = 10): self.agents[agent_id] = AgentHealth(agent_id) self.agent_capacities[agent_id] = capacity self.current_loads[agent_id] = 0 print(f"🤖 智能体已注册: {agent_id} (容量: {capacity})") async def get_best_agent(self, task_priority: int = 1) -> Optional[str]: healthy_agents = [] for agent_id, health in self.agents.items(): if health.status == HealthStatus.HEALTHY: healthy_agents.append(agent_id) if not healthy_agents: return None best_agent = None best_score = -1 for agent_id in healthy_agents: load_ratio = self.current_loads[agent_id] / self.agent_capacities[agent_id] health = self.agents[agent_id] load_score = 1 - load_ratio health_score = 1 if health.status == HealthStatus.HEALTHY else 0.5 response_score = 1 - min(health.response_time / 5.0, 1.0) total_score = load_score * 0.6 + health_score * 0.3 + response_score * 0.1 if task_priority > 2: total_score += (1 - load_ratio) * 0.2 if total_score > best_score: best_score = total_score best_agent = agent_id return best_agent async def assign_task(self, agent_id: str, task_id: str): if agent_id in self.agents and self.agents[agent_id].status == HealthStatus.HEALTHY: self.current_loads[agent_id] += 1 print(f"🎯 任务{task_id}已分配到智能体{agent_id}") return True else: print(f"❌ 无法分配任务{task_id}到智能体{agent_id}") return False async def complete_task(self, agent_id: str, task_id: str, success: bool = True, response_time: float = 1.0): if agent_id in self.agents: self.current_loads[agent_id] = max(0, self.current_loads[agent_id] - 1) health = self.agents[agent_id] health.update_heartbeat(response_time, success) print(f"✅ 任务{task_id}完成,智能体{agent_id}负载: {self.current_loads[agent_id]}")
A:可以通过动态负载均衡算法解决:
async def dynamic_load_balancing(self): """动态负载平衡,调整智能体任务分配""" healthy_agents = [] for agent_id, health in self.agents.items(): if health.status == HealthStatus.HEALTHY: healthy_agents.append(agent_id) if not healthy_agents: return load_distribution = {} for agent_id in healthy_agents: load_ratio = self.current_loads[agent_id] / self.agent_capacities[agent_id] load_distribution[agent_id] = load_ratio max_load_agent = max(load_distribution, key=load_distribution.get) min_load_agent = min(load_distribution, key=load_distribution.get) if load_distribution[max_load_agent] - load_distribution[min_load_agent] > 0.3: # 从高负载智能体转移任务到低负载智能体 print(f"🔄 正在调整负载分布...")
A:可以使用有向无环图(DAG)来管理任务依赖:
from collections import defaultdict, deque class TaskDependencyManager: def __init__(self): self.tasks = {} self.dependencies = defaultdict(list) # task_id -> [dependent_task_ids] self.reverse_dependencies = defaultdict(list) # task_id -> [prerequisite_task_ids] self.ready_tasks = set() self.completed_tasks = set() def add_task(self, task_id: str, priority: int = 1, depends_on: List[str] = None): """添加任务""" self.tasks[task_id] = {"id": task_id, "priority": priority, "ready": False} if depends_on: for dep_id in depends_on: self.dependencies[dep_id].append(task_id) self.reverse_dependencies[task_id].append(dep_id) def mark_task_completed(self, task_id: str): """标记任务完成,检查依赖任务是否可以执行""" self.completed_tasks.add(task_id) for dependent_id in self.dependencies[task_id]: all_prerequisites_completed = True for prereq_id in self.reverse_dependencies[dependent_id]: if prereq_id not in self.completed_tasks: all_prerequisites_completed = False break if all_prerequisites_completed: self.ready_tasks.add(dependent_id) self.tasks[dependent_id]["ready"] = True def get_next_ready_task(self) -> Optional[str]: """获取下一个准备执行的任务(按优先级排序)""" if not self.ready_tasks: return None ready_tasks_list = list(self.ready_tasks) ready_tasks_list.sort(key=lambda tid: -self.tasks[tid]["priority"]) task_id = ready_tasks_list[0] self.ready_tasks.remove(task_id) return task_id
通过本节的学习,我们深入掌握了AutoGen任务分发系统的核心机制和实现方法。从基础的优先级任务队列到智能调度器,再到负载均衡和容错机制,我们构建了一个完整的任务分发系统。
关键要点包括:
下一节我们将探讨多智能体协作模式,学习如何实现高效的智能体间协作。
关键词:AutoGen, 任务分发, 负载均衡, 智能调度, 容错机制, 任务队列
难度:进阶
预计阅读:20分钟