Chapter 19
15 min read
Section 121 of 175

Error Analysis and Recovery

Observability and Debugging

Introduction

Errors are inevitable in agent systems due to their complexity and reliance on external services. Effective error handling means not just catching errors, but understanding them, recovering gracefully, and learning from them to prevent future occurrences.

Section Overview: We'll explore error classification, recovery strategies, resilience patterns, and error reporting for building robust agent systems.

Error Classification

Types of Agent Errors

Error TypeSourceRecoverable
LLM API errorRate limits, timeoutsUsually yes
Tool failureExternal service downOften yes
Parsing errorInvalid LLM outputYes, with retry
Logic errorWrong agent decisionSometimes
Context overflowToo much historyYes, with truncation
Safety violationBlocked actionDepends on design
🐍python
1"""
2Error Classification System
3
4Classify errors for appropriate handling.
5"""
6
7from dataclasses import dataclass
8from enum import Enum
9from typing import Any
10
11
12class ErrorCategory(Enum):
13    LLM_API = "llm_api"
14    TOOL_EXECUTION = "tool_execution"
15    PARSING = "parsing"
16    LOGIC = "logic"
17    CONTEXT = "context"
18    SAFETY = "safety"
19    NETWORK = "network"
20    VALIDATION = "validation"
21    UNKNOWN = "unknown"
22
23
24class ErrorSeverity(Enum):
25    LOW = "low"           # Can continue with degraded functionality
26    MEDIUM = "medium"     # Requires retry or fallback
27    HIGH = "high"         # Requires intervention
28    CRITICAL = "critical" # Agent must stop
29
30
31@dataclass
32class ClassifiedError:
33    """A classified error with context."""
34    category: ErrorCategory
35    severity: ErrorSeverity
36    message: str
37    original_exception: Exception | None
38    recoverable: bool
39    suggested_action: str
40    context: dict
41
42
43class ErrorClassifier:
44    """Classify errors for appropriate handling."""
45
46    def __init__(self):
47        self.classification_rules: list[tuple[callable, ErrorCategory, ErrorSeverity]] = [
48            # LLM API errors
49            (lambda e: "rate limit" in str(e).lower(), ErrorCategory.LLM_API, ErrorSeverity.MEDIUM),
50            (lambda e: "timeout" in str(e).lower(), ErrorCategory.LLM_API, ErrorSeverity.MEDIUM),
51            (lambda e: "context length" in str(e).lower(), ErrorCategory.CONTEXT, ErrorSeverity.HIGH),
52
53            # Tool errors
54            (lambda e: "connection" in str(e).lower(), ErrorCategory.NETWORK, ErrorSeverity.MEDIUM),
55            (lambda e: "permission denied" in str(e).lower(), ErrorCategory.TOOL_EXECUTION, ErrorSeverity.HIGH),
56
57            # Parsing errors
58            (lambda e: isinstance(e, (ValueError, KeyError)), ErrorCategory.PARSING, ErrorSeverity.LOW),
59            (lambda e: "json" in str(e).lower(), ErrorCategory.PARSING, ErrorSeverity.LOW),
60
61            # Safety errors
62            (lambda e: "blocked" in str(e).lower(), ErrorCategory.SAFETY, ErrorSeverity.HIGH),
63        ]
64
65    def classify(self, error: Exception, context: dict = None) -> ClassifiedError:
66        """Classify an error."""
67        error_str = str(error).lower()
68
69        # Try classification rules
70        for rule, category, severity in self.classification_rules:
71            if rule(error):
72                return ClassifiedError(
73                    category=category,
74                    severity=severity,
75                    message=str(error),
76                    original_exception=error,
77                    recoverable=self._is_recoverable(category, severity),
78                    suggested_action=self._suggest_action(category),
79                    context=context or {}
80                )
81
82        # Default classification
83        return ClassifiedError(
84            category=ErrorCategory.UNKNOWN,
85            severity=ErrorSeverity.MEDIUM,
86            message=str(error),
87            original_exception=error,
88            recoverable=True,
89            suggested_action="Retry with backoff",
90            context=context or {}
91        )
92
93    def _is_recoverable(self, category: ErrorCategory, severity: ErrorSeverity) -> bool:
94        """Determine if error is recoverable."""
95        if severity == ErrorSeverity.CRITICAL:
96            return False
97        if category == ErrorCategory.SAFETY:
98            return False
99        return True
100
101    def _suggest_action(self, category: ErrorCategory) -> str:
102        """Suggest recovery action for error category."""
103        suggestions = {
104            ErrorCategory.LLM_API: "Retry with exponential backoff",
105            ErrorCategory.TOOL_EXECUTION: "Try alternative tool or fallback",
106            ErrorCategory.PARSING: "Retry with clearer instructions",
107            ErrorCategory.LOGIC: "Review agent reasoning and constraints",
108            ErrorCategory.CONTEXT: "Summarize or truncate context",
109            ErrorCategory.SAFETY: "Escalate to human review",
110            ErrorCategory.NETWORK: "Retry with backoff, check connectivity",
111            ErrorCategory.VALIDATION: "Check input format and constraints",
112        }
113        return suggestions.get(category, "Investigate and retry")
114
115
116# Usage
117classifier = ErrorClassifier()
118
119try:
120    # Some operation
121    pass
122except Exception as e:
123    classified = classifier.classify(e, context={"operation": "tool_call"})
124    print(f"Category: {classified.category}")
125    print(f"Severity: {classified.severity}")
126    print(f"Suggested: {classified.suggested_action}")

Recovery Strategies

Automatic Error Recovery

🐍python
1"""
2Error Recovery Strategies
3
4Implement automatic recovery for common errors.
5"""
6
7import asyncio
8import time
9from dataclasses import dataclass
10from typing import Any, Callable, TypeVar
11
12T = TypeVar("T")
13
14
15class RetryStrategy:
16    """Configurable retry strategy."""
17
18    def __init__(
19        self,
20        max_retries: int = 3,
21        initial_delay: float = 1.0,
22        max_delay: float = 60.0,
23        exponential_base: float = 2.0,
24        jitter: bool = True
25    ):
26        self.max_retries = max_retries
27        self.initial_delay = initial_delay
28        self.max_delay = max_delay
29        self.exponential_base = exponential_base
30        self.jitter = jitter
31
32    def get_delay(self, attempt: int) -> float:
33        """Calculate delay for given attempt."""
34        delay = self.initial_delay * (self.exponential_base ** attempt)
35        delay = min(delay, self.max_delay)
36
37        if self.jitter:
38            import random
39            delay *= (0.5 + random.random())
40
41        return delay
42
43
44class ErrorRecoveryManager:
45    """Manage error recovery strategies."""
46
47    def __init__(self):
48        self.strategies: dict[ErrorCategory, Callable] = {}
49        self.fallbacks: dict[str, Callable] = {}
50        self._setup_default_strategies()
51
52    def _setup_default_strategies(self):
53        """Set up default recovery strategies."""
54        self.strategies[ErrorCategory.LLM_API] = self._retry_with_backoff
55        self.strategies[ErrorCategory.PARSING] = self._retry_with_clearer_prompt
56        self.strategies[ErrorCategory.CONTEXT] = self._truncate_and_retry
57        self.strategies[ErrorCategory.TOOL_EXECUTION] = self._try_fallback_tool
58
59    async def recover(
60        self,
61        error: ClassifiedError,
62        operation: Callable,
63        *args,
64        **kwargs
65    ) -> Any:
66        """Attempt to recover from an error."""
67        if not error.recoverable:
68            raise error.original_exception
69
70        strategy = self.strategies.get(error.category)
71        if strategy:
72            return await strategy(error, operation, *args, **kwargs)
73
74        # Default: simple retry
75        return await self._simple_retry(error, operation, *args, **kwargs)
76
77    async def _retry_with_backoff(
78        self,
79        error: ClassifiedError,
80        operation: Callable,
81        *args,
82        **kwargs
83    ) -> Any:
84        """Retry with exponential backoff."""
85        retry_strategy = RetryStrategy()
86
87        for attempt in range(retry_strategy.max_retries):
88            try:
89                return await operation(*args, **kwargs)
90            except Exception as e:
91                if attempt == retry_strategy.max_retries - 1:
92                    raise
93
94                delay = retry_strategy.get_delay(attempt)
95                await asyncio.sleep(delay)
96
97    async def _retry_with_clearer_prompt(
98        self,
99        error: ClassifiedError,
100        operation: Callable,
101        *args,
102        **kwargs
103    ) -> Any:
104        """Retry with modified prompt for parsing errors."""
105        # Add clarifying instructions
106        if "messages" in kwargs:
107            kwargs["messages"] = kwargs["messages"] + [{
108                "role": "system",
109                "content": "Please respond with valid, parseable output. "
110                          "Use proper formatting as requested."
111            }]
112
113        return await operation(*args, **kwargs)
114
115    async def _truncate_and_retry(
116        self,
117        error: ClassifiedError,
118        operation: Callable,
119        *args,
120        **kwargs
121    ) -> Any:
122        """Truncate context and retry for overflow errors."""
123        if "messages" in kwargs:
124            messages = kwargs["messages"]
125            # Keep system message and last few messages
126            if len(messages) > 5:
127                kwargs["messages"] = [messages[0]] + messages[-4:]
128
129        return await operation(*args, **kwargs)
130
131    async def _try_fallback_tool(
132        self,
133        error: ClassifiedError,
134        operation: Callable,
135        *args,
136        **kwargs
137    ) -> Any:
138        """Try fallback tool for tool execution errors."""
139        tool_name = error.context.get("tool_name")
140        fallback = self.fallbacks.get(tool_name)
141
142        if fallback:
143            return await fallback(*args, **kwargs)
144
145        raise error.original_exception
146
147    async def _simple_retry(
148        self,
149        error: ClassifiedError,
150        operation: Callable,
151        *args,
152        **kwargs
153    ) -> Any:
154        """Simple retry without modification."""
155        return await operation(*args, **kwargs)
156
157    def register_fallback(self, tool_name: str, fallback: Callable):
158        """Register a fallback for a tool."""
159        self.fallbacks[tool_name] = fallback

Resilience Patterns

Building Resilient Agents

🐍python
1"""
2Resilience Patterns
3
4Patterns for building fault-tolerant agents.
5"""
6
7from dataclasses import dataclass
8from enum import Enum
9import time
10
11
12class CircuitState(Enum):
13    CLOSED = "closed"      # Normal operation
14    OPEN = "open"          # Failing, reject requests
15    HALF_OPEN = "half_open" # Testing recovery
16
17
18class CircuitBreaker:
19    """Circuit breaker pattern for external calls."""
20
21    def __init__(
22        self,
23        failure_threshold: int = 5,
24        recovery_timeout: float = 30.0,
25        success_threshold: int = 2
26    ):
27        self.failure_threshold = failure_threshold
28        self.recovery_timeout = recovery_timeout
29        self.success_threshold = success_threshold
30
31        self.state = CircuitState.CLOSED
32        self.failure_count = 0
33        self.success_count = 0
34        self.last_failure_time: float | None = None
35
36    def can_execute(self) -> bool:
37        """Check if execution is allowed."""
38        if self.state == CircuitState.CLOSED:
39            return True
40
41        if self.state == CircuitState.OPEN:
42            # Check if recovery timeout has passed
43            if self.last_failure_time:
44                elapsed = time.time() - self.last_failure_time
45                if elapsed >= self.recovery_timeout:
46                    self.state = CircuitState.HALF_OPEN
47                    return True
48            return False
49
50        # HALF_OPEN: allow execution
51        return True
52
53    def record_success(self):
54        """Record a successful execution."""
55        if self.state == CircuitState.HALF_OPEN:
56            self.success_count += 1
57            if self.success_count >= self.success_threshold:
58                self.state = CircuitState.CLOSED
59                self.failure_count = 0
60                self.success_count = 0
61        else:
62            self.failure_count = 0
63
64    def record_failure(self):
65        """Record a failed execution."""
66        self.failure_count += 1
67        self.last_failure_time = time.time()
68
69        if self.state == CircuitState.HALF_OPEN:
70            self.state = CircuitState.OPEN
71            self.success_count = 0
72        elif self.failure_count >= self.failure_threshold:
73            self.state = CircuitState.OPEN
74
75
76class Bulkhead:
77    """Bulkhead pattern to isolate failures."""
78
79    def __init__(self, max_concurrent: int = 10):
80        self.max_concurrent = max_concurrent
81        self.current_count = 0
82        self._lock = None  # Would use asyncio.Lock in real implementation
83
84    async def acquire(self) -> bool:
85        """Acquire a slot in the bulkhead."""
86        if self.current_count >= self.max_concurrent:
87            return False
88        self.current_count += 1
89        return True
90
91    async def release(self):
92        """Release a slot in the bulkhead."""
93        if self.current_count > 0:
94            self.current_count -= 1
95
96
97class GracefulDegradation:
98    """Graceful degradation for agent capabilities."""
99
100    def __init__(self):
101        self.capability_levels: dict[str, list[Callable]] = {}
102
103    def register_capability(
104        self,
105        name: str,
106        implementations: list[Callable]
107    ):
108        """Register capability with degradation levels."""
109        self.capability_levels[name] = implementations
110
111    async def execute_with_degradation(
112        self,
113        capability: str,
114        *args,
115        **kwargs
116    ) -> tuple[Any, int]:
117        """Execute capability with automatic degradation."""
118        implementations = self.capability_levels.get(capability, [])
119
120        for level, impl in enumerate(implementations):
121            try:
122                result = await impl(*args, **kwargs)
123                return result, level
124            except Exception as e:
125                if level == len(implementations) - 1:
126                    raise
127                continue
128
129        raise ValueError(f"No implementations for capability: {capability}")
130
131
132class ResilientAgent:
133    """Agent with built-in resilience patterns."""
134
135    def __init__(self):
136        self.circuit_breakers: dict[str, CircuitBreaker] = {}
137        self.bulkheads: dict[str, Bulkhead] = {}
138        self.degradation = GracefulDegradation()
139
140    def get_circuit_breaker(self, service: str) -> CircuitBreaker:
141        """Get or create circuit breaker for service."""
142        if service not in self.circuit_breakers:
143            self.circuit_breakers[service] = CircuitBreaker()
144        return self.circuit_breakers[service]
145
146    def get_bulkhead(self, category: str) -> Bulkhead:
147        """Get or create bulkhead for category."""
148        if category not in self.bulkheads:
149            self.bulkheads[category] = Bulkhead()
150        return self.bulkheads[category]
151
152    async def call_with_resilience(
153        self,
154        service: str,
155        operation: Callable,
156        *args,
157        **kwargs
158    ) -> Any:
159        """Execute with circuit breaker and bulkhead."""
160        cb = self.get_circuit_breaker(service)
161        bulkhead = self.get_bulkhead(service)
162
163        # Check circuit breaker
164        if not cb.can_execute():
165            raise Exception(f"Circuit breaker open for {service}")
166
167        # Check bulkhead
168        if not await bulkhead.acquire():
169            raise Exception(f"Bulkhead full for {service}")
170
171        try:
172            result = await operation(*args, **kwargs)
173            cb.record_success()
174            return result
175        except Exception as e:
176            cb.record_failure()
177            raise
178        finally:
179            await bulkhead.release()

Error Reporting

Comprehensive Error Reports

🐍python
1"""
2Error Reporting System
3
4Generate detailed error reports for debugging.
5"""
6
7from dataclasses import dataclass, field
8from datetime import datetime
9from typing import Any
10import traceback
11
12
13@dataclass
14class ErrorReport:
15    """Comprehensive error report."""
16    error_id: str
17    timestamp: datetime
18    error: ClassifiedError
19    stack_trace: str
20    agent_state: dict
21    recent_actions: list[dict]
22    environment: dict
23    suggested_fixes: list[str]
24
25
26class ErrorReporter:
27    """Generate and manage error reports."""
28
29    def __init__(self):
30        self.reports: list[ErrorReport] = []
31
32    def create_report(
33        self,
34        error: ClassifiedError,
35        exception: Exception,
36        agent_state: dict,
37        recent_actions: list[dict]
38    ) -> ErrorReport:
39        """Create a comprehensive error report."""
40        import uuid
41        import os
42
43        report = ErrorReport(
44            error_id=str(uuid.uuid4()),
45            timestamp=datetime.utcnow(),
46            error=error,
47            stack_trace=traceback.format_exc(),
48            agent_state=self._sanitize_state(agent_state),
49            recent_actions=recent_actions[-5:],
50            environment={
51                "python_version": os.sys.version,
52                "hostname": os.environ.get("HOSTNAME", "unknown"),
53                "env": os.environ.get("ENV", "unknown"),
54            },
55            suggested_fixes=self._generate_suggestions(error)
56        )
57
58        self.reports.append(report)
59        return report
60
61    def _sanitize_state(self, state: dict) -> dict:
62        """Remove sensitive data from state."""
63        sanitized = {}
64        sensitive_keys = ["password", "token", "key", "secret"]
65
66        for key, value in state.items():
67            if any(s in key.lower() for s in sensitive_keys):
68                sanitized[key] = "[REDACTED]"
69            elif isinstance(value, dict):
70                sanitized[key] = self._sanitize_state(value)
71            else:
72                sanitized[key] = str(value)[:500]
73
74        return sanitized
75
76    def _generate_suggestions(self, error: ClassifiedError) -> list[str]:
77        """Generate fix suggestions based on error."""
78        suggestions = [error.suggested_action]
79
80        category_suggestions = {
81            ErrorCategory.LLM_API: [
82                "Check API key and quotas",
83                "Consider using a fallback model",
84                "Implement request queuing"
85            ],
86            ErrorCategory.PARSING: [
87                "Add output format examples to prompt",
88                "Use structured output mode if available",
89                "Implement lenient parsing"
90            ],
91            ErrorCategory.CONTEXT: [
92                "Implement context summarization",
93                "Use sliding window for history",
94                "Consider retrieval instead of full context"
95            ],
96            ErrorCategory.TOOL_EXECUTION: [
97                "Check tool configuration",
98                "Verify external service status",
99                "Add fallback tools"
100            ],
101        }
102
103        suggestions.extend(category_suggestions.get(error.category, []))
104        return suggestions
105
106    def format_report(self, report: ErrorReport) -> str:
107        """Format report for display."""
108        lines = [
109            "=" * 60,
110            f"ERROR REPORT: {report.error_id}",
111            "=" * 60,
112            f"Timestamp: {report.timestamp.isoformat()}",
113            f"Category: {report.error.category.value}",
114            f"Severity: {report.error.severity.value}",
115            f"Message: {report.error.message}",
116            "",
117            "--- Stack Trace ---",
118            report.stack_trace,
119            "",
120            "--- Recent Actions ---",
121        ]
122
123        for action in report.recent_actions:
124            lines.append(f"  - {action.get('type', 'unknown')}: {action.get('result', {}).get('success', 'N/A')}")
125
126        lines.extend([
127            "",
128            "--- Suggested Fixes ---",
129        ])
130        for fix in report.suggested_fixes:
131            lines.append(f"  - {fix}")
132
133        return "\n".join(lines)
134
135    def get_error_trends(self) -> dict:
136        """Analyze error trends."""
137        by_category: dict[str, int] = {}
138        by_severity: dict[str, int] = {}
139
140        for report in self.reports:
141            cat = report.error.category.value
142            by_category[cat] = by_category.get(cat, 0) + 1
143
144            sev = report.error.severity.value
145            by_severity[sev] = by_severity.get(sev, 0) + 1
146
147        return {
148            "total_errors": len(self.reports),
149            "by_category": by_category,
150            "by_severity": by_severity
151        }

Key Takeaways

  • Classify errors by category and severity to enable appropriate handling strategies.
  • Implement automatic recovery for transient errors using retry with backoff and fallbacks.
  • Use resilience patterns like circuit breakers, bulkheads, and graceful degradation.
  • Generate comprehensive reports that include context, history, and suggested fixes.
  • Learn from errors - analyze trends to prevent recurring issues.
Next Section Preview: We'll bring everything together to build a comprehensive observability dashboard.