Introduction
Production agent systems must handle failures gracefully. LLM APIs experience outages, external tools fail, network connections drop, and unexpected errors occur. Building resilient systems that recover automatically from failures is essential for maintaining service quality.
Learning Objectives: By the end of this section, you will understand how to classify errors for appropriate handling, implement intelligent retry strategies with backoff, design circuit breakers to prevent cascade failures, and build fallback mechanisms for degraded operation.
The key to resilience is accepting that failures will happen and designing systems that handle them gracefully. Rather than trying to prevent all failures, resilient systems focus on detection, containment, and recovery.
Error Classification
Not all errors should be handled the same way. Transient errors may resolve with a retry, while permanent errors require different handling. Proper classification enables appropriate recovery strategies.
Error Taxonomy
1from enum import Enum, auto
2from dataclasses import dataclass
3from typing import Any, Optional, Type
4import re
5
6
7class ErrorCategory(Enum):
8 """Categories of errors for handling decisions."""
9 TRANSIENT = auto() # Temporary, may resolve with retry
10 RATE_LIMITED = auto() # API rate limit hit
11 AUTHENTICATION = auto() # Auth failure, needs refresh
12 VALIDATION = auto() # Invalid input, no retry
13 NOT_FOUND = auto() # Resource not found
14 SERVER_ERROR = auto() # Remote server error
15 TIMEOUT = auto() # Request timed out
16 NETWORK = auto() # Network connectivity issue
17 UNKNOWN = auto() # Unclassified error
18
19
20class ErrorSeverity(Enum):
21 """Severity levels for errors."""
22 LOW = "low" # Log and continue
23 MEDIUM = "medium" # Retry or fallback
24 HIGH = "high" # Immediate attention needed
25 CRITICAL = "critical" # System failure
26
27
28@dataclass
29class ClassifiedError:
30 """An error with classification metadata."""
31 original_error: Exception
32 category: ErrorCategory
33 severity: ErrorSeverity
34 retryable: bool
35 retry_after_seconds: Optional[float] = None
36 message: str = ""
37 context: dict[str, Any] = None
38
39 def __post_init__(self):
40 if self.context is None:
41 self.context = {}
42 if not self.message:
43 self.message = str(self.original_error)
44
45
46class ErrorClassifier:
47 """Classifies errors for appropriate handling."""
48
49 def __init__(self):
50 self._patterns: list[tuple[type, ErrorCategory, ErrorSeverity, bool]] = []
51 self._message_patterns: list[tuple[str, ErrorCategory, ErrorSeverity, bool]] = []
52 self._setup_default_patterns()
53
54 def _setup_default_patterns(self) -> None:
55 """Set up default error classification patterns."""
56 # Exception type patterns
57 self.register_exception_pattern(
58 TimeoutError,
59 ErrorCategory.TIMEOUT,
60 ErrorSeverity.MEDIUM,
61 retryable=True
62 )
63 self.register_exception_pattern(
64 ConnectionError,
65 ErrorCategory.NETWORK,
66 ErrorSeverity.MEDIUM,
67 retryable=True
68 )
69 self.register_exception_pattern(
70 ValueError,
71 ErrorCategory.VALIDATION,
72 ErrorSeverity.LOW,
73 retryable=False
74 )
75
76 # Message patterns
77 self.register_message_pattern(
78 r"rate.?limit",
79 ErrorCategory.RATE_LIMITED,
80 ErrorSeverity.MEDIUM,
81 retryable=True
82 )
83 self.register_message_pattern(
84 r"(401|unauthorized|authentication)",
85 ErrorCategory.AUTHENTICATION,
86 ErrorSeverity.HIGH,
87 retryable=False
88 )
89 self.register_message_pattern(
90 r"(404|not.?found)",
91 ErrorCategory.NOT_FOUND,
92 ErrorSeverity.LOW,
93 retryable=False
94 )
95 self.register_message_pattern(
96 r"(500|502|503|504|internal.?server)",
97 ErrorCategory.SERVER_ERROR,
98 ErrorSeverity.MEDIUM,
99 retryable=True
100 )
101
102 def register_exception_pattern(
103 self,
104 exception_type: Type[Exception],
105 category: ErrorCategory,
106 severity: ErrorSeverity,
107 retryable: bool
108 ) -> None:
109 """Register a pattern for exception types."""
110 self._patterns.append((exception_type, category, severity, retryable))
111
112 def register_message_pattern(
113 self,
114 pattern: str,
115 category: ErrorCategory,
116 severity: ErrorSeverity,
117 retryable: bool
118 ) -> None:
119 """Register a pattern for error messages."""
120 self._message_patterns.append((pattern, category, severity, retryable))
121
122 def classify(
123 self,
124 error: Exception,
125 context: Optional[dict[str, Any]] = None
126 ) -> ClassifiedError:
127 """Classify an error."""
128 # Check exception type patterns first
129 for exc_type, category, severity, retryable in self._patterns:
130 if isinstance(error, exc_type):
131 return ClassifiedError(
132 original_error=error,
133 category=category,
134 severity=severity,
135 retryable=retryable,
136 context=context or {}
137 )
138
139 # Check message patterns
140 error_message = str(error).lower()
141 for pattern, category, severity, retryable in self._message_patterns:
142 if re.search(pattern, error_message, re.IGNORECASE):
143 # Extract retry-after if present
144 retry_after = self._extract_retry_after(error_message)
145
146 return ClassifiedError(
147 original_error=error,
148 category=category,
149 severity=severity,
150 retryable=retryable,
151 retry_after_seconds=retry_after,
152 context=context or {}
153 )
154
155 # Default classification
156 return ClassifiedError(
157 original_error=error,
158 category=ErrorCategory.UNKNOWN,
159 severity=ErrorSeverity.MEDIUM,
160 retryable=True, # Default to retryable for unknown errors
161 context=context or {}
162 )
163
164 def _extract_retry_after(self, message: str) -> Optional[float]:
165 """Extract retry-after duration from error message."""
166 # Look for patterns like "retry after 30 seconds"
167 match = re.search(r"retry.?after.?(d+)", message, re.IGNORECASE)
168 if match:
169 return float(match.group(1))
170 return None
171
172
173class ErrorAggregator:
174 """Aggregates and analyzes error patterns."""
175
176 def __init__(self, window_seconds: int = 300):
177 self.window_seconds = window_seconds
178 self._errors: list[tuple[float, ClassifiedError]] = []
179
180 def record(self, error: ClassifiedError) -> None:
181 """Record an error occurrence."""
182 import time
183 now = time.time()
184
185 self._errors.append((now, error))
186
187 # Prune old errors
188 cutoff = now - self.window_seconds
189 self._errors = [(t, e) for t, e in self._errors if t > cutoff]
190
191 def get_error_rate(self) -> float:
192 """Get errors per second in the window."""
193 if not self._errors:
194 return 0.0
195 return len(self._errors) / self.window_seconds
196
197 def get_category_counts(self) -> dict[ErrorCategory, int]:
198 """Get count of errors by category."""
199 from collections import Counter
200 return Counter(e.category for _, e in self._errors)
201
202 def is_pattern_detected(
203 self,
204 category: ErrorCategory,
205 threshold: int = 5
206 ) -> bool:
207 """Detect if an error pattern is emerging."""
208 counts = self.get_category_counts()
209 return counts.get(category, 0) >= thresholdThe error classifier uses both exception types and message patterns to categorize errors. This enables different handling strategies based on the nature of the error rather than treating all failures the same way.
| Category | Example | Retryable | Action |
|---|---|---|---|
| Transient | Network glitch | Yes | Immediate retry |
| Rate Limited | API quota exceeded | Yes | Backoff retry |
| Authentication | Token expired | No | Refresh credentials |
| Validation | Invalid input | No | Return error to user |
| Server Error | 500 response | Yes | Retry with backoff |
| Timeout | Request timeout | Yes | Retry with longer timeout |
Retry Strategies
Retry strategies determine when and how to retry failed operations. Effective retry logic uses exponential backoff, jitter, and respects retry-after headers to avoid overwhelming recovering services.
Advanced Retry Implementation
1from dataclasses import dataclass
2from typing import Any, Callable, Optional, Awaitable, TypeVar
3import asyncio
4import random
5import time
6import logging
7
8
9T = TypeVar('T')
10
11
12@dataclass
13class RetryConfig:
14 """Configuration for retry behavior."""
15 max_attempts: int = 3
16 initial_delay_seconds: float = 1.0
17 max_delay_seconds: float = 60.0
18 exponential_base: float = 2.0
19 jitter_factor: float = 0.1
20 retryable_exceptions: tuple = (Exception,)
21
22
23class RetryStrategy:
24 """Implements retry with exponential backoff and jitter."""
25
26 def __init__(self, config: RetryConfig):
27 self.config = config
28 self.logger = logging.getLogger(__name__)
29
30 def calculate_delay(
31 self,
32 attempt: int,
33 retry_after: Optional[float] = None
34 ) -> float:
35 """Calculate delay before next retry."""
36 if retry_after is not None:
37 return retry_after
38
39 # Exponential backoff
40 delay = self.config.initial_delay_seconds * (
41 self.config.exponential_base ** attempt
42 )
43
44 # Cap at max delay
45 delay = min(delay, self.config.max_delay_seconds)
46
47 # Add jitter
48 jitter = delay * self.config.jitter_factor * random.random()
49 delay += jitter
50
51 return delay
52
53 async def execute(
54 self,
55 operation: Callable[[], Awaitable[T]],
56 classifier: Optional[ErrorClassifier] = None
57 ) -> T:
58 """Execute operation with retry logic."""
59 classifier = classifier or ErrorClassifier()
60 last_error: Optional[Exception] = None
61
62 for attempt in range(self.config.max_attempts):
63 try:
64 return await operation()
65
66 except self.config.retryable_exceptions as e:
67 last_error = e
68 classified = classifier.classify(e)
69
70 if not classified.retryable:
71 self.logger.warning(
72 f"Non-retryable error: {classified.category}"
73 )
74 raise
75
76 if attempt < self.config.max_attempts - 1:
77 delay = self.calculate_delay(
78 attempt,
79 classified.retry_after_seconds
80 )
81 self.logger.info(
82 f"Retry {attempt + 1}/{self.config.max_attempts} "
83 f"after {delay:.2f}s: {e}"
84 )
85 await asyncio.sleep(delay)
86
87 raise last_error
88
89
90class AdaptiveRetry:
91 """Adapts retry behavior based on success/failure patterns."""
92
93 def __init__(
94 self,
95 base_config: RetryConfig,
96 success_threshold: int = 10,
97 failure_threshold: int = 5
98 ):
99 self.base_config = base_config
100 self.success_threshold = success_threshold
101 self.failure_threshold = failure_threshold
102
103 self._consecutive_successes = 0
104 self._consecutive_failures = 0
105 self._current_max_attempts = base_config.max_attempts
106
107 async def execute(
108 self,
109 operation: Callable[[], Awaitable[T]]
110 ) -> T:
111 """Execute with adaptive retry."""
112 strategy = RetryStrategy(RetryConfig(
113 max_attempts=self._current_max_attempts,
114 initial_delay_seconds=self.base_config.initial_delay_seconds,
115 max_delay_seconds=self.base_config.max_delay_seconds,
116 exponential_base=self.base_config.exponential_base
117 ))
118
119 try:
120 result = await strategy.execute(operation)
121 self._record_success()
122 return result
123
124 except Exception:
125 self._record_failure()
126 raise
127
128 def _record_success(self) -> None:
129 """Record successful execution."""
130 self._consecutive_successes += 1
131 self._consecutive_failures = 0
132
133 # Increase retry budget after consistent success
134 if self._consecutive_successes >= self.success_threshold:
135 self._current_max_attempts = min(
136 self._current_max_attempts + 1,
137 self.base_config.max_attempts * 2
138 )
139 self._consecutive_successes = 0
140
141 def _record_failure(self) -> None:
142 """Record failed execution."""
143 self._consecutive_failures += 1
144 self._consecutive_successes = 0
145
146 # Decrease retry budget after consistent failure
147 if self._consecutive_failures >= self.failure_threshold:
148 self._current_max_attempts = max(
149 self._current_max_attempts - 1,
150 1
151 )
152 self._consecutive_failures = 0
153
154
155class RetryBudget:
156 """Limits retry attempts across the system."""
157
158 def __init__(
159 self,
160 max_retries_per_second: float = 10.0,
161 budget_window_seconds: float = 10.0
162 ):
163 self.max_retries_per_second = max_retries_per_second
164 self.budget_window_seconds = budget_window_seconds
165 self._retries: list[float] = []
166 self._lock = asyncio.Lock()
167
168 async def acquire(self) -> bool:
169 """Attempt to acquire retry budget."""
170 async with self._lock:
171 now = time.time()
172
173 # Prune old retries
174 cutoff = now - self.budget_window_seconds
175 self._retries = [t for t in self._retries if t > cutoff]
176
177 # Check if under budget
178 max_retries = self.max_retries_per_second * self.budget_window_seconds
179 if len(self._retries) >= max_retries:
180 return False
181
182 self._retries.append(now)
183 return True
184
185 @property
186 def remaining_budget(self) -> float:
187 """Get remaining retry budget as fraction."""
188 max_retries = self.max_retries_per_second * self.budget_window_seconds
189 return max(0, 1 - len(self._retries) / max_retries)
190
191
192class BudgetedRetry:
193 """Retry with system-wide budget awareness."""
194
195 def __init__(
196 self,
197 config: RetryConfig,
198 budget: RetryBudget
199 ):
200 self.config = config
201 self.budget = budget
202 self.strategy = RetryStrategy(config)
203
204 async def execute(
205 self,
206 operation: Callable[[], Awaitable[T]]
207 ) -> T:
208 """Execute with budget-aware retry."""
209 for attempt in range(self.config.max_attempts):
210 try:
211 return await operation()
212
213 except Exception as e:
214 if attempt >= self.config.max_attempts - 1:
215 raise
216
217 # Check budget before retrying
218 if not await self.budget.acquire():
219 raise RuntimeError(
220 "Retry budget exhausted"
221 ) from e
222
223 delay = self.strategy.calculate_delay(attempt)
224 await asyncio.sleep(delay)The retry implementation includes exponential backoff with jitter to prevent thundering herd problems, adaptive retry that adjusts based on success patterns, and system-wide retry budgets to prevent retry storms.
Circuit Breakers
Circuit breakers prevent cascade failures by stopping requests to failing services. When a service fails repeatedly, the circuit opens and requests fail fast rather than waiting for timeouts.
Circuit Breaker Implementation
1from enum import Enum
2from dataclasses import dataclass
3from typing import Callable, Awaitable, Optional, TypeVar
4import asyncio
5import time
6
7
8class CircuitState(Enum):
9 """States of the circuit breaker."""
10 CLOSED = "closed" # Normal operation
11 OPEN = "open" # Failing fast
12 HALF_OPEN = "half_open" # Testing recovery
13
14
15@dataclass
16class CircuitBreakerConfig:
17 """Configuration for circuit breaker."""
18 failure_threshold: int = 5
19 success_threshold: int = 3
20 timeout_seconds: float = 30.0
21 half_open_max_calls: int = 3
22
23
24T = TypeVar('T')
25
26
27class CircuitBreaker:
28 """Circuit breaker for fault tolerance."""
29
30 def __init__(
31 self,
32 name: str,
33 config: CircuitBreakerConfig
34 ):
35 self.name = name
36 self.config = config
37
38 self._state = CircuitState.CLOSED
39 self._failure_count = 0
40 self._success_count = 0
41 self._last_failure_time: Optional[float] = None
42 self._half_open_calls = 0
43 self._lock = asyncio.Lock()
44
45 @property
46 def state(self) -> CircuitState:
47 return self._state
48
49 async def execute(
50 self,
51 operation: Callable[[], Awaitable[T]],
52 fallback: Optional[Callable[[], Awaitable[T]]] = None
53 ) -> T:
54 """Execute operation through circuit breaker."""
55 async with self._lock:
56 # Check if circuit should transition
57 await self._check_state_transition()
58
59 if self._state == CircuitState.OPEN:
60 if fallback:
61 return await fallback()
62 raise CircuitOpenError(
63 f"Circuit {self.name} is open"
64 )
65
66 if self._state == CircuitState.HALF_OPEN:
67 if self._half_open_calls >= self.config.half_open_max_calls:
68 if fallback:
69 return await fallback()
70 raise CircuitOpenError(
71 f"Circuit {self.name} half-open call limit reached"
72 )
73 self._half_open_calls += 1
74
75 try:
76 result = await operation()
77 await self._record_success()
78 return result
79
80 except Exception as e:
81 await self._record_failure()
82 raise
83
84 async def _check_state_transition(self) -> None:
85 """Check if circuit should transition state."""
86 if self._state == CircuitState.OPEN:
87 # Check if timeout has passed
88 if self._last_failure_time:
89 elapsed = time.time() - self._last_failure_time
90 if elapsed >= self.config.timeout_seconds:
91 self._state = CircuitState.HALF_OPEN
92 self._half_open_calls = 0
93 self._success_count = 0
94
95 async def _record_success(self) -> None:
96 """Record successful execution."""
97 async with self._lock:
98 if self._state == CircuitState.HALF_OPEN:
99 self._success_count += 1
100 if self._success_count >= self.config.success_threshold:
101 self._state = CircuitState.CLOSED
102 self._failure_count = 0
103 else:
104 self._failure_count = 0
105
106 async def _record_failure(self) -> None:
107 """Record failed execution."""
108 async with self._lock:
109 self._failure_count += 1
110 self._last_failure_time = time.time()
111
112 if self._state == CircuitState.HALF_OPEN:
113 # Any failure in half-open opens the circuit
114 self._state = CircuitState.OPEN
115 elif self._failure_count >= self.config.failure_threshold:
116 self._state = CircuitState.OPEN
117
118 def get_stats(self) -> dict[str, Any]:
119 """Get circuit breaker statistics."""
120 return {
121 "name": self.name,
122 "state": self._state.value,
123 "failure_count": self._failure_count,
124 "success_count": self._success_count,
125 "last_failure": self._last_failure_time
126 }
127
128
129class CircuitOpenError(Exception):
130 """Raised when circuit is open."""
131 pass
132
133
134class CircuitBreakerRegistry:
135 """Registry for managing multiple circuit breakers."""
136
137 def __init__(self):
138 self._breakers: dict[str, CircuitBreaker] = {}
139 self._default_config = CircuitBreakerConfig()
140
141 def get_or_create(
142 self,
143 name: str,
144 config: Optional[CircuitBreakerConfig] = None
145 ) -> CircuitBreaker:
146 """Get existing or create new circuit breaker."""
147 if name not in self._breakers:
148 self._breakers[name] = CircuitBreaker(
149 name=name,
150 config=config or self._default_config
151 )
152 return self._breakers[name]
153
154 def get_all_stats(self) -> dict[str, dict[str, Any]]:
155 """Get stats for all circuit breakers."""
156 return {
157 name: breaker.get_stats()
158 for name, breaker in self._breakers.items()
159 }
160
161 def reset_all(self) -> None:
162 """Reset all circuit breakers."""
163 for breaker in self._breakers.values():
164 breaker._state = CircuitState.CLOSED
165 breaker._failure_count = 0
166 breaker._success_count = 0
167
168
169class HierarchicalCircuitBreaker:
170 """Circuit breaker with parent-child relationships."""
171
172 def __init__(
173 self,
174 name: str,
175 config: CircuitBreakerConfig,
176 parent: Optional["HierarchicalCircuitBreaker"] = None
177 ):
178 self.breaker = CircuitBreaker(name, config)
179 self.parent = parent
180 self.children: list["HierarchicalCircuitBreaker"] = []
181
182 if parent:
183 parent.children.append(self)
184
185 async def execute(
186 self,
187 operation: Callable[[], Awaitable[T]],
188 fallback: Optional[Callable[[], Awaitable[T]]] = None
189 ) -> T:
190 """Execute with hierarchical circuit breaking."""
191 # Check parent circuit first
192 if self.parent and self.parent.breaker.state == CircuitState.OPEN:
193 if fallback:
194 return await fallback()
195 raise CircuitOpenError(
196 f"Parent circuit {self.parent.breaker.name} is open"
197 )
198
199 return await self.breaker.execute(operation, fallback)
200
201 def propagate_open(self) -> None:
202 """Open all child circuits when parent opens."""
203 for child in self.children:
204 child.breaker._state = CircuitState.OPEN
205 child.propagate_open()The circuit breaker transitions between closed (normal), open (failing fast), and half-open (testing recovery) states. The hierarchical variant enables parent circuits to control child circuits.
Fallback Patterns
When primary operations fail, fallback patterns provide alternative responses or degraded functionality. Effective fallbacks maintain user experience even when services are unavailable.
Fallback Strategies
1from abc import ABC, abstractmethod
2from typing import Any, Callable, Awaitable, Optional, TypeVar, Generic
3from dataclasses import dataclass
4import asyncio
5
6
7T = TypeVar('T')
8
9
10class FallbackStrategy(ABC, Generic[T]):
11 """Abstract fallback strategy."""
12
13 @abstractmethod
14 async def execute(self, error: Exception) -> T:
15 """Execute fallback logic."""
16 pass
17
18
19class StaticFallback(FallbackStrategy[T]):
20 """Returns a static fallback value."""
21
22 def __init__(self, value: T):
23 self.value = value
24
25 async def execute(self, error: Exception) -> T:
26 return self.value
27
28
29class CachedFallback(FallbackStrategy[T]):
30 """Returns cached value as fallback."""
31
32 def __init__(self, cache_key: str, cache: dict[str, T]):
33 self.cache_key = cache_key
34 self.cache = cache
35
36 async def execute(self, error: Exception) -> T:
37 if self.cache_key in self.cache:
38 return self.cache[self.cache_key]
39 raise FallbackFailedError("No cached value available")
40
41
42class DegradedFallback(FallbackStrategy[T]):
43 """Provides degraded functionality."""
44
45 def __init__(
46 self,
47 degraded_operation: Callable[[Exception], Awaitable[T]]
48 ):
49 self.degraded_operation = degraded_operation
50
51 async def execute(self, error: Exception) -> T:
52 return await self.degraded_operation(error)
53
54
55class ChainedFallback(FallbackStrategy[T]):
56 """Chains multiple fallback strategies."""
57
58 def __init__(self, fallbacks: list[FallbackStrategy[T]]):
59 self.fallbacks = fallbacks
60
61 async def execute(self, error: Exception) -> T:
62 last_error = error
63
64 for fallback in self.fallbacks:
65 try:
66 return await fallback.execute(error)
67 except Exception as e:
68 last_error = e
69 continue
70
71 raise FallbackFailedError(
72 f"All fallbacks failed: {last_error}"
73 )
74
75
76class FallbackFailedError(Exception):
77 """Raised when all fallbacks fail."""
78 pass
79
80
81class ResilientOperation(Generic[T]):
82 """Operation with integrated resilience patterns."""
83
84 def __init__(
85 self,
86 operation: Callable[[], Awaitable[T]],
87 circuit_breaker: Optional[CircuitBreaker] = None,
88 retry_config: Optional[RetryConfig] = None,
89 fallback: Optional[FallbackStrategy[T]] = None,
90 timeout_seconds: Optional[float] = None
91 ):
92 self.operation = operation
93 self.circuit_breaker = circuit_breaker
94 self.retry_config = retry_config
95 self.fallback = fallback
96 self.timeout_seconds = timeout_seconds
97
98 async def execute(self) -> T:
99 """Execute with all resilience patterns applied."""
100 try:
101 result = await self._execute_with_patterns()
102 return result
103
104 except Exception as e:
105 if self.fallback:
106 return await self.fallback.execute(e)
107 raise
108
109 async def _execute_with_patterns(self) -> T:
110 """Execute with retry, circuit breaker, and timeout."""
111 async def wrapped_operation():
112 # Apply timeout
113 if self.timeout_seconds:
114 return await asyncio.wait_for(
115 self.operation(),
116 timeout=self.timeout_seconds
117 )
118 return await self.operation()
119
120 # Apply retry
121 if self.retry_config:
122 retry_strategy = RetryStrategy(self.retry_config)
123 operation = lambda: retry_strategy.execute(wrapped_operation)
124 else:
125 operation = wrapped_operation
126
127 # Apply circuit breaker
128 if self.circuit_breaker:
129 return await self.circuit_breaker.execute(operation)
130
131 return await operation()
132
133
134@dataclass
135class GracefulDegradation:
136 """Configuration for graceful degradation."""
137 feature: str
138 enabled: bool = True
139 fallback_value: Any = None
140 degraded_message: str = ""
141
142
143class GracefulDegradationManager:
144 """Manages graceful degradation of features."""
145
146 def __init__(self):
147 self._features: dict[str, GracefulDegradation] = {}
148 self._disabled_features: set[str] = set()
149
150 def register_feature(
151 self,
152 config: GracefulDegradation
153 ) -> None:
154 """Register a feature for degradation management."""
155 self._features[config.feature] = config
156
157 def disable_feature(self, feature: str) -> None:
158 """Disable a feature."""
159 self._disabled_features.add(feature)
160
161 def enable_feature(self, feature: str) -> None:
162 """Enable a feature."""
163 self._disabled_features.discard(feature)
164
165 def is_enabled(self, feature: str) -> bool:
166 """Check if feature is enabled."""
167 if feature in self._disabled_features:
168 return False
169 config = self._features.get(feature)
170 return config.enabled if config else True
171
172 def get_fallback(self, feature: str) -> Any:
173 """Get fallback value for disabled feature."""
174 config = self._features.get(feature)
175 return config.fallback_value if config else None
176
177 async def execute_if_enabled(
178 self,
179 feature: str,
180 operation: Callable[[], Awaitable[T]],
181 fallback: Optional[T] = None
182 ) -> Optional[T]:
183 """Execute operation only if feature is enabled."""
184 if not self.is_enabled(feature):
185 return fallback or self.get_fallback(feature)
186
187 try:
188 return await operation()
189 except Exception:
190 # Disable feature on error
191 self.disable_feature(feature)
192 return fallback or self.get_fallback(feature)Fallback strategies range from simple static values to complex chained fallbacks. The resilient operation wrapper combines timeout, retry, circuit breaker, and fallback patterns into a cohesive unit.
Recovery Procedures
Recovery procedures define how the system returns to normal operation after failures. This includes health checks, gradual traffic restoration, and verification of recovered services.
Recovery Manager
1from dataclasses import dataclass, field
2from typing import Callable, Awaitable, Optional
3from enum import Enum
4import asyncio
5import time
6import logging
7
8
9class RecoveryState(Enum):
10 """States during recovery."""
11 HEALTHY = "healthy"
12 DEGRADED = "degraded"
13 RECOVERING = "recovering"
14 FAILED = "failed"
15
16
17@dataclass
18class RecoveryConfig:
19 """Configuration for recovery procedures."""
20 health_check_interval: float = 10.0
21 recovery_check_interval: float = 5.0
22 min_healthy_checks: int = 3
23 traffic_ramp_steps: int = 5
24 traffic_ramp_interval: float = 30.0
25
26
27@dataclass
28class ServiceHealth:
29 """Health status of a service."""
30 name: str
31 state: RecoveryState = RecoveryState.HEALTHY
32 consecutive_healthy: int = 0
33 consecutive_unhealthy: int = 0
34 traffic_percentage: float = 100.0
35 last_check: Optional[float] = None
36 last_error: Optional[str] = None
37
38
39class RecoveryManager:
40 """Manages service recovery procedures."""
41
42 def __init__(self, config: RecoveryConfig):
43 self.config = config
44 self._services: dict[str, ServiceHealth] = {}
45 self._health_checks: dict[str, Callable[[], Awaitable[bool]]] = {}
46 self._running = False
47 self.logger = logging.getLogger(__name__)
48
49 def register_service(
50 self,
51 name: str,
52 health_check: Callable[[], Awaitable[bool]]
53 ) -> None:
54 """Register a service for recovery management."""
55 self._services[name] = ServiceHealth(name=name)
56 self._health_checks[name] = health_check
57
58 async def start(self) -> None:
59 """Start recovery manager."""
60 self._running = True
61 asyncio.create_task(self._health_check_loop())
62
63 async def stop(self) -> None:
64 """Stop recovery manager."""
65 self._running = False
66
67 async def _health_check_loop(self) -> None:
68 """Main health check loop."""
69 while self._running:
70 for name in self._services:
71 await self._check_service(name)
72
73 await asyncio.sleep(self.config.health_check_interval)
74
75 async def _check_service(self, name: str) -> None:
76 """Check health of a single service."""
77 service = self._services[name]
78 health_check = self._health_checks[name]
79
80 try:
81 healthy = await health_check()
82 service.last_check = time.time()
83
84 if healthy:
85 service.consecutive_healthy += 1
86 service.consecutive_unhealthy = 0
87 service.last_error = None
88 await self._handle_healthy(service)
89 else:
90 await self._handle_unhealthy(service, "Health check returned False")
91
92 except Exception as e:
93 await self._handle_unhealthy(service, str(e))
94
95 async def _handle_healthy(self, service: ServiceHealth) -> None:
96 """Handle healthy service."""
97 if service.state == RecoveryState.RECOVERING:
98 if service.consecutive_healthy >= self.config.min_healthy_checks:
99 await self._ramp_traffic(service)
100
101 elif service.state in (RecoveryState.DEGRADED, RecoveryState.FAILED):
102 service.state = RecoveryState.RECOVERING
103 service.traffic_percentage = 0
104 self.logger.info(f"Service {service.name} starting recovery")
105
106 async def _handle_unhealthy(
107 self,
108 service: ServiceHealth,
109 error: str
110 ) -> None:
111 """Handle unhealthy service."""
112 service.consecutive_unhealthy += 1
113 service.consecutive_healthy = 0
114 service.last_error = error
115
116 if service.state == RecoveryState.HEALTHY:
117 if service.consecutive_unhealthy >= 3:
118 service.state = RecoveryState.DEGRADED
119 self.logger.warning(
120 f"Service {service.name} degraded: {error}"
121 )
122
123 elif service.state == RecoveryState.RECOVERING:
124 service.state = RecoveryState.DEGRADED
125 service.traffic_percentage = 0
126 self.logger.warning(
127 f"Service {service.name} recovery failed: {error}"
128 )
129
130 elif service.state == RecoveryState.DEGRADED:
131 if service.consecutive_unhealthy >= 10:
132 service.state = RecoveryState.FAILED
133 service.traffic_percentage = 0
134 self.logger.error(
135 f"Service {service.name} failed: {error}"
136 )
137
138 async def _ramp_traffic(self, service: ServiceHealth) -> None:
139 """Gradually ramp up traffic to recovering service."""
140 step_size = 100.0 / self.config.traffic_ramp_steps
141 service.traffic_percentage = min(
142 100.0,
143 service.traffic_percentage + step_size
144 )
145
146 self.logger.info(
147 f"Service {service.name} traffic: {service.traffic_percentage}%"
148 )
149
150 if service.traffic_percentage >= 100:
151 service.state = RecoveryState.HEALTHY
152 self.logger.info(f"Service {service.name} fully recovered")
153
154 def should_route_to(self, name: str) -> bool:
155 """Determine if traffic should route to service."""
156 service = self._services.get(name)
157 if not service:
158 return True
159
160 if service.state == RecoveryState.FAILED:
161 return False
162
163 # Probabilistic routing based on traffic percentage
164 import random
165 return random.random() * 100 < service.traffic_percentage
166
167 def get_status(self) -> dict[str, dict[str, Any]]:
168 """Get status of all services."""
169 return {
170 name: {
171 "state": service.state.value,
172 "traffic_percentage": service.traffic_percentage,
173 "consecutive_healthy": service.consecutive_healthy,
174 "consecutive_unhealthy": service.consecutive_unhealthy,
175 "last_check": service.last_check,
176 "last_error": service.last_error
177 }
178 for name, service in self._services.items()
179 }
180
181
182class HealthCheckSuite:
183 """Suite of health checks for comprehensive verification."""
184
185 def __init__(self):
186 self._checks: list[tuple[str, Callable[[], Awaitable[bool]]]] = []
187
188 def add_check(
189 self,
190 name: str,
191 check: Callable[[], Awaitable[bool]]
192 ) -> None:
193 """Add a health check."""
194 self._checks.append((name, check))
195
196 async def run_all(self) -> dict[str, bool]:
197 """Run all health checks."""
198 results = {}
199
200 for name, check in self._checks:
201 try:
202 results[name] = await check()
203 except Exception:
204 results[name] = False
205
206 return results
207
208 async def is_healthy(self) -> bool:
209 """Check if all checks pass."""
210 results = await self.run_all()
211 return all(results.values())The recovery manager implements gradual traffic restoration with health verification. Services transition through degraded and recovering states before returning to full health, preventing premature load on recovering systems.
Summary
Error recovery and resilience are critical for production agent systems. Proper error classification, intelligent retries, circuit breakers, fallbacks, and recovery procedures work together to maintain service quality despite failures.
Key Takeaways
- Error Classification - Categorize errors to enable appropriate handling strategies
- Intelligent Retry - Use exponential backoff with jitter and respect system-wide retry budgets
- Circuit Breakers - Prevent cascade failures by failing fast when services are down
- Fallback Patterns - Provide degraded functionality rather than complete failure
- Gradual Recovery - Ramp traffic slowly to recovering services with health verification
Next Steps: The next section covers API design for agent services, including best practices for exposing agent capabilities through well-designed REST and streaming APIs.