Introduction
Agents operate in unpredictable environments. APIs fail, tools return unexpected data, LLMs misinterpret instructions. Without proper error handling, a single failure can crash the entire agent. With it, agents can recover, adapt, and still deliver value.
This section covers error classification, handling strategies, recovery patterns, and graceful degradation—everything you need to build resilient agents.
Core Principle: Design for failure. Every external call can fail, every tool can error, every LLM response can be unexpected. Plan for these cases upfront.
Types of Agent Errors
Understanding error types helps you handle them appropriately:
| Category | Examples | Typical Response |
|---|---|---|
| Transient | Network timeout, rate limit, temporary outage | Retry with backoff |
| Tool Errors | Invalid input, tool bug, resource not found | Fix input or try alternative |
| LLM Errors | Invalid JSON, hallucination, context overflow | Reprompt or constrain output |
| Logic Errors | Infinite loop, wrong tool selection, stuck state | Reset or escalate |
| Fatal Errors | Auth failure, critical service down, out of budget | Fail gracefully, alert |
Error Classification
🐍python
1from enum import Enum
2from dataclasses import dataclass
3from typing import Optional
4
5class ErrorCategory(Enum):
6 TRANSIENT = "transient" # Retry likely to succeed
7 TOOL = "tool" # Tool-related issue
8 LLM = "llm" # LLM response issue
9 LOGIC = "logic" # Agent logic issue
10 FATAL = "fatal" # Cannot recover
11
12class ErrorSeverity(Enum):
13 LOW = "low" # Log and continue
14 MEDIUM = "medium" # Attempt recovery
15 HIGH = "high" # Major recovery needed
16 CRITICAL = "critical" # Cannot continue
17
18@dataclass
19class AgentError(Exception):
20 """Structured agent error."""
21 message: str
22 category: ErrorCategory
23 severity: ErrorSeverity
24 recoverable: bool
25 original_error: Optional[Exception] = None
26 context: dict = None
27
28 def __str__(self) -> str:
29 return f"[{self.category.value}:{self.severity.value}] {self.message}"
30
31def classify_error(error: Exception) -> AgentError:
32 """Classify an exception into an AgentError."""
33
34 error_str = str(error).lower()
35
36 # Check for transient errors
37 if any(x in error_str for x in ["timeout", "rate limit", "503", "429"]):
38 return AgentError(
39 message=str(error),
40 category=ErrorCategory.TRANSIENT,
41 severity=ErrorSeverity.MEDIUM,
42 recoverable=True,
43 original_error=error
44 )
45
46 # Check for auth errors
47 if any(x in error_str for x in ["401", "403", "unauthorized", "forbidden"]):
48 return AgentError(
49 message=str(error),
50 category=ErrorCategory.FATAL,
51 severity=ErrorSeverity.CRITICAL,
52 recoverable=False,
53 original_error=error
54 )
55
56 # Check for tool errors
57 if any(x in error_str for x in ["tool", "function", "not found"]):
58 return AgentError(
59 message=str(error),
60 category=ErrorCategory.TOOL,
61 severity=ErrorSeverity.MEDIUM,
62 recoverable=True,
63 original_error=error
64 )
65
66 # Default to logic error
67 return AgentError(
68 message=str(error),
69 category=ErrorCategory.LOGIC,
70 severity=ErrorSeverity.HIGH,
71 recoverable=True,
72 original_error=error
73 )Error Handling Strategies
Retry with Exponential Backoff
🐍python
1import asyncio
2from typing import TypeVar, Callable, Awaitable
3import random
4
5T = TypeVar('T')
6
7async def retry_with_backoff(
8 fn: Callable[[], Awaitable[T]],
9 max_retries: int = 3,
10 base_delay: float = 1.0,
11 max_delay: float = 60.0,
12 exceptions: tuple = (Exception,)
13) -> T:
14 """
15 Retry an async function with exponential backoff.
16 """
17 last_error = None
18
19 for attempt in range(max_retries + 1):
20 try:
21 return await fn()
22 except exceptions as e:
23 last_error = e
24
25 if attempt == max_retries:
26 raise
27
28 # Calculate delay with jitter
29 delay = min(base_delay * (2 ** attempt), max_delay)
30 jitter = random.uniform(0, delay * 0.1)
31 total_delay = delay + jitter
32
33 print(f"Attempt {attempt + 1} failed: {e}. "
34 f"Retrying in {total_delay:.1f}s...")
35
36 await asyncio.sleep(total_delay)
37
38 raise last_error
39
40
41# Usage
42async def make_api_call():
43 return await retry_with_backoff(
44 lambda: client.messages.create(...),
45 max_retries=3,
46 exceptions=(TimeoutError, ConnectionError)
47 )Circuit Breaker
🐍python
1import time
2from dataclasses import dataclass
3
4@dataclass
5class CircuitBreaker:
6 """
7 Prevents repeated calls to failing services.
8 """
9 failure_threshold: int = 5
10 reset_timeout: float = 60.0
11
12 # State
13 failures: int = 0
14 last_failure_time: float = 0
15 state: str = "closed" # closed, open, half-open
16
17 def can_proceed(self) -> bool:
18 """Check if we can make a call."""
19 if self.state == "closed":
20 return True
21
22 if self.state == "open":
23 # Check if reset timeout has passed
24 if time.time() - self.last_failure_time > self.reset_timeout:
25 self.state = "half-open"
26 return True
27 return False
28
29 if self.state == "half-open":
30 return True
31
32 return False
33
34 def record_success(self) -> None:
35 """Record a successful call."""
36 self.failures = 0
37 self.state = "closed"
38
39 def record_failure(self) -> None:
40 """Record a failed call."""
41 self.failures += 1
42 self.last_failure_time = time.time()
43
44 if self.failures >= self.failure_threshold:
45 self.state = "open"
46 print(f"Circuit breaker opened after {self.failures} failures")
47
48
49class ToolWithCircuitBreaker:
50 """Tool wrapper with circuit breaker."""
51
52 def __init__(self, tool: Tool):
53 self.tool = tool
54 self.breaker = CircuitBreaker()
55
56 async def execute(self, **kwargs) -> str:
57 if not self.breaker.can_proceed():
58 return f"Error: Tool {self.tool.name} is temporarily unavailable"
59
60 try:
61 result = await self.tool.execute(**kwargs)
62 self.breaker.record_success()
63 return result
64 except Exception as e:
65 self.breaker.record_failure()
66 raiseFallback Strategies
🐍python
1class FallbackHandler:
2 """Handle failures with fallback options."""
3
4 def __init__(self):
5 self.fallbacks: dict[str, list[Callable]] = {}
6
7 def register_fallback(
8 self,
9 tool_name: str,
10 fallback_fn: Callable
11 ) -> None:
12 """Register a fallback for a tool."""
13 if tool_name not in self.fallbacks:
14 self.fallbacks[tool_name] = []
15 self.fallbacks[tool_name].append(fallback_fn)
16
17 async def execute_with_fallback(
18 self,
19 tool: Tool,
20 **kwargs
21 ) -> str:
22 """Execute tool with fallbacks."""
23
24 # Try primary tool
25 try:
26 return await tool.execute(**kwargs)
27 except Exception as primary_error:
28 print(f"Primary tool {tool.name} failed: {primary_error}")
29
30 # Try fallbacks
31 fallbacks = self.fallbacks.get(tool.name, [])
32 for i, fallback in enumerate(fallbacks):
33 try:
34 print(f"Trying fallback {i + 1} for {tool.name}")
35 return await fallback(**kwargs)
36 except Exception as e:
37 print(f"Fallback {i + 1} failed: {e}")
38
39 # All failed
40 return f"Error: {tool.name} and all fallbacks failed"
41
42
43# Example usage
44fallback_handler = FallbackHandler()
45
46# Register fallback search providers
47fallback_handler.register_fallback(
48 "web_search",
49 lambda query, **_: f"Fallback results for: {query}"
50)Recovery Patterns
Automatic Retry Loop
🐍python
1class RecoverableAgent:
2 """Agent with built-in recovery."""
3
4 def __init__(self, max_recovery_attempts: int = 3):
5 self.max_recovery_attempts = max_recovery_attempts
6
7 async def execute_step_with_recovery(
8 self,
9 step: dict,
10 context: dict
11 ) -> dict:
12 """Execute a step with automatic recovery."""
13
14 for attempt in range(self.max_recovery_attempts):
15 try:
16 result = await self._execute_step(step, context)
17 return {"success": True, "result": result}
18
19 except AgentError as e:
20 if not e.recoverable:
21 return {"success": False, "error": str(e)}
22
23 # Try recovery based on error category
24 recovery_result = await self._attempt_recovery(
25 e, step, context, attempt
26 )
27
28 if recovery_result.get("skip_step"):
29 return {"success": True, "result": "Step skipped", "skipped": True}
30
31 if recovery_result.get("retry"):
32 step = recovery_result.get("modified_step", step)
33 continue
34
35 if recovery_result.get("alternative"):
36 return await self.execute_step_with_recovery(
37 recovery_result["alternative"],
38 context
39 )
40
41 return {"success": False, "error": "Max recovery attempts exceeded"}
42
43 async def _attempt_recovery(
44 self,
45 error: AgentError,
46 step: dict,
47 context: dict,
48 attempt: int
49 ) -> dict:
50 """Attempt to recover from an error."""
51
52 if error.category == ErrorCategory.TRANSIENT:
53 # Wait and retry
54 await asyncio.sleep(2 ** attempt)
55 return {"retry": True}
56
57 if error.category == ErrorCategory.TOOL:
58 # Try to fix the input
59 fixed_step = await self._fix_tool_input(step, error)
60 if fixed_step:
61 return {"retry": True, "modified_step": fixed_step}
62 return {"skip_step": True}
63
64 if error.category == ErrorCategory.LLM:
65 # Reprompt with clarification
66 clarified_step = await self._clarify_step(step, error)
67 return {"retry": True, "modified_step": clarified_step}
68
69 return {"retry": False}State Checkpointing
🐍python
1import json
2from pathlib import Path
3
4class CheckpointManager:
5 """Save and restore agent state for recovery."""
6
7 def __init__(self, checkpoint_dir: str = ".checkpoints"):
8 self.checkpoint_dir = Path(checkpoint_dir)
9 self.checkpoint_dir.mkdir(exist_ok=True)
10
11 def save(self, agent_id: str, state: dict) -> str:
12 """Save agent state to checkpoint."""
13 checkpoint_path = self.checkpoint_dir / f"{agent_id}.json"
14
15 with open(checkpoint_path, "w") as f:
16 json.dump({
17 "agent_id": agent_id,
18 "timestamp": datetime.now().isoformat(),
19 "state": state
20 }, f, default=str)
21
22 return str(checkpoint_path)
23
24 def load(self, agent_id: str) -> Optional[dict]:
25 """Load agent state from checkpoint."""
26 checkpoint_path = self.checkpoint_dir / f"{agent_id}.json"
27
28 if not checkpoint_path.exists():
29 return None
30
31 with open(checkpoint_path, "r") as f:
32 data = json.load(f)
33 return data["state"]
34
35 def delete(self, agent_id: str) -> bool:
36 """Delete checkpoint after successful completion."""
37 checkpoint_path = self.checkpoint_dir / f"{agent_id}.json"
38
39 if checkpoint_path.exists():
40 checkpoint_path.unlink()
41 return True
42 return False
43
44
45class CheckpointedAgent:
46 """Agent that checkpoints state for crash recovery."""
47
48 def __init__(self):
49 self.checkpoints = CheckpointManager()
50 self.agent_id = str(uuid.uuid4())[:8]
51
52 async def run(self, task: str, resume: bool = False) -> str:
53 """Run with checkpointing."""
54
55 # Check for existing checkpoint
56 if resume:
57 saved_state = self.checkpoints.load(self.agent_id)
58 if saved_state:
59 print(f"Resuming from checkpoint")
60 state = saved_state
61 else:
62 state = self._initialize_state(task)
63 else:
64 state = self._initialize_state(task)
65
66 # Main loop with checkpointing
67 while not state["complete"]:
68 try:
69 # Execute next step
70 step_result = await self._execute_next_step(state)
71 state["steps"].append(step_result)
72 state["current_step"] += 1
73
74 # Checkpoint after each step
75 self.checkpoints.save(self.agent_id, state)
76
77 except Exception as e:
78 print(f"Error at step {state['current_step']}: {e}")
79 self.checkpoints.save(self.agent_id, state)
80 raise
81
82 # Clean up checkpoint on success
83 self.checkpoints.delete(self.agent_id)
84 return state["result"]Graceful Degradation
When full recovery isn't possible, degrade gracefully:
🐍python
1class GracefulDegradation:
2 """Handle partial failures gracefully."""
3
4 def __init__(self):
5 self.degradation_levels = [
6 "full", # All features available
7 "limited", # Some tools unavailable
8 "basic", # Core functionality only
9 "readonly", # Can only retrieve, not act
10 "offline" # No external services
11 ]
12 self.current_level = "full"
13 self.unavailable_tools: set[str] = set()
14
15 def degrade_to(self, level: str) -> None:
16 """Degrade to a specific level."""
17 if level in self.degradation_levels:
18 self.current_level = level
19 print(f"Degraded to {level} mode")
20
21 def mark_tool_unavailable(self, tool_name: str) -> None:
22 """Mark a tool as temporarily unavailable."""
23 self.unavailable_tools.add(tool_name)
24
25 # Check if we need to degrade further
26 if len(self.unavailable_tools) > 2:
27 self.degrade_to("limited")
28
29 def is_tool_available(self, tool_name: str) -> bool:
30 """Check if a tool is currently available."""
31 if self.current_level == "offline":
32 return False
33 if self.current_level == "readonly" and tool_name not in ["read_file", "search"]:
34 return False
35 return tool_name not in self.unavailable_tools
36
37 def get_available_tools(self, all_tools: list[Tool]) -> list[Tool]:
38 """Get list of currently available tools."""
39 return [
40 t for t in all_tools
41 if self.is_tool_available(t.name)
42 ]
43
44 def get_degradation_message(self) -> str:
45 """Get user-friendly degradation message."""
46 messages = {
47 "full": "",
48 "limited": "Some features are temporarily unavailable.",
49 "basic": "Running in basic mode with limited functionality.",
50 "readonly": "Running in read-only mode. Cannot make changes.",
51 "offline": "Running offline. External services unavailable."
52 }
53 return messages.get(self.current_level, "")
54
55
56class DegradableAgent:
57 """Agent with graceful degradation."""
58
59 def __init__(self, tools: list[Tool]):
60 self.all_tools = tools
61 self.degradation = GracefulDegradation()
62
63 async def run(self, task: str) -> str:
64 """Run with degradation handling."""
65
66 # Check current state
67 available_tools = self.degradation.get_available_tools(self.all_tools)
68
69 if not available_tools:
70 return "Cannot proceed: all tools are currently unavailable."
71
72 # Add degradation context to system prompt
73 system_suffix = ""
74 if self.degradation.current_level != "full":
75 system_suffix = f"""
76
77Note: {self.degradation.get_degradation_message()}
78Available tools: {[t.name for t in available_tools]}
79Unavailable: {list(self.degradation.unavailable_tools)}
80
81Work within these constraints. Inform the user if you cannot fully complete the task.
82"""
83
84 # Run with available tools
85 try:
86 result = await self._run_with_tools(task, available_tools, system_suffix)
87 return result
88 except Exception as e:
89 # Further degrade and retry
90 self.degradation.degrade_to("basic")
91 return await self.run(task) # Retry with degraded capabilitiesAlways inform users when operating in degraded mode. Unexpected limitations are worse than known constraints.
Complete Error System
🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3import asyncio
4import logging
5
6logging.basicConfig(level=logging.INFO)
7logger = logging.getLogger(__name__)
8
9@dataclass
10class ErrorConfig:
11 """Error handling configuration."""
12 max_retries: int = 3
13 base_delay: float = 1.0
14 circuit_breaker_threshold: int = 5
15 checkpoint_frequency: int = 1 # Every N steps
16 enable_degradation: bool = True
17
18class RobustAgent:
19 """
20 Production-ready agent with comprehensive error handling.
21 """
22
23 def __init__(
24 self,
25 tools: list[Tool],
26 config: ErrorConfig = None
27 ):
28 self.tools = {t.name: t for t in tools}
29 self.config = config or ErrorConfig()
30
31 # Error handling components
32 self.circuit_breakers: dict[str, CircuitBreaker] = {}
33 self.fallback_handler = FallbackHandler()
34 self.degradation = GracefulDegradation()
35 self.checkpoints = CheckpointManager()
36
37 # Metrics
38 self.error_counts: dict[str, int] = {}
39 self.recovery_counts: dict[str, int] = {}
40
41 async def execute_tool(
42 self,
43 tool_name: str,
44 **kwargs
45 ) -> tuple[bool, str]:
46 """Execute a tool with full error handling."""
47
48 # Check degradation
49 if not self.degradation.is_tool_available(tool_name):
50 return False, f"Tool {tool_name} is currently unavailable"
51
52 # Check circuit breaker
53 if tool_name not in self.circuit_breakers:
54 self.circuit_breakers[tool_name] = CircuitBreaker()
55
56 breaker = self.circuit_breakers[tool_name]
57 if not breaker.can_proceed():
58 return False, f"Tool {tool_name} is circuit-broken"
59
60 # Execute with retry
61 tool = self.tools.get(tool_name)
62 if not tool:
63 return False, f"Unknown tool: {tool_name}"
64
65 for attempt in range(self.config.max_retries):
66 try:
67 result = await asyncio.wait_for(
68 tool.execute(**kwargs),
69 timeout=30.0
70 )
71 breaker.record_success()
72 return True, result
73
74 except asyncio.TimeoutError:
75 logger.warning(f"Tool {tool_name} timed out (attempt {attempt + 1})")
76 self._record_error(tool_name, "timeout")
77
78 except Exception as e:
79 logger.warning(f"Tool {tool_name} error: {e} (attempt {attempt + 1})")
80 self._record_error(tool_name, str(type(e).__name__))
81
82 error = classify_error(e)
83 if not error.recoverable:
84 breaker.record_failure()
85 return False, str(e)
86
87 # Wait before retry
88 await asyncio.sleep(self.config.base_delay * (2 ** attempt))
89
90 # All retries failed
91 breaker.record_failure()
92 self.degradation.mark_tool_unavailable(tool_name)
93
94 # Try fallback
95 fallback_result = await self.fallback_handler.execute_with_fallback(
96 tool, **kwargs
97 )
98 if not fallback_result.startswith("Error"):
99 self._record_recovery(tool_name)
100 return True, fallback_result
101
102 return False, f"Tool {tool_name} failed after {self.config.max_retries} attempts"
103
104 def _record_error(self, tool_name: str, error_type: str) -> None:
105 """Record error for metrics."""
106 key = f"{tool_name}:{error_type}"
107 self.error_counts[key] = self.error_counts.get(key, 0) + 1
108
109 def _record_recovery(self, tool_name: str) -> None:
110 """Record successful recovery."""
111 self.recovery_counts[tool_name] = self.recovery_counts.get(tool_name, 0) + 1
112
113 def get_health_status(self) -> dict:
114 """Get agent health status."""
115 return {
116 "degradation_level": self.degradation.current_level,
117 "unavailable_tools": list(self.degradation.unavailable_tools),
118 "circuit_breakers": {
119 name: breaker.state
120 for name, breaker in self.circuit_breakers.items()
121 },
122 "error_counts": self.error_counts,
123 "recovery_counts": self.recovery_counts
124 }Summary
Robust error handling makes agents reliable. We covered:
- Error types: Transient, tool, LLM, logic, and fatal errors
- Handling strategies: Retry with backoff, circuit breakers, fallbacks
- Recovery patterns: Automatic retry loops, state checkpointing
- Graceful degradation: Maintaining partial functionality when systems fail
- Complete system: Integrated error handling for production agents
In the next section, we'll cover testing your agent—ensuring it works correctly before deployment.