Chapter 18
15 min read
Section 115 of 175

Monitoring and Alerting

Agent Safety and Guardrails

Introduction

Effective monitoring provides visibility into agent behavior, enabling early detection of problems and continuous improvement. This section covers comprehensive logging, anomaly detection, alerting systems, and audit trails for production agent systems.

Section Overview: We'll explore logging strategies, anomaly detection algorithms, alerting systems, and audit trail implementation for agent observability.

Logging Strategies

What to Log

CategoryData PointsRetention
ActionsType, parameters, result30 days
DecisionsContext, options, choice90 days
ErrorsType, stack trace, context1 year
SecurityAuth, access, violations2 years
PerformanceLatency, tokens, costs30 days
User interactionsInputs, outputs, feedbackVaries
🐍python
1"""
2Comprehensive Agent Logging
3
4Log everything needed to:
51. Debug issues
62. Understand behavior
73. Audit actions
84. Improve performance
9"""
10
11import json
12from dataclasses import dataclass, field, asdict
13from datetime import datetime
14from enum import Enum
15from typing import Any
16import uuid
17
18
19class LogLevel(Enum):
20    DEBUG = "debug"
21    INFO = "info"
22    WARNING = "warning"
23    ERROR = "error"
24    CRITICAL = "critical"
25
26
27class LogCategory(Enum):
28    ACTION = "action"
29    DECISION = "decision"
30    ERROR = "error"
31    SECURITY = "security"
32    PERFORMANCE = "performance"
33    USER = "user"
34
35
36@dataclass
37class AgentLogEntry:
38    """Structured log entry for agent activity."""
39    timestamp: datetime
40    agent_id: str
41    session_id: str
42    category: LogCategory
43    level: LogLevel
44    event_type: str
45    message: str
46    data: dict = field(default_factory=dict)
47    trace_id: str = field(default_factory=lambda: str(uuid.uuid4()))
48
49    def to_dict(self) -> dict:
50        return {
51            "timestamp": self.timestamp.isoformat(),
52            "agent_id": self.agent_id,
53            "session_id": self.session_id,
54            "category": self.category.value,
55            "level": self.level.value,
56            "event_type": self.event_type,
57            "message": self.message,
58            "data": self.data,
59            "trace_id": self.trace_id,
60        }
61
62
63class AgentLogger:
64    """Comprehensive logging for agent systems."""
65
66    def __init__(self, agent_id: str, session_id: str | None = None):
67        self.agent_id = agent_id
68        self.session_id = session_id or str(uuid.uuid4())
69        self.log_handlers: list = []
70
71    def log(
72        self,
73        category: LogCategory,
74        level: LogLevel,
75        event_type: str,
76        message: str,
77        **data
78    ):
79        """Create and dispatch a log entry."""
80        entry = AgentLogEntry(
81            timestamp=datetime.now(),
82            agent_id=self.agent_id,
83            session_id=self.session_id,
84            category=category,
85            level=level,
86            event_type=event_type,
87            message=message,
88            data=data
89        )
90
91        for handler in self.log_handlers:
92            handler(entry)
93
94    # Convenience methods
95    def log_action(
96        self,
97        action_type: str,
98        parameters: dict,
99        result: Any,
100        success: bool
101    ):
102        """Log an agent action."""
103        self.log(
104            category=LogCategory.ACTION,
105            level=LogLevel.INFO if success else LogLevel.WARNING,
106            event_type="action_executed",
107            message=f"Action {action_type}: {'success' if success else 'failed'}",
108            action_type=action_type,
109            parameters=self._sanitize_params(parameters),
110            result=str(result)[:1000],  # Truncate
111            success=success
112        )
113
114    def log_decision(
115        self,
116        decision_type: str,
117        options: list[str],
118        chosen: str,
119        confidence: float,
120        reasoning: str
121    ):
122        """Log a decision made by the agent."""
123        self.log(
124            category=LogCategory.DECISION,
125            level=LogLevel.INFO,
126            event_type="decision_made",
127            message=f"Decision: {decision_type} -> {chosen}",
128            decision_type=decision_type,
129            options=options,
130            chosen=chosen,
131            confidence=confidence,
132            reasoning=reasoning[:500]
133        )
134
135    def log_error(
136        self,
137        error_type: str,
138        error_message: str,
139        stack_trace: str | None = None,
140        context: dict | None = None
141    ):
142        """Log an error."""
143        self.log(
144            category=LogCategory.ERROR,
145            level=LogLevel.ERROR,
146            event_type="error_occurred",
147            message=f"Error: {error_type}",
148            error_type=error_type,
149            error_message=error_message,
150            stack_trace=stack_trace,
151            context=context or {}
152        )
153
154    def log_security_event(
155        self,
156        event_type: str,
157        severity: str,
158        details: dict
159    ):
160        """Log a security-related event."""
161        level = LogLevel.CRITICAL if severity == "high" else LogLevel.WARNING
162        self.log(
163            category=LogCategory.SECURITY,
164            level=level,
165            event_type=event_type,
166            message=f"Security event: {event_type}",
167            severity=severity,
168            **details
169        )
170
171    def _sanitize_params(self, params: dict) -> dict:
172        """Remove sensitive data from parameters."""
173        sensitive_keys = ["password", "token", "key", "secret", "credential"]
174        sanitized = {}
175        for key, value in params.items():
176            if any(s in key.lower() for s in sensitive_keys):
177                sanitized[key] = "[REDACTED]"
178            else:
179                sanitized[key] = value
180        return sanitized
181
182
183# Usage
184logger = AgentLogger("agent_001")
185
186# Add handlers
187logger.log_handlers.append(lambda e: print(json.dumps(e.to_dict(), indent=2)))
188
189# Log actions
190logger.log_action(
191    action_type="search",
192    parameters={"query": "AI agents"},
193    result={"count": 10},
194    success=True
195)

Anomaly Detection

Detecting Abnormal Behavior

🐍python
1"""
2Anomaly Detection for Agent Behavior
3
4Detect unusual patterns that may indicate:
51. Compromised agent
62. Goal drift
73. Infinite loops
84. Resource abuse
95. Policy violations
10"""
11
12from dataclasses import dataclass
13from datetime import datetime, timedelta
14from typing import Any
15import statistics
16
17
18@dataclass
19class AnomalyDetection:
20    """Result of anomaly detection."""
21    is_anomaly: bool
22    anomaly_type: str
23    severity: float  # 0-1
24    description: str
25    recommended_action: str
26
27
28class AnomalyDetector:
29    """Detect anomalies in agent behavior."""
30
31    def __init__(self):
32        self.action_history: list[dict] = []
33        self.metrics_history: list[dict] = []
34
35        # Baseline statistics (would be learned in production)
36        self.baseline_metrics = {
37            "actions_per_minute": {"mean": 5, "std": 2},
38            "api_calls_per_task": {"mean": 10, "std": 5},
39            "tokens_per_action": {"mean": 1000, "std": 500},
40            "error_rate": {"mean": 0.05, "std": 0.02},
41        }
42
43    def check_action(self, action: dict) -> AnomalyDetection | None:
44        """Check single action for anomalies."""
45        self.action_history.append(action)
46
47        # Check for loops
48        loop_detection = self._detect_loop()
49        if loop_detection:
50            return loop_detection
51
52        # Check for unusual action types
53        type_anomaly = self._detect_unusual_action_type(action)
54        if type_anomaly:
55            return type_anomaly
56
57        return None
58
59    def check_metrics(self, metrics: dict) -> AnomalyDetection | None:
60        """Check metrics for statistical anomalies."""
61        self.metrics_history.append({
62            "timestamp": datetime.now(),
63            **metrics
64        })
65
66        for metric_name, value in metrics.items():
67            if metric_name in self.baseline_metrics:
68                anomaly = self._check_statistical_anomaly(
69                    metric_name, value
70                )
71                if anomaly:
72                    return anomaly
73
74        return None
75
76    def _detect_loop(self) -> AnomalyDetection | None:
77        """Detect if agent is stuck in a loop."""
78        if len(self.action_history) < 5:
79            return None
80
81        # Check last 5 actions
82        recent = self.action_history[-5:]
83        action_types = [a.get("type") for a in recent]
84
85        # All same type
86        if len(set(action_types)) == 1:
87            return AnomalyDetection(
88                is_anomaly=True,
89                anomaly_type="infinite_loop",
90                severity=0.8,
91                description=f"Agent repeated '{action_types[0]}' 5 times",
92                recommended_action="pause_and_review"
93            )
94
95        # Same action with same parameters
96        action_keys = [
97            f"{a.get('type')}:{a.get('input', '')[:50]}"
98            for a in recent
99        ]
100        if len(set(action_keys)) <= 2:
101            return AnomalyDetection(
102                is_anomaly=True,
103                anomaly_type="stuck_pattern",
104                severity=0.6,
105                description="Agent stuck in repetitive pattern",
106                recommended_action="inject_guidance"
107            )
108
109        return None
110
111    def _detect_unusual_action_type(
112        self,
113        action: dict
114    ) -> AnomalyDetection | None:
115        """Detect unusual action types."""
116        suspicious_actions = [
117            "delete", "remove", "drop", "truncate",
118            "send_email", "transfer", "payment"
119        ]
120
121        action_type = action.get("type", "").lower()
122        for suspicious in suspicious_actions:
123            if suspicious in action_type:
124                return AnomalyDetection(
125                    is_anomaly=True,
126                    anomaly_type="suspicious_action",
127                    severity=0.7,
128                    description=f"Suspicious action attempted: {action_type}",
129                    recommended_action="require_approval"
130                )
131
132        return None
133
134    def _check_statistical_anomaly(
135        self,
136        metric_name: str,
137        value: float
138    ) -> AnomalyDetection | None:
139        """Check if metric value is statistically anomalous."""
140        baseline = self.baseline_metrics[metric_name]
141        mean = baseline["mean"]
142        std = baseline["std"]
143
144        # Z-score calculation
145        z_score = abs(value - mean) / std if std > 0 else 0
146
147        # Anomaly if more than 3 standard deviations
148        if z_score > 3:
149            return AnomalyDetection(
150                is_anomaly=True,
151                anomaly_type="statistical_anomaly",
152                severity=min(z_score / 5, 1.0),
153                description=f"{metric_name} is {z_score:.1f} std devs from normal",
154                recommended_action="investigate"
155            )
156
157        return None
158
159
160class BehaviorProfiler:
161    """Build and compare behavior profiles."""
162
163    def __init__(self):
164        self.normal_profile: dict = {}
165        self.current_session_profile: dict = {}
166
167    def update_normal_profile(self, agent_id: str, history: list[dict]):
168        """Update what 'normal' looks like for this agent."""
169        profile = {
170            "common_actions": self._get_action_distribution(history),
171            "avg_session_length": self._avg_session_length(history),
172            "avg_actions_per_task": self._avg_actions_per_task(history),
173            "typical_error_rate": self._error_rate(history),
174        }
175        self.normal_profile[agent_id] = profile
176
177    def compare_to_normal(
178        self,
179        agent_id: str,
180        current_behavior: dict
181    ) -> float:
182        """Compare current behavior to normal profile. Returns deviation score."""
183        if agent_id not in self.normal_profile:
184            return 0.0  # No baseline
185
186        normal = self.normal_profile[agent_id]
187        deviation = 0.0
188
189        # Compare action distribution
190        normal_actions = normal.get("common_actions", {})
191        current_actions = current_behavior.get("actions", {})
192
193        for action, normal_pct in normal_actions.items():
194            current_pct = current_actions.get(action, 0)
195            deviation += abs(normal_pct - current_pct)
196
197        return min(deviation, 1.0)
198
199    def _get_action_distribution(self, history: list[dict]) -> dict:
200        pass
201
202    def _avg_session_length(self, history: list[dict]) -> float:
203        pass
204
205    def _avg_actions_per_task(self, history: list[dict]) -> float:
206        pass
207
208    def _error_rate(self, history: list[dict]) -> float:
209        pass

Alerting Systems

Alert Configuration

🐍python
1"""
2Alerting System for Agent Safety
3
4Configure alerts for:
51. Security incidents
62. Performance degradation
73. Error thresholds
84. Anomalies
95. Resource limits
10"""
11
12from dataclasses import dataclass
13from datetime import datetime
14from enum import Enum
15from typing import Callable
16
17
18class AlertSeverity(Enum):
19    LOW = "low"
20    MEDIUM = "medium"
21    HIGH = "high"
22    CRITICAL = "critical"
23
24
25class AlertChannel(Enum):
26    EMAIL = "email"
27    SLACK = "slack"
28    PAGERDUTY = "pagerduty"
29    WEBHOOK = "webhook"
30    LOG = "log"
31
32
33@dataclass
34class AlertRule:
35    """Definition of an alert rule."""
36    name: str
37    condition: Callable[[dict], bool]
38    severity: AlertSeverity
39    channels: list[AlertChannel]
40    cooldown_minutes: int = 5
41    message_template: str = ""
42
43
44@dataclass
45class Alert:
46    """An alert instance."""
47    id: str
48    rule_name: str
49    severity: AlertSeverity
50    message: str
51    timestamp: datetime
52    data: dict
53    acknowledged: bool = False
54
55
56class AlertManager:
57    """Manage alerts for agent systems."""
58
59    def __init__(self):
60        self.rules: dict[str, AlertRule] = {}
61        self.active_alerts: list[Alert] = []
62        self.alert_history: list[Alert] = []
63        self.last_alert_time: dict[str, datetime] = {}
64
65        # Channel handlers
66        self.channel_handlers: dict[AlertChannel, Callable] = {
67            AlertChannel.LOG: self._log_alert,
68            AlertChannel.SLACK: self._slack_alert,
69            AlertChannel.EMAIL: self._email_alert,
70            AlertChannel.PAGERDUTY: self._pagerduty_alert,
71        }
72
73        self._setup_default_rules()
74
75    def _setup_default_rules(self):
76        """Set up default alerting rules."""
77
78        # High error rate
79        self.add_rule(AlertRule(
80            name="high_error_rate",
81            condition=lambda m: m.get("error_rate", 0) > 0.1,
82            severity=AlertSeverity.HIGH,
83            channels=[AlertChannel.SLACK, AlertChannel.LOG],
84            cooldown_minutes=10,
85            message_template="Error rate is {error_rate:.1%}, exceeds 10% threshold"
86        ))
87
88        # Security violation
89        self.add_rule(AlertRule(
90            name="security_violation",
91            condition=lambda m: m.get("security_violation", False),
92            severity=AlertSeverity.CRITICAL,
93            channels=[AlertChannel.PAGERDUTY, AlertChannel.SLACK, AlertChannel.LOG],
94            cooldown_minutes=0,  # Always alert
95            message_template="Security violation detected: {violation_type}"
96        ))
97
98        # Resource limit approaching
99        self.add_rule(AlertRule(
100            name="resource_limit_warning",
101            condition=lambda m: m.get("resource_usage", 0) > 0.8,
102            severity=AlertSeverity.MEDIUM,
103            channels=[AlertChannel.SLACK, AlertChannel.LOG],
104            cooldown_minutes=30,
105            message_template="Resource usage at {resource_usage:.1%}"
106        ))
107
108        # Agent stuck
109        self.add_rule(AlertRule(
110            name="agent_stuck",
111            condition=lambda m: m.get("loop_detected", False),
112            severity=AlertSeverity.HIGH,
113            channels=[AlertChannel.SLACK, AlertChannel.LOG],
114            cooldown_minutes=5,
115            message_template="Agent {agent_id} appears stuck in loop"
116        ))
117
118    def add_rule(self, rule: AlertRule):
119        """Add an alert rule."""
120        self.rules[rule.name] = rule
121
122    def check_metrics(self, metrics: dict):
123        """Check metrics against all rules."""
124        for rule_name, rule in self.rules.items():
125            try:
126                if rule.condition(metrics):
127                    self._maybe_fire_alert(rule, metrics)
128            except Exception as e:
129                print(f"Error checking rule {rule_name}: {e}")
130
131    def _maybe_fire_alert(self, rule: AlertRule, metrics: dict):
132        """Fire alert if not in cooldown."""
133        now = datetime.now()
134        last_time = self.last_alert_time.get(rule.name)
135
136        # Check cooldown
137        if last_time:
138            from datetime import timedelta
139            cooldown = timedelta(minutes=rule.cooldown_minutes)
140            if now - last_time < cooldown:
141                return  # Still in cooldown
142
143        # Create alert
144        import uuid
145        alert = Alert(
146            id=str(uuid.uuid4()),
147            rule_name=rule.name,
148            severity=rule.severity,
149            message=rule.message_template.format(**metrics),
150            timestamp=now,
151            data=metrics
152        )
153
154        self.active_alerts.append(alert)
155        self.last_alert_time[rule.name] = now
156
157        # Send through channels
158        for channel in rule.channels:
159            handler = self.channel_handlers.get(channel)
160            if handler:
161                handler(alert)
162
163    def acknowledge(self, alert_id: str, acknowledged_by: str):
164        """Acknowledge an alert."""
165        for alert in self.active_alerts:
166            if alert.id == alert_id:
167                alert.acknowledged = True
168                self.alert_history.append(alert)
169                self.active_alerts.remove(alert)
170                break
171
172    def _log_alert(self, alert: Alert):
173        """Log alert to console/file."""
174        print(f"[{alert.severity.value.upper()}] {alert.message}")
175
176    def _slack_alert(self, alert: Alert):
177        """Send alert to Slack."""
178        # Implementation would use Slack API
179        pass
180
181    def _email_alert(self, alert: Alert):
182        """Send alert via email."""
183        # Implementation would use email service
184        pass
185
186    def _pagerduty_alert(self, alert: Alert):
187        """Send alert to PagerDuty."""
188        # Implementation would use PagerDuty API
189        pass
190
191
192# Usage
193alert_manager = AlertManager()
194
195# Check some metrics
196alert_manager.check_metrics({
197    "error_rate": 0.15,
198    "resource_usage": 0.75,
199    "agent_id": "agent_001"
200})

Audit Trails

Comprehensive Audit Logging

🐍python
1"""
2Audit Trail System
3
4Maintain immutable records of:
51. All agent actions
62. Decision rationale
73. Human interventions
84. Policy changes
95. Access events
10"""
11
12from dataclasses import dataclass, field
13from datetime import datetime
14from typing import Any
15import hashlib
16import json
17
18
19@dataclass
20class AuditEntry:
21    """Immutable audit log entry."""
22    id: str
23    timestamp: datetime
24    event_type: str
25    actor: str  # Agent ID or user ID
26    actor_type: str  # "agent", "user", "system"
27    action: str
28    resource: str
29    details: dict
30    outcome: str
31    previous_hash: str
32    hash: str = ""
33
34    def __post_init__(self):
35        if not self.hash:
36            self.hash = self._compute_hash()
37
38    def _compute_hash(self) -> str:
39        """Compute hash for integrity verification."""
40        content = f"{self.timestamp.isoformat()}{self.event_type}{self.actor}"
41        content += f"{self.action}{self.resource}{json.dumps(self.details)}"
42        content += f"{self.outcome}{self.previous_hash}"
43        return hashlib.sha256(content.encode()).hexdigest()
44
45    def verify(self) -> bool:
46        """Verify entry integrity."""
47        return self.hash == self._compute_hash()
48
49
50class AuditTrail:
51    """Manage audit trail with integrity verification."""
52
53    def __init__(self, storage_backend=None):
54        self.entries: list[AuditEntry] = []
55        self.storage = storage_backend
56        self._last_hash = "genesis"
57
58    def log(
59        self,
60        event_type: str,
61        actor: str,
62        actor_type: str,
63        action: str,
64        resource: str,
65        details: dict,
66        outcome: str
67    ) -> AuditEntry:
68        """Create an audit log entry."""
69        import uuid
70
71        entry = AuditEntry(
72            id=str(uuid.uuid4()),
73            timestamp=datetime.now(),
74            event_type=event_type,
75            actor=actor,
76            actor_type=actor_type,
77            action=action,
78            resource=resource,
79            details=details,
80            outcome=outcome,
81            previous_hash=self._last_hash
82        )
83
84        self.entries.append(entry)
85        self._last_hash = entry.hash
86
87        # Persist if storage configured
88        if self.storage:
89            self.storage.save(entry)
90
91        return entry
92
93    def verify_chain(self) -> tuple[bool, list[str]]:
94        """Verify the entire audit chain integrity."""
95        issues = []
96
97        if not self.entries:
98            return True, issues
99
100        # Verify first entry
101        if self.entries[0].previous_hash != "genesis":
102            issues.append("First entry has invalid previous hash")
103
104        # Verify each entry
105        for i, entry in enumerate(self.entries):
106            # Verify entry hash
107            if not entry.verify():
108                issues.append(f"Entry {entry.id} failed hash verification")
109
110            # Verify chain continuity
111            if i > 0:
112                if entry.previous_hash != self.entries[i-1].hash:
113                    issues.append(f"Chain break at entry {entry.id}")
114
115        return len(issues) == 0, issues
116
117    def query(
118        self,
119        start_time: datetime | None = None,
120        end_time: datetime | None = None,
121        actor: str | None = None,
122        event_type: str | None = None,
123        action: str | None = None
124    ) -> list[AuditEntry]:
125        """Query audit entries with filters."""
126        results = self.entries
127
128        if start_time:
129            results = [e for e in results if e.timestamp >= start_time]
130        if end_time:
131            results = [e for e in results if e.timestamp <= end_time]
132        if actor:
133            results = [e for e in results if e.actor == actor]
134        if event_type:
135            results = [e for e in results if e.event_type == event_type]
136        if action:
137            results = [e for e in results if e.action == action]
138
139        return results
140
141    def generate_report(
142        self,
143        start_time: datetime,
144        end_time: datetime
145    ) -> dict:
146        """Generate an audit report for a time period."""
147        entries = self.query(start_time=start_time, end_time=end_time)
148
149        report = {
150            "period_start": start_time.isoformat(),
151            "period_end": end_time.isoformat(),
152            "total_entries": len(entries),
153            "by_actor_type": {},
154            "by_event_type": {},
155            "by_outcome": {},
156            "security_events": [],
157            "failed_actions": [],
158        }
159
160        for entry in entries:
161            # Count by actor type
162            at = entry.actor_type
163            report["by_actor_type"][at] = report["by_actor_type"].get(at, 0) + 1
164
165            # Count by event type
166            et = entry.event_type
167            report["by_event_type"][et] = report["by_event_type"].get(et, 0) + 1
168
169            # Count by outcome
170            oc = entry.outcome
171            report["by_outcome"][oc] = report["by_outcome"].get(oc, 0) + 1
172
173            # Collect security events
174            if entry.event_type.startswith("security"):
175                report["security_events"].append(entry)
176
177            # Collect failures
178            if entry.outcome in ["failed", "denied", "error"]:
179                report["failed_actions"].append(entry)
180
181        return report
182
183
184# Usage
185audit = AuditTrail()
186
187# Log some events
188audit.log(
189    event_type="action",
190    actor="agent_001",
191    actor_type="agent",
192    action="read_file",
193    resource="/data/input.txt",
194    details={"bytes_read": 1024},
195    outcome="success"
196)
197
198audit.log(
199    event_type="security",
200    actor="agent_001",
201    actor_type="agent",
202    action="access_denied",
203    resource="/etc/passwd",
204    details={"reason": "restricted_path"},
205    outcome="blocked"
206)
207
208# Verify chain
209valid, issues = audit.verify_chain()
210print(f"Chain valid: {valid}")

Key Takeaways

  • Comprehensive logging captures actions, decisions, errors, and security events with appropriate retention.
  • Anomaly detection identifies unusual patterns like loops, suspicious actions, and statistical outliers.
  • Alerting systems route notifications based on severity through appropriate channels with cooldowns.
  • Audit trails provide immutable, verifiable records for compliance and forensics.
  • Observability enables safety - you can't secure what you can't see.
Next Section Preview: We'll bring everything together to build a comprehensive safe agent system.