Introduction
Observability is the ability to understand what's happening inside your agent system by examining its outputs. For AI agents, this is particularly important because their behavior is non-deterministic and can be difficult to predict or reproduce.
Chapter Overview: This chapter covers logging best practices, distributed tracing, debugging techniques, performance profiling, and building observability dashboards for agent systems.
Structured Logging
Why Structured Logs Matter
| Aspect | Unstructured | Structured |
|---|---|---|
| Format | Free-form text | JSON/key-value pairs |
| Parsing | Regex required | Direct field access |
| Querying | Limited | Full database queries |
| Analysis | Manual | Automated dashboards |
| Correlation | Difficult | Easy with trace IDs |
🐍python
1"""
2Structured Logging for AI Agents
3
4Key principles:
51. Use JSON or structured format
62. Include consistent fields
73. Add context at each level
84. Enable correlation across requests
9"""
10
11import json
12import logging
13from dataclasses import dataclass, field, asdict
14from datetime import datetime
15from typing import Any
16import uuid
17
18
19@dataclass
20class LogContext:
21 """Context that propagates through the agent."""
22 trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
23 span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
24 agent_id: str = ""
25 session_id: str = ""
26 user_id: str = ""
27 task_id: str = ""
28
29
30@dataclass
31class StructuredLogEntry:
32 """A structured log entry with all required fields."""
33 timestamp: str
34 level: str
35 message: str
36 context: LogContext
37 event_type: str
38 data: dict = field(default_factory=dict)
39 duration_ms: float | None = None
40 error: dict | None = None
41
42 def to_json(self) -> str:
43 return json.dumps({
44 "timestamp": self.timestamp,
45 "level": self.level,
46 "message": self.message,
47 "trace_id": self.context.trace_id,
48 "span_id": self.context.span_id,
49 "agent_id": self.context.agent_id,
50 "session_id": self.context.session_id,
51 "user_id": self.context.user_id,
52 "task_id": self.context.task_id,
53 "event_type": self.event_type,
54 "data": self.data,
55 "duration_ms": self.duration_ms,
56 "error": self.error,
57 })
58
59
60class StructuredLogger:
61 """Logger that outputs structured JSON logs."""
62
63 def __init__(self, service_name: str):
64 self.service_name = service_name
65 self.context = LogContext()
66 self._setup_handler()
67
68 def _setup_handler(self):
69 """Set up JSON logging handler."""
70 self.logger = logging.getLogger(self.service_name)
71 self.logger.setLevel(logging.DEBUG)
72
73 handler = logging.StreamHandler()
74 handler.setFormatter(JSONFormatter())
75 self.logger.addHandler(handler)
76
77 def set_context(self, **kwargs):
78 """Update logging context."""
79 for key, value in kwargs.items():
80 if hasattr(self.context, key):
81 setattr(self.context, key, value)
82
83 def log(
84 self,
85 level: str,
86 message: str,
87 event_type: str,
88 **data
89 ):
90 """Create and output a structured log entry."""
91 entry = StructuredLogEntry(
92 timestamp=datetime.utcnow().isoformat() + "Z",
93 level=level,
94 message=message,
95 context=self.context,
96 event_type=event_type,
97 data=data
98 )
99
100 log_method = getattr(self.logger, level.lower(), self.logger.info)
101 log_method(entry.to_json())
102
103 def info(self, message: str, event_type: str, **data):
104 self.log("INFO", message, event_type, **data)
105
106 def debug(self, message: str, event_type: str, **data):
107 self.log("DEBUG", message, event_type, **data)
108
109 def warning(self, message: str, event_type: str, **data):
110 self.log("WARNING", message, event_type, **data)
111
112 def error(self, message: str, event_type: str, error: Exception = None, **data):
113 error_data = None
114 if error:
115 error_data = {
116 "type": type(error).__name__,
117 "message": str(error),
118 }
119 self.log("ERROR", message, event_type, error=error_data, **data)
120
121
122class JSONFormatter(logging.Formatter):
123 """Formatter that outputs JSON strings."""
124
125 def format(self, record: logging.LogRecord) -> str:
126 return record.getMessage()
127
128
129# Usage
130logger = StructuredLogger("agent_service")
131logger.set_context(
132 agent_id="agent_001",
133 session_id="session_abc123",
134 user_id="user_42"
135)
136
137logger.info(
138 "Agent started processing task",
139 event_type="task_started",
140 task_type="research",
141 estimated_steps=5
142)Log Levels and Categories
Appropriate Log Levels
| Level | Use For | Example |
|---|---|---|
| DEBUG | Detailed diagnostic info | LLM prompt/response |
| INFO | Normal operations | Task started/completed |
| WARNING | Unexpected but handled | Retry after failure |
| ERROR | Failures requiring attention | API call failed |
| CRITICAL | System-level failures | Agent crashed |
🐍python
1"""
2Log Categories for Agent Systems
3
4Organize logs by category for easier filtering and analysis.
5"""
6
7from enum import Enum
8
9
10class LogCategory(Enum):
11 # Agent lifecycle
12 LIFECYCLE = "lifecycle" # Start, stop, restart
13 TASK = "task" # Task processing
14 DECISION = "decision" # Agent decisions
15
16 # LLM interactions
17 LLM_REQUEST = "llm_request" # Prompts sent to LLM
18 LLM_RESPONSE = "llm_response" # LLM responses
19 LLM_ERROR = "llm_error" # LLM failures
20
21 # Tool usage
22 TOOL_CALL = "tool_call" # Tool invocations
23 TOOL_RESULT = "tool_result" # Tool results
24 TOOL_ERROR = "tool_error" # Tool failures
25
26 # Memory operations
27 MEMORY_READ = "memory_read" # Reading from memory
28 MEMORY_WRITE = "memory_write" # Writing to memory
29
30 # External interactions
31 API_CALL = "api_call" # External API calls
32 USER_INPUT = "user_input" # User messages
33 USER_OUTPUT = "user_output" # Responses to user
34
35 # Safety and security
36 SAFETY = "safety" # Safety checks
37 SECURITY = "security" # Security events
38
39 # Performance
40 PERFORMANCE = "performance" # Timing, resource usage
41
42
43class AgentEventLogger:
44 """Logger with predefined event types for agents."""
45
46 def __init__(self, base_logger: StructuredLogger):
47 self.logger = base_logger
48
49 # Lifecycle events
50 def log_agent_start(self, config: dict):
51 self.logger.info(
52 "Agent initialized",
53 event_type="agent_start",
54 category=LogCategory.LIFECYCLE.value,
55 config=config
56 )
57
58 def log_agent_stop(self, reason: str):
59 self.logger.info(
60 f"Agent stopped: {reason}",
61 event_type="agent_stop",
62 category=LogCategory.LIFECYCLE.value,
63 reason=reason
64 )
65
66 # Task events
67 def log_task_start(self, task_id: str, task_type: str, description: str):
68 self.logger.info(
69 f"Starting task: {description[:50]}",
70 event_type="task_start",
71 category=LogCategory.TASK.value,
72 task_id=task_id,
73 task_type=task_type
74 )
75
76 def log_task_complete(self, task_id: str, success: bool, duration_ms: float):
77 level = "info" if success else "warning"
78 getattr(self.logger, level)(
79 f"Task completed: {'success' if success else 'failed'}",
80 event_type="task_complete",
81 category=LogCategory.TASK.value,
82 task_id=task_id,
83 success=success,
84 duration_ms=duration_ms
85 )
86
87 # LLM events
88 def log_llm_request(
89 self,
90 model: str,
91 prompt_tokens: int,
92 messages_count: int
93 ):
94 self.logger.debug(
95 f"LLM request to {model}",
96 event_type="llm_request",
97 category=LogCategory.LLM_REQUEST.value,
98 model=model,
99 prompt_tokens=prompt_tokens,
100 messages_count=messages_count
101 )
102
103 def log_llm_response(
104 self,
105 model: str,
106 completion_tokens: int,
107 duration_ms: float
108 ):
109 self.logger.debug(
110 f"LLM response from {model}",
111 event_type="llm_response",
112 category=LogCategory.LLM_RESPONSE.value,
113 model=model,
114 completion_tokens=completion_tokens,
115 duration_ms=duration_ms
116 )
117
118 # Tool events
119 def log_tool_call(self, tool_name: str, parameters: dict):
120 self.logger.info(
121 f"Calling tool: {tool_name}",
122 event_type="tool_call",
123 category=LogCategory.TOOL_CALL.value,
124 tool_name=tool_name,
125 parameters=self._sanitize_params(parameters)
126 )
127
128 def log_tool_result(
129 self,
130 tool_name: str,
131 success: bool,
132 duration_ms: float
133 ):
134 self.logger.info(
135 f"Tool result: {tool_name} - {'success' if success else 'failed'}",
136 event_type="tool_result",
137 category=LogCategory.TOOL_RESULT.value,
138 tool_name=tool_name,
139 success=success,
140 duration_ms=duration_ms
141 )
142
143 # Safety events
144 def log_safety_check(
145 self,
146 check_type: str,
147 passed: bool,
148 details: dict
149 ):
150 level = "info" if passed else "warning"
151 getattr(self.logger, level)(
152 f"Safety check: {check_type} - {'passed' if passed else 'failed'}",
153 event_type="safety_check",
154 category=LogCategory.SAFETY.value,
155 check_type=check_type,
156 passed=passed,
157 details=details
158 )
159
160 def _sanitize_params(self, params: dict) -> dict:
161 """Remove sensitive data from parameters."""
162 sensitive = ["password", "token", "key", "secret"]
163 return {
164 k: "[REDACTED]" if any(s in k.lower() for s in sensitive) else v
165 for k, v in params.items()
166 }Context Propagation
Maintaining Context Across Operations
🐍python
1"""
2Context Propagation
3
4Ensure trace context flows through:
51. LLM calls
62. Tool invocations
73. Memory operations
84. External API calls
95. Async operations
10"""
11
12import contextvars
13from contextlib import contextmanager
14from dataclasses import dataclass
15from typing import Any, Generator
16import uuid
17
18
19# Context variables for thread-safe propagation
20_trace_context: contextvars.ContextVar[dict] = contextvars.ContextVar(
21 "trace_context",
22 default={}
23)
24
25
26@dataclass
27class TraceContext:
28 """Immutable trace context."""
29 trace_id: str
30 span_id: str
31 parent_span_id: str | None = None
32 baggage: dict = None
33
34 def __post_init__(self):
35 if self.baggage is None:
36 self.baggage = {}
37
38
39class ContextManager:
40 """Manage trace context propagation."""
41
42 @staticmethod
43 def get_current_context() -> TraceContext:
44 """Get current trace context."""
45 ctx = _trace_context.get()
46 if not ctx:
47 return ContextManager.create_new_context()
48 return TraceContext(**ctx)
49
50 @staticmethod
51 def create_new_context() -> TraceContext:
52 """Create a new trace context."""
53 return TraceContext(
54 trace_id=str(uuid.uuid4()),
55 span_id=str(uuid.uuid4())
56 )
57
58 @staticmethod
59 def create_child_context(parent: TraceContext) -> TraceContext:
60 """Create a child span context."""
61 return TraceContext(
62 trace_id=parent.trace_id,
63 span_id=str(uuid.uuid4()),
64 parent_span_id=parent.span_id,
65 baggage=parent.baggage.copy()
66 )
67
68 @staticmethod
69 @contextmanager
70 def span(name: str, **attributes) -> Generator[TraceContext, None, None]:
71 """Create a new span context."""
72 parent = ContextManager.get_current_context()
73 child = ContextManager.create_child_context(parent)
74
75 # Set child as current
76 token = _trace_context.set({
77 "trace_id": child.trace_id,
78 "span_id": child.span_id,
79 "parent_span_id": child.parent_span_id,
80 "baggage": child.baggage
81 })
82
83 try:
84 yield child
85 finally:
86 # Restore parent context
87 _trace_context.reset(token)
88
89 @staticmethod
90 def add_baggage(key: str, value: Any):
91 """Add baggage item to current context."""
92 ctx = _trace_context.get()
93 if ctx:
94 ctx["baggage"] = ctx.get("baggage", {})
95 ctx["baggage"][key] = value
96 _trace_context.set(ctx)
97
98
99class TracedAgent:
100 """Agent with context propagation."""
101
102 def __init__(self, agent_id: str, logger: AgentEventLogger):
103 self.agent_id = agent_id
104 self.logger = logger
105
106 async def run_task(self, task: str) -> dict:
107 """Run task with full context propagation."""
108
109 # Start new trace for this task
110 with ContextManager.span("run_task", task=task[:50]) as ctx:
111 ContextManager.add_baggage("agent_id", self.agent_id)
112
113 self.logger.log_task_start(
114 task_id=ctx.span_id,
115 task_type="general",
116 description=task
117 )
118
119 try:
120 # Each operation gets its own span
121 with ContextManager.span("process_input"):
122 processed = await self._process_input(task)
123
124 with ContextManager.span("generate_plan"):
125 plan = await self._generate_plan(processed)
126
127 results = []
128 for i, step in enumerate(plan):
129 with ContextManager.span(f"execute_step_{i}"):
130 result = await self._execute_step(step)
131 results.append(result)
132
133 with ContextManager.span("generate_response"):
134 response = await self._generate_response(results)
135
136 return {"success": True, "response": response}
137
138 except Exception as e:
139 self.logger.logger.error(
140 f"Task failed: {e}",
141 event_type="task_error",
142 error=e
143 )
144 return {"success": False, "error": str(e)}
145
146 async def _process_input(self, task: str) -> str:
147 """Process input with context."""
148 ctx = ContextManager.get_current_context()
149 self.logger.logger.debug(
150 "Processing input",
151 event_type="input_processing",
152 trace_id=ctx.trace_id,
153 span_id=ctx.span_id
154 )
155 return task
156
157 async def _generate_plan(self, task: str) -> list:
158 """Generate plan with context."""
159 return ["step1", "step2", "step3"]
160
161 async def _execute_step(self, step: str) -> dict:
162 """Execute step with context."""
163 return {"step": step, "result": "completed"}
164
165 async def _generate_response(self, results: list) -> str:
166 """Generate response with context."""
167 return "Task completed successfully"Log Aggregation
Centralizing Agent Logs
🐍python
1"""
2Log Aggregation
3
4Collect and centralize logs from:
51. Multiple agent instances
62. Different services
73. Various environments
8"""
9
10from abc import ABC, abstractmethod
11from dataclasses import dataclass
12from datetime import datetime
13from typing import Any
14import json
15
16
17class LogDestination(ABC):
18 """Abstract base for log destinations."""
19
20 @abstractmethod
21 def send(self, entry: dict):
22 """Send log entry to destination."""
23 pass
24
25
26class ConsoleDestination(LogDestination):
27 """Output logs to console."""
28
29 def send(self, entry: dict):
30 print(json.dumps(entry, indent=2))
31
32
33class FileDestination(LogDestination):
34 """Write logs to file."""
35
36 def __init__(self, file_path: str):
37 self.file_path = file_path
38
39 def send(self, entry: dict):
40 with open(self.file_path, "a") as f:
41 f.write(json.dumps(entry) + "\n")
42
43
44class ElasticsearchDestination(LogDestination):
45 """Send logs to Elasticsearch."""
46
47 def __init__(self, host: str, index: str):
48 self.host = host
49 self.index = index
50
51 def send(self, entry: dict):
52 # In production, use elasticsearch-py client
53 import requests
54 url = f"{self.host}/{self.index}/_doc"
55 requests.post(url, json=entry)
56
57
58class CloudWatchDestination(LogDestination):
59 """Send logs to AWS CloudWatch."""
60
61 def __init__(self, log_group: str, log_stream: str):
62 self.log_group = log_group
63 self.log_stream = log_stream
64 # Initialize boto3 client in production
65
66 def send(self, entry: dict):
67 # In production, use boto3 CloudWatch Logs client
68 pass
69
70
71class LogAggregator:
72 """Aggregate logs and send to multiple destinations."""
73
74 def __init__(self):
75 self.destinations: list[LogDestination] = []
76 self.buffer: list[dict] = []
77 self.buffer_size = 100
78 self.enrichers: list = []
79
80 def add_destination(self, destination: LogDestination):
81 """Add a log destination."""
82 self.destinations.append(destination)
83
84 def add_enricher(self, enricher):
85 """Add an enricher function."""
86 self.enrichers.append(enricher)
87
88 def log(self, entry: dict):
89 """Process and send a log entry."""
90 # Enrich the entry
91 enriched = entry.copy()
92 for enricher in self.enrichers:
93 enriched = enricher(enriched)
94
95 # Add to buffer
96 self.buffer.append(enriched)
97
98 # Flush if buffer is full
99 if len(self.buffer) >= self.buffer_size:
100 self.flush()
101
102 def flush(self):
103 """Flush buffered logs to all destinations."""
104 for entry in self.buffer:
105 for dest in self.destinations:
106 try:
107 dest.send(entry)
108 except Exception as e:
109 print(f"Failed to send log to {dest}: {e}")
110
111 self.buffer.clear()
112
113
114def environment_enricher(entry: dict) -> dict:
115 """Add environment information to log entries."""
116 import os
117 entry["environment"] = os.environ.get("ENV", "development")
118 entry["hostname"] = os.environ.get("HOSTNAME", "unknown")
119 entry["service_version"] = os.environ.get("VERSION", "unknown")
120 return entry
121
122
123def timestamp_enricher(entry: dict) -> dict:
124 """Ensure consistent timestamp format."""
125 if "timestamp" not in entry:
126 entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
127 return entry
128
129
130# Usage
131aggregator = LogAggregator()
132aggregator.add_destination(ConsoleDestination())
133aggregator.add_destination(FileDestination("/var/log/agent.log"))
134aggregator.add_enricher(environment_enricher)
135aggregator.add_enricher(timestamp_enricher)
136
137# Log an entry
138aggregator.log({
139 "level": "INFO",
140 "message": "Agent started",
141 "agent_id": "agent_001"
142})Key Takeaways
- Use structured logging with JSON format for easy parsing, querying, and analysis.
- Include consistent context like trace IDs, agent IDs, and session IDs in every log entry.
- Use appropriate log levels and categories to organize logs for different use cases.
- Propagate context through all operations to enable end-to-end tracing.
- Aggregate logs centrally from all agent instances for unified observability.
Next Section Preview: We'll explore distributed tracing and spans for understanding agent request flows.