Introduction
AI agents that can take actions in the real world introduce unique safety challenges. Unlike traditional chatbots that only generate text, agents can execute code, access files, make API calls, and interact with external systems. This power demands robust safety mechanisms.
Chapter Overview: This chapter covers comprehensive safety strategies for agentic systems, from input validation to output filtering, action boundaries, and human oversight mechanisms.
Why Safety Matters
The Agent Risk Landscape
| Risk Category | Example | Potential Impact |
|---|---|---|
| Data exposure | Agent leaks API keys | Security breach |
| Unintended actions | Agent deletes wrong files | Data loss |
| Resource abuse | Infinite API loop | Cost overrun |
| Prompt injection | Malicious instructions | System compromise |
| Scope creep | Agent exceeds boundaries | Unauthorized access |
| Hallucination | Agent acts on false info | Wrong decisions |
🐍python
1"""
2Why Agent Safety is Critical
3
4Traditional Software vs AI Agents:
5
6Traditional Software:
7- Deterministic behavior
8- Explicit control flow
9- Predictable outputs
10- Static capabilities
11
12AI Agents:
13- Non-deterministic behavior
14- Emergent decision-making
15- Variable outputs
16- Dynamic capabilities
17
18The combination of autonomy + capability + unpredictability
19creates unique safety challenges.
20"""
21
22from dataclasses import dataclass
23from enum import Enum
24
25
26class RiskLevel(Enum):
27 LOW = "low"
28 MEDIUM = "medium"
29 HIGH = "high"
30 CRITICAL = "critical"
31
32
33@dataclass
34class SafetyIncident:
35 """Record of a safety-related incident."""
36 incident_type: str
37 severity: RiskLevel
38 description: str
39 root_cause: str
40 mitigation: str
41
42
43# Real-world incident examples
44INCIDENT_EXAMPLES = [
45 SafetyIncident(
46 incident_type="data_exposure",
47 severity=RiskLevel.CRITICAL,
48 description="Agent included database credentials in API request",
49 root_cause="No secret detection in output pipeline",
50 mitigation="Add secret scanning to all outbound data"
51 ),
52 SafetyIncident(
53 incident_type="unintended_action",
54 severity=RiskLevel.HIGH,
55 description="Agent deleted production database during cleanup",
56 root_cause="Overly broad file system permissions",
57 mitigation="Implement allowlist for file operations"
58 ),
59 SafetyIncident(
60 incident_type="resource_abuse",
61 severity=RiskLevel.MEDIUM,
62 description="Agent made 10,000 API calls in retry loop",
63 root_cause="No rate limiting or circuit breaker",
64 mitigation="Add exponential backoff and limits"
65 ),
66 SafetyIncident(
67 incident_type="prompt_injection",
68 severity=RiskLevel.CRITICAL,
69 description="Web page content hijacked agent behavior",
70 root_cause="No content sanitization from external sources",
71 mitigation="Sandbox external content, validate instructions"
72 ),
73]Agent Threat Model
Attack Surfaces
🐍python
1"""
2Agent Threat Model
3
4Attack vectors specific to AI agents:
5
61. INPUT ATTACKS
7 - Prompt injection in user messages
8 - Malicious content in retrieved documents
9 - Adversarial data in tool responses
10
112. PROCESSING ATTACKS
12 - Model jailbreaking attempts
13 - Context manipulation
14 - Hallucination exploitation
15
163. OUTPUT ATTACKS
17 - Exfiltration via tool calls
18 - Command injection in generated code
19 - Social engineering through responses
20
214. ENVIRONMENTAL ATTACKS
22 - Tool/API compromise
23 - Supply chain attacks on dependencies
24 - Infrastructure manipulation
25"""
26
27from dataclasses import dataclass
28
29
30@dataclass
31class ThreatVector:
32 """Represents a potential attack vector."""
33 name: str
34 category: str
35 description: str
36 likelihood: float # 0-1
37 impact: float # 0-1
38 mitigations: list[str]
39
40 @property
41 def risk_score(self) -> float:
42 """Calculate risk score."""
43 return self.likelihood * self.impact
44
45
46class AgentThreatModel:
47 """Comprehensive threat model for agents."""
48
49 def __init__(self):
50 self.threats: list[ThreatVector] = []
51 self._load_threats()
52
53 def _load_threats(self):
54 """Load known threat vectors."""
55 self.threats = [
56 ThreatVector(
57 name="Direct Prompt Injection",
58 category="input",
59 description="Malicious instructions in user input",
60 likelihood=0.8,
61 impact=0.7,
62 mitigations=[
63 "Input validation",
64 "Instruction hierarchy",
65 "Output verification"
66 ]
67 ),
68 ThreatVector(
69 name="Indirect Prompt Injection",
70 category="input",
71 description="Malicious content in external data",
72 likelihood=0.6,
73 impact=0.8,
74 mitigations=[
75 "Content sanitization",
76 "Source verification",
77 "Sandboxed processing"
78 ]
79 ),
80 ThreatVector(
81 name="Tool Abuse",
82 category="processing",
83 description="Agent misuses tools beyond intended scope",
84 likelihood=0.5,
85 impact=0.9,
86 mitigations=[
87 "Permission boundaries",
88 "Action allowlists",
89 "Human approval for sensitive ops"
90 ]
91 ),
92 ThreatVector(
93 name="Data Exfiltration",
94 category="output",
95 description="Sensitive data leaked through outputs",
96 likelihood=0.4,
97 impact=0.9,
98 mitigations=[
99 "Output scanning",
100 "Secret detection",
101 "Data classification"
102 ]
103 ),
104 ]
105
106 def get_high_priority_threats(self) -> list[ThreatVector]:
107 """Get threats with highest risk scores."""
108 return sorted(
109 self.threats,
110 key=lambda t: t.risk_score,
111 reverse=True
112 )[:5]
113
114 def get_mitigations_for_category(
115 self,
116 category: str
117 ) -> list[str]:
118 """Get all mitigations for a threat category."""
119 mitigations = set()
120 for threat in self.threats:
121 if threat.category == category:
122 mitigations.update(threat.mitigations)
123 return list(mitigations)Defense in Depth
Multi-Layer Safety Architecture
🐍python
1"""
2Defense in Depth for AI Agents
3
4Multiple safety layers ensure that if one fails,
5others provide protection:
6
7Layer 1: INPUT VALIDATION
8├── Sanitize user input
9├── Validate tool responses
10└── Filter external content
11
12Layer 2: INSTRUCTION BOUNDARIES
13├── System prompt protections
14├── Role separation
15└── Permission scoping
16
17Layer 3: ACTION CONTROLS
18├── Allowlists/blocklists
19├── Rate limiting
20└── Resource quotas
21
22Layer 4: OUTPUT FILTERING
23├── Content moderation
24├── Secret scanning
25└── Format validation
26
27Layer 5: MONITORING
28├── Anomaly detection
29├── Audit logging
30└── Alerting
31
32Layer 6: HUMAN OVERSIGHT
33├── Approval workflows
34├── Kill switches
35└── Escalation paths
36"""
37
38from abc import ABC, abstractmethod
39from typing import Any
40
41
42class SafetyLayer(ABC):
43 """Abstract base class for safety layers."""
44
45 @abstractmethod
46 def check(self, data: Any) -> tuple[bool, str]:
47 """Check if data passes this safety layer."""
48 pass
49
50
51class InputValidationLayer(SafetyLayer):
52 """Layer 1: Input validation and sanitization."""
53
54 def __init__(self):
55 self.blocked_patterns = [
56 r"ignore (?:all )?(?:previous )?instructions",
57 r"you are now",
58 r"disregard (?:your )?(?:previous )?",
59 r"new persona",
60 ]
61
62 def check(self, data: str) -> tuple[bool, str]:
63 import re
64 for pattern in self.blocked_patterns:
65 if re.search(pattern, data.lower()):
66 return False, f"Blocked pattern detected: {pattern}"
67 return True, "Input validation passed"
68
69
70class ActionControlLayer(SafetyLayer):
71 """Layer 3: Action boundaries and permissions."""
72
73 def __init__(self, allowed_actions: list[str]):
74 self.allowed_actions = set(allowed_actions)
75 self.action_counts: dict[str, int] = {}
76 self.rate_limits: dict[str, int] = {
77 "file_write": 10,
78 "api_call": 100,
79 "code_execution": 5,
80 }
81
82 def check(self, action: dict) -> tuple[bool, str]:
83 action_type = action.get("type", "")
84
85 # Check allowlist
86 if action_type not in self.allowed_actions:
87 return False, f"Action not allowed: {action_type}"
88
89 # Check rate limits
90 self.action_counts[action_type] = (
91 self.action_counts.get(action_type, 0) + 1
92 )
93 limit = self.rate_limits.get(action_type, 1000)
94 if self.action_counts[action_type] > limit:
95 return False, f"Rate limit exceeded for: {action_type}"
96
97 return True, "Action approved"
98
99
100class OutputFilterLayer(SafetyLayer):
101 """Layer 4: Output filtering and moderation."""
102
103 def __init__(self):
104 self.secret_patterns = [
105 r"(?:api[_-]?key|apikey)[=:]s*['"]?[w-]+",
106 r"(?:password|passwd|pwd)[=:]s*['"]?S+",
107 r"(?:secret|token)[=:]s*['"]?[w-]+",
108 r"(?:aws|azure|gcp)[_-]?(?:access|secret)[_-]?key",
109 ]
110
111 def check(self, data: str) -> tuple[bool, str]:
112 import re
113 for pattern in self.secret_patterns:
114 if re.search(pattern, data.lower()):
115 return False, "Potential secret detected in output"
116 return True, "Output filter passed"
117
118
119class SafetyPipeline:
120 """Combines multiple safety layers."""
121
122 def __init__(self):
123 self.layers: list[SafetyLayer] = []
124
125 def add_layer(self, layer: SafetyLayer):
126 """Add a safety layer to the pipeline."""
127 self.layers.append(layer)
128
129 def check_all(self, data: Any) -> tuple[bool, list[str]]:
130 """Run data through all safety layers."""
131 results = []
132 all_passed = True
133
134 for layer in self.layers:
135 passed, message = layer.check(data)
136 results.append(f"{layer.__class__.__name__}: {message}")
137 if not passed:
138 all_passed = False
139
140 return all_passed, results
141
142
143# Example usage
144pipeline = SafetyPipeline()
145pipeline.add_layer(InputValidationLayer())
146pipeline.add_layer(ActionControlLayer(["search", "read", "summarize"]))
147pipeline.add_layer(OutputFilterLayer())Core Safety Principles
Principle 1: Least Privilege
🐍python
1"""
2Principle 1: Least Privilege
3
4Agents should only have the minimum permissions
5required to accomplish their task.
6"""
7
8from dataclasses import dataclass
9
10
11@dataclass
12class Permission:
13 """Represents a specific permission."""
14 resource: str
15 action: str
16 scope: str
17
18
19class LeastPrivilegeManager:
20 """Enforce least privilege for agents."""
21
22 def __init__(self):
23 self.role_permissions: dict[str, list[Permission]] = {
24 "researcher": [
25 Permission("web", "read", "public_urls"),
26 Permission("files", "read", "research_folder"),
27 ],
28 "writer": [
29 Permission("files", "read", "content_folder"),
30 Permission("files", "write", "drafts_folder"),
31 ],
32 "admin": [
33 Permission("files", "read", "*"),
34 Permission("files", "write", "*"),
35 Permission("system", "execute", "safe_commands"),
36 ],
37 }
38
39 def get_permissions(self, role: str) -> list[Permission]:
40 """Get permissions for a role."""
41 return self.role_permissions.get(role, [])
42
43 def check_permission(
44 self,
45 role: str,
46 resource: str,
47 action: str,
48 scope: str
49 ) -> bool:
50 """Check if role has required permission."""
51 permissions = self.get_permissions(role)
52 for perm in permissions:
53 if (perm.resource == resource and
54 perm.action == action and
55 (perm.scope == "*" or perm.scope == scope)):
56 return True
57 return FalsePrinciple 2: Fail Safe
🐍python
1"""
2Principle 2: Fail Safe
3
4When in doubt, deny. Errors should result in
5safe states, not dangerous ones.
6"""
7
8
9class FailSafeAgent:
10 """Agent that fails safely."""
11
12 def __init__(self):
13 self.safe_mode = False
14 self.error_count = 0
15 self.error_threshold = 3
16
17 def execute_action(self, action: dict) -> dict:
18 """Execute action with fail-safe behavior."""
19 try:
20 # Check if in safe mode
21 if self.safe_mode:
22 return {
23 "success": False,
24 "error": "Agent in safe mode - manual reset required"
25 }
26
27 # Attempt action
28 result = self._do_action(action)
29
30 # Reset error count on success
31 self.error_count = 0
32
33 return result
34
35 except Exception as e:
36 self.error_count += 1
37
38 # Enter safe mode after too many errors
39 if self.error_count >= self.error_threshold:
40 self.safe_mode = True
41 self._alert_humans("Agent entered safe mode")
42
43 # Return safe failure
44 return {
45 "success": False,
46 "error": str(e),
47 "safe_state": True
48 }
49
50 def _do_action(self, action: dict) -> dict:
51 """Perform the actual action."""
52 pass # Implementation
53
54 def _alert_humans(self, message: str):
55 """Alert human operators."""
56 pass # Implementation
57
58 def reset_safe_mode(self, authorization: str):
59 """Manually reset safe mode with authorization."""
60 if self._verify_authorization(authorization):
61 self.safe_mode = False
62 self.error_count = 0Principle 3: Defense in Depth
🐍python
1"""
2Principle 3: Defense in Depth
3
4Never rely on a single safety mechanism.
5Layer multiple independent controls.
6"""
7
8
9class DefenseInDepthAgent:
10 """Agent with multiple safety layers."""
11
12 def process_request(self, request: str) -> str:
13 """Process request through multiple safety layers."""
14
15 # Layer 1: Input validation
16 if not self._validate_input(request):
17 return "Request blocked by input validation"
18
19 # Layer 2: Intent classification
20 intent = self._classify_intent(request)
21 if intent in ["harmful", "manipulation"]:
22 return "Request blocked by intent classifier"
23
24 # Layer 3: Permission check
25 if not self._check_permissions(intent):
26 return "Request blocked by permission check"
27
28 # Layer 4: Execute with sandboxing
29 result = self._sandboxed_execution(request)
30
31 # Layer 5: Output filtering
32 filtered_result = self._filter_output(result)
33
34 # Layer 6: Audit logging
35 self._audit_log(request, filtered_result)
36
37 return filtered_result
38
39 def _validate_input(self, request: str) -> bool:
40 """Layer 1: Validate input."""
41 pass
42
43 def _classify_intent(self, request: str) -> str:
44 """Layer 2: Classify intent."""
45 pass
46
47 def _check_permissions(self, intent: str) -> bool:
48 """Layer 3: Check permissions."""
49 pass
50
51 def _sandboxed_execution(self, request: str) -> str:
52 """Layer 4: Execute in sandbox."""
53 pass
54
55 def _filter_output(self, result: str) -> str:
56 """Layer 5: Filter output."""
57 pass
58
59 def _audit_log(self, request: str, result: str):
60 """Layer 6: Log for audit."""
61 passKey Takeaways
- Agent safety is critical because agents combine autonomy, capability, and unpredictability.
- Understand your threat model - know the attack vectors specific to your agent's capabilities.
- Defense in depth ensures multiple safety layers protect against failures in any single control.
- Least privilege limits the damage an agent can cause if compromised.
- Fail safe means errors result in safe states, not dangerous ones.
Next Section Preview: We'll dive into input validation and sanitization techniques for protecting agents from malicious inputs.