Chapter 18
18 min read
Section 113 of 175

Action Boundaries and Permissions

Agent Safety and Guardrails

Introduction

Agents with tool access can take real-world actions - executing code, modifying files, making API calls. Controlling what actions an agent can take is crucial for preventing unintended consequences and limiting potential damage from errors or attacks.

Section Overview: We'll explore permission systems, action allowlists, resource limits, and execution sandboxing for controlling agent capabilities.

Permission Systems

Role-Based Access Control

RolePermissionsUse Case
ReaderRead files, searchResearch agents
WriterRead + write to specific dirsContent agents
DeveloperRead + write + execute codeCoding assistants
AdminFull access with approvalSystem maintenance
RestrictedMinimal permissionsUntrusted tasks
🐍python
1"""
2Role-Based Permission System
3
4Implements RBAC (Role-Based Access Control) for agents.
5"""
6
7from dataclasses import dataclass, field
8from enum import Enum, auto
9from typing import Callable
10
11
12class Action(Enum):
13    READ_FILE = auto()
14    WRITE_FILE = auto()
15    DELETE_FILE = auto()
16    EXECUTE_CODE = auto()
17    MAKE_HTTP_REQUEST = auto()
18    SEND_EMAIL = auto()
19    ACCESS_DATABASE = auto()
20    MODIFY_SYSTEM = auto()
21
22
23class Resource(Enum):
24    FILES = "files"
25    CODE = "code"
26    NETWORK = "network"
27    DATABASE = "database"
28    EMAIL = "email"
29    SYSTEM = "system"
30
31
32@dataclass
33class Permission:
34    """Single permission definition."""
35    action: Action
36    resource: Resource
37    scope: str  # "*" for all, or specific path/pattern
38    conditions: dict = field(default_factory=dict)
39
40
41@dataclass
42class Role:
43    """Role with associated permissions."""
44    name: str
45    permissions: list[Permission]
46    inherit_from: list[str] = field(default_factory=list)
47
48
49class PermissionManager:
50    """Manage role-based permissions for agents."""
51
52    def __init__(self):
53        self.roles: dict[str, Role] = {}
54        self.agent_roles: dict[str, str] = {}
55        self._setup_default_roles()
56
57    def _setup_default_roles(self):
58        """Set up default role hierarchy."""
59
60        # Reader role - minimal permissions
61        self.roles["reader"] = Role(
62            name="reader",
63            permissions=[
64                Permission(Action.READ_FILE, Resource.FILES, "public/*"),
65                Permission(Action.MAKE_HTTP_REQUEST, Resource.NETWORK, "*.wikipedia.org"),
66            ]
67        )
68
69        # Writer role - extends reader
70        self.roles["writer"] = Role(
71            name="writer",
72            permissions=[
73                Permission(Action.READ_FILE, Resource.FILES, "*"),
74                Permission(Action.WRITE_FILE, Resource.FILES, "workspace/*"),
75                Permission(Action.MAKE_HTTP_REQUEST, Resource.NETWORK, "*"),
76            ],
77            inherit_from=["reader"]
78        )
79
80        # Developer role - code execution
81        self.roles["developer"] = Role(
82            name="developer",
83            permissions=[
84                Permission(Action.READ_FILE, Resource.FILES, "*"),
85                Permission(Action.WRITE_FILE, Resource.FILES, "src/*"),
86                Permission(Action.EXECUTE_CODE, Resource.CODE, "sandbox"),
87                Permission(Action.MAKE_HTTP_REQUEST, Resource.NETWORK, "*"),
88            ],
89            inherit_from=["writer"]
90        )
91
92        # Restricted role - minimal for untrusted
93        self.roles["restricted"] = Role(
94            name="restricted",
95            permissions=[
96                Permission(Action.READ_FILE, Resource.FILES, "public/*.txt"),
97            ]
98        )
99
100    def assign_role(self, agent_id: str, role_name: str):
101        """Assign a role to an agent."""
102        if role_name not in self.roles:
103            raise ValueError(f"Unknown role: {role_name}")
104        self.agent_roles[agent_id] = role_name
105
106    def check_permission(
107        self,
108        agent_id: str,
109        action: Action,
110        resource: Resource,
111        scope: str
112    ) -> bool:
113        """Check if agent has permission for action."""
114        role_name = self.agent_roles.get(agent_id)
115        if not role_name:
116            return False
117
118        # Get all permissions including inherited
119        permissions = self._get_all_permissions(role_name)
120
121        # Check each permission
122        for perm in permissions:
123            if self._matches_permission(perm, action, resource, scope):
124                return True
125
126        return False
127
128    def _get_all_permissions(self, role_name: str) -> list[Permission]:
129        """Get permissions including inherited roles."""
130        role = self.roles.get(role_name)
131        if not role:
132            return []
133
134        all_perms = list(role.permissions)
135
136        # Add inherited permissions
137        for parent_name in role.inherit_from:
138            all_perms.extend(self._get_all_permissions(parent_name))
139
140        return all_perms
141
142    def _matches_permission(
143        self,
144        perm: Permission,
145        action: Action,
146        resource: Resource,
147        scope: str
148    ) -> bool:
149        """Check if permission matches request."""
150        if perm.action != action:
151            return False
152        if perm.resource != resource:
153            return False
154
155        # Check scope matching
156        return self._scope_matches(perm.scope, scope)
157
158    def _scope_matches(self, pattern: str, scope: str) -> bool:
159        """Check if scope matches pattern."""
160        import fnmatch
161        return fnmatch.fnmatch(scope, pattern)
162
163
164# Usage example
165manager = PermissionManager()
166
167# Assign roles
168manager.assign_role("research_agent", "reader")
169manager.assign_role("code_agent", "developer")
170manager.assign_role("untrusted_agent", "restricted")
171
172# Check permissions
173print(manager.check_permission(
174    "research_agent",
175    Action.READ_FILE,
176    Resource.FILES,
177    "public/data.txt"
178))  # True
179
180print(manager.check_permission(
181    "research_agent",
182    Action.WRITE_FILE,
183    Resource.FILES,
184    "workspace/output.txt"
185))  # False

Action Allowlists

Controlling Tool Usage

🐍python
1"""
2Action Allowlist System
3
4Explicitly define what actions an agent CAN do,
5rather than what it cannot (deny by default).
6"""
7
8from dataclasses import dataclass
9from typing import Any, Callable
10
11
12@dataclass
13class AllowedAction:
14    """Definition of an allowed action."""
15    name: str
16    description: str
17    parameters: dict[str, type]
18    validator: Callable[[dict], bool] | None = None
19    requires_approval: bool = False
20
21
22class ActionAllowlist:
23    """Manage allowed actions for an agent."""
24
25    def __init__(self):
26        self.allowed_actions: dict[str, AllowedAction] = {}
27
28    def register(self, action: AllowedAction):
29        """Register an allowed action."""
30        self.allowed_actions[action.name] = action
31
32    def is_allowed(self, action_name: str) -> bool:
33        """Check if action is allowed."""
34        return action_name in self.allowed_actions
35
36    def validate_action(
37        self,
38        action_name: str,
39        parameters: dict
40    ) -> tuple[bool, str]:
41        """Validate an action request."""
42        if action_name not in self.allowed_actions:
43            return False, f"Action not allowed: {action_name}"
44
45        action = self.allowed_actions[action_name]
46
47        # Check required parameters
48        for param_name, param_type in action.parameters.items():
49            if param_name not in parameters:
50                return False, f"Missing parameter: {param_name}"
51            if not isinstance(parameters[param_name], param_type):
52                return False, f"Invalid type for {param_name}"
53
54        # Run custom validator if present
55        if action.validator:
56            if not action.validator(parameters):
57                return False, "Custom validation failed"
58
59        return True, "Action allowed"
60
61    def requires_approval(self, action_name: str) -> bool:
62        """Check if action requires human approval."""
63        if action_name in self.allowed_actions:
64            return self.allowed_actions[action_name].requires_approval
65        return True  # Default to requiring approval
66
67
68def create_safe_file_allowlist() -> ActionAllowlist:
69    """Create an allowlist for safe file operations."""
70
71    allowlist = ActionAllowlist()
72
73    # Safe read operation
74    allowlist.register(AllowedAction(
75        name="read_file",
76        description="Read contents of a file",
77        parameters={"path": str},
78        validator=lambda p: not p["path"].startswith("/etc")
79    ))
80
81    # Restricted write operation
82    allowlist.register(AllowedAction(
83        name="write_file",
84        description="Write to a file in workspace",
85        parameters={"path": str, "content": str},
86        validator=lambda p: p["path"].startswith("workspace/"),
87        requires_approval=False
88    ))
89
90    # Delete requires approval
91    allowlist.register(AllowedAction(
92        name="delete_file",
93        description="Delete a file",
94        parameters={"path": str},
95        requires_approval=True  # Always needs human approval
96    ))
97
98    return allowlist
99
100
101# Usage
102allowlist = create_safe_file_allowlist()
103
104# Check actions
105print(allowlist.validate_action("read_file", {"path": "data/input.txt"}))
106# (True, "Action allowed")
107
108print(allowlist.validate_action("read_file", {"path": "/etc/passwd"}))
109# (False, "Custom validation failed")
110
111print(allowlist.validate_action("execute_code", {"code": "print('hi')"}))
112# (False, "Action not allowed: execute_code")

Dynamic Action Control

🐍python
1"""
2Dynamic Action Control
3
4Adjust allowed actions based on:
5- Agent behavior history
6- Current context
7- Risk assessment
8"""
9
10from dataclasses import dataclass
11from datetime import datetime, timedelta
12from typing import Any
13
14
15@dataclass
16class ActionRecord:
17    """Record of an action taken."""
18    action_name: str
19    parameters: dict
20    timestamp: datetime
21    success: bool
22    error: str | None = None
23
24
25class DynamicActionController:
26    """Dynamically control agent actions based on behavior."""
27
28    def __init__(self, base_allowlist: ActionAllowlist):
29        self.base_allowlist = base_allowlist
30        self.action_history: list[ActionRecord] = []
31        self.trust_score: float = 0.5
32        self.locked_actions: set[str] = set()
33
34    def check_action(
35        self,
36        action_name: str,
37        parameters: dict
38    ) -> tuple[bool, str]:
39        """Check if action is allowed given current state."""
40
41        # Check if action is locked
42        if action_name in self.locked_actions:
43            return False, f"Action locked due to previous issues: {action_name}"
44
45        # Check base allowlist
46        allowed, message = self.base_allowlist.validate_action(
47            action_name, parameters
48        )
49        if not allowed:
50            return False, message
51
52        # Check trust-based restrictions
53        if self.trust_score < 0.3:
54            # Low trust: only read operations
55            if action_name not in ["read_file", "search"]:
56                return False, "Low trust: only read operations allowed"
57
58        elif self.trust_score < 0.7:
59            # Medium trust: no destructive operations
60            if action_name in ["delete_file", "modify_system"]:
61                return False, "Medium trust: destructive operations blocked"
62
63        return True, "Action allowed"
64
65    def record_action(self, record: ActionRecord):
66        """Record an action and update trust."""
67        self.action_history.append(record)
68
69        # Update trust based on outcome
70        if record.success:
71            self.trust_score = min(1.0, self.trust_score + 0.02)
72        else:
73            self.trust_score = max(0.0, self.trust_score - 0.1)
74
75            # Lock action if repeated failures
76            recent_failures = self._count_recent_failures(record.action_name)
77            if recent_failures >= 3:
78                self.locked_actions.add(record.action_name)
79
80    def _count_recent_failures(
81        self,
82        action_name: str,
83        window: timedelta = timedelta(minutes=5)
84    ) -> int:
85        """Count recent failures for an action."""
86        cutoff = datetime.now() - window
87        return sum(
88            1 for record in self.action_history
89            if (record.action_name == action_name and
90                not record.success and
91                record.timestamp > cutoff)
92        )
93
94    def reset_locks(self, authorization: str):
95        """Reset locked actions with authorization."""
96        if self._verify_authorization(authorization):
97            self.locked_actions.clear()
98            self.trust_score = 0.5
99
100    def _verify_authorization(self, auth: str) -> bool:
101        """Verify authorization code."""
102        # In production, use proper authentication
103        return auth == "admin_reset_code"

Resource Limits

Preventing Resource Abuse

🐍python
1"""
2Resource Limits
3
4Prevent agents from:
5- Making too many API calls
6- Using too much compute
7- Generating too much output
8- Running too long
9"""
10
11from dataclasses import dataclass, field
12from datetime import datetime
13from typing import Any
14
15
16@dataclass
17class ResourceLimits:
18    """Define resource limits for an agent."""
19    max_api_calls: int = 100
20    max_tokens: int = 100000
21    max_file_size_bytes: int = 10 * 1024 * 1024  # 10MB
22    max_files_created: int = 10
23    max_execution_time_seconds: int = 300  # 5 minutes
24    max_memory_bytes: int = 512 * 1024 * 1024  # 512MB
25    max_output_length: int = 50000
26
27
28@dataclass
29class ResourceUsage:
30    """Track current resource usage."""
31    api_calls: int = 0
32    tokens_used: int = 0
33    bytes_written: int = 0
34    files_created: int = 0
35    start_time: datetime = field(default_factory=datetime.now)
36    peak_memory: int = 0
37
38
39class ResourceMonitor:
40    """Monitor and enforce resource limits."""
41
42    def __init__(self, limits: ResourceLimits):
43        self.limits = limits
44        self.usage = ResourceUsage()
45
46    def check_limit(self, resource: str, amount: int = 1) -> tuple[bool, str]:
47        """Check if resource usage is within limits."""
48
49        if resource == "api_calls":
50            if self.usage.api_calls + amount > self.limits.max_api_calls:
51                return False, f"API call limit exceeded ({self.limits.max_api_calls})"
52
53        elif resource == "tokens":
54            if self.usage.tokens_used + amount > self.limits.max_tokens:
55                return False, f"Token limit exceeded ({self.limits.max_tokens})"
56
57        elif resource == "file_size":
58            if amount > self.limits.max_file_size_bytes:
59                return False, f"File too large (max {self.limits.max_file_size_bytes})"
60
61        elif resource == "files":
62            if self.usage.files_created + amount > self.limits.max_files_created:
63                return False, f"File creation limit exceeded ({self.limits.max_files_created})"
64
65        elif resource == "time":
66            elapsed = (datetime.now() - self.usage.start_time).total_seconds()
67            if elapsed > self.limits.max_execution_time_seconds:
68                return False, f"Execution time limit exceeded ({self.limits.max_execution_time_seconds}s)"
69
70        return True, "Within limits"
71
72    def record_usage(self, resource: str, amount: int):
73        """Record resource usage."""
74        if resource == "api_calls":
75            self.usage.api_calls += amount
76        elif resource == "tokens":
77            self.usage.tokens_used += amount
78        elif resource == "bytes_written":
79            self.usage.bytes_written += amount
80        elif resource == "files":
81            self.usage.files_created += amount
82
83    def get_remaining(self) -> dict[str, int]:
84        """Get remaining resources."""
85        return {
86            "api_calls": self.limits.max_api_calls - self.usage.api_calls,
87            "tokens": self.limits.max_tokens - self.usage.tokens_used,
88            "files": self.limits.max_files_created - self.usage.files_created,
89            "time_seconds": self.limits.max_execution_time_seconds - int(
90                (datetime.now() - self.usage.start_time).total_seconds()
91            )
92        }
93
94    def get_usage_report(self) -> dict:
95        """Get detailed usage report."""
96        remaining = self.get_remaining()
97        return {
98            "limits": {
99                "api_calls": self.limits.max_api_calls,
100                "tokens": self.limits.max_tokens,
101                "files": self.limits.max_files_created,
102            },
103            "used": {
104                "api_calls": self.usage.api_calls,
105                "tokens": self.usage.tokens_used,
106                "files": self.usage.files_created,
107            },
108            "remaining": remaining,
109            "utilization": {
110                "api_calls": self.usage.api_calls / self.limits.max_api_calls,
111                "tokens": self.usage.tokens_used / self.limits.max_tokens,
112            }
113        }
114
115
116class RateLimiter:
117    """Rate limiting for agent actions."""
118
119    def __init__(self):
120        self.action_timestamps: dict[str, list[datetime]] = {}
121        self.rate_limits: dict[str, tuple[int, int]] = {
122            # (max_calls, window_seconds)
123            "api_call": (60, 60),      # 60 per minute
124            "file_write": (10, 60),    # 10 per minute
125            "code_execute": (5, 60),   # 5 per minute
126        }
127
128    def check_rate_limit(self, action_type: str) -> tuple[bool, str]:
129        """Check if action is within rate limit."""
130        if action_type not in self.rate_limits:
131            return True, "No rate limit for action"
132
133        max_calls, window_seconds = self.rate_limits[action_type]
134        now = datetime.now()
135        cutoff = now - timedelta(seconds=window_seconds)
136
137        # Get recent calls
138        timestamps = self.action_timestamps.get(action_type, [])
139        recent = [ts for ts in timestamps if ts > cutoff]
140
141        if len(recent) >= max_calls:
142            wait_time = (recent[0] - cutoff).total_seconds()
143            return False, f"Rate limited. Wait {wait_time:.1f}s"
144
145        return True, "Within rate limit"
146
147    def record_action(self, action_type: str):
148        """Record an action for rate limiting."""
149        if action_type not in self.action_timestamps:
150            self.action_timestamps[action_type] = []
151        self.action_timestamps[action_type].append(datetime.now())
152
153        # Clean old entries
154        self._cleanup_old_entries(action_type)

Execution Sandboxing

Isolating Agent Execution

🐍python
1"""
2Execution Sandboxing
3
4Isolate agent code execution to prevent:
5- File system access outside allowed paths
6- Network access to unauthorized hosts
7- System command execution
8- Resource exhaustion
9"""
10
11import subprocess
12from dataclasses import dataclass
13from typing import Any
14
15
16@dataclass
17class SandboxConfig:
18    """Configuration for execution sandbox."""
19    allowed_paths: list[str]
20    allowed_hosts: list[str]
21    max_memory_mb: int
22    max_time_seconds: int
23    allow_network: bool
24    allow_subprocess: bool
25
26
27class ExecutionSandbox:
28    """Sandboxed code execution environment."""
29
30    def __init__(self, config: SandboxConfig):
31        self.config = config
32
33    def execute_python(self, code: str) -> dict:
34        """Execute Python code in sandbox."""
35
36        # Validate code before execution
37        validation = self._validate_code(code)
38        if not validation["safe"]:
39            return {
40                "success": False,
41                "error": validation["reason"],
42                "output": ""
43            }
44
45        # Execute in restricted environment
46        try:
47            result = self._run_in_sandbox(code)
48            return {
49                "success": True,
50                "output": result,
51                "error": None
52            }
53        except Exception as e:
54            return {
55                "success": False,
56                "error": str(e),
57                "output": ""
58            }
59
60    def _validate_code(self, code: str) -> dict:
61        """Validate code for dangerous patterns."""
62        import ast
63
64        dangerous_imports = [
65            "os", "subprocess", "sys", "shutil",
66            "socket", "requests", "urllib",
67        ]
68
69        dangerous_calls = [
70            "eval", "exec", "compile",
71            "open", "__import__", "getattr",
72        ]
73
74        try:
75            tree = ast.parse(code)
76        except SyntaxError as e:
77            return {"safe": False, "reason": f"Syntax error: {e}"}
78
79        for node in ast.walk(tree):
80            # Check imports
81            if isinstance(node, ast.Import):
82                for alias in node.names:
83                    if alias.name.split(".")[0] in dangerous_imports:
84                        return {
85                            "safe": False,
86                            "reason": f"Dangerous import: {alias.name}"
87                        }
88
89            elif isinstance(node, ast.ImportFrom):
90                if node.module and node.module.split(".")[0] in dangerous_imports:
91                    return {
92                        "safe": False,
93                        "reason": f"Dangerous import: {node.module}"
94                    }
95
96            # Check function calls
97            elif isinstance(node, ast.Call):
98                if isinstance(node.func, ast.Name):
99                    if node.func.id in dangerous_calls:
100                        return {
101                            "safe": False,
102                            "reason": f"Dangerous function: {node.func.id}"
103                        }
104
105        return {"safe": True, "reason": None}
106
107    def _run_in_sandbox(self, code: str) -> str:
108        """Run code in sandboxed subprocess."""
109
110        # Create restricted globals
111        restricted_globals = {
112            "__builtins__": {
113                "print": print,
114                "len": len,
115                "range": range,
116                "str": str,
117                "int": int,
118                "float": float,
119                "list": list,
120                "dict": dict,
121                "True": True,
122                "False": False,
123                "None": None,
124            }
125        }
126
127        # Capture output
128        import io
129        import contextlib
130
131        output = io.StringIO()
132        with contextlib.redirect_stdout(output):
133            exec(code, restricted_globals)
134
135        return output.getvalue()
136
137
138class DockerSandbox:
139    """Docker-based sandbox for stronger isolation."""
140
141    def __init__(self, image: str = "python:3.11-slim"):
142        self.image = image
143        self.memory_limit = "256m"
144        self.cpu_limit = "0.5"
145        self.network = "none"
146
147    def execute(self, code: str, timeout: int = 30) -> dict:
148        """Execute code in Docker container."""
149
150        # Write code to temp file
151        import tempfile
152        with tempfile.NamedTemporaryFile(
153            mode="w",
154            suffix=".py",
155            delete=False
156        ) as f:
157            f.write(code)
158            code_file = f.name
159
160        try:
161            result = subprocess.run(
162                [
163                    "docker", "run",
164                    "--rm",
165                    "--network", self.network,
166                    "--memory", self.memory_limit,
167                    "--cpus", self.cpu_limit,
168                    "--read-only",
169                    "-v", f"{code_file}:/code.py:ro",
170                    self.image,
171                    "python", "/code.py"
172                ],
173                capture_output=True,
174                text=True,
175                timeout=timeout
176            )
177
178            return {
179                "success": result.returncode == 0,
180                "output": result.stdout,
181                "error": result.stderr if result.returncode != 0 else None
182            }
183
184        except subprocess.TimeoutExpired:
185            return {
186                "success": False,
187                "output": "",
188                "error": f"Execution timed out after {timeout}s"
189            }
190
191        finally:
192            import os
193            os.unlink(code_file)
194
195
196# Usage
197sandbox = ExecutionSandbox(SandboxConfig(
198    allowed_paths=["workspace/"],
199    allowed_hosts=[],
200    max_memory_mb=256,
201    max_time_seconds=30,
202    allow_network=False,
203    allow_subprocess=False
204))
205
206result = sandbox.execute_python('''
207x = [1, 2, 3, 4, 5]
208print(f"Sum: {sum(x)}")  # This will fail - sum not in restricted builtins
209''')

Key Takeaways

  • Role-based permissions provide structured control over what agents can do based on their purpose.
  • Action allowlists implement deny-by-default - only explicitly allowed actions can be taken.
  • Resource limits prevent agents from consuming excessive compute, storage, or API calls.
  • Sandboxing isolates code execution to prevent unauthorized system access.
  • Dynamic controls adjust permissions based on agent behavior and trust level.
Next Section Preview: We'll explore human-in-the-loop controls for maintaining oversight of agent actions.