3.5 错误恢复与自愈循环设计 | Loop Engineering 异常处理、自动回滚与自愈机制实战 导读 在自主循环中,错误不是异常而是常态。Agent 调用工具可能超时、验证器可能给出矛盾反馈、环境可能发生意外变化。关键不是避免错误,而是让循环具备从错误中自愈的能力。本节系统分类循环中的错误类型,设计可恢复/不可恢复的区分机制,实现完整的自愈框架。 学习目标 识别 Loop Engineering 循环中四类错误(Agent、工具、验证器、环境)的特征 实现基于错误分类的自动恢复框架,支持重试、回滚、降级和跳过策略 设计指数退避、有限重试、智能跳过等重试策略 将自愈机制集成到 discover → plan → execute → verify 循环中 核心概念 错误分类体系 可恢复
在自主循环中,错误不是异常而是常态。Agent 调用工具可能超时、验证器可能给出矛盾反馈、环境可能发生意外变化。关键不是避免错误,而是让循环具备从错误中自愈的能力。本节系统分类循环中的错误类型,设计可恢复/不可恢复的区分机制,实现完整的自愈框架。
首先建立统一的错误分类体系,为后续的恢复策略选择提供基础。
"""error_classifier.py — 循环错误分类与诊断""" import re, time, traceback from dataclasses import dataclass, field from enum import Enum class ErrorCategory(Enum): AGENT = "agent" # Agent 自身产生的错误 TOOL = "tool" # 工具调用失败 VERIFIER = "verifier" # 验证器问题 ENVIRONMENT = "environment" # 环境问题 UNKNOWN = "unknown" class RecoveryAction(Enum): RETRY = "retry" # 带退避重试 ROLLBACK = "rollback" # 回滚到检查点 DEGRADE = "degrade" # 降级到替代方案 SKIP = "skip" # 跳过当前子任务 ESCALATE = "escalate" # 升级人工处理 class ErrorSeverity(Enum): LOW = "low" # 不影响主流程 MEDIUM = "medium" # 影响当前步骤 HIGH = "high" # 影响当前迭代 CRITICAL = "critical" # 需要人工介入 @dataclass class ClassifiedError: """分类后的错误""" original_error: Exception category: ErrorCategory severity: ErrorSeverity recoverable: bool suggested_action: RecoveryAction context: str = "" # 错误发生时的上下文 timestamp: float = field(default_factory=time.time) diagnosis: str = "" # 诊断说明 class ErrorClassifier: """错误分类器""" # 错误模式 → 分类映射 TOOL_PATTERNS = { ErrorCategory.TOOL: [ (r"timeout", "工具调用超时"), (r"connection.*refused", "连接被拒绝"), (r"403|401|permission", "权限不足"), (r"rate.?limit", "API 限流"), (r"file.*not.?found", "文件不存在"), (r"disk.*full|no.?space", "磁盘空间不足"), ], } ENV_PATTERNS = { ErrorCategory.ENVIRONMENT: [ (r"OOM|memory", "内存不足"), (r"network.*unreachable", "网络不可达"), (r"service.*unavailable", "服务不可用"), ] } VERIFIER_PATTERNS = { ErrorCategory.VERIFIER: [ (r"verification.*failed", "验证失败"), (r"contradict", "验证标准矛盾"), (r"false.?positive", "误报"), ] } def classify(self, error: Exception, context: str = "") -> ClassifiedError: """对异常进行分类""" error_str = str(error).lower() error_type = type(error).__name__ # 1. 匹配已知模式 category, diagnosis = self._match_patterns(error_str) # 2. 根据异常类型判断 if category == ErrorCategory.UNKNOWN: if error_type in ("TimeoutError", "ConnectionError"): category = ErrorCategory.TOOL diagnosis = "网络相关错误" elif error_type in ("PermissionError",): category = ErrorCategory.TOOL diagnosis = "权限错误" elif error_type in ("ValueError", "TypeError"): category = ErrorCategory.AGENT diagnosis = "Agent 生成了无效值" elif error_type in ("FileNotFoundError", "IOError"): category = ErrorCategory.ENVIRONMENT diagnosis = "环境文件问题" # 3. 判断严重程度和可恢复性 severity, recoverable, action = self._assess_severity( category, error_str) return ClassifiedError( original_error=error, category=category, severity=severity, recoverable=recoverable, suggested_action=action, context=context, diagnosis=diagnosis or f"未识别的错误: {error_type}") def _match_patterns(self, error_str: str): for category, patterns in [ *self.TOOL_PATTERNS.items(), *self.ENV_PATTERNS.items(), *self.VERIFIER_PATTERNS.items(), ]: for pattern, desc in patterns: if re.search(pattern, error_str): return category, desc return ErrorCategory.UNKNOWN, "" def _assess_severity(self, category, error_str): critical_keywords = {"oom", "disk full", "memory", "corrupt"} if any(k in error_str for k in critical_keywords): return ErrorSeverity.CRITICAL, False, RecoveryAction.ESCALATE if category == ErrorCategory.ENVIRONMENT: return ErrorSeverity.HIGH, False, RecoveryAction.ESCALATE if "rate" in error_str or "limit" in error_str: return ErrorSeverity.MEDIUM, True, RecoveryAction.RETRY if "timeout" in error_str: return ErrorSeverity.MEDIUM, True, RecoveryAction.RETRY if "permission" in error_str or "403" in error_str: return ErrorSeverity.MEDIUM, True, RecoveryAction.DEGRADE return ErrorSeverity.LOW, True, RecoveryAction.RETRY # 演示 if __name__ == "__main__": classifier = ErrorClassifier() test_errors = [ TimeoutError("API call timeout after 30s"), PermissionError("Permission denied: /etc/config"), ValueError("Invalid JSON generated by agent"), RuntimeError("Disk full, cannot write output"), ConnectionError("Rate limit exceeded (429)"), ] for err in test_errors: result = classifier.classify(err) print(f"[{result.severity.value:8s}] {type(err).__name__}: " f"{result.category.value} | 可恢复:{result.recoverable} | " f"策略:{result.suggested_action.value} | {result.diagnosis}")
基于错误分类结果,执行对应的恢复策略。
"""recovery_executor.py — 错误恢复策略执行器""" import time, json, os from dataclasses import dataclass, field from error_classifier import ( ClassifiedError, RecoveryAction, ErrorSeverity ) @dataclass class RecoveryResult: """恢复结果""" success: bool action_taken: RecoveryAction attempts: int total_time_ms: float message: str fallback_used: bool = False class CheckpointManager: """检查点管理器: 支持状态回滚""" def __init__(self, checkpoint_dir: str = "/tmp/loop_checkpoints"): self.checkpoint_dir = checkpoint_dir self.checkpoints: dict[str, dict] = {} os.makedirs(checkpoint_dir, exist_ok=True) def save(self, name: str, state: dict): """保存检查点""" path = f"{self.checkpoint_dir}/{name}.json" with open(path, 'w') as f: json.dump(state, f, ensure_ascii=False) self.checkpoints[name] = {"path": path, "time": time.time()} def restore(self, name: str) -> dict | None: """恢复到指定检查点""" if name not in self.checkpoints: return None path = self.checkpoints[name]["path"] with open(path) as f: return json.load(f) def list_checkpoints(self) -> list[str]: return list(self.checkpoints.keys()) class RecoveryExecutor: """恢复策略执行器""" def __init__(self, checkpoint_mgr: CheckpointManager = None): self.checkpoints = checkpoint_mgr or CheckpointManager() self.recovery_log: list[dict] = [] self.recovery_counts: dict[str, int] = {} def execute(self, classified: ClassifiedError, retry_fn=None, rollback_target: str = None, degrade_fn=None) -> RecoveryResult: """执行恢复策略""" start = time.time() action = classified.suggested_action if action == RecoveryAction.RETRY: result = self._retry_with_backoff(classified, retry_fn) elif action == RecoveryAction.ROLLBACK: result = self._rollback(classified, rollback_target) elif action == RecoveryAction.DEGRADE: result = self._degrade(classified, degrade_fn) elif action == RecoveryAction.SKIP: result = self._skip(classified) else: result = RecoveryResult( success=False, action=action, attempts=0, total_time_ms=(time.time()-start)*1000, message="需要人工介入,已升级处理") self.recovery_log.append({ "action": action.value, "success": result.success, "category": classified.category.value, "attempts": result.attempts}) return result def _retry_with_backoff(self, classified, retry_fn, max_retries: int = 3) -> RecoveryResult: """指数退避重试""" for attempt in range(1, max_retries + 1): delay = min(2 ** attempt, 30) # 最大30秒 print(f" 🔄 重试 {attempt}/{max_retries} (等待{delay}s)...") time.sleep(delay) try: if retry_fn: retry_fn() return RecoveryResult( success=True, action=RecoveryAction.RETRY, attempts=attempt, total_time_ms=0, message=f"第{attempt}次重试成功") except Exception: continue return RecoveryResult( success=False, action=RecoveryAction.RETRY, attempts=max_retries, total_time_ms=0, message=f"重试{max_retries}次后仍失败") def _rollback(self, classified, target) -> RecoveryResult: """回滚到检查点""" state = self.checkpoints.restore(target) if state: return RecoveryResult( success=True, action=RecoveryAction.ROLLBACK, attempts=1, total_time_ms=0, message=f"已回滚到检查点: {target}") return RecoveryResult( success=False, action=RecoveryAction.ROLLBACK, attempts=1, total_time_ms=0, message=f"检查点 {target} 不存在") def _degrade(self, classified, degrade_fn) -> RecoveryResult: """降级到替代方案""" try: if degrade_fn: degrade_fn() return RecoveryResult( success=True, action=RecoveryAction.DEGRADE, attempts=1, total_time_ms=0, message="已降级到替代方案", fallback_used=True) except Exception as e: return RecoveryResult( success=False, action=RecoveryAction.DEGRADE, attempts=1, total_time_ms=0, message=f"降级也失败: {e}") def _skip(self, classified) -> RecoveryResult: """跳过当前子任务""" return RecoveryResult( success=True, action=RecoveryAction.SKIP, attempts=0, total_time_ms=0, message=f"已跳过: {classified.diagnosis}", fallback_used=True) def get_recovery_summary(self) -> dict: """恢复操作摘要""" total = len(self.recovery_log) success = sum(1 for r in self.recovery_log if r["success"]) return { "total_recoveries": total, "successful": success, "success_rate": success / total if total > 0 else 0, "by_action": self._count_by_action() } def _count_by_action(self) -> dict: counts = {} for r in self.recovery_log: a = r["action"] counts[a] = counts.get(a, 0) + 1 return counts
将错误分类、恢复策略和检查点管理集成到完整的循环引擎中。
"""self_healing_loop.py — 自愈循环引擎""" import json, time from error_classifier import ErrorClassifier from recovery_executor import RecoveryExecutor, CheckpointManager class SelfHealingLoop: """自愈循环引擎""" def __init__(self, name: str, max_iterations: int = 20, checkpoint_interval: int = 3): self.name = name self.max_iterations = max_iterations self.checkpoint_interval = checkpoint_interval self.classifier = ErrorClassifier() self.recovery = RecoveryExecutor() self.iteration = 0 self.errors_handled = 0 self.skipped_tasks = 0 def _save_checkpoint(self, state: dict): self.recovery.checkpoints.save(f"iter_{self.iteration}", state) def run(self, step_fn, state: dict = None): """ step_fn(iteration, state) -> (output, should_continue) state 是可变状态, 支持检查点恢复 """ if state is None: state = {"progress": [], "artifacts": {}} print(f"🔄 自愈循环启动: {self.name}") print(f" 最大迭代: {self.max_iterations} | " f"检查点间隔: 每{self.checkpoint_interval}次") while self.iteration < self.max_iterations: self.iteration += 1 # 定期保存检查点 if self.iteration % self.checkpoint_interval == 0: self._save_checkpoint(state) # 执行步骤 try: output, should_continue = step_fn(self.iteration, state) print(f" ✅ 迭代{self.iteration}: {output[:50]}") if not should_continue: print(f"\n🏁 循环正常完成") break except Exception as e: self.errors_handled += 1 classified = self.classifier.classify(e, context=f"iteration={self.iteration}") print(f" ❌ 迭代{self.iteration} 错误: " f"[{classified.category.value}] {e}") print(f" 诊断: {classified.diagnosis}") if not classified.recoverable: print(f" 🛑 不可恢复错误, 升级人工处理") break # 执行恢复 result = self.recovery.execute( classified, retry_fn=lambda: step_fn(self.iteration, state), rollback_target=f"iter_{max(1, self.iteration - self.checkpoint_interval)}") if result.success: print(f" 🔧 恢复成功: {result.message}") if result.action.value == "skip": self.skipped_tasks += 1 else: print(f" 🛑 恢复失败: {result.message}, 停止循环") break summary = self.recovery.get_recovery_summary() print(f"\n📊 循环摘要:") print(f" 迭代: {self.iteration} | 错误处理: {self.errors_handled} | " f"跳过: {self.skipped_tasks}") print(f" 恢复成功率: {summary['success_rate']:.0%}") print(f" 恢复操作: {summary['by_action']}") return {"iterations": self.iteration, "errors": self.errors_handled, "recovery_summary": summary} # 使用示例 if __name__ == "__main__": def my_step(i, state): state["progress"].append(f"step_{i}") # 模拟不同类型的错误 if i == 3: raise TimeoutError("API call timeout") if i == 5: raise ConnectionError("Rate limit exceeded (429)") if i == 7: raise PermissionError("Permission denied: /etc/config") return f"完成步骤{i}", i < 10 loop = SelfHealingLoop("API开发任务", max_iterations=12) result = loop.run(my_step) print(json.dumps(result, ensure_ascii=False, indent=2))
"""e2e_self_healing.py — 端到端自愈演示""" from self_healing_loop import SelfHealingLoop # 模拟一个完整的开发循环,包含多种错误场景 def development_step(i, state): state["progress"].append(f"iter_{i}") if i == 2: raise TimeoutError("npm install timeout after 60s") if i == 4: raise PermissionError("Cannot write to /var/log/app.log") if i == 6: raise ValueError("Agent generated invalid Python syntax") if i == 8: raise ConnectionError("Rate limit: too many API calls") output = f"成功完成迭代{i}: 生成代码并验证" state["artifacts"][f"artifact_{i}"] = output return output, i < 10 loop = SelfHealingLoop("全栈开发任务", max_iterations=12, checkpoint_interval=2) result = loop.run(development_step) print(f"\n最终状态: artifacts={len(result.get('state',{}).get('artifacts',{}))}")
判断标准有三个维度:(1)错误是否是瞬时的——超时、限流通常是瞬时的,磁盘满、内存溢出通常不是;(2)是否有替代路径——权限不足可能可以换一个写入位置,但服务永久宕机无法替代;(3)重试是否会改变结果——如果是参数错误,重试多少次都一样。实际中建议"乐观恢复"策略:先假设可恢复并尝试恢复,失败后再升级。错误的分类器(如本节实现的 ErrorClassifier)提供初始判断,但恢复执行器的实际结果才是最终裁决。
检查点的 I/O 开销通常很小(JSON 序列化几十 KB 的状态文件),频率的主要考量不是性能而是恢复粒度。每 2-3 次迭代保存一个检查点是合理的起点。频率太低意味着回滚时损失更多工作;太高则检查点管理本身变得复杂。建议:关键操作(如大规模文件变更)前后各保存一次;常规迭代按固定间隔保存。
它们是互补的关系。漂移检测处理的是"方向性偏移"——Agent 在正常工作但方向错了;自愈处理的是"执行性错误"——某个具体操作失败了。两者可以同时存在于一个循环中:漂移检测在每次迭代时检查方向,自愈机制在错误发生时执行恢复。它们不会冲突,因为触发时机不同——漂移是定期检查,自愈是错误驱动。极端情况下,如果自愈恢复后 Agent 继续在同一错误上失败,漂移检测可能会识别出策略漂移并触发重新锚定。
每次迭代前保存检查点。 即使是轻量级的状态快照也比没有好。检查点是回滚的基础,没有检查点就没有回滚能力。
为每种错误类型预设恢复策略。 不要在错误发生时才临时决定怎么做。在循环设计阶段就为可能出现的错误类型制定恢复计划,减少恢复时的决策延迟。
记录所有恢复操作的详细日志。 自愈机制是"隐式"执行的——没有人工介入就意味着可能不知道发生过错误。完整日志对事后分析和系统改进至关重要。
设置恢复操作的上限。 如果同一个错误连续恢复 3 次仍然失败,立即停止并升级。避免无限恢复循环浪费 Token。
不要让自愈机制隐藏严重问题。 某些错误(如数据损坏、安全违规)不应被自动恢复,而应立即停止并告警。恢复策略的 ESCALATE 路径必须通畅。
不要在回滚后不分析失败原因。 回滚只是恢复了状态,如果不分析为什么失败,恢复后的循环很可能再次遇到同样的问题。每次恢复后应将失败信息注入上下文,指导 Agent 避免重蹈覆辙。
避免恢复策略过于激进。 频繁的降级和跳过会导致最终产出的质量逐步下降,形成"降级螺旋"。设定降级次数上限。
本节构建了完整的错误恢复与自愈体系。我们学习了四类循环错误的特征,实现了基于模式匹配的错误分类器,设计了检查点管理器支持状态回滚,构建了支持重试/回滚/降级/跳过的恢复执行器,并将所有组件集成到自愈循环引擎中。
核心要点:循环中错误是常态而非异常;错误分类是恢复决策的基础;检查点为回滚提供基础;恢复策略应预设而非临时决定;自愈与漂移检测互补而非冲突。
下一节是第3章的最后一节——安全边界与权限设计,探讨如何在赋予 Agent 自主执行能力的同时确保系统的安全可控。
关键词:错误恢复、自愈循环、Self-Healing、Loop Engineering、检查点回滚、指数退避、降级执行、错误分类、容错设计、自主循环
难度:⭐⭐⭐⭐ 中高级
预计阅读时间:25 分钟