Introduction
Tools fail. Networks are unreliable, APIs have rate limits, files don't exist. Robust error handling transforms these failures from dead ends into opportunities for the agent to adapt and succeed.
Error Handling Philosophy: Don't hide errors from the LLM. Surface them clearly so the agent can reason about alternatives. A good agent treats failures as information, not catastrophes.
Types of Tool Errors
Understanding error types helps design appropriate handling strategies:
| Error Type | Example | Retry? | Strategy |
|---|---|---|---|
| Transient | Network timeout, 503 | Yes | Exponential backoff |
| Rate limit | 429 Too Many Requests | Yes | Wait then retry |
| Validation | Invalid parameters | No | Fix params, try again |
| Not found | File doesn't exist | No | Search for alternative |
| Permission | Access denied | No | Request permission |
| Permanent | API deprecated | No | Use alternative tool |
🐍error_classification.py
1from enum import Enum
2from dataclasses import dataclass
3
4class ErrorCategory(Enum):
5 TRANSIENT = "transient" # Retry with backoff
6 RATE_LIMITED = "rate_limited" # Wait and retry
7 VALIDATION = "validation" # Fix input and retry
8 NOT_FOUND = "not_found" # Try alternative
9 PERMISSION = "permission" # Request permission
10 PERMANENT = "permanent" # Cannot recover
11
12@dataclass
13class CategorizedError:
14 """Error with category and handling hints."""
15 category: ErrorCategory
16 message: str
17 retry_after: float | None = None
18 suggested_fix: str | None = None
19
20class ErrorClassifier:
21 """Classify errors for appropriate handling."""
22
23 def classify(self, error: Exception, context: dict = None) -> CategorizedError:
24 """Classify an error into a category."""
25
26 error_str = str(error).lower()
27 error_type = type(error).__name__
28
29 # Rate limiting
30 if "429" in error_str or "rate limit" in error_str:
31 retry_after = self._extract_retry_after(error)
32 return CategorizedError(
33 category=ErrorCategory.RATE_LIMITED,
34 message=str(error),
35 retry_after=retry_after,
36 suggested_fix="Wait and retry"
37 )
38
39 # Transient network errors
40 if any(x in error_str for x in ["timeout", "503", "connection", "temporary"]):
41 return CategorizedError(
42 category=ErrorCategory.TRANSIENT,
43 message=str(error),
44 suggested_fix="Retry with exponential backoff"
45 )
46
47 # Validation errors
48 if any(x in error_str for x in ["invalid", "validation", "required", "missing"]):
49 return CategorizedError(
50 category=ErrorCategory.VALIDATION,
51 message=str(error),
52 suggested_fix="Check and fix the parameters"
53 )
54
55 # Not found
56 if any(x in error_str for x in ["not found", "404", "doesn't exist", "no such file"]):
57 return CategorizedError(
58 category=ErrorCategory.NOT_FOUND,
59 message=str(error),
60 suggested_fix="Search for the correct resource"
61 )
62
63 # Permission errors
64 if any(x in error_str for x in ["permission", "forbidden", "403", "unauthorized"]):
65 return CategorizedError(
66 category=ErrorCategory.PERMISSION,
67 message=str(error),
68 suggested_fix="Request appropriate permissions"
69 )
70
71 # Default to permanent
72 return CategorizedError(
73 category=ErrorCategory.PERMANENT,
74 message=str(error),
75 suggested_fix="Consider alternative approaches"
76 )
77
78 def _extract_retry_after(self, error: Exception) -> float | None:
79 """Extract retry-after from rate limit error if available."""
80 # Implementation depends on error format
81 return 60.0 # Default to 60 secondsRetry Strategies
Different errors require different retry approaches:
🐍retry_strategies.py
1import asyncio
2import random
3from functools import wraps
4from typing import Callable, TypeVar, Any
5
6T = TypeVar('T')
7
8class RetryConfig:
9 """Configuration for retry behavior."""
10
11 def __init__(
12 self,
13 max_attempts: int = 3,
14 base_delay: float = 1.0,
15 max_delay: float = 60.0,
16 exponential_base: float = 2.0,
17 jitter: bool = True
18 ):
19 self.max_attempts = max_attempts
20 self.base_delay = base_delay
21 self.max_delay = max_delay
22 self.exponential_base = exponential_base
23 self.jitter = jitter
24
25 def get_delay(self, attempt: int) -> float:
26 """Calculate delay for attempt number."""
27 delay = self.base_delay * (self.exponential_base ** attempt)
28 delay = min(delay, self.max_delay)
29
30 if self.jitter:
31 # Add ±25% jitter to prevent thundering herd
32 jitter_range = delay * 0.25
33 delay += random.uniform(-jitter_range, jitter_range)
34
35 return max(0, delay)
36
37
38async def retry_with_backoff(
39 func: Callable[..., T],
40 *args,
41 config: RetryConfig | None = None,
42 retryable_exceptions: tuple = (Exception,),
43 **kwargs
44) -> T:
45 """Execute function with exponential backoff retry."""
46
47 config = config or RetryConfig()
48 last_exception = None
49
50 for attempt in range(config.max_attempts):
51 try:
52 if asyncio.iscoroutinefunction(func):
53 return await func(*args, **kwargs)
54 else:
55 return func(*args, **kwargs)
56
57 except retryable_exceptions as e:
58 last_exception = e
59
60 if attempt < config.max_attempts - 1:
61 delay = config.get_delay(attempt)
62 await asyncio.sleep(delay)
63
64 raise last_exception
65
66
67def with_retry(
68 config: RetryConfig | None = None,
69 retryable: tuple = (Exception,)
70):
71 """Decorator for retry with backoff."""
72
73 def decorator(func: Callable) -> Callable:
74 @wraps(func)
75 async def wrapper(*args, **kwargs):
76 return await retry_with_backoff(
77 func, *args,
78 config=config,
79 retryable_exceptions=retryable,
80 **kwargs
81 )
82 return wrapper
83 return decorator
84
85
86# Usage
87@with_retry(
88 config=RetryConfig(max_attempts=3, base_delay=2.0),
89 retryable=(ConnectionError, TimeoutError)
90)
91async def fetch_data(url: str) -> dict:
92 """Fetch data with automatic retry."""
93 async with aiohttp.ClientSession() as session:
94 async with session.get(url) as response:
95 return await response.json()Smart Retry Based on Error Type
🐍smart_retry.py
1class SmartRetryExecutor:
2 """Execute with intelligent retry based on error type."""
3
4 def __init__(self, classifier: ErrorClassifier):
5 self.classifier = classifier
6
7 async def execute_with_retry(
8 self,
9 func: Callable,
10 *args,
11 max_attempts: int = 3,
12 **kwargs
13 ) -> Any:
14 """Execute with smart retry logic."""
15
16 for attempt in range(max_attempts):
17 try:
18 return await func(*args, **kwargs)
19
20 except Exception as e:
21 categorized = self.classifier.classify(e)
22
23 # Never retry permanent or permission errors
24 if categorized.category in [
25 ErrorCategory.PERMANENT,
26 ErrorCategory.PERMISSION,
27 ErrorCategory.VALIDATION
28 ]:
29 raise
30
31 # Check if we've exhausted retries
32 if attempt >= max_attempts - 1:
33 raise
34
35 # Calculate delay based on error type
36 if categorized.category == ErrorCategory.RATE_LIMITED:
37 delay = categorized.retry_after or 60.0
38 elif categorized.category == ErrorCategory.TRANSIENT:
39 delay = 2 ** attempt # Exponential backoff
40 else:
41 delay = 1.0
42
43 await asyncio.sleep(delay)
44
45 raise RuntimeError("Max retries exceeded")Circuit Breaker Pattern
Prevent cascading failures by stopping calls to failing services:
🐍circuit_breaker.py
1from enum import Enum
2from dataclasses import dataclass, field
3from datetime import datetime, timedelta
4import asyncio
5
6class CircuitState(Enum):
7 CLOSED = "closed" # Normal operation
8 OPEN = "open" # Failing, reject calls
9 HALF_OPEN = "half_open" # Testing if recovered
10
11@dataclass
12class CircuitBreaker:
13 """Circuit breaker for failing tools."""
14
15 failure_threshold: int = 5
16 recovery_timeout: float = 60.0
17 half_open_max_calls: int = 3
18
19 # State
20 state: CircuitState = CircuitState.CLOSED
21 failure_count: int = 0
22 last_failure_time: datetime | None = None
23 half_open_successes: int = 0
24
25 def can_execute(self) -> bool:
26 """Check if execution is allowed."""
27
28 if self.state == CircuitState.CLOSED:
29 return True
30
31 if self.state == CircuitState.OPEN:
32 # Check if recovery timeout has passed
33 if self._recovery_timeout_elapsed():
34 self.state = CircuitState.HALF_OPEN
35 self.half_open_successes = 0
36 return True
37 return False
38
39 if self.state == CircuitState.HALF_OPEN:
40 # Allow limited calls in half-open state
41 return True
42
43 return False
44
45 def record_success(self):
46 """Record a successful call."""
47
48 if self.state == CircuitState.HALF_OPEN:
49 self.half_open_successes += 1
50 if self.half_open_successes >= self.half_open_max_calls:
51 # Recovered - close the circuit
52 self.state = CircuitState.CLOSED
53 self.failure_count = 0
54
55 elif self.state == CircuitState.CLOSED:
56 # Reset failure count on success
57 self.failure_count = 0
58
59 def record_failure(self):
60 """Record a failed call."""
61
62 self.failure_count += 1
63 self.last_failure_time = datetime.now()
64
65 if self.state == CircuitState.HALF_OPEN:
66 # Failure in half-open means still broken
67 self.state = CircuitState.OPEN
68
69 elif self.state == CircuitState.CLOSED:
70 if self.failure_count >= self.failure_threshold:
71 # Too many failures - open the circuit
72 self.state = CircuitState.OPEN
73
74 def _recovery_timeout_elapsed(self) -> bool:
75 """Check if recovery timeout has passed."""
76 if not self.last_failure_time:
77 return True
78
79 elapsed = datetime.now() - self.last_failure_time
80 return elapsed.total_seconds() >= self.recovery_timeout
81
82
83class CircuitBreakerRegistry:
84 """Manage circuit breakers for multiple tools."""
85
86 def __init__(self):
87 self._breakers: dict[str, CircuitBreaker] = {}
88
89 def get_or_create(
90 self,
91 tool_name: str,
92 **kwargs
93 ) -> CircuitBreaker:
94 """Get or create a circuit breaker for a tool."""
95
96 if tool_name not in self._breakers:
97 self._breakers[tool_name] = CircuitBreaker(**kwargs)
98
99 return self._breakers[tool_name]
100
101
102class CircuitBreakerExecutor:
103 """Execute tools with circuit breaker protection."""
104
105 def __init__(self, executor: ToolExecutor):
106 self.executor = executor
107 self.registry = CircuitBreakerRegistry()
108
109 async def execute(
110 self,
111 tool_name: str,
112 arguments: dict
113 ) -> ExecutionResult:
114 """Execute with circuit breaker."""
115
116 breaker = self.registry.get_or_create(tool_name)
117
118 if not breaker.can_execute():
119 return ExecutionResult(
120 status=ExecutionStatus.ERROR,
121 error=f"Circuit breaker open for {tool_name}. "
122 f"Tool is temporarily unavailable due to failures.",
123 tool_name=tool_name
124 )
125
126 result = await self.executor.execute(tool_name, arguments)
127
128 if result.status == ExecutionStatus.SUCCESS:
129 breaker.record_success()
130 else:
131 breaker.record_failure()
132
133 return resultFallback Strategies
When tools fail, having fallbacks maintains agent functionality:
🐍fallback_strategies.py
1from dataclasses import dataclass
2from typing import Callable, Any
3
4@dataclass
5class ToolWithFallback:
6 """Tool with fallback options."""
7 primary: str
8 fallbacks: list[str]
9 description: str
10
11class FallbackExecutor:
12 """Execute tools with automatic fallback."""
13
14 def __init__(self, executor: ToolExecutor):
15 self.executor = executor
16 self.fallback_chains: dict[str, list[str]] = {}
17
18 def register_fallback_chain(
19 self,
20 primary: str,
21 fallbacks: list[str]
22 ):
23 """Register fallback tools for a primary tool."""
24 self.fallback_chains[primary] = fallbacks
25
26 async def execute_with_fallback(
27 self,
28 tool_name: str,
29 arguments: dict,
30 context: dict | None = None
31 ) -> ExecutionResult:
32 """Execute tool with automatic fallback on failure."""
33
34 # Try primary tool
35 result = await self.executor.execute(tool_name, arguments, context)
36
37 if result.status == ExecutionStatus.SUCCESS:
38 return result
39
40 # Try fallbacks
41 fallbacks = self.fallback_chains.get(tool_name, [])
42
43 for fallback_name in fallbacks:
44 # Adapt arguments if needed
45 adapted_args = self._adapt_arguments(
46 tool_name, fallback_name, arguments
47 )
48
49 if adapted_args is None:
50 continue
51
52 fallback_result = await self.executor.execute(
53 fallback_name, adapted_args, context
54 )
55
56 if fallback_result.status == ExecutionStatus.SUCCESS:
57 # Mark that we used a fallback
58 fallback_result.tool_name = f"{tool_name} (via {fallback_name})"
59 return fallback_result
60
61 # All fallbacks failed
62 return result
63
64 def _adapt_arguments(
65 self,
66 from_tool: str,
67 to_tool: str,
68 arguments: dict
69 ) -> dict | None:
70 """Adapt arguments from one tool to another."""
71 # Implementation depends on tool schemas
72 # Return None if adaptation not possible
73 return arguments
74
75
76# Example: Search fallback chain
77executor = FallbackExecutor(tool_executor)
78
79executor.register_fallback_chain(
80 primary="search_google",
81 fallbacks=["search_bing", "search_duckduckgo"]
82)
83
84# If Google search fails, automatically try Bing, then DuckDuckGo
85result = await executor.execute_with_fallback(
86 "search_google",
87 {"query": "Python tutorials"}
88)Cached Fallback
🐍cached_fallback.py
1from datetime import datetime, timedelta
2from typing import Any
3
4class CachedFallbackExecutor:
5 """Use cached results as fallback when tool fails."""
6
7 def __init__(self, executor: ToolExecutor, cache_ttl: float = 3600):
8 self.executor = executor
9 self.cache: dict[str, tuple[Any, datetime]] = {}
10 self.cache_ttl = timedelta(seconds=cache_ttl)
11
12 def _cache_key(self, tool: str, args: dict) -> str:
13 """Generate cache key."""
14 import hashlib
15 import json
16 arg_str = json.dumps(args, sort_keys=True)
17 return f"{tool}:{hashlib.md5(arg_str.encode()).hexdigest()}"
18
19 async def execute(
20 self,
21 tool_name: str,
22 arguments: dict
23 ) -> ExecutionResult:
24 """Execute with cache as fallback."""
25
26 cache_key = self._cache_key(tool_name, arguments)
27
28 # Try to execute
29 result = await self.executor.execute(tool_name, arguments)
30
31 if result.status == ExecutionStatus.SUCCESS:
32 # Cache successful result
33 self.cache[cache_key] = (result.result, datetime.now())
34 return result
35
36 # Execution failed - try cache
37 if cache_key in self.cache:
38 cached_result, cached_time = self.cache[cache_key]
39
40 if datetime.now() - cached_time < self.cache_ttl:
41 return ExecutionResult(
42 status=ExecutionStatus.SUCCESS,
43 result=cached_result,
44 tool_name=f"{tool_name} (cached)",
45 # Include warning about cached data
46 error=f"Using cached result from {cached_time.isoformat()}"
47 )
48
49 return resultLLM Self-Correction
The most powerful error handling: let the LLM learn from errors:
🐍llm_self_correction.py
1class SelfCorrectingAgent:
2 """Agent that learns from tool errors."""
3
4 def __init__(self, llm, tools: ToolSystem, max_corrections: int = 3):
5 self.llm = llm
6 self.tools = tools
7 self.max_corrections = max_corrections
8
9 async def run(self, task: str) -> str:
10 """Run task with self-correction on errors."""
11
12 messages = [{"role": "user", "content": task}]
13 corrections = 0
14
15 while corrections < self.max_corrections:
16 # Get LLM response
17 response = await self.llm.generate(
18 messages,
19 tools=self.tools.get_tools_for_api()
20 )
21
22 # Check for tool calls
23 tool_calls = self._extract_tool_calls(response)
24
25 if not tool_calls:
26 # No tool calls, return response
27 return self._extract_text(response)
28
29 # Execute tools
30 results = await self.tools.execute_calls(tool_calls)
31
32 # Check for errors
33 errors = [r for r in results.values()
34 if r.status != ExecutionStatus.SUCCESS]
35
36 if errors:
37 # Format error for LLM to understand
38 error_message = self._format_errors_for_llm(errors)
39
40 messages.extend([
41 {"role": "assistant", "content": response.content},
42 {"role": "user", "content": error_message}
43 ])
44
45 corrections += 1
46 else:
47 # Success - feed results back
48 messages.extend([
49 {"role": "assistant", "content": response.content},
50 {"role": "user", "content": self._format_success(results)}
51 ])
52
53 return "Max corrections exceeded"
54
55 def _format_errors_for_llm(self, errors: list[ExecutionResult]) -> str:
56 """Format errors to help LLM self-correct."""
57
58 parts = ["The following tool calls failed:\n"]
59
60 for error in errors:
61 parts.append(f"""
62Tool: {error.tool_name}
63Error: {error.error}
64Parameters used: {json.dumps(error.parameters, indent=2)}
65
66Please analyze what went wrong and try an alternative approach.
67Consider:
68- Were the parameters correct?
69- Is there a different tool that could work?
70- Is the requested resource available?
71""")
72
73 return "\n".join(parts)
74
75 def _format_success(self, results: dict[str, ExecutionResult]) -> str:
76 """Format successful results."""
77 formatted = []
78 for call_id, result in results.items():
79 formatted.append({
80 "type": "tool_result",
81 "tool_use_id": call_id,
82 "content": result.to_observation()
83 })
84 return formattedErrors as Observations
Frame errors as observations, not failures. "The file was not found" gives the LLM information to work with. The agent might then search for the file or ask the user for the correct path.
Summary
Error handling and retries:
- Classify errors: Different types need different handling
- Exponential backoff: For transient errors, wait and retry
- Circuit breaker: Prevent cascading failures
- Fallback chains: Alternative tools when primary fails
- Cache fallback: Use cached results when live fails
- LLM self-correction: Let the agent learn from errors
Next: Let's explore advanced tool patterns for complex agent scenarios.