Chapter 18
15 min read
Section 110 of 175

Safety Fundamentals

Agent Safety and Guardrails

Introduction

AI agents that can take actions in the real world introduce unique safety challenges. Unlike traditional chatbots that only generate text, agents can execute code, access files, make API calls, and interact with external systems. This power demands robust safety mechanisms.

Chapter Overview: This chapter covers comprehensive safety strategies for agentic systems, from input validation to output filtering, action boundaries, and human oversight mechanisms.

Why Safety Matters

The Agent Risk Landscape

Risk CategoryExamplePotential Impact
Data exposureAgent leaks API keysSecurity breach
Unintended actionsAgent deletes wrong filesData loss
Resource abuseInfinite API loopCost overrun
Prompt injectionMalicious instructionsSystem compromise
Scope creepAgent exceeds boundariesUnauthorized access
HallucinationAgent acts on false infoWrong decisions
🐍python
1"""
2Why Agent Safety is Critical
3
4Traditional Software vs AI Agents:
5
6Traditional Software:
7- Deterministic behavior
8- Explicit control flow
9- Predictable outputs
10- Static capabilities
11
12AI Agents:
13- Non-deterministic behavior
14- Emergent decision-making
15- Variable outputs
16- Dynamic capabilities
17
18The combination of autonomy + capability + unpredictability
19creates unique safety challenges.
20"""
21
22from dataclasses import dataclass
23from enum import Enum
24
25
26class RiskLevel(Enum):
27    LOW = "low"
28    MEDIUM = "medium"
29    HIGH = "high"
30    CRITICAL = "critical"
31
32
33@dataclass
34class SafetyIncident:
35    """Record of a safety-related incident."""
36    incident_type: str
37    severity: RiskLevel
38    description: str
39    root_cause: str
40    mitigation: str
41
42
43# Real-world incident examples
44INCIDENT_EXAMPLES = [
45    SafetyIncident(
46        incident_type="data_exposure",
47        severity=RiskLevel.CRITICAL,
48        description="Agent included database credentials in API request",
49        root_cause="No secret detection in output pipeline",
50        mitigation="Add secret scanning to all outbound data"
51    ),
52    SafetyIncident(
53        incident_type="unintended_action",
54        severity=RiskLevel.HIGH,
55        description="Agent deleted production database during cleanup",
56        root_cause="Overly broad file system permissions",
57        mitigation="Implement allowlist for file operations"
58    ),
59    SafetyIncident(
60        incident_type="resource_abuse",
61        severity=RiskLevel.MEDIUM,
62        description="Agent made 10,000 API calls in retry loop",
63        root_cause="No rate limiting or circuit breaker",
64        mitigation="Add exponential backoff and limits"
65    ),
66    SafetyIncident(
67        incident_type="prompt_injection",
68        severity=RiskLevel.CRITICAL,
69        description="Web page content hijacked agent behavior",
70        root_cause="No content sanitization from external sources",
71        mitigation="Sandbox external content, validate instructions"
72    ),
73]

Agent Threat Model

Attack Surfaces

🐍python
1"""
2Agent Threat Model
3
4Attack vectors specific to AI agents:
5
61. INPUT ATTACKS
7   - Prompt injection in user messages
8   - Malicious content in retrieved documents
9   - Adversarial data in tool responses
10
112. PROCESSING ATTACKS
12   - Model jailbreaking attempts
13   - Context manipulation
14   - Hallucination exploitation
15
163. OUTPUT ATTACKS
17   - Exfiltration via tool calls
18   - Command injection in generated code
19   - Social engineering through responses
20
214. ENVIRONMENTAL ATTACKS
22   - Tool/API compromise
23   - Supply chain attacks on dependencies
24   - Infrastructure manipulation
25"""
26
27from dataclasses import dataclass
28
29
30@dataclass
31class ThreatVector:
32    """Represents a potential attack vector."""
33    name: str
34    category: str
35    description: str
36    likelihood: float  # 0-1
37    impact: float      # 0-1
38    mitigations: list[str]
39
40    @property
41    def risk_score(self) -> float:
42        """Calculate risk score."""
43        return self.likelihood * self.impact
44
45
46class AgentThreatModel:
47    """Comprehensive threat model for agents."""
48
49    def __init__(self):
50        self.threats: list[ThreatVector] = []
51        self._load_threats()
52
53    def _load_threats(self):
54        """Load known threat vectors."""
55        self.threats = [
56            ThreatVector(
57                name="Direct Prompt Injection",
58                category="input",
59                description="Malicious instructions in user input",
60                likelihood=0.8,
61                impact=0.7,
62                mitigations=[
63                    "Input validation",
64                    "Instruction hierarchy",
65                    "Output verification"
66                ]
67            ),
68            ThreatVector(
69                name="Indirect Prompt Injection",
70                category="input",
71                description="Malicious content in external data",
72                likelihood=0.6,
73                impact=0.8,
74                mitigations=[
75                    "Content sanitization",
76                    "Source verification",
77                    "Sandboxed processing"
78                ]
79            ),
80            ThreatVector(
81                name="Tool Abuse",
82                category="processing",
83                description="Agent misuses tools beyond intended scope",
84                likelihood=0.5,
85                impact=0.9,
86                mitigations=[
87                    "Permission boundaries",
88                    "Action allowlists",
89                    "Human approval for sensitive ops"
90                ]
91            ),
92            ThreatVector(
93                name="Data Exfiltration",
94                category="output",
95                description="Sensitive data leaked through outputs",
96                likelihood=0.4,
97                impact=0.9,
98                mitigations=[
99                    "Output scanning",
100                    "Secret detection",
101                    "Data classification"
102                ]
103            ),
104        ]
105
106    def get_high_priority_threats(self) -> list[ThreatVector]:
107        """Get threats with highest risk scores."""
108        return sorted(
109            self.threats,
110            key=lambda t: t.risk_score,
111            reverse=True
112        )[:5]
113
114    def get_mitigations_for_category(
115        self,
116        category: str
117    ) -> list[str]:
118        """Get all mitigations for a threat category."""
119        mitigations = set()
120        for threat in self.threats:
121            if threat.category == category:
122                mitigations.update(threat.mitigations)
123        return list(mitigations)

Defense in Depth

Multi-Layer Safety Architecture

🐍python
1"""
2Defense in Depth for AI Agents
3
4Multiple safety layers ensure that if one fails,
5others provide protection:
6
7Layer 1: INPUT VALIDATION
8├── Sanitize user input
9├── Validate tool responses
10└── Filter external content
11
12Layer 2: INSTRUCTION BOUNDARIES
13├── System prompt protections
14├── Role separation
15└── Permission scoping
16
17Layer 3: ACTION CONTROLS
18├── Allowlists/blocklists
19├── Rate limiting
20└── Resource quotas
21
22Layer 4: OUTPUT FILTERING
23├── Content moderation
24├── Secret scanning
25└── Format validation
26
27Layer 5: MONITORING
28├── Anomaly detection
29├── Audit logging
30└── Alerting
31
32Layer 6: HUMAN OVERSIGHT
33├── Approval workflows
34├── Kill switches
35└── Escalation paths
36"""
37
38from abc import ABC, abstractmethod
39from typing import Any
40
41
42class SafetyLayer(ABC):
43    """Abstract base class for safety layers."""
44
45    @abstractmethod
46    def check(self, data: Any) -> tuple[bool, str]:
47        """Check if data passes this safety layer."""
48        pass
49
50
51class InputValidationLayer(SafetyLayer):
52    """Layer 1: Input validation and sanitization."""
53
54    def __init__(self):
55        self.blocked_patterns = [
56            r"ignore (?:all )?(?:previous )?instructions",
57            r"you are now",
58            r"disregard (?:your )?(?:previous )?",
59            r"new persona",
60        ]
61
62    def check(self, data: str) -> tuple[bool, str]:
63        import re
64        for pattern in self.blocked_patterns:
65            if re.search(pattern, data.lower()):
66                return False, f"Blocked pattern detected: {pattern}"
67        return True, "Input validation passed"
68
69
70class ActionControlLayer(SafetyLayer):
71    """Layer 3: Action boundaries and permissions."""
72
73    def __init__(self, allowed_actions: list[str]):
74        self.allowed_actions = set(allowed_actions)
75        self.action_counts: dict[str, int] = {}
76        self.rate_limits: dict[str, int] = {
77            "file_write": 10,
78            "api_call": 100,
79            "code_execution": 5,
80        }
81
82    def check(self, action: dict) -> tuple[bool, str]:
83        action_type = action.get("type", "")
84
85        # Check allowlist
86        if action_type not in self.allowed_actions:
87            return False, f"Action not allowed: {action_type}"
88
89        # Check rate limits
90        self.action_counts[action_type] = (
91            self.action_counts.get(action_type, 0) + 1
92        )
93        limit = self.rate_limits.get(action_type, 1000)
94        if self.action_counts[action_type] > limit:
95            return False, f"Rate limit exceeded for: {action_type}"
96
97        return True, "Action approved"
98
99
100class OutputFilterLayer(SafetyLayer):
101    """Layer 4: Output filtering and moderation."""
102
103    def __init__(self):
104        self.secret_patterns = [
105            r"(?:api[_-]?key|apikey)[=:]s*['"]?[w-]+",
106            r"(?:password|passwd|pwd)[=:]s*['"]?S+",
107            r"(?:secret|token)[=:]s*['"]?[w-]+",
108            r"(?:aws|azure|gcp)[_-]?(?:access|secret)[_-]?key",
109        ]
110
111    def check(self, data: str) -> tuple[bool, str]:
112        import re
113        for pattern in self.secret_patterns:
114            if re.search(pattern, data.lower()):
115                return False, "Potential secret detected in output"
116        return True, "Output filter passed"
117
118
119class SafetyPipeline:
120    """Combines multiple safety layers."""
121
122    def __init__(self):
123        self.layers: list[SafetyLayer] = []
124
125    def add_layer(self, layer: SafetyLayer):
126        """Add a safety layer to the pipeline."""
127        self.layers.append(layer)
128
129    def check_all(self, data: Any) -> tuple[bool, list[str]]:
130        """Run data through all safety layers."""
131        results = []
132        all_passed = True
133
134        for layer in self.layers:
135            passed, message = layer.check(data)
136            results.append(f"{layer.__class__.__name__}: {message}")
137            if not passed:
138                all_passed = False
139
140        return all_passed, results
141
142
143# Example usage
144pipeline = SafetyPipeline()
145pipeline.add_layer(InputValidationLayer())
146pipeline.add_layer(ActionControlLayer(["search", "read", "summarize"]))
147pipeline.add_layer(OutputFilterLayer())

Core Safety Principles

Principle 1: Least Privilege

🐍python
1"""
2Principle 1: Least Privilege
3
4Agents should only have the minimum permissions
5required to accomplish their task.
6"""
7
8from dataclasses import dataclass
9
10
11@dataclass
12class Permission:
13    """Represents a specific permission."""
14    resource: str
15    action: str
16    scope: str
17
18
19class LeastPrivilegeManager:
20    """Enforce least privilege for agents."""
21
22    def __init__(self):
23        self.role_permissions: dict[str, list[Permission]] = {
24            "researcher": [
25                Permission("web", "read", "public_urls"),
26                Permission("files", "read", "research_folder"),
27            ],
28            "writer": [
29                Permission("files", "read", "content_folder"),
30                Permission("files", "write", "drafts_folder"),
31            ],
32            "admin": [
33                Permission("files", "read", "*"),
34                Permission("files", "write", "*"),
35                Permission("system", "execute", "safe_commands"),
36            ],
37        }
38
39    def get_permissions(self, role: str) -> list[Permission]:
40        """Get permissions for a role."""
41        return self.role_permissions.get(role, [])
42
43    def check_permission(
44        self,
45        role: str,
46        resource: str,
47        action: str,
48        scope: str
49    ) -> bool:
50        """Check if role has required permission."""
51        permissions = self.get_permissions(role)
52        for perm in permissions:
53            if (perm.resource == resource and
54                perm.action == action and
55                (perm.scope == "*" or perm.scope == scope)):
56                return True
57        return False

Principle 2: Fail Safe

🐍python
1"""
2Principle 2: Fail Safe
3
4When in doubt, deny. Errors should result in
5safe states, not dangerous ones.
6"""
7
8
9class FailSafeAgent:
10    """Agent that fails safely."""
11
12    def __init__(self):
13        self.safe_mode = False
14        self.error_count = 0
15        self.error_threshold = 3
16
17    def execute_action(self, action: dict) -> dict:
18        """Execute action with fail-safe behavior."""
19        try:
20            # Check if in safe mode
21            if self.safe_mode:
22                return {
23                    "success": False,
24                    "error": "Agent in safe mode - manual reset required"
25                }
26
27            # Attempt action
28            result = self._do_action(action)
29
30            # Reset error count on success
31            self.error_count = 0
32
33            return result
34
35        except Exception as e:
36            self.error_count += 1
37
38            # Enter safe mode after too many errors
39            if self.error_count >= self.error_threshold:
40                self.safe_mode = True
41                self._alert_humans("Agent entered safe mode")
42
43            # Return safe failure
44            return {
45                "success": False,
46                "error": str(e),
47                "safe_state": True
48            }
49
50    def _do_action(self, action: dict) -> dict:
51        """Perform the actual action."""
52        pass  # Implementation
53
54    def _alert_humans(self, message: str):
55        """Alert human operators."""
56        pass  # Implementation
57
58    def reset_safe_mode(self, authorization: str):
59        """Manually reset safe mode with authorization."""
60        if self._verify_authorization(authorization):
61            self.safe_mode = False
62            self.error_count = 0

Principle 3: Defense in Depth

🐍python
1"""
2Principle 3: Defense in Depth
3
4Never rely on a single safety mechanism.
5Layer multiple independent controls.
6"""
7
8
9class DefenseInDepthAgent:
10    """Agent with multiple safety layers."""
11
12    def process_request(self, request: str) -> str:
13        """Process request through multiple safety layers."""
14
15        # Layer 1: Input validation
16        if not self._validate_input(request):
17            return "Request blocked by input validation"
18
19        # Layer 2: Intent classification
20        intent = self._classify_intent(request)
21        if intent in ["harmful", "manipulation"]:
22            return "Request blocked by intent classifier"
23
24        # Layer 3: Permission check
25        if not self._check_permissions(intent):
26            return "Request blocked by permission check"
27
28        # Layer 4: Execute with sandboxing
29        result = self._sandboxed_execution(request)
30
31        # Layer 5: Output filtering
32        filtered_result = self._filter_output(result)
33
34        # Layer 6: Audit logging
35        self._audit_log(request, filtered_result)
36
37        return filtered_result
38
39    def _validate_input(self, request: str) -> bool:
40        """Layer 1: Validate input."""
41        pass
42
43    def _classify_intent(self, request: str) -> str:
44        """Layer 2: Classify intent."""
45        pass
46
47    def _check_permissions(self, intent: str) -> bool:
48        """Layer 3: Check permissions."""
49        pass
50
51    def _sandboxed_execution(self, request: str) -> str:
52        """Layer 4: Execute in sandbox."""
53        pass
54
55    def _filter_output(self, result: str) -> str:
56        """Layer 5: Filter output."""
57        pass
58
59    def _audit_log(self, request: str, result: str):
60        """Layer 6: Log for audit."""
61        pass

Key Takeaways

  • Agent safety is critical because agents combine autonomy, capability, and unpredictability.
  • Understand your threat model - know the attack vectors specific to your agent's capabilities.
  • Defense in depth ensures multiple safety layers protect against failures in any single control.
  • Least privilege limits the damage an agent can cause if compromised.
  • Fail safe means errors result in safe states, not dangerous ones.
Next Section Preview: We'll dive into input validation and sanitization techniques for protecting agents from malicious inputs.