Chapter 11
15 min read
Section 69 of 175

Error Handling and Recovery

Building Your First Agent

Introduction

Agents operate in unpredictable environments. APIs fail, tools return unexpected data, LLMs misinterpret instructions. Without proper error handling, a single failure can crash the entire agent. With it, agents can recover, adapt, and still deliver value.

This section covers error classification, handling strategies, recovery patterns, and graceful degradation—everything you need to build resilient agents.

Core Principle: Design for failure. Every external call can fail, every tool can error, every LLM response can be unexpected. Plan for these cases upfront.

Types of Agent Errors

Understanding error types helps you handle them appropriately:

CategoryExamplesTypical Response
TransientNetwork timeout, rate limit, temporary outageRetry with backoff
Tool ErrorsInvalid input, tool bug, resource not foundFix input or try alternative
LLM ErrorsInvalid JSON, hallucination, context overflowReprompt or constrain output
Logic ErrorsInfinite loop, wrong tool selection, stuck stateReset or escalate
Fatal ErrorsAuth failure, critical service down, out of budgetFail gracefully, alert

Error Classification

🐍python
1from enum import Enum
2from dataclasses import dataclass
3from typing import Optional
4
5class ErrorCategory(Enum):
6    TRANSIENT = "transient"      # Retry likely to succeed
7    TOOL = "tool"                # Tool-related issue
8    LLM = "llm"                  # LLM response issue
9    LOGIC = "logic"              # Agent logic issue
10    FATAL = "fatal"              # Cannot recover
11
12class ErrorSeverity(Enum):
13    LOW = "low"           # Log and continue
14    MEDIUM = "medium"     # Attempt recovery
15    HIGH = "high"         # Major recovery needed
16    CRITICAL = "critical" # Cannot continue
17
18@dataclass
19class AgentError(Exception):
20    """Structured agent error."""
21    message: str
22    category: ErrorCategory
23    severity: ErrorSeverity
24    recoverable: bool
25    original_error: Optional[Exception] = None
26    context: dict = None
27
28    def __str__(self) -> str:
29        return f"[{self.category.value}:{self.severity.value}] {self.message}"
30
31def classify_error(error: Exception) -> AgentError:
32    """Classify an exception into an AgentError."""
33
34    error_str = str(error).lower()
35
36    # Check for transient errors
37    if any(x in error_str for x in ["timeout", "rate limit", "503", "429"]):
38        return AgentError(
39            message=str(error),
40            category=ErrorCategory.TRANSIENT,
41            severity=ErrorSeverity.MEDIUM,
42            recoverable=True,
43            original_error=error
44        )
45
46    # Check for auth errors
47    if any(x in error_str for x in ["401", "403", "unauthorized", "forbidden"]):
48        return AgentError(
49            message=str(error),
50            category=ErrorCategory.FATAL,
51            severity=ErrorSeverity.CRITICAL,
52            recoverable=False,
53            original_error=error
54        )
55
56    # Check for tool errors
57    if any(x in error_str for x in ["tool", "function", "not found"]):
58        return AgentError(
59            message=str(error),
60            category=ErrorCategory.TOOL,
61            severity=ErrorSeverity.MEDIUM,
62            recoverable=True,
63            original_error=error
64        )
65
66    # Default to logic error
67    return AgentError(
68        message=str(error),
69        category=ErrorCategory.LOGIC,
70        severity=ErrorSeverity.HIGH,
71        recoverable=True,
72        original_error=error
73    )

Error Handling Strategies

Retry with Exponential Backoff

🐍python
1import asyncio
2from typing import TypeVar, Callable, Awaitable
3import random
4
5T = TypeVar('T')
6
7async def retry_with_backoff(
8    fn: Callable[[], Awaitable[T]],
9    max_retries: int = 3,
10    base_delay: float = 1.0,
11    max_delay: float = 60.0,
12    exceptions: tuple = (Exception,)
13) -> T:
14    """
15    Retry an async function with exponential backoff.
16    """
17    last_error = None
18
19    for attempt in range(max_retries + 1):
20        try:
21            return await fn()
22        except exceptions as e:
23            last_error = e
24
25            if attempt == max_retries:
26                raise
27
28            # Calculate delay with jitter
29            delay = min(base_delay * (2 ** attempt), max_delay)
30            jitter = random.uniform(0, delay * 0.1)
31            total_delay = delay + jitter
32
33            print(f"Attempt {attempt + 1} failed: {e}. "
34                  f"Retrying in {total_delay:.1f}s...")
35
36            await asyncio.sleep(total_delay)
37
38    raise last_error
39
40
41# Usage
42async def make_api_call():
43    return await retry_with_backoff(
44        lambda: client.messages.create(...),
45        max_retries=3,
46        exceptions=(TimeoutError, ConnectionError)
47    )

Circuit Breaker

🐍python
1import time
2from dataclasses import dataclass
3
4@dataclass
5class CircuitBreaker:
6    """
7    Prevents repeated calls to failing services.
8    """
9    failure_threshold: int = 5
10    reset_timeout: float = 60.0
11
12    # State
13    failures: int = 0
14    last_failure_time: float = 0
15    state: str = "closed"  # closed, open, half-open
16
17    def can_proceed(self) -> bool:
18        """Check if we can make a call."""
19        if self.state == "closed":
20            return True
21
22        if self.state == "open":
23            # Check if reset timeout has passed
24            if time.time() - self.last_failure_time > self.reset_timeout:
25                self.state = "half-open"
26                return True
27            return False
28
29        if self.state == "half-open":
30            return True
31
32        return False
33
34    def record_success(self) -> None:
35        """Record a successful call."""
36        self.failures = 0
37        self.state = "closed"
38
39    def record_failure(self) -> None:
40        """Record a failed call."""
41        self.failures += 1
42        self.last_failure_time = time.time()
43
44        if self.failures >= self.failure_threshold:
45            self.state = "open"
46            print(f"Circuit breaker opened after {self.failures} failures")
47
48
49class ToolWithCircuitBreaker:
50    """Tool wrapper with circuit breaker."""
51
52    def __init__(self, tool: Tool):
53        self.tool = tool
54        self.breaker = CircuitBreaker()
55
56    async def execute(self, **kwargs) -> str:
57        if not self.breaker.can_proceed():
58            return f"Error: Tool {self.tool.name} is temporarily unavailable"
59
60        try:
61            result = await self.tool.execute(**kwargs)
62            self.breaker.record_success()
63            return result
64        except Exception as e:
65            self.breaker.record_failure()
66            raise

Fallback Strategies

🐍python
1class FallbackHandler:
2    """Handle failures with fallback options."""
3
4    def __init__(self):
5        self.fallbacks: dict[str, list[Callable]] = {}
6
7    def register_fallback(
8        self,
9        tool_name: str,
10        fallback_fn: Callable
11    ) -> None:
12        """Register a fallback for a tool."""
13        if tool_name not in self.fallbacks:
14            self.fallbacks[tool_name] = []
15        self.fallbacks[tool_name].append(fallback_fn)
16
17    async def execute_with_fallback(
18        self,
19        tool: Tool,
20        **kwargs
21    ) -> str:
22        """Execute tool with fallbacks."""
23
24        # Try primary tool
25        try:
26            return await tool.execute(**kwargs)
27        except Exception as primary_error:
28            print(f"Primary tool {tool.name} failed: {primary_error}")
29
30        # Try fallbacks
31        fallbacks = self.fallbacks.get(tool.name, [])
32        for i, fallback in enumerate(fallbacks):
33            try:
34                print(f"Trying fallback {i + 1} for {tool.name}")
35                return await fallback(**kwargs)
36            except Exception as e:
37                print(f"Fallback {i + 1} failed: {e}")
38
39        # All failed
40        return f"Error: {tool.name} and all fallbacks failed"
41
42
43# Example usage
44fallback_handler = FallbackHandler()
45
46# Register fallback search providers
47fallback_handler.register_fallback(
48    "web_search",
49    lambda query, **_: f"Fallback results for: {query}"
50)

Recovery Patterns

Automatic Retry Loop

🐍python
1class RecoverableAgent:
2    """Agent with built-in recovery."""
3
4    def __init__(self, max_recovery_attempts: int = 3):
5        self.max_recovery_attempts = max_recovery_attempts
6
7    async def execute_step_with_recovery(
8        self,
9        step: dict,
10        context: dict
11    ) -> dict:
12        """Execute a step with automatic recovery."""
13
14        for attempt in range(self.max_recovery_attempts):
15            try:
16                result = await self._execute_step(step, context)
17                return {"success": True, "result": result}
18
19            except AgentError as e:
20                if not e.recoverable:
21                    return {"success": False, "error": str(e)}
22
23                # Try recovery based on error category
24                recovery_result = await self._attempt_recovery(
25                    e, step, context, attempt
26                )
27
28                if recovery_result.get("skip_step"):
29                    return {"success": True, "result": "Step skipped", "skipped": True}
30
31                if recovery_result.get("retry"):
32                    step = recovery_result.get("modified_step", step)
33                    continue
34
35                if recovery_result.get("alternative"):
36                    return await self.execute_step_with_recovery(
37                        recovery_result["alternative"],
38                        context
39                    )
40
41        return {"success": False, "error": "Max recovery attempts exceeded"}
42
43    async def _attempt_recovery(
44        self,
45        error: AgentError,
46        step: dict,
47        context: dict,
48        attempt: int
49    ) -> dict:
50        """Attempt to recover from an error."""
51
52        if error.category == ErrorCategory.TRANSIENT:
53            # Wait and retry
54            await asyncio.sleep(2 ** attempt)
55            return {"retry": True}
56
57        if error.category == ErrorCategory.TOOL:
58            # Try to fix the input
59            fixed_step = await self._fix_tool_input(step, error)
60            if fixed_step:
61                return {"retry": True, "modified_step": fixed_step}
62            return {"skip_step": True}
63
64        if error.category == ErrorCategory.LLM:
65            # Reprompt with clarification
66            clarified_step = await self._clarify_step(step, error)
67            return {"retry": True, "modified_step": clarified_step}
68
69        return {"retry": False}

State Checkpointing

🐍python
1import json
2from pathlib import Path
3
4class CheckpointManager:
5    """Save and restore agent state for recovery."""
6
7    def __init__(self, checkpoint_dir: str = ".checkpoints"):
8        self.checkpoint_dir = Path(checkpoint_dir)
9        self.checkpoint_dir.mkdir(exist_ok=True)
10
11    def save(self, agent_id: str, state: dict) -> str:
12        """Save agent state to checkpoint."""
13        checkpoint_path = self.checkpoint_dir / f"{agent_id}.json"
14
15        with open(checkpoint_path, "w") as f:
16            json.dump({
17                "agent_id": agent_id,
18                "timestamp": datetime.now().isoformat(),
19                "state": state
20            }, f, default=str)
21
22        return str(checkpoint_path)
23
24    def load(self, agent_id: str) -> Optional[dict]:
25        """Load agent state from checkpoint."""
26        checkpoint_path = self.checkpoint_dir / f"{agent_id}.json"
27
28        if not checkpoint_path.exists():
29            return None
30
31        with open(checkpoint_path, "r") as f:
32            data = json.load(f)
33            return data["state"]
34
35    def delete(self, agent_id: str) -> bool:
36        """Delete checkpoint after successful completion."""
37        checkpoint_path = self.checkpoint_dir / f"{agent_id}.json"
38
39        if checkpoint_path.exists():
40            checkpoint_path.unlink()
41            return True
42        return False
43
44
45class CheckpointedAgent:
46    """Agent that checkpoints state for crash recovery."""
47
48    def __init__(self):
49        self.checkpoints = CheckpointManager()
50        self.agent_id = str(uuid.uuid4())[:8]
51
52    async def run(self, task: str, resume: bool = False) -> str:
53        """Run with checkpointing."""
54
55        # Check for existing checkpoint
56        if resume:
57            saved_state = self.checkpoints.load(self.agent_id)
58            if saved_state:
59                print(f"Resuming from checkpoint")
60                state = saved_state
61            else:
62                state = self._initialize_state(task)
63        else:
64            state = self._initialize_state(task)
65
66        # Main loop with checkpointing
67        while not state["complete"]:
68            try:
69                # Execute next step
70                step_result = await self._execute_next_step(state)
71                state["steps"].append(step_result)
72                state["current_step"] += 1
73
74                # Checkpoint after each step
75                self.checkpoints.save(self.agent_id, state)
76
77            except Exception as e:
78                print(f"Error at step {state['current_step']}: {e}")
79                self.checkpoints.save(self.agent_id, state)
80                raise
81
82        # Clean up checkpoint on success
83        self.checkpoints.delete(self.agent_id)
84        return state["result"]

Graceful Degradation

When full recovery isn't possible, degrade gracefully:

🐍python
1class GracefulDegradation:
2    """Handle partial failures gracefully."""
3
4    def __init__(self):
5        self.degradation_levels = [
6            "full",        # All features available
7            "limited",     # Some tools unavailable
8            "basic",       # Core functionality only
9            "readonly",    # Can only retrieve, not act
10            "offline"      # No external services
11        ]
12        self.current_level = "full"
13        self.unavailable_tools: set[str] = set()
14
15    def degrade_to(self, level: str) -> None:
16        """Degrade to a specific level."""
17        if level in self.degradation_levels:
18            self.current_level = level
19            print(f"Degraded to {level} mode")
20
21    def mark_tool_unavailable(self, tool_name: str) -> None:
22        """Mark a tool as temporarily unavailable."""
23        self.unavailable_tools.add(tool_name)
24
25        # Check if we need to degrade further
26        if len(self.unavailable_tools) > 2:
27            self.degrade_to("limited")
28
29    def is_tool_available(self, tool_name: str) -> bool:
30        """Check if a tool is currently available."""
31        if self.current_level == "offline":
32            return False
33        if self.current_level == "readonly" and tool_name not in ["read_file", "search"]:
34            return False
35        return tool_name not in self.unavailable_tools
36
37    def get_available_tools(self, all_tools: list[Tool]) -> list[Tool]:
38        """Get list of currently available tools."""
39        return [
40            t for t in all_tools
41            if self.is_tool_available(t.name)
42        ]
43
44    def get_degradation_message(self) -> str:
45        """Get user-friendly degradation message."""
46        messages = {
47            "full": "",
48            "limited": "Some features are temporarily unavailable.",
49            "basic": "Running in basic mode with limited functionality.",
50            "readonly": "Running in read-only mode. Cannot make changes.",
51            "offline": "Running offline. External services unavailable."
52        }
53        return messages.get(self.current_level, "")
54
55
56class DegradableAgent:
57    """Agent with graceful degradation."""
58
59    def __init__(self, tools: list[Tool]):
60        self.all_tools = tools
61        self.degradation = GracefulDegradation()
62
63    async def run(self, task: str) -> str:
64        """Run with degradation handling."""
65
66        # Check current state
67        available_tools = self.degradation.get_available_tools(self.all_tools)
68
69        if not available_tools:
70            return "Cannot proceed: all tools are currently unavailable."
71
72        # Add degradation context to system prompt
73        system_suffix = ""
74        if self.degradation.current_level != "full":
75            system_suffix = f"""
76
77Note: {self.degradation.get_degradation_message()}
78Available tools: {[t.name for t in available_tools]}
79Unavailable: {list(self.degradation.unavailable_tools)}
80
81Work within these constraints. Inform the user if you cannot fully complete the task.
82"""
83
84        # Run with available tools
85        try:
86            result = await self._run_with_tools(task, available_tools, system_suffix)
87            return result
88        except Exception as e:
89            # Further degrade and retry
90            self.degradation.degrade_to("basic")
91            return await self.run(task)  # Retry with degraded capabilities
Always inform users when operating in degraded mode. Unexpected limitations are worse than known constraints.

Complete Error System

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3import asyncio
4import logging
5
6logging.basicConfig(level=logging.INFO)
7logger = logging.getLogger(__name__)
8
9@dataclass
10class ErrorConfig:
11    """Error handling configuration."""
12    max_retries: int = 3
13    base_delay: float = 1.0
14    circuit_breaker_threshold: int = 5
15    checkpoint_frequency: int = 1  # Every N steps
16    enable_degradation: bool = True
17
18class RobustAgent:
19    """
20    Production-ready agent with comprehensive error handling.
21    """
22
23    def __init__(
24        self,
25        tools: list[Tool],
26        config: ErrorConfig = None
27    ):
28        self.tools = {t.name: t for t in tools}
29        self.config = config or ErrorConfig()
30
31        # Error handling components
32        self.circuit_breakers: dict[str, CircuitBreaker] = {}
33        self.fallback_handler = FallbackHandler()
34        self.degradation = GracefulDegradation()
35        self.checkpoints = CheckpointManager()
36
37        # Metrics
38        self.error_counts: dict[str, int] = {}
39        self.recovery_counts: dict[str, int] = {}
40
41    async def execute_tool(
42        self,
43        tool_name: str,
44        **kwargs
45    ) -> tuple[bool, str]:
46        """Execute a tool with full error handling."""
47
48        # Check degradation
49        if not self.degradation.is_tool_available(tool_name):
50            return False, f"Tool {tool_name} is currently unavailable"
51
52        # Check circuit breaker
53        if tool_name not in self.circuit_breakers:
54            self.circuit_breakers[tool_name] = CircuitBreaker()
55
56        breaker = self.circuit_breakers[tool_name]
57        if not breaker.can_proceed():
58            return False, f"Tool {tool_name} is circuit-broken"
59
60        # Execute with retry
61        tool = self.tools.get(tool_name)
62        if not tool:
63            return False, f"Unknown tool: {tool_name}"
64
65        for attempt in range(self.config.max_retries):
66            try:
67                result = await asyncio.wait_for(
68                    tool.execute(**kwargs),
69                    timeout=30.0
70                )
71                breaker.record_success()
72                return True, result
73
74            except asyncio.TimeoutError:
75                logger.warning(f"Tool {tool_name} timed out (attempt {attempt + 1})")
76                self._record_error(tool_name, "timeout")
77
78            except Exception as e:
79                logger.warning(f"Tool {tool_name} error: {e} (attempt {attempt + 1})")
80                self._record_error(tool_name, str(type(e).__name__))
81
82                error = classify_error(e)
83                if not error.recoverable:
84                    breaker.record_failure()
85                    return False, str(e)
86
87            # Wait before retry
88            await asyncio.sleep(self.config.base_delay * (2 ** attempt))
89
90        # All retries failed
91        breaker.record_failure()
92        self.degradation.mark_tool_unavailable(tool_name)
93
94        # Try fallback
95        fallback_result = await self.fallback_handler.execute_with_fallback(
96            tool, **kwargs
97        )
98        if not fallback_result.startswith("Error"):
99            self._record_recovery(tool_name)
100            return True, fallback_result
101
102        return False, f"Tool {tool_name} failed after {self.config.max_retries} attempts"
103
104    def _record_error(self, tool_name: str, error_type: str) -> None:
105        """Record error for metrics."""
106        key = f"{tool_name}:{error_type}"
107        self.error_counts[key] = self.error_counts.get(key, 0) + 1
108
109    def _record_recovery(self, tool_name: str) -> None:
110        """Record successful recovery."""
111        self.recovery_counts[tool_name] = self.recovery_counts.get(tool_name, 0) + 1
112
113    def get_health_status(self) -> dict:
114        """Get agent health status."""
115        return {
116            "degradation_level": self.degradation.current_level,
117            "unavailable_tools": list(self.degradation.unavailable_tools),
118            "circuit_breakers": {
119                name: breaker.state
120                for name, breaker in self.circuit_breakers.items()
121            },
122            "error_counts": self.error_counts,
123            "recovery_counts": self.recovery_counts
124        }

Summary

Robust error handling makes agents reliable. We covered:

  • Error types: Transient, tool, LLM, logic, and fatal errors
  • Handling strategies: Retry with backoff, circuit breakers, fallbacks
  • Recovery patterns: Automatic retry loops, state checkpointing
  • Graceful degradation: Maintaining partial functionality when systems fail
  • Complete system: Integrated error handling for production agents

In the next section, we'll cover testing your agent—ensuring it works correctly before deployment.