步骤3:工具执行监控和错误恢复 构建包含监控和恢复机制的工具执行系统: 步骤4:高级工具编排 构建复杂的工具编排系统,实现工具链和条件调用:
构建包含监控和恢复机制的工具执行系统:
import time from dataclasses import dataclass from typing import Optional, Callable from enum import Enum class ExecutionStatus(Enum): """执行状态""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" RETRYING = "retrying" @dataclass class ToolExecutionResult: """工具执行结果""" tool_name: str status: ExecutionStatus result: Optional[str] error_message: Optional[str] execution_time: float retry_count: int = 0 class ToolMonitor: """工具执行监控器""" def __init__(self, max_retries: int = 3, timeout: float = 30.0): self.max_retries = max_retries self.timeout = timeout self.execution_history: List[ToolExecutionResult] = [] def execute_with_monitoring(self, tool: Any, *args, **kwargs) -> ToolExecutionResult: """带监控的工具执行""" start_time = time.time() for attempt in range(self.max_retries + 1): try: print(f"执行 {tool.name} (尝试 {attempt + 1}/{self.max_retries + 1})") # 检查超时 result = self._execute_with_timeout(tool, *args, **kwargs) execution_time = time.time() - start_time # 记录成功 if attempt == 0: result_obj = ToolExecutionResult( tool_name=tool.name, status=ExecutionStatus.COMPLETED, result=result, error_message=None, execution_time=execution_time ) else: result_obj = ToolExecutionResult( tool_name=tool.name, status=ExecutionStatus.COMPLETED, result=result, error_message=f"在尝试 {attempt + 1} 成功", execution_time=execution_time, retry_count=attempt ) self.execution_history.append(result_obj) return result_obj except Exception as e: execution_time = time.time() - start_time if attempt < self.max_retries: print(f"尝试 {attempt + 1} 失败,正在重试: {str(e)}") time.sleep(2 ** attempt) # 指数退避 result_obj = ToolExecutionResult( tool_name=tool.name, status=ExecutionStatus.RETRYING, result=None, error_message=str(e), execution_time=execution_time, retry_count=attempt + 1 ) else: print(f"达到最大重试次数,执行失败: {str(e)}") result_obj = ToolExecutionResult( tool_name=tool.name, status=ExecutionStatus.FAILED, result=None, error_message=str(e), execution_time=execution_time, retry_count=self.max_retries ) self.execution_history.append(result_obj) return result_obj def _execute_with_timeout(self, tool: Any, *args, **kwargs) -> str: """带超时的工具执行""" import threading result_container = {"result": None, "error": None} def target(): try: result_container["result"] = tool.invoke(*args, **kwargs) except Exception as e: result_container["error"] = e thread = threading.Thread(target=target) thread.daemon = True thread.start() thread.join(timeout=self.timeout) if thread.is_alive(): raise TimeoutError(f"工具执行超时 ({self.timeout}s)") elif result_container["error"]: raise result_container["error"] else: return result_container["result"] def get_statistics(self) -> Dict[str, Any]: """获取执行统计""" if not self.execution_history: return {"total_executions": 0} total = len(self.execution_history) successful = sum(1 for r in self.execution_history if r.status == ExecutionStatus.COMPLETED) failed = sum(1 for r in self.execution_history if r.status == ExecutionStatus.FAILED) avg_time = sum(r.execution_time for r in self.execution_history) / total avg_retries = sum(r.retry_count for r in self.execution_history) / total return { "total_executions": total, "successful_executions": successful, "failed_executions": failed, "success_rate": successful / total if total > 0 else 0, "average_execution_time": avg_time, "average_retries": avg_retries } class SmartToolExecutor: """智能工具执行器""" def __init__(self): self.monitor = ToolMonitor() def execute_tool_safely(self, tool: Any, *args, **kwargs) -> str: """安全执行工具""" result = self.monitor.execute_with_monitoring(tool, *args, **kwargs) if result.status == ExecutionStatus.COMPLETED: return result.result else: # 尝试备用方案 return self._execute_fallback(tool, args, kwargs, result) def _execute_fallback(self, tool: Any, args: tuple, kwargs: dict, result: ToolExecutionResult) -> str: """执行备用方案""" print(f"工具 {tool.name} 执行失败,尝试备用方案") # 根据错误类型选择不同的备用策略 error_msg = result.error_message.lower() if "timeout" in error_msg: # 超时:尝试简化参数 simplified_kwargs = kwargs.copy() if 'max_results' in simplified_kwargs: simplified_kwargs['max_results'] = 5 # 减少结果数 return tool.invoke(*args, **simplified_kwargs) elif "connection" in error_msg or "network" in error_msg: # 网络错误:返回模拟数据 return f"网络错误,使用缓存数据: {tool.name} 的默认响应" else: # 其他错误:返回错误信息 return f"工具 {tool.name} 执行失败: {result.error_message}" # 创建智能执行器 executor = SmartToolExecutor() def test_tool_monitoring(): """测试工具监控""" print("\n=== 测试工具执行监控 ===") # 测试成功执行 print("测试成功执行:") result = executor.execute_tool_safely(calculator, "2 + 2") print(f"结果: {result}") # 测试超时(模拟) print("\n测试超时处理:") # 这里不能真的模拟超时,但代码结构已经准备好了 # 获取统计信息 stats = executor.monitor.get_statistics() print(f"执行统计: {stats}") if __name__ == "__main__": test_tool_monitoring()
构建复杂的工具编排系统,实现工具链和条件调用:
from dataclasses import dataclass from typing import List, Dict, Optional, Callable from enum import Enum class ToolOperationType(Enum): """工具操作类型""" SEQUENTIAL = "sequential" # 顺序执行 PARALLEL = "parallel" # 并行执行 CONDITIONAL = "conditional" # 条件执行 LOOP = "loop" # 循环执行 @dataclass class ToolOperation: """工具操作""" type: ToolOperationType tool_name: str parameters: Dict[str, Any] condition: Optional[str] = None max_iterations: Optional[int] = None class ToolOrchestrator: """工具编排器""" def __init__(self): self.operations: List[ToolOperation] = [] self.tools: Dict[str, Any] = {} def add_tool(self, name: str, tool: Any): """添加工具""" self.tools[name] = tool def add_sequential_operation(self, tool_name: str, **parameters): """添加顺序操作""" operation = ToolOperation( type=ToolOperationType.SEQUENTIAL, tool_name=tool_name, parameters=parameters ) self.operations.append(operation) def add_parallel_operation(self, tool_name: str, **parameters): """添加并行操作""" operation = ToolOperation( type=ToolOperationType.PARALLEL, tool_name=tool_name, parameters=parameters ) self.operations.append(operation) def add_conditional_operation(self, tool_name: str, condition: str, **parameters): """添加条件操作""" operation = ToolOperation( type=ToolOperationType.CONDITIONAL, tool_name=tool_name, condition=condition, parameters=parameters ) self.operations.append(operation) def add_loop_operation(self, tool_name: str, max_iterations: int, **parameters): """添加循环操作""" operation = ToolOperation( type=ToolOperationType.LOOP, tool_name=tool_name, parameters=parameters, max_iterations=max_iterations ) self.operations.append(operation) def execute_orchestration(self) -> Dict[str, Any]: """执行编排""" results = {} for operation in self.operations: if operation.type == ToolOperationType.SEQUENTIAL: result = self._execute_sequential(operation) results[operation.tool_name] = result elif operation.type == ToolOperationType.PARALLEL: result = self._execute_parallel(operation) results[operation.tool_name] = result elif operation.type == ToolOperationType.CONDITIONAL: result = self._execute_conditional(operation) results[operation.tool_name] = result elif operation.type == ToolOperationType.LOOP: result = self._execute_loop(operation) results[operation.tool_name] = result return results def _execute_sequential(self, operation: ToolOperation) -> List[str]: """执行顺序操作""" results = [] tool = self.tools[operation.tool_name] for i in range(3): # 模拟多次调用 result = tool.invoke(**operation.parameters) results.append(result) return results def _execute_parallel(self, operation: ToolOperation) -> List[str]: """执行并行操作""" import concurrent.futures import time tool = self.tools[operation.tool_name] with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # 创建多个任务 futures = [] for i in range(3): future = executor.submit(tool.invoke, **operation.parameters) futures.append(future) # 等待所有任务完成 results = [] for future in concurrent.futures.as_completed(futures): results.append(future.result()) return results def _execute_conditional(self, operation: ToolOperation) -> Optional[str]: """执行条件操作""" tool = self.tools[operation.tool_name] # 简单的条件判断 condition = operation.condition.lower() if "计算" in condition or "math" in condition: return tool.invoke(**operation.parameters) else: print(f"条件不满足,跳过操作: {condition}") return None def _execute_loop(self, operation: ToolOperation) -> List[str]: """执行循环操作""" results = [] tool = self.tools[operation.tool_name] max_iterations = operation.max_iterations or 5 for i in range(max_iterations): try: result = tool.invoke(**operation.parameters) results.append(result) print(f"循环执行 {i + 1}/{max_iterations}: {result}") except Exception as e: print(f"循环执行 {i + 1} 失败: {str(e)}") break return results # 创建工具编排器 orchestrator = ToolOrchestrator() # 添加工具 orchestrator.add_tool("web_search", web_search) orchestrator.add_tool("calculator", calculator) # 构建复杂的工具编排 print("\n=== 测试高级工具编排 ===") # 1. 顺序操作:先搜索,再计算 orchestrator.add_sequential_operation( "web_search", query="AI技术发展", max_results=5 ) orchestrator.add_sequential_operation( "calculator", expression="5 * 3 + 2" ) # 2. 并行操作:同时进行多个计算 orchestrator.add_parallel_operation( "calculator", expression="2 + 2" ) # 3. 条件操作:只在满足条件时执行 orchestrator.add_conditional_operation( "calculator", condition="数学计算", expression="10 / 2" ) # 4. 循环操作:重复计算 orchestrator.add_loop_operation( "calculator", max_iterations=3, expression="i * 2", # 注意:这里的参数可能需要调整 i=1 ) # 执行编排 try: results = orchestrator.execute_orchestration() print(f"编排结果: {results}") except Exception as e: print(f"编排执行错误: {str(e)}") # 更简单的测试 print("\n=== 简单测试 ===") simple_orchestrator = ToolOrchestrator() simple_orchestrator.add_tool("calculator", calculator) simple_orchestrator.add_sequential_operation("calculator", expression="2 + 3") simple_results = simple_orchestrator.execute_orchestration() print(f"简单编排结果: {simple_results}")