Chapter 21
18 min read
Section 132 of 175

Error Recovery and Resilience

Production Deployment

Introduction

Production agent systems must handle failures gracefully. LLM APIs experience outages, external tools fail, network connections drop, and unexpected errors occur. Building resilient systems that recover automatically from failures is essential for maintaining service quality.

Learning Objectives: By the end of this section, you will understand how to classify errors for appropriate handling, implement intelligent retry strategies with backoff, design circuit breakers to prevent cascade failures, and build fallback mechanisms for degraded operation.

The key to resilience is accepting that failures will happen and designing systems that handle them gracefully. Rather than trying to prevent all failures, resilient systems focus on detection, containment, and recovery.


Error Classification

Not all errors should be handled the same way. Transient errors may resolve with a retry, while permanent errors require different handling. Proper classification enables appropriate recovery strategies.

Error Taxonomy

🐍python
1from enum import Enum, auto
2from dataclasses import dataclass
3from typing import Any, Optional, Type
4import re
5
6
7class ErrorCategory(Enum):
8    """Categories of errors for handling decisions."""
9    TRANSIENT = auto()      # Temporary, may resolve with retry
10    RATE_LIMITED = auto()   # API rate limit hit
11    AUTHENTICATION = auto() # Auth failure, needs refresh
12    VALIDATION = auto()     # Invalid input, no retry
13    NOT_FOUND = auto()      # Resource not found
14    SERVER_ERROR = auto()   # Remote server error
15    TIMEOUT = auto()        # Request timed out
16    NETWORK = auto()        # Network connectivity issue
17    UNKNOWN = auto()        # Unclassified error
18
19
20class ErrorSeverity(Enum):
21    """Severity levels for errors."""
22    LOW = "low"           # Log and continue
23    MEDIUM = "medium"     # Retry or fallback
24    HIGH = "high"         # Immediate attention needed
25    CRITICAL = "critical" # System failure
26
27
28@dataclass
29class ClassifiedError:
30    """An error with classification metadata."""
31    original_error: Exception
32    category: ErrorCategory
33    severity: ErrorSeverity
34    retryable: bool
35    retry_after_seconds: Optional[float] = None
36    message: str = ""
37    context: dict[str, Any] = None
38
39    def __post_init__(self):
40        if self.context is None:
41            self.context = {}
42        if not self.message:
43            self.message = str(self.original_error)
44
45
46class ErrorClassifier:
47    """Classifies errors for appropriate handling."""
48
49    def __init__(self):
50        self._patterns: list[tuple[type, ErrorCategory, ErrorSeverity, bool]] = []
51        self._message_patterns: list[tuple[str, ErrorCategory, ErrorSeverity, bool]] = []
52        self._setup_default_patterns()
53
54    def _setup_default_patterns(self) -> None:
55        """Set up default error classification patterns."""
56        # Exception type patterns
57        self.register_exception_pattern(
58            TimeoutError,
59            ErrorCategory.TIMEOUT,
60            ErrorSeverity.MEDIUM,
61            retryable=True
62        )
63        self.register_exception_pattern(
64            ConnectionError,
65            ErrorCategory.NETWORK,
66            ErrorSeverity.MEDIUM,
67            retryable=True
68        )
69        self.register_exception_pattern(
70            ValueError,
71            ErrorCategory.VALIDATION,
72            ErrorSeverity.LOW,
73            retryable=False
74        )
75
76        # Message patterns
77        self.register_message_pattern(
78            r"rate.?limit",
79            ErrorCategory.RATE_LIMITED,
80            ErrorSeverity.MEDIUM,
81            retryable=True
82        )
83        self.register_message_pattern(
84            r"(401|unauthorized|authentication)",
85            ErrorCategory.AUTHENTICATION,
86            ErrorSeverity.HIGH,
87            retryable=False
88        )
89        self.register_message_pattern(
90            r"(404|not.?found)",
91            ErrorCategory.NOT_FOUND,
92            ErrorSeverity.LOW,
93            retryable=False
94        )
95        self.register_message_pattern(
96            r"(500|502|503|504|internal.?server)",
97            ErrorCategory.SERVER_ERROR,
98            ErrorSeverity.MEDIUM,
99            retryable=True
100        )
101
102    def register_exception_pattern(
103        self,
104        exception_type: Type[Exception],
105        category: ErrorCategory,
106        severity: ErrorSeverity,
107        retryable: bool
108    ) -> None:
109        """Register a pattern for exception types."""
110        self._patterns.append((exception_type, category, severity, retryable))
111
112    def register_message_pattern(
113        self,
114        pattern: str,
115        category: ErrorCategory,
116        severity: ErrorSeverity,
117        retryable: bool
118    ) -> None:
119        """Register a pattern for error messages."""
120        self._message_patterns.append((pattern, category, severity, retryable))
121
122    def classify(
123        self,
124        error: Exception,
125        context: Optional[dict[str, Any]] = None
126    ) -> ClassifiedError:
127        """Classify an error."""
128        # Check exception type patterns first
129        for exc_type, category, severity, retryable in self._patterns:
130            if isinstance(error, exc_type):
131                return ClassifiedError(
132                    original_error=error,
133                    category=category,
134                    severity=severity,
135                    retryable=retryable,
136                    context=context or {}
137                )
138
139        # Check message patterns
140        error_message = str(error).lower()
141        for pattern, category, severity, retryable in self._message_patterns:
142            if re.search(pattern, error_message, re.IGNORECASE):
143                # Extract retry-after if present
144                retry_after = self._extract_retry_after(error_message)
145
146                return ClassifiedError(
147                    original_error=error,
148                    category=category,
149                    severity=severity,
150                    retryable=retryable,
151                    retry_after_seconds=retry_after,
152                    context=context or {}
153                )
154
155        # Default classification
156        return ClassifiedError(
157            original_error=error,
158            category=ErrorCategory.UNKNOWN,
159            severity=ErrorSeverity.MEDIUM,
160            retryable=True,  # Default to retryable for unknown errors
161            context=context or {}
162        )
163
164    def _extract_retry_after(self, message: str) -> Optional[float]:
165        """Extract retry-after duration from error message."""
166        # Look for patterns like "retry after 30 seconds"
167        match = re.search(r"retry.?after.?(d+)", message, re.IGNORECASE)
168        if match:
169            return float(match.group(1))
170        return None
171
172
173class ErrorAggregator:
174    """Aggregates and analyzes error patterns."""
175
176    def __init__(self, window_seconds: int = 300):
177        self.window_seconds = window_seconds
178        self._errors: list[tuple[float, ClassifiedError]] = []
179
180    def record(self, error: ClassifiedError) -> None:
181        """Record an error occurrence."""
182        import time
183        now = time.time()
184
185        self._errors.append((now, error))
186
187        # Prune old errors
188        cutoff = now - self.window_seconds
189        self._errors = [(t, e) for t, e in self._errors if t > cutoff]
190
191    def get_error_rate(self) -> float:
192        """Get errors per second in the window."""
193        if not self._errors:
194            return 0.0
195        return len(self._errors) / self.window_seconds
196
197    def get_category_counts(self) -> dict[ErrorCategory, int]:
198        """Get count of errors by category."""
199        from collections import Counter
200        return Counter(e.category for _, e in self._errors)
201
202    def is_pattern_detected(
203        self,
204        category: ErrorCategory,
205        threshold: int = 5
206    ) -> bool:
207        """Detect if an error pattern is emerging."""
208        counts = self.get_category_counts()
209        return counts.get(category, 0) >= threshold

The error classifier uses both exception types and message patterns to categorize errors. This enables different handling strategies based on the nature of the error rather than treating all failures the same way.

CategoryExampleRetryableAction
TransientNetwork glitchYesImmediate retry
Rate LimitedAPI quota exceededYesBackoff retry
AuthenticationToken expiredNoRefresh credentials
ValidationInvalid inputNoReturn error to user
Server Error500 responseYesRetry with backoff
TimeoutRequest timeoutYesRetry with longer timeout

Retry Strategies

Retry strategies determine when and how to retry failed operations. Effective retry logic uses exponential backoff, jitter, and respects retry-after headers to avoid overwhelming recovering services.

Advanced Retry Implementation

🐍python
1from dataclasses import dataclass
2from typing import Any, Callable, Optional, Awaitable, TypeVar
3import asyncio
4import random
5import time
6import logging
7
8
9T = TypeVar('T')
10
11
12@dataclass
13class RetryConfig:
14    """Configuration for retry behavior."""
15    max_attempts: int = 3
16    initial_delay_seconds: float = 1.0
17    max_delay_seconds: float = 60.0
18    exponential_base: float = 2.0
19    jitter_factor: float = 0.1
20    retryable_exceptions: tuple = (Exception,)
21
22
23class RetryStrategy:
24    """Implements retry with exponential backoff and jitter."""
25
26    def __init__(self, config: RetryConfig):
27        self.config = config
28        self.logger = logging.getLogger(__name__)
29
30    def calculate_delay(
31        self,
32        attempt: int,
33        retry_after: Optional[float] = None
34    ) -> float:
35        """Calculate delay before next retry."""
36        if retry_after is not None:
37            return retry_after
38
39        # Exponential backoff
40        delay = self.config.initial_delay_seconds * (
41            self.config.exponential_base ** attempt
42        )
43
44        # Cap at max delay
45        delay = min(delay, self.config.max_delay_seconds)
46
47        # Add jitter
48        jitter = delay * self.config.jitter_factor * random.random()
49        delay += jitter
50
51        return delay
52
53    async def execute(
54        self,
55        operation: Callable[[], Awaitable[T]],
56        classifier: Optional[ErrorClassifier] = None
57    ) -> T:
58        """Execute operation with retry logic."""
59        classifier = classifier or ErrorClassifier()
60        last_error: Optional[Exception] = None
61
62        for attempt in range(self.config.max_attempts):
63            try:
64                return await operation()
65
66            except self.config.retryable_exceptions as e:
67                last_error = e
68                classified = classifier.classify(e)
69
70                if not classified.retryable:
71                    self.logger.warning(
72                        f"Non-retryable error: {classified.category}"
73                    )
74                    raise
75
76                if attempt < self.config.max_attempts - 1:
77                    delay = self.calculate_delay(
78                        attempt,
79                        classified.retry_after_seconds
80                    )
81                    self.logger.info(
82                        f"Retry {attempt + 1}/{self.config.max_attempts} "
83                        f"after {delay:.2f}s: {e}"
84                    )
85                    await asyncio.sleep(delay)
86
87        raise last_error
88
89
90class AdaptiveRetry:
91    """Adapts retry behavior based on success/failure patterns."""
92
93    def __init__(
94        self,
95        base_config: RetryConfig,
96        success_threshold: int = 10,
97        failure_threshold: int = 5
98    ):
99        self.base_config = base_config
100        self.success_threshold = success_threshold
101        self.failure_threshold = failure_threshold
102
103        self._consecutive_successes = 0
104        self._consecutive_failures = 0
105        self._current_max_attempts = base_config.max_attempts
106
107    async def execute(
108        self,
109        operation: Callable[[], Awaitable[T]]
110    ) -> T:
111        """Execute with adaptive retry."""
112        strategy = RetryStrategy(RetryConfig(
113            max_attempts=self._current_max_attempts,
114            initial_delay_seconds=self.base_config.initial_delay_seconds,
115            max_delay_seconds=self.base_config.max_delay_seconds,
116            exponential_base=self.base_config.exponential_base
117        ))
118
119        try:
120            result = await strategy.execute(operation)
121            self._record_success()
122            return result
123
124        except Exception:
125            self._record_failure()
126            raise
127
128    def _record_success(self) -> None:
129        """Record successful execution."""
130        self._consecutive_successes += 1
131        self._consecutive_failures = 0
132
133        # Increase retry budget after consistent success
134        if self._consecutive_successes >= self.success_threshold:
135            self._current_max_attempts = min(
136                self._current_max_attempts + 1,
137                self.base_config.max_attempts * 2
138            )
139            self._consecutive_successes = 0
140
141    def _record_failure(self) -> None:
142        """Record failed execution."""
143        self._consecutive_failures += 1
144        self._consecutive_successes = 0
145
146        # Decrease retry budget after consistent failure
147        if self._consecutive_failures >= self.failure_threshold:
148            self._current_max_attempts = max(
149                self._current_max_attempts - 1,
150                1
151            )
152            self._consecutive_failures = 0
153
154
155class RetryBudget:
156    """Limits retry attempts across the system."""
157
158    def __init__(
159        self,
160        max_retries_per_second: float = 10.0,
161        budget_window_seconds: float = 10.0
162    ):
163        self.max_retries_per_second = max_retries_per_second
164        self.budget_window_seconds = budget_window_seconds
165        self._retries: list[float] = []
166        self._lock = asyncio.Lock()
167
168    async def acquire(self) -> bool:
169        """Attempt to acquire retry budget."""
170        async with self._lock:
171            now = time.time()
172
173            # Prune old retries
174            cutoff = now - self.budget_window_seconds
175            self._retries = [t for t in self._retries if t > cutoff]
176
177            # Check if under budget
178            max_retries = self.max_retries_per_second * self.budget_window_seconds
179            if len(self._retries) >= max_retries:
180                return False
181
182            self._retries.append(now)
183            return True
184
185    @property
186    def remaining_budget(self) -> float:
187        """Get remaining retry budget as fraction."""
188        max_retries = self.max_retries_per_second * self.budget_window_seconds
189        return max(0, 1 - len(self._retries) / max_retries)
190
191
192class BudgetedRetry:
193    """Retry with system-wide budget awareness."""
194
195    def __init__(
196        self,
197        config: RetryConfig,
198        budget: RetryBudget
199    ):
200        self.config = config
201        self.budget = budget
202        self.strategy = RetryStrategy(config)
203
204    async def execute(
205        self,
206        operation: Callable[[], Awaitable[T]]
207    ) -> T:
208        """Execute with budget-aware retry."""
209        for attempt in range(self.config.max_attempts):
210            try:
211                return await operation()
212
213            except Exception as e:
214                if attempt >= self.config.max_attempts - 1:
215                    raise
216
217                # Check budget before retrying
218                if not await self.budget.acquire():
219                    raise RuntimeError(
220                        "Retry budget exhausted"
221                    ) from e
222
223                delay = self.strategy.calculate_delay(attempt)
224                await asyncio.sleep(delay)

The retry implementation includes exponential backoff with jitter to prevent thundering herd problems, adaptive retry that adjusts based on success patterns, and system-wide retry budgets to prevent retry storms.


Circuit Breakers

Circuit breakers prevent cascade failures by stopping requests to failing services. When a service fails repeatedly, the circuit opens and requests fail fast rather than waiting for timeouts.

Circuit Breaker Implementation

🐍python
1from enum import Enum
2from dataclasses import dataclass
3from typing import Callable, Awaitable, Optional, TypeVar
4import asyncio
5import time
6
7
8class CircuitState(Enum):
9    """States of the circuit breaker."""
10    CLOSED = "closed"       # Normal operation
11    OPEN = "open"           # Failing fast
12    HALF_OPEN = "half_open" # Testing recovery
13
14
15@dataclass
16class CircuitBreakerConfig:
17    """Configuration for circuit breaker."""
18    failure_threshold: int = 5
19    success_threshold: int = 3
20    timeout_seconds: float = 30.0
21    half_open_max_calls: int = 3
22
23
24T = TypeVar('T')
25
26
27class CircuitBreaker:
28    """Circuit breaker for fault tolerance."""
29
30    def __init__(
31        self,
32        name: str,
33        config: CircuitBreakerConfig
34    ):
35        self.name = name
36        self.config = config
37
38        self._state = CircuitState.CLOSED
39        self._failure_count = 0
40        self._success_count = 0
41        self._last_failure_time: Optional[float] = None
42        self._half_open_calls = 0
43        self._lock = asyncio.Lock()
44
45    @property
46    def state(self) -> CircuitState:
47        return self._state
48
49    async def execute(
50        self,
51        operation: Callable[[], Awaitable[T]],
52        fallback: Optional[Callable[[], Awaitable[T]]] = None
53    ) -> T:
54        """Execute operation through circuit breaker."""
55        async with self._lock:
56            # Check if circuit should transition
57            await self._check_state_transition()
58
59            if self._state == CircuitState.OPEN:
60                if fallback:
61                    return await fallback()
62                raise CircuitOpenError(
63                    f"Circuit {self.name} is open"
64                )
65
66            if self._state == CircuitState.HALF_OPEN:
67                if self._half_open_calls >= self.config.half_open_max_calls:
68                    if fallback:
69                        return await fallback()
70                    raise CircuitOpenError(
71                        f"Circuit {self.name} half-open call limit reached"
72                    )
73                self._half_open_calls += 1
74
75        try:
76            result = await operation()
77            await self._record_success()
78            return result
79
80        except Exception as e:
81            await self._record_failure()
82            raise
83
84    async def _check_state_transition(self) -> None:
85        """Check if circuit should transition state."""
86        if self._state == CircuitState.OPEN:
87            # Check if timeout has passed
88            if self._last_failure_time:
89                elapsed = time.time() - self._last_failure_time
90                if elapsed >= self.config.timeout_seconds:
91                    self._state = CircuitState.HALF_OPEN
92                    self._half_open_calls = 0
93                    self._success_count = 0
94
95    async def _record_success(self) -> None:
96        """Record successful execution."""
97        async with self._lock:
98            if self._state == CircuitState.HALF_OPEN:
99                self._success_count += 1
100                if self._success_count >= self.config.success_threshold:
101                    self._state = CircuitState.CLOSED
102                    self._failure_count = 0
103            else:
104                self._failure_count = 0
105
106    async def _record_failure(self) -> None:
107        """Record failed execution."""
108        async with self._lock:
109            self._failure_count += 1
110            self._last_failure_time = time.time()
111
112            if self._state == CircuitState.HALF_OPEN:
113                # Any failure in half-open opens the circuit
114                self._state = CircuitState.OPEN
115            elif self._failure_count >= self.config.failure_threshold:
116                self._state = CircuitState.OPEN
117
118    def get_stats(self) -> dict[str, Any]:
119        """Get circuit breaker statistics."""
120        return {
121            "name": self.name,
122            "state": self._state.value,
123            "failure_count": self._failure_count,
124            "success_count": self._success_count,
125            "last_failure": self._last_failure_time
126        }
127
128
129class CircuitOpenError(Exception):
130    """Raised when circuit is open."""
131    pass
132
133
134class CircuitBreakerRegistry:
135    """Registry for managing multiple circuit breakers."""
136
137    def __init__(self):
138        self._breakers: dict[str, CircuitBreaker] = {}
139        self._default_config = CircuitBreakerConfig()
140
141    def get_or_create(
142        self,
143        name: str,
144        config: Optional[CircuitBreakerConfig] = None
145    ) -> CircuitBreaker:
146        """Get existing or create new circuit breaker."""
147        if name not in self._breakers:
148            self._breakers[name] = CircuitBreaker(
149                name=name,
150                config=config or self._default_config
151            )
152        return self._breakers[name]
153
154    def get_all_stats(self) -> dict[str, dict[str, Any]]:
155        """Get stats for all circuit breakers."""
156        return {
157            name: breaker.get_stats()
158            for name, breaker in self._breakers.items()
159        }
160
161    def reset_all(self) -> None:
162        """Reset all circuit breakers."""
163        for breaker in self._breakers.values():
164            breaker._state = CircuitState.CLOSED
165            breaker._failure_count = 0
166            breaker._success_count = 0
167
168
169class HierarchicalCircuitBreaker:
170    """Circuit breaker with parent-child relationships."""
171
172    def __init__(
173        self,
174        name: str,
175        config: CircuitBreakerConfig,
176        parent: Optional["HierarchicalCircuitBreaker"] = None
177    ):
178        self.breaker = CircuitBreaker(name, config)
179        self.parent = parent
180        self.children: list["HierarchicalCircuitBreaker"] = []
181
182        if parent:
183            parent.children.append(self)
184
185    async def execute(
186        self,
187        operation: Callable[[], Awaitable[T]],
188        fallback: Optional[Callable[[], Awaitable[T]]] = None
189    ) -> T:
190        """Execute with hierarchical circuit breaking."""
191        # Check parent circuit first
192        if self.parent and self.parent.breaker.state == CircuitState.OPEN:
193            if fallback:
194                return await fallback()
195            raise CircuitOpenError(
196                f"Parent circuit {self.parent.breaker.name} is open"
197            )
198
199        return await self.breaker.execute(operation, fallback)
200
201    def propagate_open(self) -> None:
202        """Open all child circuits when parent opens."""
203        for child in self.children:
204            child.breaker._state = CircuitState.OPEN
205            child.propagate_open()

The circuit breaker transitions between closed (normal), open (failing fast), and half-open (testing recovery) states. The hierarchical variant enables parent circuits to control child circuits.


Fallback Patterns

When primary operations fail, fallback patterns provide alternative responses or degraded functionality. Effective fallbacks maintain user experience even when services are unavailable.

Fallback Strategies

🐍python
1from abc import ABC, abstractmethod
2from typing import Any, Callable, Awaitable, Optional, TypeVar, Generic
3from dataclasses import dataclass
4import asyncio
5
6
7T = TypeVar('T')
8
9
10class FallbackStrategy(ABC, Generic[T]):
11    """Abstract fallback strategy."""
12
13    @abstractmethod
14    async def execute(self, error: Exception) -> T:
15        """Execute fallback logic."""
16        pass
17
18
19class StaticFallback(FallbackStrategy[T]):
20    """Returns a static fallback value."""
21
22    def __init__(self, value: T):
23        self.value = value
24
25    async def execute(self, error: Exception) -> T:
26        return self.value
27
28
29class CachedFallback(FallbackStrategy[T]):
30    """Returns cached value as fallback."""
31
32    def __init__(self, cache_key: str, cache: dict[str, T]):
33        self.cache_key = cache_key
34        self.cache = cache
35
36    async def execute(self, error: Exception) -> T:
37        if self.cache_key in self.cache:
38            return self.cache[self.cache_key]
39        raise FallbackFailedError("No cached value available")
40
41
42class DegradedFallback(FallbackStrategy[T]):
43    """Provides degraded functionality."""
44
45    def __init__(
46        self,
47        degraded_operation: Callable[[Exception], Awaitable[T]]
48    ):
49        self.degraded_operation = degraded_operation
50
51    async def execute(self, error: Exception) -> T:
52        return await self.degraded_operation(error)
53
54
55class ChainedFallback(FallbackStrategy[T]):
56    """Chains multiple fallback strategies."""
57
58    def __init__(self, fallbacks: list[FallbackStrategy[T]]):
59        self.fallbacks = fallbacks
60
61    async def execute(self, error: Exception) -> T:
62        last_error = error
63
64        for fallback in self.fallbacks:
65            try:
66                return await fallback.execute(error)
67            except Exception as e:
68                last_error = e
69                continue
70
71        raise FallbackFailedError(
72            f"All fallbacks failed: {last_error}"
73        )
74
75
76class FallbackFailedError(Exception):
77    """Raised when all fallbacks fail."""
78    pass
79
80
81class ResilientOperation(Generic[T]):
82    """Operation with integrated resilience patterns."""
83
84    def __init__(
85        self,
86        operation: Callable[[], Awaitable[T]],
87        circuit_breaker: Optional[CircuitBreaker] = None,
88        retry_config: Optional[RetryConfig] = None,
89        fallback: Optional[FallbackStrategy[T]] = None,
90        timeout_seconds: Optional[float] = None
91    ):
92        self.operation = operation
93        self.circuit_breaker = circuit_breaker
94        self.retry_config = retry_config
95        self.fallback = fallback
96        self.timeout_seconds = timeout_seconds
97
98    async def execute(self) -> T:
99        """Execute with all resilience patterns applied."""
100        try:
101            result = await self._execute_with_patterns()
102            return result
103
104        except Exception as e:
105            if self.fallback:
106                return await self.fallback.execute(e)
107            raise
108
109    async def _execute_with_patterns(self) -> T:
110        """Execute with retry, circuit breaker, and timeout."""
111        async def wrapped_operation():
112            # Apply timeout
113            if self.timeout_seconds:
114                return await asyncio.wait_for(
115                    self.operation(),
116                    timeout=self.timeout_seconds
117                )
118            return await self.operation()
119
120        # Apply retry
121        if self.retry_config:
122            retry_strategy = RetryStrategy(self.retry_config)
123            operation = lambda: retry_strategy.execute(wrapped_operation)
124        else:
125            operation = wrapped_operation
126
127        # Apply circuit breaker
128        if self.circuit_breaker:
129            return await self.circuit_breaker.execute(operation)
130
131        return await operation()
132
133
134@dataclass
135class GracefulDegradation:
136    """Configuration for graceful degradation."""
137    feature: str
138    enabled: bool = True
139    fallback_value: Any = None
140    degraded_message: str = ""
141
142
143class GracefulDegradationManager:
144    """Manages graceful degradation of features."""
145
146    def __init__(self):
147        self._features: dict[str, GracefulDegradation] = {}
148        self._disabled_features: set[str] = set()
149
150    def register_feature(
151        self,
152        config: GracefulDegradation
153    ) -> None:
154        """Register a feature for degradation management."""
155        self._features[config.feature] = config
156
157    def disable_feature(self, feature: str) -> None:
158        """Disable a feature."""
159        self._disabled_features.add(feature)
160
161    def enable_feature(self, feature: str) -> None:
162        """Enable a feature."""
163        self._disabled_features.discard(feature)
164
165    def is_enabled(self, feature: str) -> bool:
166        """Check if feature is enabled."""
167        if feature in self._disabled_features:
168            return False
169        config = self._features.get(feature)
170        return config.enabled if config else True
171
172    def get_fallback(self, feature: str) -> Any:
173        """Get fallback value for disabled feature."""
174        config = self._features.get(feature)
175        return config.fallback_value if config else None
176
177    async def execute_if_enabled(
178        self,
179        feature: str,
180        operation: Callable[[], Awaitable[T]],
181        fallback: Optional[T] = None
182    ) -> Optional[T]:
183        """Execute operation only if feature is enabled."""
184        if not self.is_enabled(feature):
185            return fallback or self.get_fallback(feature)
186
187        try:
188            return await operation()
189        except Exception:
190            # Disable feature on error
191            self.disable_feature(feature)
192            return fallback or self.get_fallback(feature)

Fallback strategies range from simple static values to complex chained fallbacks. The resilient operation wrapper combines timeout, retry, circuit breaker, and fallback patterns into a cohesive unit.


Recovery Procedures

Recovery procedures define how the system returns to normal operation after failures. This includes health checks, gradual traffic restoration, and verification of recovered services.

Recovery Manager

🐍python
1from dataclasses import dataclass, field
2from typing import Callable, Awaitable, Optional
3from enum import Enum
4import asyncio
5import time
6import logging
7
8
9class RecoveryState(Enum):
10    """States during recovery."""
11    HEALTHY = "healthy"
12    DEGRADED = "degraded"
13    RECOVERING = "recovering"
14    FAILED = "failed"
15
16
17@dataclass
18class RecoveryConfig:
19    """Configuration for recovery procedures."""
20    health_check_interval: float = 10.0
21    recovery_check_interval: float = 5.0
22    min_healthy_checks: int = 3
23    traffic_ramp_steps: int = 5
24    traffic_ramp_interval: float = 30.0
25
26
27@dataclass
28class ServiceHealth:
29    """Health status of a service."""
30    name: str
31    state: RecoveryState = RecoveryState.HEALTHY
32    consecutive_healthy: int = 0
33    consecutive_unhealthy: int = 0
34    traffic_percentage: float = 100.0
35    last_check: Optional[float] = None
36    last_error: Optional[str] = None
37
38
39class RecoveryManager:
40    """Manages service recovery procedures."""
41
42    def __init__(self, config: RecoveryConfig):
43        self.config = config
44        self._services: dict[str, ServiceHealth] = {}
45        self._health_checks: dict[str, Callable[[], Awaitable[bool]]] = {}
46        self._running = False
47        self.logger = logging.getLogger(__name__)
48
49    def register_service(
50        self,
51        name: str,
52        health_check: Callable[[], Awaitable[bool]]
53    ) -> None:
54        """Register a service for recovery management."""
55        self._services[name] = ServiceHealth(name=name)
56        self._health_checks[name] = health_check
57
58    async def start(self) -> None:
59        """Start recovery manager."""
60        self._running = True
61        asyncio.create_task(self._health_check_loop())
62
63    async def stop(self) -> None:
64        """Stop recovery manager."""
65        self._running = False
66
67    async def _health_check_loop(self) -> None:
68        """Main health check loop."""
69        while self._running:
70            for name in self._services:
71                await self._check_service(name)
72
73            await asyncio.sleep(self.config.health_check_interval)
74
75    async def _check_service(self, name: str) -> None:
76        """Check health of a single service."""
77        service = self._services[name]
78        health_check = self._health_checks[name]
79
80        try:
81            healthy = await health_check()
82            service.last_check = time.time()
83
84            if healthy:
85                service.consecutive_healthy += 1
86                service.consecutive_unhealthy = 0
87                service.last_error = None
88                await self._handle_healthy(service)
89            else:
90                await self._handle_unhealthy(service, "Health check returned False")
91
92        except Exception as e:
93            await self._handle_unhealthy(service, str(e))
94
95    async def _handle_healthy(self, service: ServiceHealth) -> None:
96        """Handle healthy service."""
97        if service.state == RecoveryState.RECOVERING:
98            if service.consecutive_healthy >= self.config.min_healthy_checks:
99                await self._ramp_traffic(service)
100
101        elif service.state in (RecoveryState.DEGRADED, RecoveryState.FAILED):
102            service.state = RecoveryState.RECOVERING
103            service.traffic_percentage = 0
104            self.logger.info(f"Service {service.name} starting recovery")
105
106    async def _handle_unhealthy(
107        self,
108        service: ServiceHealth,
109        error: str
110    ) -> None:
111        """Handle unhealthy service."""
112        service.consecutive_unhealthy += 1
113        service.consecutive_healthy = 0
114        service.last_error = error
115
116        if service.state == RecoveryState.HEALTHY:
117            if service.consecutive_unhealthy >= 3:
118                service.state = RecoveryState.DEGRADED
119                self.logger.warning(
120                    f"Service {service.name} degraded: {error}"
121                )
122
123        elif service.state == RecoveryState.RECOVERING:
124            service.state = RecoveryState.DEGRADED
125            service.traffic_percentage = 0
126            self.logger.warning(
127                f"Service {service.name} recovery failed: {error}"
128            )
129
130        elif service.state == RecoveryState.DEGRADED:
131            if service.consecutive_unhealthy >= 10:
132                service.state = RecoveryState.FAILED
133                service.traffic_percentage = 0
134                self.logger.error(
135                    f"Service {service.name} failed: {error}"
136                )
137
138    async def _ramp_traffic(self, service: ServiceHealth) -> None:
139        """Gradually ramp up traffic to recovering service."""
140        step_size = 100.0 / self.config.traffic_ramp_steps
141        service.traffic_percentage = min(
142            100.0,
143            service.traffic_percentage + step_size
144        )
145
146        self.logger.info(
147            f"Service {service.name} traffic: {service.traffic_percentage}%"
148        )
149
150        if service.traffic_percentage >= 100:
151            service.state = RecoveryState.HEALTHY
152            self.logger.info(f"Service {service.name} fully recovered")
153
154    def should_route_to(self, name: str) -> bool:
155        """Determine if traffic should route to service."""
156        service = self._services.get(name)
157        if not service:
158            return True
159
160        if service.state == RecoveryState.FAILED:
161            return False
162
163        # Probabilistic routing based on traffic percentage
164        import random
165        return random.random() * 100 < service.traffic_percentage
166
167    def get_status(self) -> dict[str, dict[str, Any]]:
168        """Get status of all services."""
169        return {
170            name: {
171                "state": service.state.value,
172                "traffic_percentage": service.traffic_percentage,
173                "consecutive_healthy": service.consecutive_healthy,
174                "consecutive_unhealthy": service.consecutive_unhealthy,
175                "last_check": service.last_check,
176                "last_error": service.last_error
177            }
178            for name, service in self._services.items()
179        }
180
181
182class HealthCheckSuite:
183    """Suite of health checks for comprehensive verification."""
184
185    def __init__(self):
186        self._checks: list[tuple[str, Callable[[], Awaitable[bool]]]] = []
187
188    def add_check(
189        self,
190        name: str,
191        check: Callable[[], Awaitable[bool]]
192    ) -> None:
193        """Add a health check."""
194        self._checks.append((name, check))
195
196    async def run_all(self) -> dict[str, bool]:
197        """Run all health checks."""
198        results = {}
199
200        for name, check in self._checks:
201            try:
202                results[name] = await check()
203            except Exception:
204                results[name] = False
205
206        return results
207
208    async def is_healthy(self) -> bool:
209        """Check if all checks pass."""
210        results = await self.run_all()
211        return all(results.values())

The recovery manager implements gradual traffic restoration with health verification. Services transition through degraded and recovering states before returning to full health, preventing premature load on recovering systems.


Summary

Error recovery and resilience are critical for production agent systems. Proper error classification, intelligent retries, circuit breakers, fallbacks, and recovery procedures work together to maintain service quality despite failures.

Key Takeaways

  • Error Classification - Categorize errors to enable appropriate handling strategies
  • Intelligent Retry - Use exponential backoff with jitter and respect system-wide retry budgets
  • Circuit Breakers - Prevent cascade failures by failing fast when services are down
  • Fallback Patterns - Provide degraded functionality rather than complete failure
  • Gradual Recovery - Ramp traffic slowly to recovering services with health verification
Next Steps: The next section covers API design for agent services, including best practices for exposing agent capabilities through well-designed REST and streaming APIs.