2.3 消息传递


文档摘要

2.3 消息传递 — AgentChat通信机制详解 本节导读:深入理解AgentChat中消息类型、传递机制和处理流程,掌握智能体间高效通信的技术实现和最佳实践 学习目标 掌握AgentChat消息系统的核心架构和类型体系 理解消息传递机制和路由策略 学会处理不同类型消息的转换和过滤 能够实现复杂的消息处理逻辑 优化消息传递的性能和可靠性 核心概念 消息传递是多智能体系统的通信基础,AgentChat提供了丰富的消息类型和传递机制,支持智能体间的高效信息交换和协作协调。 消息传递架构图:消息类型、传递流程和处理机制的完整体系 环境准备 / 前置知识 Python 3.

2.3 消息传递 — AgentChat通信机制详解

本节导读:深入理解AgentChat中消息类型、传递机制和处理流程,掌握智能体间高效通信的技术实现和最佳实践

学习目标

  • 掌握AgentChat消息系统的核心架构和类型体系
  • 理解消息传递机制和路由策略
  • 学会处理不同类型消息的转换和过滤
  • 能够实现复杂的消息处理逻辑
  • 优化消息传递的性能和可靠性

核心概念

消息传递是多智能体系统的通信基础,AgentChat提供了丰富的消息类型和传递机制,支持智能体间的高效信息交换和协作协调。

消息传递架构图:消息类型、传递流程和处理机制的完整体系

环境准备 / 前置知识

  • Python 3.10+ 异步编程基础
  • AutoGen-AgentChat库:pip install autogen-agentchat
  • 消息处理和序列化概念
  • 异步事件驱动编程
  • 基础网络通信知识

消息类型体系

基础消息类型

from autogen_agentchat.messages import ( TextMessage, FunctionCallMessage, FunctionCallResultMessage, StopMessage, ResetMessage, ToolCallMessage, ToolCallResultMessage ) # 文本消息 - 基础消息类型 text_message = TextMessage( content="你好,我是一个智能助手", source="user" ) # 函数调用消息 function_call = FunctionCallMessage( name="get_weather", arguments={"city": "北京"}, source="assistant" ) # 函数调用结果消息 function_result = FunctionCallResultMessage( result={"temperature": 25, "condition": "sunny"}, source="function" ) # 停止消息 stop_message = StopMessage( content="任务完成", source="system" ) # 重置消息 reset_message = ResetMessage( content="对话重置", source="system" )

工具调用消息

from autogen_agentchat.messages import ( ToolCallMessage, ToolCallResultMessage, ToolMessage ) # 工具调用消息 tool_call = ToolCallMessage( tool="calculator", arguments={"expression": "2 + 2"}, source="assistant" ) # 工具调用结果消息 tool_result = ToolCallResultMessage( result={"result": 4, "status": "success"}, source="tool" ) # 工作消息 tool_message = ToolMessage( content="计算完成", source="tool_system" )

消息继承层次结构

# 消息类型层次关系 # Message (基类) # ├── TextMessage # ├── FunctionCallMessage # ├── FunctionCallResultMessage # ├── ToolCallMessage # ├── ToolCallResultMessage # ├── StopMessage # ├── ResetMessage # ├── ImageMessage # ├── AudioMessage # └── CompositeMessage # 自定义消息类型 from autogen_agentchat.messages import Message class CustomMessage(Message): """自定义消息类型""" def __init__(self, content: str, metadata: dict = None, **kwargs): super().__init__(content=content, **kwargs) self.metadata = metadata or {} def to_dict(self) -> dict: """转换为字典格式""" return { "type": "custom", "content": self.content, "metadata": self.metadata, "source": self.source, "timestamp": self.timestamp }

消息传递机制

基础消息传递

import asyncio from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.messages import TextMessage from autogen_ext.models.openai import OpenAIChatCompletionClient # 创建智能体 agent1 = AssistantAgent( name="agent1", model_client=OpenAIChatCompletionClient(model="gpt-4o") ) agent2 = AssistantAgent( name="agent2", model_client=OpenAIChatCompletionClient(model="gpt-4o") ) async def basic_message_passing(): """基础消息传递示例""" # 智能体1发送消息给智能体2 message = TextMessage(content="你好,请问你需要什么帮助?", source="agent1") response = await agent2.on_message(message) print(f"智能体1发送: {message.content}") print(f"智能体2回复: {response.content}") return response # 执行消息传递 result = asyncio.run(basic_message_passing())

消息路由机制

from typing import List, Dict, Any from autogen_agentchat.messages import Message from autogen_agentchat.agents import AssistantAgent class MessageRouter: """消息路由器""" def __init__(self): self.message_handlers = {} self.subscribers = {} def register_handler(self, message_type: str, handler): """注册消息处理器""" self.message_handlers[message_type] = handler def subscribe(self, subscriber, message_types: List[str]): """订阅特定类型的消息""" for msg_type in message_types: if msg_type not in self.subscribers: self.subscribers[msg_type] = [] self.subscribers[msg_type].append(subscriber) async def route_message(self, message: Message) -> List[Any]: """路由消息到订阅者""" results = [] # 根据消息类型路由 msg_type = type(message).__name__ if msg_type in self.subscribers: for subscriber in self.subscribers[msg_type]: result = await subscriber.on_message(message) results.append(result) # 处理特定类型的消息 if msg_type in self.message_handlers: handler_result = await self.message_handlers[msg_type](message) results.append(handler_result) return results # 使用消息路由器 router = MessageRouter() # 注册处理器 def text_message_handler(message: TextMessage): """文本消息处理器""" print(f"处理文本消息: {message.content}") return {"status": "processed", "length": len(message.content)} router.register_handler("TextMessage", text_message_handler) # 订阅者 class MessageSubscriber: def __init__(self, name: str): self.name = name async def on_message(self, message: Message): """接收消息""" print(f"{self.name} 收到消息: {message.content}") return {"subscriber": self.name, "processed": True} # 创建订阅者 subscriber1 = MessageSubscriber("订阅者1") subscriber2 = MessageSubscriber("订阅者2") # 订阅消息类型 router.subscribe(subscriber1, ["TextMessage"]) router.subscribe(subscriber2, ["TextMessage"]) # 使用路由器 async def message_routing_example(): message = TextMessage("这是一条测试消息", source="sender") results = await router.route_message(message) print(f"路由结果: {results}") return results

消息处理和转换

消息过滤器

from typing import List, Callable from autogen_agentchat.messages import Message class MessageFilter: """消息过滤器""" def __init__(self): self.filters = [] def add_filter(self, filter_func: Callable[[Message], bool]): """添加过滤函数""" self.filters.append(filter_func) def apply_filters(self, messages: List[Message]) -> List[Message]: """应用所有过滤器""" filtered_messages = messages for filter_func in self.filters: filtered_messages = [ msg for msg in filtered_messages if filter_func(msg) ] return filtered_messages def clear_filters(self): """清除所有过滤器""" self.filters = [] # 使用消息过滤器 async def message_filter_example(): # 创建过滤器实例 filter_instance = MessageFilter() # 添加长度过滤器 def length_filter(message: Message) -> bool: return len(message.content) > 10 # 添加来源过滤器 def source_filter(message: Message) -> bool: return message.source == "user" # 添加内容过滤器 def content_filter(message: Message) -> bool: return "重要" in message.content # 注册过滤器 filter_instance.add_filter(length_filter) filter_instance.add_filter(source_filter) filter_instance.add_filter(content_filter) # 测试消息 messages = [ TextMessage("短消息", source="system"), TextMessage("这是一条包含重要信息的较长消息", source="user"), TextMessage("另一条消息", source="user"), TextMessage("包含重要信息,但来源不对", source="assistant"), ] # 应用过滤器 filtered = filter_instance.apply_filters(messages) print(f"过滤后的消息数量: {len(filtered)}") for msg in filtered: print(f"- {msg.source}: {msg.content}") return filtered

消息状态管理

消息历史管理

from typing import List, Dict, Optional from autogen_agentchat.messages import Message from datetime import datetime import json class MessageHistoryManager: """消息历史管理器""" def __init__(self, max_history: int = 100): self.max_history = max_history self.message_history: List[Message] = [] self.metadata: Dict[str, Dict] = {} def add_message(self, message: Message, metadata: Optional[Dict] = None): """添加消息到历史""" self.message_history.append(message) # 更新元数据 if metadata: self.metadata[str(len(self.message_history) - 1)] = metadata # 限制历史长度 if len(self.message_history) > self.max_history: self.message_history.pop(0) # 清理对应的元数据 oldest_key = str(len(self.message_history)) if oldest_key in self.metadata: del self.metadata[oldest_key] def get_recent_messages(self, count: int = 10) -> List[Message]: """获取最近的N条消息""" start_idx = max(0, len(self.message_history) - count) return self.message_history[start_idx:] def get_messages_by_source(self, source: str) -> List[Message]: """根据来源获取消息""" return [msg for msg in self.message_history if msg.source == source] def get_messages_by_type(self, message_type: str) -> List[Message]: """根据类型获取消息""" return [msg for msg in self.message_history if type(msg).__name__ == message_type] def export_history(self, filepath: str): """导出消息历史到文件""" export_data = { "messages": [ { "type": type(msg).__name__, "content": msg.content, "source": msg.source, "timestamp": getattr(msg, 'timestamp', None) } for msg in self.message_history ], "metadata": self.metadata, "export_time": datetime.now().isoformat() } with open(filepath, 'w', encoding='utf-8') as f: json.dump(export_data, f, indent=2, ensure_ascii=False) def get_statistics(self) -> Dict[str, int]: """获取消息统计信息""" stats = { "total_messages": len(self.message_history), "sources": {}, "types": {} } for msg in self.message_history: # 统计来源 source = msg.source stats["sources"][source] = stats["sources"].get(source, 0) + 1 # 统计类型 msg_type = type(msg).__name__ stats["types"][msg_type] = stats["types"].get(msg_type, 0) + 1 return stats # 使用消息历史管理器 async def message_history_example(): # 创建管理器 history_manager = MessageHistoryManager(max_history=50) # 添加消息 messages = [ TextMessage("用户:你好", source="user"), TextMessage("助手:你好!有什么可以帮助你的吗?", source="assistant"), TextMessage("用户:我想了解AutoGen", source="user") ] for msg in messages: history_manager.add_message(msg, {"priority": "normal"}) # 获取最近消息 recent = history_manager.get_recent_messages(2) print("最近消息:") for msg in recent: print(f"- {msg.source}: {msg.content}") # 获取统计信息 stats = history_manager.get_statistics() print(f"消息统计: {stats}") # 导出历史 history_manager.export_history("message_history.json") return history_manager

消息传递优化

消息批处理

import asyncio from typing import List, Dict, Any from autogen_agentchat.messages import Message from autogen_agentchat.agents import AssistantAgent class BatchMessageProcessor: """批量消息处理器""" def __init__(self, batch_size: int = 5, timeout: float = 10.0): self.batch_size = batch_size self.timeout = timeout self.message_queue = [] self.processing_lock = asyncio.Lock() async def add_message(self, message: Message, metadata: Dict = None): """添加消息到批量处理队列""" async with self.processing_lock: self.message_queue.append({ "message": message, "metadata": metadata or {}, "timestamp": asyncio.get_event_loop().time() }) # 如果达到批量大小,处理队列 if len(self.message_queue) >= self.batch_size: await self._process_batch() async def _process_batch(self): """处理一批消息""" if not self.message_queue: return # 复制队列并清空 batch = self.message_queue.copy() self.message_queue.clear() # 批量处理消息 try: await self._execute_batch(batch) except Exception as e: print(f"批量处理错误: {e}") # 将失败的消息重新加入队列 self.message_queue.extend(batch) async def _execute_batch(self, batch: List[Dict]): """执行批量处理""" # 这里可以实现具体的批量处理逻辑 print(f"处理批量消息,共 {len(batch)} 条") # 模拟处理延迟 await asyncio.sleep(1) # 返回处理结果 for item in batch: print(f"处理消息: {item['message'].content}") item["result"] = {"status": "success", "processed": True} async def flush(self): """刷新剩余消息""" while self.message_queue: await self._process_batch() await asyncio.sleep(0.1) # 使用批量处理器 async def batch_processing_example(): # 创建批量处理器 processor = BatchMessageProcessor(batch_size=3, timeout=5.0) # 模拟添加消息 messages = [ TextMessage(f"消息 {i}", source="user") for i in range(1, 8) # 8条消息,会被分成3批处理 ] # 并发添加消息 add_tasks = [ processor.add_message(msg, {"id": i}) for i, msg in enumerate(messages) ] await asyncio.gather(*add_tasks) # 刷新剩余消息 await processor.flush() print("批量处理完成")

常见问题 FAQ

Q1:如何处理大量消息的性能问题?

A:可以通过以下方式优化消息处理性能:

# 1. 批量处理消息 class HighPerformanceMessageProcessor: def __init__(self, batch_size=10): self.batch_size = batch_size self.batch_queue = [] async def process_batch(self, messages): """批量处理消息""" # 使用线程池或异步IO提高处理速度 results = await asyncio.gather(*[ self._process_single(msg) for msg in messages ]) return results async def _process_single(self, message): """处理单个消息""" # 实现具体的处理逻辑 await asyncio.sleep(0.01) # 模拟处理时间 return {"result": f"processed_{hash(message.content)}"} # 2. 使用消息队列 import asyncio from queue import Queue class AsyncMessageQueue: def __init__(self): self.queue = asyncio.Queue() self.processors = [] async def add_message(self, message): """添加消息到队列""" await self.queue.put(message) async def process_messages(self): """处理队列中的消息""" while True: message = await self.queue.get() await self._process_message(message) async def _process_message(self, message): """处理单个消息""" # 具体处理逻辑 pass

Q2:如何确保消息传递的可靠性?

A:消息可靠性保障策略:

# 1. 消息确认机制 class ReliableMessagePasser: def __init__(self): self.pending_messages = {} self.acknowledged_messages = set() async def send_with_ack(self, message): """发送消息并等待确认""" message_id = id(message) self.pending_messages[message_id] = message try: result = await self._send_message(message) self.acknowledged_messages.add(message_id) return result except Exception as e: # 处理失败,重试 return await self._retry_message(message) async def _retry_message(self, message, max_retries=3): """重试发送消息""" for attempt in range(max_retries): try: result = await self._send_message(message) self.acknowledged_messages.add(id(message)) return result except Exception as e: if attempt == max_retries - 1: raise await asyncio.sleep(2 ** attempt) # 指数退避

最佳实践与避坑

消息设计最佳实践

1. 合理的消息类型设计

# ✅ 好的实践:明确的消息类型 class ServiceRequestMessage(Message): """服务请求消息""" def __init__(self, service_type: str, request_data: dict, **kwargs): super().__init__( content=f"服务请求: {service_type}", **kwargs ) self.service_type = service_type self.request_data = request_data class ServiceResponseMessage(Message): """服务响应消息""" def __init__(self, response_data: dict, success: bool, **kwargs): super().__init__( content="服务响应", **kwargs ) self.response_data = response_data self.success = success # ❌ 避免模糊的消息类型 class AmbiguousMessage(Message): """模糊的消息类型""" def __init__(self, data, **kwargs): super().__init__(content="各种数据", **kwargs) # 数据类型不明确,难以处理 self.data = data

2. 消息结构化设计

# ✅ 好的实践:结构化的消息 class StructuredMessage(Message): """结构化消息""" def __init__(self, message_type: str, payload: dict, metadata: dict = None, **kwargs): super().__init__(content=self._format_content(message_type, payload), **kwargs) self.message_type = message_type self.payload = payload self.metadata = metadata or {} def _format_content(self, message_type: str, payload: dict) -> str: """格式化消息内容""" # 可以根据消息类型格式化不同的内容 return f"{message_type}: {payload}" def to_dict(self) -> dict: """转换为字典格式""" return { "type": self.message_type, "payload": self.payload, "metadata": self.metadata, "source": self.source, "timestamp": self.timestamp }

性能优化最佳实践

1. 消息缓存策略

# ✅ 好的实践:智能缓存 class SmartMessageCache: """智能消息缓存""" def __init__(self, max_size=1000, ttl=300): self.cache = {} self.max_size = max_size self.ttl = ttl # 5分钟TTL self.access_times = {} def get_message(self, message_key: str): """获取缓存的消息""" if message_key in self.cache: # 检查TTL current_time = time.time() if current_time - self.access_times[message_key] < self.ttl: self.access_times[message_key] = current_time return self.cache[message_key] else: # 过期,删除 del self.cache[message_key] del self.access_times[message_key] return None def set_message(self, message_key: str, message_data): """设置缓存消息""" if len(self.cache) >= self.max_size: # LRU淘汰 oldest_key = min

发布者: 作者: 转发
评论区 (0)
U