Chapter 19
15 min read
Section 117 of 175

Logging Best Practices

Observability and Debugging

Introduction

Observability is the ability to understand what's happening inside your agent system by examining its outputs. For AI agents, this is particularly important because their behavior is non-deterministic and can be difficult to predict or reproduce.

Chapter Overview: This chapter covers logging best practices, distributed tracing, debugging techniques, performance profiling, and building observability dashboards for agent systems.

Structured Logging

Why Structured Logs Matter

AspectUnstructuredStructured
FormatFree-form textJSON/key-value pairs
ParsingRegex requiredDirect field access
QueryingLimitedFull database queries
AnalysisManualAutomated dashboards
CorrelationDifficultEasy with trace IDs
🐍python
1"""
2Structured Logging for AI Agents
3
4Key principles:
51. Use JSON or structured format
62. Include consistent fields
73. Add context at each level
84. Enable correlation across requests
9"""
10
11import json
12import logging
13from dataclasses import dataclass, field, asdict
14from datetime import datetime
15from typing import Any
16import uuid
17
18
19@dataclass
20class LogContext:
21    """Context that propagates through the agent."""
22    trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
23    span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
24    agent_id: str = ""
25    session_id: str = ""
26    user_id: str = ""
27    task_id: str = ""
28
29
30@dataclass
31class StructuredLogEntry:
32    """A structured log entry with all required fields."""
33    timestamp: str
34    level: str
35    message: str
36    context: LogContext
37    event_type: str
38    data: dict = field(default_factory=dict)
39    duration_ms: float | None = None
40    error: dict | None = None
41
42    def to_json(self) -> str:
43        return json.dumps({
44            "timestamp": self.timestamp,
45            "level": self.level,
46            "message": self.message,
47            "trace_id": self.context.trace_id,
48            "span_id": self.context.span_id,
49            "agent_id": self.context.agent_id,
50            "session_id": self.context.session_id,
51            "user_id": self.context.user_id,
52            "task_id": self.context.task_id,
53            "event_type": self.event_type,
54            "data": self.data,
55            "duration_ms": self.duration_ms,
56            "error": self.error,
57        })
58
59
60class StructuredLogger:
61    """Logger that outputs structured JSON logs."""
62
63    def __init__(self, service_name: str):
64        self.service_name = service_name
65        self.context = LogContext()
66        self._setup_handler()
67
68    def _setup_handler(self):
69        """Set up JSON logging handler."""
70        self.logger = logging.getLogger(self.service_name)
71        self.logger.setLevel(logging.DEBUG)
72
73        handler = logging.StreamHandler()
74        handler.setFormatter(JSONFormatter())
75        self.logger.addHandler(handler)
76
77    def set_context(self, **kwargs):
78        """Update logging context."""
79        for key, value in kwargs.items():
80            if hasattr(self.context, key):
81                setattr(self.context, key, value)
82
83    def log(
84        self,
85        level: str,
86        message: str,
87        event_type: str,
88        **data
89    ):
90        """Create and output a structured log entry."""
91        entry = StructuredLogEntry(
92            timestamp=datetime.utcnow().isoformat() + "Z",
93            level=level,
94            message=message,
95            context=self.context,
96            event_type=event_type,
97            data=data
98        )
99
100        log_method = getattr(self.logger, level.lower(), self.logger.info)
101        log_method(entry.to_json())
102
103    def info(self, message: str, event_type: str, **data):
104        self.log("INFO", message, event_type, **data)
105
106    def debug(self, message: str, event_type: str, **data):
107        self.log("DEBUG", message, event_type, **data)
108
109    def warning(self, message: str, event_type: str, **data):
110        self.log("WARNING", message, event_type, **data)
111
112    def error(self, message: str, event_type: str, error: Exception = None, **data):
113        error_data = None
114        if error:
115            error_data = {
116                "type": type(error).__name__,
117                "message": str(error),
118            }
119        self.log("ERROR", message, event_type, error=error_data, **data)
120
121
122class JSONFormatter(logging.Formatter):
123    """Formatter that outputs JSON strings."""
124
125    def format(self, record: logging.LogRecord) -> str:
126        return record.getMessage()
127
128
129# Usage
130logger = StructuredLogger("agent_service")
131logger.set_context(
132    agent_id="agent_001",
133    session_id="session_abc123",
134    user_id="user_42"
135)
136
137logger.info(
138    "Agent started processing task",
139    event_type="task_started",
140    task_type="research",
141    estimated_steps=5
142)

Log Levels and Categories

Appropriate Log Levels

LevelUse ForExample
DEBUGDetailed diagnostic infoLLM prompt/response
INFONormal operationsTask started/completed
WARNINGUnexpected but handledRetry after failure
ERRORFailures requiring attentionAPI call failed
CRITICALSystem-level failuresAgent crashed
🐍python
1"""
2Log Categories for Agent Systems
3
4Organize logs by category for easier filtering and analysis.
5"""
6
7from enum import Enum
8
9
10class LogCategory(Enum):
11    # Agent lifecycle
12    LIFECYCLE = "lifecycle"          # Start, stop, restart
13    TASK = "task"                    # Task processing
14    DECISION = "decision"            # Agent decisions
15
16    # LLM interactions
17    LLM_REQUEST = "llm_request"      # Prompts sent to LLM
18    LLM_RESPONSE = "llm_response"    # LLM responses
19    LLM_ERROR = "llm_error"          # LLM failures
20
21    # Tool usage
22    TOOL_CALL = "tool_call"          # Tool invocations
23    TOOL_RESULT = "tool_result"      # Tool results
24    TOOL_ERROR = "tool_error"        # Tool failures
25
26    # Memory operations
27    MEMORY_READ = "memory_read"      # Reading from memory
28    MEMORY_WRITE = "memory_write"    # Writing to memory
29
30    # External interactions
31    API_CALL = "api_call"            # External API calls
32    USER_INPUT = "user_input"        # User messages
33    USER_OUTPUT = "user_output"      # Responses to user
34
35    # Safety and security
36    SAFETY = "safety"                # Safety checks
37    SECURITY = "security"            # Security events
38
39    # Performance
40    PERFORMANCE = "performance"      # Timing, resource usage
41
42
43class AgentEventLogger:
44    """Logger with predefined event types for agents."""
45
46    def __init__(self, base_logger: StructuredLogger):
47        self.logger = base_logger
48
49    # Lifecycle events
50    def log_agent_start(self, config: dict):
51        self.logger.info(
52            "Agent initialized",
53            event_type="agent_start",
54            category=LogCategory.LIFECYCLE.value,
55            config=config
56        )
57
58    def log_agent_stop(self, reason: str):
59        self.logger.info(
60            f"Agent stopped: {reason}",
61            event_type="agent_stop",
62            category=LogCategory.LIFECYCLE.value,
63            reason=reason
64        )
65
66    # Task events
67    def log_task_start(self, task_id: str, task_type: str, description: str):
68        self.logger.info(
69            f"Starting task: {description[:50]}",
70            event_type="task_start",
71            category=LogCategory.TASK.value,
72            task_id=task_id,
73            task_type=task_type
74        )
75
76    def log_task_complete(self, task_id: str, success: bool, duration_ms: float):
77        level = "info" if success else "warning"
78        getattr(self.logger, level)(
79            f"Task completed: {'success' if success else 'failed'}",
80            event_type="task_complete",
81            category=LogCategory.TASK.value,
82            task_id=task_id,
83            success=success,
84            duration_ms=duration_ms
85        )
86
87    # LLM events
88    def log_llm_request(
89        self,
90        model: str,
91        prompt_tokens: int,
92        messages_count: int
93    ):
94        self.logger.debug(
95            f"LLM request to {model}",
96            event_type="llm_request",
97            category=LogCategory.LLM_REQUEST.value,
98            model=model,
99            prompt_tokens=prompt_tokens,
100            messages_count=messages_count
101        )
102
103    def log_llm_response(
104        self,
105        model: str,
106        completion_tokens: int,
107        duration_ms: float
108    ):
109        self.logger.debug(
110            f"LLM response from {model}",
111            event_type="llm_response",
112            category=LogCategory.LLM_RESPONSE.value,
113            model=model,
114            completion_tokens=completion_tokens,
115            duration_ms=duration_ms
116        )
117
118    # Tool events
119    def log_tool_call(self, tool_name: str, parameters: dict):
120        self.logger.info(
121            f"Calling tool: {tool_name}",
122            event_type="tool_call",
123            category=LogCategory.TOOL_CALL.value,
124            tool_name=tool_name,
125            parameters=self._sanitize_params(parameters)
126        )
127
128    def log_tool_result(
129        self,
130        tool_name: str,
131        success: bool,
132        duration_ms: float
133    ):
134        self.logger.info(
135            f"Tool result: {tool_name} - {'success' if success else 'failed'}",
136            event_type="tool_result",
137            category=LogCategory.TOOL_RESULT.value,
138            tool_name=tool_name,
139            success=success,
140            duration_ms=duration_ms
141        )
142
143    # Safety events
144    def log_safety_check(
145        self,
146        check_type: str,
147        passed: bool,
148        details: dict
149    ):
150        level = "info" if passed else "warning"
151        getattr(self.logger, level)(
152            f"Safety check: {check_type} - {'passed' if passed else 'failed'}",
153            event_type="safety_check",
154            category=LogCategory.SAFETY.value,
155            check_type=check_type,
156            passed=passed,
157            details=details
158        )
159
160    def _sanitize_params(self, params: dict) -> dict:
161        """Remove sensitive data from parameters."""
162        sensitive = ["password", "token", "key", "secret"]
163        return {
164            k: "[REDACTED]" if any(s in k.lower() for s in sensitive) else v
165            for k, v in params.items()
166        }

Context Propagation

Maintaining Context Across Operations

🐍python
1"""
2Context Propagation
3
4Ensure trace context flows through:
51. LLM calls
62. Tool invocations
73. Memory operations
84. External API calls
95. Async operations
10"""
11
12import contextvars
13from contextlib import contextmanager
14from dataclasses import dataclass
15from typing import Any, Generator
16import uuid
17
18
19# Context variables for thread-safe propagation
20_trace_context: contextvars.ContextVar[dict] = contextvars.ContextVar(
21    "trace_context",
22    default={}
23)
24
25
26@dataclass
27class TraceContext:
28    """Immutable trace context."""
29    trace_id: str
30    span_id: str
31    parent_span_id: str | None = None
32    baggage: dict = None
33
34    def __post_init__(self):
35        if self.baggage is None:
36            self.baggage = {}
37
38
39class ContextManager:
40    """Manage trace context propagation."""
41
42    @staticmethod
43    def get_current_context() -> TraceContext:
44        """Get current trace context."""
45        ctx = _trace_context.get()
46        if not ctx:
47            return ContextManager.create_new_context()
48        return TraceContext(**ctx)
49
50    @staticmethod
51    def create_new_context() -> TraceContext:
52        """Create a new trace context."""
53        return TraceContext(
54            trace_id=str(uuid.uuid4()),
55            span_id=str(uuid.uuid4())
56        )
57
58    @staticmethod
59    def create_child_context(parent: TraceContext) -> TraceContext:
60        """Create a child span context."""
61        return TraceContext(
62            trace_id=parent.trace_id,
63            span_id=str(uuid.uuid4()),
64            parent_span_id=parent.span_id,
65            baggage=parent.baggage.copy()
66        )
67
68    @staticmethod
69    @contextmanager
70    def span(name: str, **attributes) -> Generator[TraceContext, None, None]:
71        """Create a new span context."""
72        parent = ContextManager.get_current_context()
73        child = ContextManager.create_child_context(parent)
74
75        # Set child as current
76        token = _trace_context.set({
77            "trace_id": child.trace_id,
78            "span_id": child.span_id,
79            "parent_span_id": child.parent_span_id,
80            "baggage": child.baggage
81        })
82
83        try:
84            yield child
85        finally:
86            # Restore parent context
87            _trace_context.reset(token)
88
89    @staticmethod
90    def add_baggage(key: str, value: Any):
91        """Add baggage item to current context."""
92        ctx = _trace_context.get()
93        if ctx:
94            ctx["baggage"] = ctx.get("baggage", {})
95            ctx["baggage"][key] = value
96            _trace_context.set(ctx)
97
98
99class TracedAgent:
100    """Agent with context propagation."""
101
102    def __init__(self, agent_id: str, logger: AgentEventLogger):
103        self.agent_id = agent_id
104        self.logger = logger
105
106    async def run_task(self, task: str) -> dict:
107        """Run task with full context propagation."""
108
109        # Start new trace for this task
110        with ContextManager.span("run_task", task=task[:50]) as ctx:
111            ContextManager.add_baggage("agent_id", self.agent_id)
112
113            self.logger.log_task_start(
114                task_id=ctx.span_id,
115                task_type="general",
116                description=task
117            )
118
119            try:
120                # Each operation gets its own span
121                with ContextManager.span("process_input"):
122                    processed = await self._process_input(task)
123
124                with ContextManager.span("generate_plan"):
125                    plan = await self._generate_plan(processed)
126
127                results = []
128                for i, step in enumerate(plan):
129                    with ContextManager.span(f"execute_step_{i}"):
130                        result = await self._execute_step(step)
131                        results.append(result)
132
133                with ContextManager.span("generate_response"):
134                    response = await self._generate_response(results)
135
136                return {"success": True, "response": response}
137
138            except Exception as e:
139                self.logger.logger.error(
140                    f"Task failed: {e}",
141                    event_type="task_error",
142                    error=e
143                )
144                return {"success": False, "error": str(e)}
145
146    async def _process_input(self, task: str) -> str:
147        """Process input with context."""
148        ctx = ContextManager.get_current_context()
149        self.logger.logger.debug(
150            "Processing input",
151            event_type="input_processing",
152            trace_id=ctx.trace_id,
153            span_id=ctx.span_id
154        )
155        return task
156
157    async def _generate_plan(self, task: str) -> list:
158        """Generate plan with context."""
159        return ["step1", "step2", "step3"]
160
161    async def _execute_step(self, step: str) -> dict:
162        """Execute step with context."""
163        return {"step": step, "result": "completed"}
164
165    async def _generate_response(self, results: list) -> str:
166        """Generate response with context."""
167        return "Task completed successfully"

Log Aggregation

Centralizing Agent Logs

🐍python
1"""
2Log Aggregation
3
4Collect and centralize logs from:
51. Multiple agent instances
62. Different services
73. Various environments
8"""
9
10from abc import ABC, abstractmethod
11from dataclasses import dataclass
12from datetime import datetime
13from typing import Any
14import json
15
16
17class LogDestination(ABC):
18    """Abstract base for log destinations."""
19
20    @abstractmethod
21    def send(self, entry: dict):
22        """Send log entry to destination."""
23        pass
24
25
26class ConsoleDestination(LogDestination):
27    """Output logs to console."""
28
29    def send(self, entry: dict):
30        print(json.dumps(entry, indent=2))
31
32
33class FileDestination(LogDestination):
34    """Write logs to file."""
35
36    def __init__(self, file_path: str):
37        self.file_path = file_path
38
39    def send(self, entry: dict):
40        with open(self.file_path, "a") as f:
41            f.write(json.dumps(entry) + "\n")
42
43
44class ElasticsearchDestination(LogDestination):
45    """Send logs to Elasticsearch."""
46
47    def __init__(self, host: str, index: str):
48        self.host = host
49        self.index = index
50
51    def send(self, entry: dict):
52        # In production, use elasticsearch-py client
53        import requests
54        url = f"{self.host}/{self.index}/_doc"
55        requests.post(url, json=entry)
56
57
58class CloudWatchDestination(LogDestination):
59    """Send logs to AWS CloudWatch."""
60
61    def __init__(self, log_group: str, log_stream: str):
62        self.log_group = log_group
63        self.log_stream = log_stream
64        # Initialize boto3 client in production
65
66    def send(self, entry: dict):
67        # In production, use boto3 CloudWatch Logs client
68        pass
69
70
71class LogAggregator:
72    """Aggregate logs and send to multiple destinations."""
73
74    def __init__(self):
75        self.destinations: list[LogDestination] = []
76        self.buffer: list[dict] = []
77        self.buffer_size = 100
78        self.enrichers: list = []
79
80    def add_destination(self, destination: LogDestination):
81        """Add a log destination."""
82        self.destinations.append(destination)
83
84    def add_enricher(self, enricher):
85        """Add an enricher function."""
86        self.enrichers.append(enricher)
87
88    def log(self, entry: dict):
89        """Process and send a log entry."""
90        # Enrich the entry
91        enriched = entry.copy()
92        for enricher in self.enrichers:
93            enriched = enricher(enriched)
94
95        # Add to buffer
96        self.buffer.append(enriched)
97
98        # Flush if buffer is full
99        if len(self.buffer) >= self.buffer_size:
100            self.flush()
101
102    def flush(self):
103        """Flush buffered logs to all destinations."""
104        for entry in self.buffer:
105            for dest in self.destinations:
106                try:
107                    dest.send(entry)
108                except Exception as e:
109                    print(f"Failed to send log to {dest}: {e}")
110
111        self.buffer.clear()
112
113
114def environment_enricher(entry: dict) -> dict:
115    """Add environment information to log entries."""
116    import os
117    entry["environment"] = os.environ.get("ENV", "development")
118    entry["hostname"] = os.environ.get("HOSTNAME", "unknown")
119    entry["service_version"] = os.environ.get("VERSION", "unknown")
120    return entry
121
122
123def timestamp_enricher(entry: dict) -> dict:
124    """Ensure consistent timestamp format."""
125    if "timestamp" not in entry:
126        entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
127    return entry
128
129
130# Usage
131aggregator = LogAggregator()
132aggregator.add_destination(ConsoleDestination())
133aggregator.add_destination(FileDestination("/var/log/agent.log"))
134aggregator.add_enricher(environment_enricher)
135aggregator.add_enricher(timestamp_enricher)
136
137# Log an entry
138aggregator.log({
139    "level": "INFO",
140    "message": "Agent started",
141    "agent_id": "agent_001"
142})

Key Takeaways

  • Use structured logging with JSON format for easy parsing, querying, and analysis.
  • Include consistent context like trace IDs, agent IDs, and session IDs in every log entry.
  • Use appropriate log levels and categories to organize logs for different use cases.
  • Propagate context through all operations to enable end-to-end tracing.
  • Aggregate logs centrally from all agent instances for unified observability.
Next Section Preview: We'll explore distributed tracing and spans for understanding agent request flows.