4.1 执行器配置 — AutoGen代码执行引擎详解 本节导读:深入理解AutoGen代码执行器的核心机制,掌握不同执行器的配置方法和适用场景,构建安全高效的代码运行环境 学习目标 掌握AutoGen代码执行器的核心架构和实现原理 理解不同执行器的特性和适用场景 学会配置和优化本地与Docker执行器 能够根据业务需求选择合适的执行器配置 核心概念 AutoGen的代码执行系统是整个框架的核心组成部分,它提供了安全、可控的代码运行环境。执行器负责将智能体生成的代码转换为可执行的程序,并捕获执行结果。 代码执行架构图:代码生成、安全检查、环境执行、结果反馈的完整流程 环境准备 / 前置知识 Python 3.
本节导读:深入理解AutoGen代码执行器的核心机制,掌握不同执行器的配置方法和适用场景,构建安全高效的代码运行环境
AutoGen的代码执行系统是整个框架的核心组成部分,它提供了安全、可控的代码运行环境。执行器负责将智能体生成的代码转换为可执行的程序,并捕获执行结果。

本地命令行执行器是最基础的执行器类型,直接在宿主机上执行代码。
from autogen.code_executor import LocalCommandLineCodeExecutor from autogen_agentchat.agents import AssistantAgent from autogen_ext.models.openai import OpenAIChatCompletionClient # 基础本地执行器配置 local_executor = LocalCommandLineCodeExecutor( timeout=300, # 总体超时时间(秒) work_dir="/tmp/autogen_work", # 工作目录 execution_timeout=30, # 单次执行超时时间 max_output=10000 # 最大输出字符数 ) # 创建使用本地执行器的智能体 coding_assistant = AssistantAgent( name="python_coder", model_client=OpenAIChatCompletionClient(model="gpt-4o"), code_execution_config={"executor": local_executor} ) # 使用执行器 async def basic_local_execution(): code = "print('Hello, AutoGen!'); import math; print(f'Sqrt(16) = {math.sqrt(16)}')" result = local_executor.execute_code(code, "test.py") return result
特点分析:
Docker容器执行器提供了更好的隔离性和安全性。
from autogen.code_executor import DockerCommandLineCodeExecutor # Docker执行器高级配置 docker_executor = DockerCommandLineCodeExecutor( image="python:3.10-slim", # 基础镜像 timeout=600, # 10分钟超时 work_dir="/workspace", # 容器内工作目录 container_name="autogen-executor-{uuid}", # 容器名称模板 auto_remove=True, # 自动删除容器 network_mode="bridge", # 网络模式 port_bindings={8080: 8080}, # 端口映射 volume_mounts={ "/host/data": "/container/data", # 数据卷挂载 "/host/code": "/workspace/code" # 代码目录挂载 }, environment_vars={ "PYTHONPATH": "/workspace", "DATA_DIR": "/workspace/data", "AUTOKEN_VERSION": "1.0.0" }, resource_limits={ "memory": "2g", "cpu": "1.0", "pids": 100 } ) # 使用Docker执行器 async def docker_execution_example(): code = """ import pandas as pd import numpy as np import matplotlib.pyplot as plt # 生成数据分析代码 np.random.seed(42) data = pd.DataFrame({ 'timestamp': pd.date_range('2024-01-01', periods=100, freq='D'), 'value': np.random.randn(100).cumsum(), 'category': np.random.choice(['A', 'B', 'C'], 100) }) # 基础统计 print(data.head()) print(f"数据行数: {len(data)}") print(f"分类统计:\n{data['category'].value_counts()}") # 生成趋势图 plt.figure(figsize=(12, 6)) plt.plot(data['timestamp'], data['value'], label='趋势线') plt.title('时间序列数据分析') plt.xlabel('时间') plt.ylabel('数值') plt.legend() plt.grid(True) plt.tight_layout() plt.savefig('/workspace/trend_analysis.png') plt.close() print("分析完成,图表已保存") """ result = docker_executor.execute_code(code, "data_analysis.py") return result
特点分析:
基础执行器类支持自定义扩展:
from autogen.code_executor import CodeExecutor from typing import Dict, Any, Optional import asyncio import logging class AdvancedCodeExecutor(CodeExecutor): """高级代码执行器,支持多种执行模式和监控""" def __init__( self, execution_modes: Optional[Dict[str, str]] = None, monitoring_interval: int = 5, max_retries: int = 3, **kwargs ): super().__init__(**kwargs) self.execution_modes = execution_modes or { "standard": "标准执行模式", "debug": "调试模式,启用详细日志", "performance": "性能模式,优化执行速度" } self.monitoring_interval = monitoring_interval self.max_retries = max_retries self.logger = logging.getLogger("AdvancedCodeExecutor") def configure_environment(self, mode: str = "standard") -> Dict[str, Any]: """根据模式配置执行环境""" env_config = {} if mode == "debug": env_config.update({ "PYTHONPATH": "/workspace/debug", "DEBUG": "1", "LOG_LEVEL": "DEBUG" }) elif mode == "performance": env_config.update({ "PYTHONPATH": "/workspace/optimized", "OMP_NUM_THREADS": "4", "MKL_NUM_THREADS": "4" }) else: # standard env_config.update({ "PYTHONPATH": "/workspace", "PYTHONUNBUFFERED": "1" }) return env_config async def execute_with_retry(self, code: str, filename: str, mode: str = "standard") -> Dict[str, Any]: """支持重试机制的执行方法""" for attempt in range(self.max_retries): try: execution_task = asyncio.create_task( self.execute_code(code, filename) ) result = await execution_task return { 'success': True, 'result': result, 'attempts': attempt + 1, 'mode': mode } except Exception as e: self.logger.warning(f"执行失败 (尝试 {attempt + 1}/{self.max_retries}): {str(e)}") if attempt == self.max_retries - 1: return { 'success': False, 'error': str(e), 'attempts': attempt + 1, 'mode': mode } await asyncio.sleep(2 ** attempt)
为不同场景提供标准的配置模板:
from autogen.code_executor import DockerCommandLineCodeExecutor, LocalCommandLineCodeExecutor from enum import Enum class ExecutionScenario(Enum): """执行场景枚举""" DEVELOPMENT = "development" # 开发环境 PRODUCTION = "production" # 生产环境 TESTING = "testing" # 测试环境 SANDBOX = "sandbox" # 沙盒环境 class ExecutorTemplateFactory: """执行器配置模板工厂""" @staticmethod def get_template(scenario: ExecutionScenario) -> Dict[str, Any]: """根据场景获取配置模板""" templates = { ExecutionScenario.DEVELOPMENT: { "type": "local", "config": { "timeout": 120, "work_dir": "/tmp/dev", "execution_timeout": 30, "auto_remove": False, "debug_mode": True } }, ExecutionScenario.PRODUCTION: { "type": "docker", "config": { "image": "python:3.10-slim", "timeout": 1800, "work_dir": "/workspace", "auto_remove": True, "resource_limits": {"memory": "4g", "cpu": "2.0"} } }, ExecutionScenario.TESTING: { "type": "docker", "config": { "image": "python:3.10-slim", "timeout": 300, "work_dir": "/workspace/test", "auto_remove": True, "test_mode": True } }, ExecutionScenario.SANDBOX: { "type": "docker", "config": { "image": "python:3.10-slim", "timeout": 60, "work_dir": "/workspace/sandbox", "auto_remove": True, "blocked_imports": ["os", "subprocess", "sys"] } } } return templates.get(scenario, templates[ExecutionScenario.DEVELOPMENT]) # 使用模板工厂 def create_executors_by_scenario(): """根据场景创建执行器""" executors = {} for scenario in ExecutionScenario: template = ExecutorTemplateFactory.get_template(scenario) if template["type"] == "local": executor = LocalCommandLineCodeExecutor(**template["config"]) elif template["type"] == "docker": executor = DockerCommandLineCodeExecutor(**template["config"]) else: raise ValueError(f"不支持的执行器类型: {template['type']}") executors[scenario.value] = executor return executors
对于复杂的业务场景,可能需要多个执行器协同工作:
import asyncio from typing import List, Dict, Any from autogen.code_executor import DockerCommandLineCodeExecutor, LocalCommandLineCodeExecutor class MultiExecutorOrchestrator: """多执行器协调器""" def __init__(self): self.executors = { "preprocessing": LocalCommandLineCodeExecutor( timeout=60, work_dir="/tmp/preprocessing" ), "analysis": DockerCommandLineCodeExecutor( image="python:3.10-slim", timeout=300, work_dir="/workspace/analysis" ), "postprocessing": LocalCommandLineCodeExecutor( timeout=60, work_dir="/tmp/postprocessing" ) } async def execute_pipeline(self, code_chunks: List[Dict[str, Any]]) -> Dict[str, Any]: """执行管道化的代码任务""" results = {} tasks = [] # 创建执行任务 for chunk in code_chunks: executor_name = chunk["executor"] code = chunk["code"] filename = chunk["filename"] executor = self.executors[executor_name] task = asyncio.create_task( executor.execute_code(code, filename) ) tasks.append({ "name": executor_name, "task": task, "filename": filename }) # 并行执行 for task_info in tasks: try: result = await task_info["task"] results[task_info["name"]] = { "success": True, "result": result, "filename": task_info["filename"] } except Exception as e: results[task_info["name"]] = { "success": False, "error": str(e), "filename": task_info["filename"] } return results
import time from dataclasses import dataclass from autogen.code_executor import LocalCommandLineCodeExecutor @dataclass class ExecutionMetrics: """执行指标""" start_time: float end_time: Optional[float] = None memory_usage_mb: Optional[float] = None execution_status: str = "running" output_length: int = 0 error_message: Optional[str] = None class MonitoredCodeExecutor(LocalCommandLineCodeExecutor): """带监控的代码执行器""" def __init__(self, enable_monitoring: bool = True, **kwargs): super().__init__(**kwargs) self.enable_monitoring = enable_monitoring async def execute_code_monitoring(self, code: str, filename: str) -> Dict[str, Any]: """执行代码并收集监控数据""" metrics = ExecutionMetrics(start_time=time.time()) try: if self.enable_monitoring: result = await super().execute_code(code, filename) metrics.output_length = len(result.get('output', '')) else: result = await super().execute_code(code, filename) metrics.end_time = time.time() metrics.execution_status = "completed" return { "success": True, "result": result, "metrics": self._serialize_metrics(metrics) } except Exception as e: metrics.end_time = time.time() metrics.execution_status = "failed" metrics.error_message = str(e) return { "success": False, "error": str(e), "metrics": self._serialize_metrics(metrics) } def _serialize_metrics(self, metrics: ExecutionMetrics) -> Dict[str, Any]: """序列化指标数据""" duration = (metrics.end_time or time.time()) - metrics.start_time return { "duration_seconds": round(duration, 2), "memory_usage_mb": metrics.memory_usage_mb, "execution_status": metrics.execution_status, "output_length": metrics.output_length, "error_message": metrics.error_message }
A:根据具体需求选择:
A:合理设置timeout,使用异步监控,实现优雅的中断机制。对于计算密集型任务,考虑分块执行或使用专门的作业队列。
A:使用Docker容器隔离,限制模块导入,设置资源限制,实现代码审查机制,使用只读文件系统等。
A:为每个用户分配独立的执行环境,使用命名容器,隔离网络和存储,实现用户配额管理。
本节详细介绍了AutoGen代码执行器的核心机制,包括本地执行器、Docker执行器的配置方法,以及自定义执行器的开发。通过实际案例,我们学习了如何根据不同场景选择合适的执行器配置,如何实现多执行器协作,以及如何进行性能监控和安全控制。
关键要点:
下一节将深入探讨Docker集成的具体实现,包括容器环境配置、网络管理和安全策略。
关键词:代码执行, 执行器配置, Docker容器, 安全控制, 性能监控, 智能管理
难度:进阶
预计阅读:30分钟