Chapter 7
15 min read
Section 44 of 175

Error Handling and Retries

Tool Use and Function Calling

Introduction

Tools fail. Networks are unreliable, APIs have rate limits, files don't exist. Robust error handling transforms these failures from dead ends into opportunities for the agent to adapt and succeed.

Error Handling Philosophy: Don't hide errors from the LLM. Surface them clearly so the agent can reason about alternatives. A good agent treats failures as information, not catastrophes.

Types of Tool Errors

Understanding error types helps design appropriate handling strategies:

Error TypeExampleRetry?Strategy
TransientNetwork timeout, 503YesExponential backoff
Rate limit429 Too Many RequestsYesWait then retry
ValidationInvalid parametersNoFix params, try again
Not foundFile doesn't existNoSearch for alternative
PermissionAccess deniedNoRequest permission
PermanentAPI deprecatedNoUse alternative tool
🐍error_classification.py
1from enum import Enum
2from dataclasses import dataclass
3
4class ErrorCategory(Enum):
5    TRANSIENT = "transient"        # Retry with backoff
6    RATE_LIMITED = "rate_limited"  # Wait and retry
7    VALIDATION = "validation"       # Fix input and retry
8    NOT_FOUND = "not_found"        # Try alternative
9    PERMISSION = "permission"       # Request permission
10    PERMANENT = "permanent"         # Cannot recover
11
12@dataclass
13class CategorizedError:
14    """Error with category and handling hints."""
15    category: ErrorCategory
16    message: str
17    retry_after: float | None = None
18    suggested_fix: str | None = None
19
20class ErrorClassifier:
21    """Classify errors for appropriate handling."""
22
23    def classify(self, error: Exception, context: dict = None) -> CategorizedError:
24        """Classify an error into a category."""
25
26        error_str = str(error).lower()
27        error_type = type(error).__name__
28
29        # Rate limiting
30        if "429" in error_str or "rate limit" in error_str:
31            retry_after = self._extract_retry_after(error)
32            return CategorizedError(
33                category=ErrorCategory.RATE_LIMITED,
34                message=str(error),
35                retry_after=retry_after,
36                suggested_fix="Wait and retry"
37            )
38
39        # Transient network errors
40        if any(x in error_str for x in ["timeout", "503", "connection", "temporary"]):
41            return CategorizedError(
42                category=ErrorCategory.TRANSIENT,
43                message=str(error),
44                suggested_fix="Retry with exponential backoff"
45            )
46
47        # Validation errors
48        if any(x in error_str for x in ["invalid", "validation", "required", "missing"]):
49            return CategorizedError(
50                category=ErrorCategory.VALIDATION,
51                message=str(error),
52                suggested_fix="Check and fix the parameters"
53            )
54
55        # Not found
56        if any(x in error_str for x in ["not found", "404", "doesn't exist", "no such file"]):
57            return CategorizedError(
58                category=ErrorCategory.NOT_FOUND,
59                message=str(error),
60                suggested_fix="Search for the correct resource"
61            )
62
63        # Permission errors
64        if any(x in error_str for x in ["permission", "forbidden", "403", "unauthorized"]):
65            return CategorizedError(
66                category=ErrorCategory.PERMISSION,
67                message=str(error),
68                suggested_fix="Request appropriate permissions"
69            )
70
71        # Default to permanent
72        return CategorizedError(
73            category=ErrorCategory.PERMANENT,
74            message=str(error),
75            suggested_fix="Consider alternative approaches"
76        )
77
78    def _extract_retry_after(self, error: Exception) -> float | None:
79        """Extract retry-after from rate limit error if available."""
80        # Implementation depends on error format
81        return 60.0  # Default to 60 seconds

Retry Strategies

Different errors require different retry approaches:

🐍retry_strategies.py
1import asyncio
2import random
3from functools import wraps
4from typing import Callable, TypeVar, Any
5
6T = TypeVar('T')
7
8class RetryConfig:
9    """Configuration for retry behavior."""
10
11    def __init__(
12        self,
13        max_attempts: int = 3,
14        base_delay: float = 1.0,
15        max_delay: float = 60.0,
16        exponential_base: float = 2.0,
17        jitter: bool = True
18    ):
19        self.max_attempts = max_attempts
20        self.base_delay = base_delay
21        self.max_delay = max_delay
22        self.exponential_base = exponential_base
23        self.jitter = jitter
24
25    def get_delay(self, attempt: int) -> float:
26        """Calculate delay for attempt number."""
27        delay = self.base_delay * (self.exponential_base ** attempt)
28        delay = min(delay, self.max_delay)
29
30        if self.jitter:
31            # Add ±25% jitter to prevent thundering herd
32            jitter_range = delay * 0.25
33            delay += random.uniform(-jitter_range, jitter_range)
34
35        return max(0, delay)
36
37
38async def retry_with_backoff(
39    func: Callable[..., T],
40    *args,
41    config: RetryConfig | None = None,
42    retryable_exceptions: tuple = (Exception,),
43    **kwargs
44) -> T:
45    """Execute function with exponential backoff retry."""
46
47    config = config or RetryConfig()
48    last_exception = None
49
50    for attempt in range(config.max_attempts):
51        try:
52            if asyncio.iscoroutinefunction(func):
53                return await func(*args, **kwargs)
54            else:
55                return func(*args, **kwargs)
56
57        except retryable_exceptions as e:
58            last_exception = e
59
60            if attempt < config.max_attempts - 1:
61                delay = config.get_delay(attempt)
62                await asyncio.sleep(delay)
63
64    raise last_exception
65
66
67def with_retry(
68    config: RetryConfig | None = None,
69    retryable: tuple = (Exception,)
70):
71    """Decorator for retry with backoff."""
72
73    def decorator(func: Callable) -> Callable:
74        @wraps(func)
75        async def wrapper(*args, **kwargs):
76            return await retry_with_backoff(
77                func, *args,
78                config=config,
79                retryable_exceptions=retryable,
80                **kwargs
81            )
82        return wrapper
83    return decorator
84
85
86# Usage
87@with_retry(
88    config=RetryConfig(max_attempts=3, base_delay=2.0),
89    retryable=(ConnectionError, TimeoutError)
90)
91async def fetch_data(url: str) -> dict:
92    """Fetch data with automatic retry."""
93    async with aiohttp.ClientSession() as session:
94        async with session.get(url) as response:
95            return await response.json()

Smart Retry Based on Error Type

🐍smart_retry.py
1class SmartRetryExecutor:
2    """Execute with intelligent retry based on error type."""
3
4    def __init__(self, classifier: ErrorClassifier):
5        self.classifier = classifier
6
7    async def execute_with_retry(
8        self,
9        func: Callable,
10        *args,
11        max_attempts: int = 3,
12        **kwargs
13    ) -> Any:
14        """Execute with smart retry logic."""
15
16        for attempt in range(max_attempts):
17            try:
18                return await func(*args, **kwargs)
19
20            except Exception as e:
21                categorized = self.classifier.classify(e)
22
23                # Never retry permanent or permission errors
24                if categorized.category in [
25                    ErrorCategory.PERMANENT,
26                    ErrorCategory.PERMISSION,
27                    ErrorCategory.VALIDATION
28                ]:
29                    raise
30
31                # Check if we've exhausted retries
32                if attempt >= max_attempts - 1:
33                    raise
34
35                # Calculate delay based on error type
36                if categorized.category == ErrorCategory.RATE_LIMITED:
37                    delay = categorized.retry_after or 60.0
38                elif categorized.category == ErrorCategory.TRANSIENT:
39                    delay = 2 ** attempt  # Exponential backoff
40                else:
41                    delay = 1.0
42
43                await asyncio.sleep(delay)
44
45        raise RuntimeError("Max retries exceeded")

Circuit Breaker Pattern

Prevent cascading failures by stopping calls to failing services:

🐍circuit_breaker.py
1from enum import Enum
2from dataclasses import dataclass, field
3from datetime import datetime, timedelta
4import asyncio
5
6class CircuitState(Enum):
7    CLOSED = "closed"      # Normal operation
8    OPEN = "open"          # Failing, reject calls
9    HALF_OPEN = "half_open"  # Testing if recovered
10
11@dataclass
12class CircuitBreaker:
13    """Circuit breaker for failing tools."""
14
15    failure_threshold: int = 5
16    recovery_timeout: float = 60.0
17    half_open_max_calls: int = 3
18
19    # State
20    state: CircuitState = CircuitState.CLOSED
21    failure_count: int = 0
22    last_failure_time: datetime | None = None
23    half_open_successes: int = 0
24
25    def can_execute(self) -> bool:
26        """Check if execution is allowed."""
27
28        if self.state == CircuitState.CLOSED:
29            return True
30
31        if self.state == CircuitState.OPEN:
32            # Check if recovery timeout has passed
33            if self._recovery_timeout_elapsed():
34                self.state = CircuitState.HALF_OPEN
35                self.half_open_successes = 0
36                return True
37            return False
38
39        if self.state == CircuitState.HALF_OPEN:
40            # Allow limited calls in half-open state
41            return True
42
43        return False
44
45    def record_success(self):
46        """Record a successful call."""
47
48        if self.state == CircuitState.HALF_OPEN:
49            self.half_open_successes += 1
50            if self.half_open_successes >= self.half_open_max_calls:
51                # Recovered - close the circuit
52                self.state = CircuitState.CLOSED
53                self.failure_count = 0
54
55        elif self.state == CircuitState.CLOSED:
56            # Reset failure count on success
57            self.failure_count = 0
58
59    def record_failure(self):
60        """Record a failed call."""
61
62        self.failure_count += 1
63        self.last_failure_time = datetime.now()
64
65        if self.state == CircuitState.HALF_OPEN:
66            # Failure in half-open means still broken
67            self.state = CircuitState.OPEN
68
69        elif self.state == CircuitState.CLOSED:
70            if self.failure_count >= self.failure_threshold:
71                # Too many failures - open the circuit
72                self.state = CircuitState.OPEN
73
74    def _recovery_timeout_elapsed(self) -> bool:
75        """Check if recovery timeout has passed."""
76        if not self.last_failure_time:
77            return True
78
79        elapsed = datetime.now() - self.last_failure_time
80        return elapsed.total_seconds() >= self.recovery_timeout
81
82
83class CircuitBreakerRegistry:
84    """Manage circuit breakers for multiple tools."""
85
86    def __init__(self):
87        self._breakers: dict[str, CircuitBreaker] = {}
88
89    def get_or_create(
90        self,
91        tool_name: str,
92        **kwargs
93    ) -> CircuitBreaker:
94        """Get or create a circuit breaker for a tool."""
95
96        if tool_name not in self._breakers:
97            self._breakers[tool_name] = CircuitBreaker(**kwargs)
98
99        return self._breakers[tool_name]
100
101
102class CircuitBreakerExecutor:
103    """Execute tools with circuit breaker protection."""
104
105    def __init__(self, executor: ToolExecutor):
106        self.executor = executor
107        self.registry = CircuitBreakerRegistry()
108
109    async def execute(
110        self,
111        tool_name: str,
112        arguments: dict
113    ) -> ExecutionResult:
114        """Execute with circuit breaker."""
115
116        breaker = self.registry.get_or_create(tool_name)
117
118        if not breaker.can_execute():
119            return ExecutionResult(
120                status=ExecutionStatus.ERROR,
121                error=f"Circuit breaker open for {tool_name}. "
122                      f"Tool is temporarily unavailable due to failures.",
123                tool_name=tool_name
124            )
125
126        result = await self.executor.execute(tool_name, arguments)
127
128        if result.status == ExecutionStatus.SUCCESS:
129            breaker.record_success()
130        else:
131            breaker.record_failure()
132
133        return result

Fallback Strategies

When tools fail, having fallbacks maintains agent functionality:

🐍fallback_strategies.py
1from dataclasses import dataclass
2from typing import Callable, Any
3
4@dataclass
5class ToolWithFallback:
6    """Tool with fallback options."""
7    primary: str
8    fallbacks: list[str]
9    description: str
10
11class FallbackExecutor:
12    """Execute tools with automatic fallback."""
13
14    def __init__(self, executor: ToolExecutor):
15        self.executor = executor
16        self.fallback_chains: dict[str, list[str]] = {}
17
18    def register_fallback_chain(
19        self,
20        primary: str,
21        fallbacks: list[str]
22    ):
23        """Register fallback tools for a primary tool."""
24        self.fallback_chains[primary] = fallbacks
25
26    async def execute_with_fallback(
27        self,
28        tool_name: str,
29        arguments: dict,
30        context: dict | None = None
31    ) -> ExecutionResult:
32        """Execute tool with automatic fallback on failure."""
33
34        # Try primary tool
35        result = await self.executor.execute(tool_name, arguments, context)
36
37        if result.status == ExecutionStatus.SUCCESS:
38            return result
39
40        # Try fallbacks
41        fallbacks = self.fallback_chains.get(tool_name, [])
42
43        for fallback_name in fallbacks:
44            # Adapt arguments if needed
45            adapted_args = self._adapt_arguments(
46                tool_name, fallback_name, arguments
47            )
48
49            if adapted_args is None:
50                continue
51
52            fallback_result = await self.executor.execute(
53                fallback_name, adapted_args, context
54            )
55
56            if fallback_result.status == ExecutionStatus.SUCCESS:
57                # Mark that we used a fallback
58                fallback_result.tool_name = f"{tool_name} (via {fallback_name})"
59                return fallback_result
60
61        # All fallbacks failed
62        return result
63
64    def _adapt_arguments(
65        self,
66        from_tool: str,
67        to_tool: str,
68        arguments: dict
69    ) -> dict | None:
70        """Adapt arguments from one tool to another."""
71        # Implementation depends on tool schemas
72        # Return None if adaptation not possible
73        return arguments
74
75
76# Example: Search fallback chain
77executor = FallbackExecutor(tool_executor)
78
79executor.register_fallback_chain(
80    primary="search_google",
81    fallbacks=["search_bing", "search_duckduckgo"]
82)
83
84# If Google search fails, automatically try Bing, then DuckDuckGo
85result = await executor.execute_with_fallback(
86    "search_google",
87    {"query": "Python tutorials"}
88)

Cached Fallback

🐍cached_fallback.py
1from datetime import datetime, timedelta
2from typing import Any
3
4class CachedFallbackExecutor:
5    """Use cached results as fallback when tool fails."""
6
7    def __init__(self, executor: ToolExecutor, cache_ttl: float = 3600):
8        self.executor = executor
9        self.cache: dict[str, tuple[Any, datetime]] = {}
10        self.cache_ttl = timedelta(seconds=cache_ttl)
11
12    def _cache_key(self, tool: str, args: dict) -> str:
13        """Generate cache key."""
14        import hashlib
15        import json
16        arg_str = json.dumps(args, sort_keys=True)
17        return f"{tool}:{hashlib.md5(arg_str.encode()).hexdigest()}"
18
19    async def execute(
20        self,
21        tool_name: str,
22        arguments: dict
23    ) -> ExecutionResult:
24        """Execute with cache as fallback."""
25
26        cache_key = self._cache_key(tool_name, arguments)
27
28        # Try to execute
29        result = await self.executor.execute(tool_name, arguments)
30
31        if result.status == ExecutionStatus.SUCCESS:
32            # Cache successful result
33            self.cache[cache_key] = (result.result, datetime.now())
34            return result
35
36        # Execution failed - try cache
37        if cache_key in self.cache:
38            cached_result, cached_time = self.cache[cache_key]
39
40            if datetime.now() - cached_time < self.cache_ttl:
41                return ExecutionResult(
42                    status=ExecutionStatus.SUCCESS,
43                    result=cached_result,
44                    tool_name=f"{tool_name} (cached)",
45                    # Include warning about cached data
46                    error=f"Using cached result from {cached_time.isoformat()}"
47                )
48
49        return result

LLM Self-Correction

The most powerful error handling: let the LLM learn from errors:

🐍llm_self_correction.py
1class SelfCorrectingAgent:
2    """Agent that learns from tool errors."""
3
4    def __init__(self, llm, tools: ToolSystem, max_corrections: int = 3):
5        self.llm = llm
6        self.tools = tools
7        self.max_corrections = max_corrections
8
9    async def run(self, task: str) -> str:
10        """Run task with self-correction on errors."""
11
12        messages = [{"role": "user", "content": task}]
13        corrections = 0
14
15        while corrections < self.max_corrections:
16            # Get LLM response
17            response = await self.llm.generate(
18                messages,
19                tools=self.tools.get_tools_for_api()
20            )
21
22            # Check for tool calls
23            tool_calls = self._extract_tool_calls(response)
24
25            if not tool_calls:
26                # No tool calls, return response
27                return self._extract_text(response)
28
29            # Execute tools
30            results = await self.tools.execute_calls(tool_calls)
31
32            # Check for errors
33            errors = [r for r in results.values()
34                     if r.status != ExecutionStatus.SUCCESS]
35
36            if errors:
37                # Format error for LLM to understand
38                error_message = self._format_errors_for_llm(errors)
39
40                messages.extend([
41                    {"role": "assistant", "content": response.content},
42                    {"role": "user", "content": error_message}
43                ])
44
45                corrections += 1
46            else:
47                # Success - feed results back
48                messages.extend([
49                    {"role": "assistant", "content": response.content},
50                    {"role": "user", "content": self._format_success(results)}
51                ])
52
53        return "Max corrections exceeded"
54
55    def _format_errors_for_llm(self, errors: list[ExecutionResult]) -> str:
56        """Format errors to help LLM self-correct."""
57
58        parts = ["The following tool calls failed:\n"]
59
60        for error in errors:
61            parts.append(f"""
62Tool: {error.tool_name}
63Error: {error.error}
64Parameters used: {json.dumps(error.parameters, indent=2)}
65
66Please analyze what went wrong and try an alternative approach.
67Consider:
68- Were the parameters correct?
69- Is there a different tool that could work?
70- Is the requested resource available?
71""")
72
73        return "\n".join(parts)
74
75    def _format_success(self, results: dict[str, ExecutionResult]) -> str:
76        """Format successful results."""
77        formatted = []
78        for call_id, result in results.items():
79            formatted.append({
80                "type": "tool_result",
81                "tool_use_id": call_id,
82                "content": result.to_observation()
83            })
84        return formatted

Errors as Observations

Frame errors as observations, not failures. "The file was not found" gives the LLM information to work with. The agent might then search for the file or ask the user for the correct path.

Summary

Error handling and retries:

  1. Classify errors: Different types need different handling
  2. Exponential backoff: For transient errors, wait and retry
  3. Circuit breaker: Prevent cascading failures
  4. Fallback chains: Alternative tools when primary fails
  5. Cache fallback: Use cached results when live fails
  6. LLM self-correction: Let the agent learn from errors
Next: Let's explore advanced tool patterns for complex agent scenarios.