Introduction
Effective monitoring provides visibility into agent behavior, enabling early detection of problems and continuous improvement. This section covers comprehensive logging, anomaly detection, alerting systems, and audit trails for production agent systems.
Section Overview: We'll explore logging strategies, anomaly detection algorithms, alerting systems, and audit trail implementation for agent observability.
Logging Strategies
What to Log
| Category | Data Points | Retention |
|---|---|---|
| Actions | Type, parameters, result | 30 days |
| Decisions | Context, options, choice | 90 days |
| Errors | Type, stack trace, context | 1 year |
| Security | Auth, access, violations | 2 years |
| Performance | Latency, tokens, costs | 30 days |
| User interactions | Inputs, outputs, feedback | Varies |
🐍python
1"""
2Comprehensive Agent Logging
3
4Log everything needed to:
51. Debug issues
62. Understand behavior
73. Audit actions
84. Improve performance
9"""
10
11import json
12from dataclasses import dataclass, field, asdict
13from datetime import datetime
14from enum import Enum
15from typing import Any
16import uuid
17
18
19class LogLevel(Enum):
20 DEBUG = "debug"
21 INFO = "info"
22 WARNING = "warning"
23 ERROR = "error"
24 CRITICAL = "critical"
25
26
27class LogCategory(Enum):
28 ACTION = "action"
29 DECISION = "decision"
30 ERROR = "error"
31 SECURITY = "security"
32 PERFORMANCE = "performance"
33 USER = "user"
34
35
36@dataclass
37class AgentLogEntry:
38 """Structured log entry for agent activity."""
39 timestamp: datetime
40 agent_id: str
41 session_id: str
42 category: LogCategory
43 level: LogLevel
44 event_type: str
45 message: str
46 data: dict = field(default_factory=dict)
47 trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
48
49 def to_dict(self) -> dict:
50 return {
51 "timestamp": self.timestamp.isoformat(),
52 "agent_id": self.agent_id,
53 "session_id": self.session_id,
54 "category": self.category.value,
55 "level": self.level.value,
56 "event_type": self.event_type,
57 "message": self.message,
58 "data": self.data,
59 "trace_id": self.trace_id,
60 }
61
62
63class AgentLogger:
64 """Comprehensive logging for agent systems."""
65
66 def __init__(self, agent_id: str, session_id: str | None = None):
67 self.agent_id = agent_id
68 self.session_id = session_id or str(uuid.uuid4())
69 self.log_handlers: list = []
70
71 def log(
72 self,
73 category: LogCategory,
74 level: LogLevel,
75 event_type: str,
76 message: str,
77 **data
78 ):
79 """Create and dispatch a log entry."""
80 entry = AgentLogEntry(
81 timestamp=datetime.now(),
82 agent_id=self.agent_id,
83 session_id=self.session_id,
84 category=category,
85 level=level,
86 event_type=event_type,
87 message=message,
88 data=data
89 )
90
91 for handler in self.log_handlers:
92 handler(entry)
93
94 # Convenience methods
95 def log_action(
96 self,
97 action_type: str,
98 parameters: dict,
99 result: Any,
100 success: bool
101 ):
102 """Log an agent action."""
103 self.log(
104 category=LogCategory.ACTION,
105 level=LogLevel.INFO if success else LogLevel.WARNING,
106 event_type="action_executed",
107 message=f"Action {action_type}: {'success' if success else 'failed'}",
108 action_type=action_type,
109 parameters=self._sanitize_params(parameters),
110 result=str(result)[:1000], # Truncate
111 success=success
112 )
113
114 def log_decision(
115 self,
116 decision_type: str,
117 options: list[str],
118 chosen: str,
119 confidence: float,
120 reasoning: str
121 ):
122 """Log a decision made by the agent."""
123 self.log(
124 category=LogCategory.DECISION,
125 level=LogLevel.INFO,
126 event_type="decision_made",
127 message=f"Decision: {decision_type} -> {chosen}",
128 decision_type=decision_type,
129 options=options,
130 chosen=chosen,
131 confidence=confidence,
132 reasoning=reasoning[:500]
133 )
134
135 def log_error(
136 self,
137 error_type: str,
138 error_message: str,
139 stack_trace: str | None = None,
140 context: dict | None = None
141 ):
142 """Log an error."""
143 self.log(
144 category=LogCategory.ERROR,
145 level=LogLevel.ERROR,
146 event_type="error_occurred",
147 message=f"Error: {error_type}",
148 error_type=error_type,
149 error_message=error_message,
150 stack_trace=stack_trace,
151 context=context or {}
152 )
153
154 def log_security_event(
155 self,
156 event_type: str,
157 severity: str,
158 details: dict
159 ):
160 """Log a security-related event."""
161 level = LogLevel.CRITICAL if severity == "high" else LogLevel.WARNING
162 self.log(
163 category=LogCategory.SECURITY,
164 level=level,
165 event_type=event_type,
166 message=f"Security event: {event_type}",
167 severity=severity,
168 **details
169 )
170
171 def _sanitize_params(self, params: dict) -> dict:
172 """Remove sensitive data from parameters."""
173 sensitive_keys = ["password", "token", "key", "secret", "credential"]
174 sanitized = {}
175 for key, value in params.items():
176 if any(s in key.lower() for s in sensitive_keys):
177 sanitized[key] = "[REDACTED]"
178 else:
179 sanitized[key] = value
180 return sanitized
181
182
183# Usage
184logger = AgentLogger("agent_001")
185
186# Add handlers
187logger.log_handlers.append(lambda e: print(json.dumps(e.to_dict(), indent=2)))
188
189# Log actions
190logger.log_action(
191 action_type="search",
192 parameters={"query": "AI agents"},
193 result={"count": 10},
194 success=True
195)Anomaly Detection
Detecting Abnormal Behavior
🐍python
1"""
2Anomaly Detection for Agent Behavior
3
4Detect unusual patterns that may indicate:
51. Compromised agent
62. Goal drift
73. Infinite loops
84. Resource abuse
95. Policy violations
10"""
11
12from dataclasses import dataclass
13from datetime import datetime, timedelta
14from typing import Any
15import statistics
16
17
18@dataclass
19class AnomalyDetection:
20 """Result of anomaly detection."""
21 is_anomaly: bool
22 anomaly_type: str
23 severity: float # 0-1
24 description: str
25 recommended_action: str
26
27
28class AnomalyDetector:
29 """Detect anomalies in agent behavior."""
30
31 def __init__(self):
32 self.action_history: list[dict] = []
33 self.metrics_history: list[dict] = []
34
35 # Baseline statistics (would be learned in production)
36 self.baseline_metrics = {
37 "actions_per_minute": {"mean": 5, "std": 2},
38 "api_calls_per_task": {"mean": 10, "std": 5},
39 "tokens_per_action": {"mean": 1000, "std": 500},
40 "error_rate": {"mean": 0.05, "std": 0.02},
41 }
42
43 def check_action(self, action: dict) -> AnomalyDetection | None:
44 """Check single action for anomalies."""
45 self.action_history.append(action)
46
47 # Check for loops
48 loop_detection = self._detect_loop()
49 if loop_detection:
50 return loop_detection
51
52 # Check for unusual action types
53 type_anomaly = self._detect_unusual_action_type(action)
54 if type_anomaly:
55 return type_anomaly
56
57 return None
58
59 def check_metrics(self, metrics: dict) -> AnomalyDetection | None:
60 """Check metrics for statistical anomalies."""
61 self.metrics_history.append({
62 "timestamp": datetime.now(),
63 **metrics
64 })
65
66 for metric_name, value in metrics.items():
67 if metric_name in self.baseline_metrics:
68 anomaly = self._check_statistical_anomaly(
69 metric_name, value
70 )
71 if anomaly:
72 return anomaly
73
74 return None
75
76 def _detect_loop(self) -> AnomalyDetection | None:
77 """Detect if agent is stuck in a loop."""
78 if len(self.action_history) < 5:
79 return None
80
81 # Check last 5 actions
82 recent = self.action_history[-5:]
83 action_types = [a.get("type") for a in recent]
84
85 # All same type
86 if len(set(action_types)) == 1:
87 return AnomalyDetection(
88 is_anomaly=True,
89 anomaly_type="infinite_loop",
90 severity=0.8,
91 description=f"Agent repeated '{action_types[0]}' 5 times",
92 recommended_action="pause_and_review"
93 )
94
95 # Same action with same parameters
96 action_keys = [
97 f"{a.get('type')}:{a.get('input', '')[:50]}"
98 for a in recent
99 ]
100 if len(set(action_keys)) <= 2:
101 return AnomalyDetection(
102 is_anomaly=True,
103 anomaly_type="stuck_pattern",
104 severity=0.6,
105 description="Agent stuck in repetitive pattern",
106 recommended_action="inject_guidance"
107 )
108
109 return None
110
111 def _detect_unusual_action_type(
112 self,
113 action: dict
114 ) -> AnomalyDetection | None:
115 """Detect unusual action types."""
116 suspicious_actions = [
117 "delete", "remove", "drop", "truncate",
118 "send_email", "transfer", "payment"
119 ]
120
121 action_type = action.get("type", "").lower()
122 for suspicious in suspicious_actions:
123 if suspicious in action_type:
124 return AnomalyDetection(
125 is_anomaly=True,
126 anomaly_type="suspicious_action",
127 severity=0.7,
128 description=f"Suspicious action attempted: {action_type}",
129 recommended_action="require_approval"
130 )
131
132 return None
133
134 def _check_statistical_anomaly(
135 self,
136 metric_name: str,
137 value: float
138 ) -> AnomalyDetection | None:
139 """Check if metric value is statistically anomalous."""
140 baseline = self.baseline_metrics[metric_name]
141 mean = baseline["mean"]
142 std = baseline["std"]
143
144 # Z-score calculation
145 z_score = abs(value - mean) / std if std > 0 else 0
146
147 # Anomaly if more than 3 standard deviations
148 if z_score > 3:
149 return AnomalyDetection(
150 is_anomaly=True,
151 anomaly_type="statistical_anomaly",
152 severity=min(z_score / 5, 1.0),
153 description=f"{metric_name} is {z_score:.1f} std devs from normal",
154 recommended_action="investigate"
155 )
156
157 return None
158
159
160class BehaviorProfiler:
161 """Build and compare behavior profiles."""
162
163 def __init__(self):
164 self.normal_profile: dict = {}
165 self.current_session_profile: dict = {}
166
167 def update_normal_profile(self, agent_id: str, history: list[dict]):
168 """Update what 'normal' looks like for this agent."""
169 profile = {
170 "common_actions": self._get_action_distribution(history),
171 "avg_session_length": self._avg_session_length(history),
172 "avg_actions_per_task": self._avg_actions_per_task(history),
173 "typical_error_rate": self._error_rate(history),
174 }
175 self.normal_profile[agent_id] = profile
176
177 def compare_to_normal(
178 self,
179 agent_id: str,
180 current_behavior: dict
181 ) -> float:
182 """Compare current behavior to normal profile. Returns deviation score."""
183 if agent_id not in self.normal_profile:
184 return 0.0 # No baseline
185
186 normal = self.normal_profile[agent_id]
187 deviation = 0.0
188
189 # Compare action distribution
190 normal_actions = normal.get("common_actions", {})
191 current_actions = current_behavior.get("actions", {})
192
193 for action, normal_pct in normal_actions.items():
194 current_pct = current_actions.get(action, 0)
195 deviation += abs(normal_pct - current_pct)
196
197 return min(deviation, 1.0)
198
199 def _get_action_distribution(self, history: list[dict]) -> dict:
200 pass
201
202 def _avg_session_length(self, history: list[dict]) -> float:
203 pass
204
205 def _avg_actions_per_task(self, history: list[dict]) -> float:
206 pass
207
208 def _error_rate(self, history: list[dict]) -> float:
209 passAlerting Systems
Alert Configuration
🐍python
1"""
2Alerting System for Agent Safety
3
4Configure alerts for:
51. Security incidents
62. Performance degradation
73. Error thresholds
84. Anomalies
95. Resource limits
10"""
11
12from dataclasses import dataclass
13from datetime import datetime
14from enum import Enum
15from typing import Callable
16
17
18class AlertSeverity(Enum):
19 LOW = "low"
20 MEDIUM = "medium"
21 HIGH = "high"
22 CRITICAL = "critical"
23
24
25class AlertChannel(Enum):
26 EMAIL = "email"
27 SLACK = "slack"
28 PAGERDUTY = "pagerduty"
29 WEBHOOK = "webhook"
30 LOG = "log"
31
32
33@dataclass
34class AlertRule:
35 """Definition of an alert rule."""
36 name: str
37 condition: Callable[[dict], bool]
38 severity: AlertSeverity
39 channels: list[AlertChannel]
40 cooldown_minutes: int = 5
41 message_template: str = ""
42
43
44@dataclass
45class Alert:
46 """An alert instance."""
47 id: str
48 rule_name: str
49 severity: AlertSeverity
50 message: str
51 timestamp: datetime
52 data: dict
53 acknowledged: bool = False
54
55
56class AlertManager:
57 """Manage alerts for agent systems."""
58
59 def __init__(self):
60 self.rules: dict[str, AlertRule] = {}
61 self.active_alerts: list[Alert] = []
62 self.alert_history: list[Alert] = []
63 self.last_alert_time: dict[str, datetime] = {}
64
65 # Channel handlers
66 self.channel_handlers: dict[AlertChannel, Callable] = {
67 AlertChannel.LOG: self._log_alert,
68 AlertChannel.SLACK: self._slack_alert,
69 AlertChannel.EMAIL: self._email_alert,
70 AlertChannel.PAGERDUTY: self._pagerduty_alert,
71 }
72
73 self._setup_default_rules()
74
75 def _setup_default_rules(self):
76 """Set up default alerting rules."""
77
78 # High error rate
79 self.add_rule(AlertRule(
80 name="high_error_rate",
81 condition=lambda m: m.get("error_rate", 0) > 0.1,
82 severity=AlertSeverity.HIGH,
83 channels=[AlertChannel.SLACK, AlertChannel.LOG],
84 cooldown_minutes=10,
85 message_template="Error rate is {error_rate:.1%}, exceeds 10% threshold"
86 ))
87
88 # Security violation
89 self.add_rule(AlertRule(
90 name="security_violation",
91 condition=lambda m: m.get("security_violation", False),
92 severity=AlertSeverity.CRITICAL,
93 channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK, AlertChannel.LOG],
94 cooldown_minutes=0, # Always alert
95 message_template="Security violation detected: {violation_type}"
96 ))
97
98 # Resource limit approaching
99 self.add_rule(AlertRule(
100 name="resource_limit_warning",
101 condition=lambda m: m.get("resource_usage", 0) > 0.8,
102 severity=AlertSeverity.MEDIUM,
103 channels=[AlertChannel.SLACK, AlertChannel.LOG],
104 cooldown_minutes=30,
105 message_template="Resource usage at {resource_usage:.1%}"
106 ))
107
108 # Agent stuck
109 self.add_rule(AlertRule(
110 name="agent_stuck",
111 condition=lambda m: m.get("loop_detected", False),
112 severity=AlertSeverity.HIGH,
113 channels=[AlertChannel.SLACK, AlertChannel.LOG],
114 cooldown_minutes=5,
115 message_template="Agent {agent_id} appears stuck in loop"
116 ))
117
118 def add_rule(self, rule: AlertRule):
119 """Add an alert rule."""
120 self.rules[rule.name] = rule
121
122 def check_metrics(self, metrics: dict):
123 """Check metrics against all rules."""
124 for rule_name, rule in self.rules.items():
125 try:
126 if rule.condition(metrics):
127 self._maybe_fire_alert(rule, metrics)
128 except Exception as e:
129 print(f"Error checking rule {rule_name}: {e}")
130
131 def _maybe_fire_alert(self, rule: AlertRule, metrics: dict):
132 """Fire alert if not in cooldown."""
133 now = datetime.now()
134 last_time = self.last_alert_time.get(rule.name)
135
136 # Check cooldown
137 if last_time:
138 from datetime import timedelta
139 cooldown = timedelta(minutes=rule.cooldown_minutes)
140 if now - last_time < cooldown:
141 return # Still in cooldown
142
143 # Create alert
144 import uuid
145 alert = Alert(
146 id=str(uuid.uuid4()),
147 rule_name=rule.name,
148 severity=rule.severity,
149 message=rule.message_template.format(**metrics),
150 timestamp=now,
151 data=metrics
152 )
153
154 self.active_alerts.append(alert)
155 self.last_alert_time[rule.name] = now
156
157 # Send through channels
158 for channel in rule.channels:
159 handler = self.channel_handlers.get(channel)
160 if handler:
161 handler(alert)
162
163 def acknowledge(self, alert_id: str, acknowledged_by: str):
164 """Acknowledge an alert."""
165 for alert in self.active_alerts:
166 if alert.id == alert_id:
167 alert.acknowledged = True
168 self.alert_history.append(alert)
169 self.active_alerts.remove(alert)
170 break
171
172 def _log_alert(self, alert: Alert):
173 """Log alert to console/file."""
174 print(f"[{alert.severity.value.upper()}] {alert.message}")
175
176 def _slack_alert(self, alert: Alert):
177 """Send alert to Slack."""
178 # Implementation would use Slack API
179 pass
180
181 def _email_alert(self, alert: Alert):
182 """Send alert via email."""
183 # Implementation would use email service
184 pass
185
186 def _pagerduty_alert(self, alert: Alert):
187 """Send alert to PagerDuty."""
188 # Implementation would use PagerDuty API
189 pass
190
191
192# Usage
193alert_manager = AlertManager()
194
195# Check some metrics
196alert_manager.check_metrics({
197 "error_rate": 0.15,
198 "resource_usage": 0.75,
199 "agent_id": "agent_001"
200})Audit Trails
Comprehensive Audit Logging
🐍python
1"""
2Audit Trail System
3
4Maintain immutable records of:
51. All agent actions
62. Decision rationale
73. Human interventions
84. Policy changes
95. Access events
10"""
11
12from dataclasses import dataclass, field
13from datetime import datetime
14from typing import Any
15import hashlib
16import json
17
18
19@dataclass
20class AuditEntry:
21 """Immutable audit log entry."""
22 id: str
23 timestamp: datetime
24 event_type: str
25 actor: str # Agent ID or user ID
26 actor_type: str # "agent", "user", "system"
27 action: str
28 resource: str
29 details: dict
30 outcome: str
31 previous_hash: str
32 hash: str = ""
33
34 def __post_init__(self):
35 if not self.hash:
36 self.hash = self._compute_hash()
37
38 def _compute_hash(self) -> str:
39 """Compute hash for integrity verification."""
40 content = f"{self.timestamp.isoformat()}{self.event_type}{self.actor}"
41 content += f"{self.action}{self.resource}{json.dumps(self.details)}"
42 content += f"{self.outcome}{self.previous_hash}"
43 return hashlib.sha256(content.encode()).hexdigest()
44
45 def verify(self) -> bool:
46 """Verify entry integrity."""
47 return self.hash == self._compute_hash()
48
49
50class AuditTrail:
51 """Manage audit trail with integrity verification."""
52
53 def __init__(self, storage_backend=None):
54 self.entries: list[AuditEntry] = []
55 self.storage = storage_backend
56 self._last_hash = "genesis"
57
58 def log(
59 self,
60 event_type: str,
61 actor: str,
62 actor_type: str,
63 action: str,
64 resource: str,
65 details: dict,
66 outcome: str
67 ) -> AuditEntry:
68 """Create an audit log entry."""
69 import uuid
70
71 entry = AuditEntry(
72 id=str(uuid.uuid4()),
73 timestamp=datetime.now(),
74 event_type=event_type,
75 actor=actor,
76 actor_type=actor_type,
77 action=action,
78 resource=resource,
79 details=details,
80 outcome=outcome,
81 previous_hash=self._last_hash
82 )
83
84 self.entries.append(entry)
85 self._last_hash = entry.hash
86
87 # Persist if storage configured
88 if self.storage:
89 self.storage.save(entry)
90
91 return entry
92
93 def verify_chain(self) -> tuple[bool, list[str]]:
94 """Verify the entire audit chain integrity."""
95 issues = []
96
97 if not self.entries:
98 return True, issues
99
100 # Verify first entry
101 if self.entries[0].previous_hash != "genesis":
102 issues.append("First entry has invalid previous hash")
103
104 # Verify each entry
105 for i, entry in enumerate(self.entries):
106 # Verify entry hash
107 if not entry.verify():
108 issues.append(f"Entry {entry.id} failed hash verification")
109
110 # Verify chain continuity
111 if i > 0:
112 if entry.previous_hash != self.entries[i-1].hash:
113 issues.append(f"Chain break at entry {entry.id}")
114
115 return len(issues) == 0, issues
116
117 def query(
118 self,
119 start_time: datetime | None = None,
120 end_time: datetime | None = None,
121 actor: str | None = None,
122 event_type: str | None = None,
123 action: str | None = None
124 ) -> list[AuditEntry]:
125 """Query audit entries with filters."""
126 results = self.entries
127
128 if start_time:
129 results = [e for e in results if e.timestamp >= start_time]
130 if end_time:
131 results = [e for e in results if e.timestamp <= end_time]
132 if actor:
133 results = [e for e in results if e.actor == actor]
134 if event_type:
135 results = [e for e in results if e.event_type == event_type]
136 if action:
137 results = [e for e in results if e.action == action]
138
139 return results
140
141 def generate_report(
142 self,
143 start_time: datetime,
144 end_time: datetime
145 ) -> dict:
146 """Generate an audit report for a time period."""
147 entries = self.query(start_time=start_time, end_time=end_time)
148
149 report = {
150 "period_start": start_time.isoformat(),
151 "period_end": end_time.isoformat(),
152 "total_entries": len(entries),
153 "by_actor_type": {},
154 "by_event_type": {},
155 "by_outcome": {},
156 "security_events": [],
157 "failed_actions": [],
158 }
159
160 for entry in entries:
161 # Count by actor type
162 at = entry.actor_type
163 report["by_actor_type"][at] = report["by_actor_type"].get(at, 0) + 1
164
165 # Count by event type
166 et = entry.event_type
167 report["by_event_type"][et] = report["by_event_type"].get(et, 0) + 1
168
169 # Count by outcome
170 oc = entry.outcome
171 report["by_outcome"][oc] = report["by_outcome"].get(oc, 0) + 1
172
173 # Collect security events
174 if entry.event_type.startswith("security"):
175 report["security_events"].append(entry)
176
177 # Collect failures
178 if entry.outcome in ["failed", "denied", "error"]:
179 report["failed_actions"].append(entry)
180
181 return report
182
183
184# Usage
185audit = AuditTrail()
186
187# Log some events
188audit.log(
189 event_type="action",
190 actor="agent_001",
191 actor_type="agent",
192 action="read_file",
193 resource="/data/input.txt",
194 details={"bytes_read": 1024},
195 outcome="success"
196)
197
198audit.log(
199 event_type="security",
200 actor="agent_001",
201 actor_type="agent",
202 action="access_denied",
203 resource="/etc/passwd",
204 details={"reason": "restricted_path"},
205 outcome="blocked"
206)
207
208# Verify chain
209valid, issues = audit.verify_chain()
210print(f"Chain valid: {valid}")Key Takeaways
- Comprehensive logging captures actions, decisions, errors, and security events with appropriate retention.
- Anomaly detection identifies unusual patterns like loops, suspicious actions, and statistical outliers.
- Alerting systems route notifications based on severity through appropriate channels with cooldowns.
- Audit trails provide immutable, verifiable records for compliance and forensics.
- Observability enables safety - you can't secure what you can't see.
Next Section Preview: We'll bring everything together to build a comprehensive safe agent system.