Introduction
Agents with tool access can take real-world actions - executing code, modifying files, making API calls. Controlling what actions an agent can take is crucial for preventing unintended consequences and limiting potential damage from errors or attacks.
Section Overview: We'll explore permission systems, action allowlists, resource limits, and execution sandboxing for controlling agent capabilities.
Permission Systems
Role-Based Access Control
| Role | Permissions | Use Case |
|---|---|---|
| Reader | Read files, search | Research agents |
| Writer | Read + write to specific dirs | Content agents |
| Developer | Read + write + execute code | Coding assistants |
| Admin | Full access with approval | System maintenance |
| Restricted | Minimal permissions | Untrusted tasks |
🐍python
1"""
2Role-Based Permission System
3
4Implements RBAC (Role-Based Access Control) for agents.
5"""
6
7from dataclasses import dataclass, field
8from enum import Enum, auto
9from typing import Callable
10
11
12class Action(Enum):
13 READ_FILE = auto()
14 WRITE_FILE = auto()
15 DELETE_FILE = auto()
16 EXECUTE_CODE = auto()
17 MAKE_HTTP_REQUEST = auto()
18 SEND_EMAIL = auto()
19 ACCESS_DATABASE = auto()
20 MODIFY_SYSTEM = auto()
21
22
23class Resource(Enum):
24 FILES = "files"
25 CODE = "code"
26 NETWORK = "network"
27 DATABASE = "database"
28 EMAIL = "email"
29 SYSTEM = "system"
30
31
32@dataclass
33class Permission:
34 """Single permission definition."""
35 action: Action
36 resource: Resource
37 scope: str # "*" for all, or specific path/pattern
38 conditions: dict = field(default_factory=dict)
39
40
41@dataclass
42class Role:
43 """Role with associated permissions."""
44 name: str
45 permissions: list[Permission]
46 inherit_from: list[str] = field(default_factory=list)
47
48
49class PermissionManager:
50 """Manage role-based permissions for agents."""
51
52 def __init__(self):
53 self.roles: dict[str, Role] = {}
54 self.agent_roles: dict[str, str] = {}
55 self._setup_default_roles()
56
57 def _setup_default_roles(self):
58 """Set up default role hierarchy."""
59
60 # Reader role - minimal permissions
61 self.roles["reader"] = Role(
62 name="reader",
63 permissions=[
64 Permission(Action.READ_FILE, Resource.FILES, "public/*"),
65 Permission(Action.MAKE_HTTP_REQUEST, Resource.NETWORK, "*.wikipedia.org"),
66 ]
67 )
68
69 # Writer role - extends reader
70 self.roles["writer"] = Role(
71 name="writer",
72 permissions=[
73 Permission(Action.READ_FILE, Resource.FILES, "*"),
74 Permission(Action.WRITE_FILE, Resource.FILES, "workspace/*"),
75 Permission(Action.MAKE_HTTP_REQUEST, Resource.NETWORK, "*"),
76 ],
77 inherit_from=["reader"]
78 )
79
80 # Developer role - code execution
81 self.roles["developer"] = Role(
82 name="developer",
83 permissions=[
84 Permission(Action.READ_FILE, Resource.FILES, "*"),
85 Permission(Action.WRITE_FILE, Resource.FILES, "src/*"),
86 Permission(Action.EXECUTE_CODE, Resource.CODE, "sandbox"),
87 Permission(Action.MAKE_HTTP_REQUEST, Resource.NETWORK, "*"),
88 ],
89 inherit_from=["writer"]
90 )
91
92 # Restricted role - minimal for untrusted
93 self.roles["restricted"] = Role(
94 name="restricted",
95 permissions=[
96 Permission(Action.READ_FILE, Resource.FILES, "public/*.txt"),
97 ]
98 )
99
100 def assign_role(self, agent_id: str, role_name: str):
101 """Assign a role to an agent."""
102 if role_name not in self.roles:
103 raise ValueError(f"Unknown role: {role_name}")
104 self.agent_roles[agent_id] = role_name
105
106 def check_permission(
107 self,
108 agent_id: str,
109 action: Action,
110 resource: Resource,
111 scope: str
112 ) -> bool:
113 """Check if agent has permission for action."""
114 role_name = self.agent_roles.get(agent_id)
115 if not role_name:
116 return False
117
118 # Get all permissions including inherited
119 permissions = self._get_all_permissions(role_name)
120
121 # Check each permission
122 for perm in permissions:
123 if self._matches_permission(perm, action, resource, scope):
124 return True
125
126 return False
127
128 def _get_all_permissions(self, role_name: str) -> list[Permission]:
129 """Get permissions including inherited roles."""
130 role = self.roles.get(role_name)
131 if not role:
132 return []
133
134 all_perms = list(role.permissions)
135
136 # Add inherited permissions
137 for parent_name in role.inherit_from:
138 all_perms.extend(self._get_all_permissions(parent_name))
139
140 return all_perms
141
142 def _matches_permission(
143 self,
144 perm: Permission,
145 action: Action,
146 resource: Resource,
147 scope: str
148 ) -> bool:
149 """Check if permission matches request."""
150 if perm.action != action:
151 return False
152 if perm.resource != resource:
153 return False
154
155 # Check scope matching
156 return self._scope_matches(perm.scope, scope)
157
158 def _scope_matches(self, pattern: str, scope: str) -> bool:
159 """Check if scope matches pattern."""
160 import fnmatch
161 return fnmatch.fnmatch(scope, pattern)
162
163
164# Usage example
165manager = PermissionManager()
166
167# Assign roles
168manager.assign_role("research_agent", "reader")
169manager.assign_role("code_agent", "developer")
170manager.assign_role("untrusted_agent", "restricted")
171
172# Check permissions
173print(manager.check_permission(
174 "research_agent",
175 Action.READ_FILE,
176 Resource.FILES,
177 "public/data.txt"
178)) # True
179
180print(manager.check_permission(
181 "research_agent",
182 Action.WRITE_FILE,
183 Resource.FILES,
184 "workspace/output.txt"
185)) # FalseAction Allowlists
Controlling Tool Usage
🐍python
1"""
2Action Allowlist System
3
4Explicitly define what actions an agent CAN do,
5rather than what it cannot (deny by default).
6"""
7
8from dataclasses import dataclass
9from typing import Any, Callable
10
11
12@dataclass
13class AllowedAction:
14 """Definition of an allowed action."""
15 name: str
16 description: str
17 parameters: dict[str, type]
18 validator: Callable[[dict], bool] | None = None
19 requires_approval: bool = False
20
21
22class ActionAllowlist:
23 """Manage allowed actions for an agent."""
24
25 def __init__(self):
26 self.allowed_actions: dict[str, AllowedAction] = {}
27
28 def register(self, action: AllowedAction):
29 """Register an allowed action."""
30 self.allowed_actions[action.name] = action
31
32 def is_allowed(self, action_name: str) -> bool:
33 """Check if action is allowed."""
34 return action_name in self.allowed_actions
35
36 def validate_action(
37 self,
38 action_name: str,
39 parameters: dict
40 ) -> tuple[bool, str]:
41 """Validate an action request."""
42 if action_name not in self.allowed_actions:
43 return False, f"Action not allowed: {action_name}"
44
45 action = self.allowed_actions[action_name]
46
47 # Check required parameters
48 for param_name, param_type in action.parameters.items():
49 if param_name not in parameters:
50 return False, f"Missing parameter: {param_name}"
51 if not isinstance(parameters[param_name], param_type):
52 return False, f"Invalid type for {param_name}"
53
54 # Run custom validator if present
55 if action.validator:
56 if not action.validator(parameters):
57 return False, "Custom validation failed"
58
59 return True, "Action allowed"
60
61 def requires_approval(self, action_name: str) -> bool:
62 """Check if action requires human approval."""
63 if action_name in self.allowed_actions:
64 return self.allowed_actions[action_name].requires_approval
65 return True # Default to requiring approval
66
67
68def create_safe_file_allowlist() -> ActionAllowlist:
69 """Create an allowlist for safe file operations."""
70
71 allowlist = ActionAllowlist()
72
73 # Safe read operation
74 allowlist.register(AllowedAction(
75 name="read_file",
76 description="Read contents of a file",
77 parameters={"path": str},
78 validator=lambda p: not p["path"].startswith("/etc")
79 ))
80
81 # Restricted write operation
82 allowlist.register(AllowedAction(
83 name="write_file",
84 description="Write to a file in workspace",
85 parameters={"path": str, "content": str},
86 validator=lambda p: p["path"].startswith("workspace/"),
87 requires_approval=False
88 ))
89
90 # Delete requires approval
91 allowlist.register(AllowedAction(
92 name="delete_file",
93 description="Delete a file",
94 parameters={"path": str},
95 requires_approval=True # Always needs human approval
96 ))
97
98 return allowlist
99
100
101# Usage
102allowlist = create_safe_file_allowlist()
103
104# Check actions
105print(allowlist.validate_action("read_file", {"path": "data/input.txt"}))
106# (True, "Action allowed")
107
108print(allowlist.validate_action("read_file", {"path": "/etc/passwd"}))
109# (False, "Custom validation failed")
110
111print(allowlist.validate_action("execute_code", {"code": "print('hi')"}))
112# (False, "Action not allowed: execute_code")Dynamic Action Control
🐍python
1"""
2Dynamic Action Control
3
4Adjust allowed actions based on:
5- Agent behavior history
6- Current context
7- Risk assessment
8"""
9
10from dataclasses import dataclass
11from datetime import datetime, timedelta
12from typing import Any
13
14
15@dataclass
16class ActionRecord:
17 """Record of an action taken."""
18 action_name: str
19 parameters: dict
20 timestamp: datetime
21 success: bool
22 error: str | None = None
23
24
25class DynamicActionController:
26 """Dynamically control agent actions based on behavior."""
27
28 def __init__(self, base_allowlist: ActionAllowlist):
29 self.base_allowlist = base_allowlist
30 self.action_history: list[ActionRecord] = []
31 self.trust_score: float = 0.5
32 self.locked_actions: set[str] = set()
33
34 def check_action(
35 self,
36 action_name: str,
37 parameters: dict
38 ) -> tuple[bool, str]:
39 """Check if action is allowed given current state."""
40
41 # Check if action is locked
42 if action_name in self.locked_actions:
43 return False, f"Action locked due to previous issues: {action_name}"
44
45 # Check base allowlist
46 allowed, message = self.base_allowlist.validate_action(
47 action_name, parameters
48 )
49 if not allowed:
50 return False, message
51
52 # Check trust-based restrictions
53 if self.trust_score < 0.3:
54 # Low trust: only read operations
55 if action_name not in ["read_file", "search"]:
56 return False, "Low trust: only read operations allowed"
57
58 elif self.trust_score < 0.7:
59 # Medium trust: no destructive operations
60 if action_name in ["delete_file", "modify_system"]:
61 return False, "Medium trust: destructive operations blocked"
62
63 return True, "Action allowed"
64
65 def record_action(self, record: ActionRecord):
66 """Record an action and update trust."""
67 self.action_history.append(record)
68
69 # Update trust based on outcome
70 if record.success:
71 self.trust_score = min(1.0, self.trust_score + 0.02)
72 else:
73 self.trust_score = max(0.0, self.trust_score - 0.1)
74
75 # Lock action if repeated failures
76 recent_failures = self._count_recent_failures(record.action_name)
77 if recent_failures >= 3:
78 self.locked_actions.add(record.action_name)
79
80 def _count_recent_failures(
81 self,
82 action_name: str,
83 window: timedelta = timedelta(minutes=5)
84 ) -> int:
85 """Count recent failures for an action."""
86 cutoff = datetime.now() - window
87 return sum(
88 1 for record in self.action_history
89 if (record.action_name == action_name and
90 not record.success and
91 record.timestamp > cutoff)
92 )
93
94 def reset_locks(self, authorization: str):
95 """Reset locked actions with authorization."""
96 if self._verify_authorization(authorization):
97 self.locked_actions.clear()
98 self.trust_score = 0.5
99
100 def _verify_authorization(self, auth: str) -> bool:
101 """Verify authorization code."""
102 # In production, use proper authentication
103 return auth == "admin_reset_code"Resource Limits
Preventing Resource Abuse
🐍python
1"""
2Resource Limits
3
4Prevent agents from:
5- Making too many API calls
6- Using too much compute
7- Generating too much output
8- Running too long
9"""
10
11from dataclasses import dataclass, field
12from datetime import datetime
13from typing import Any
14
15
16@dataclass
17class ResourceLimits:
18 """Define resource limits for an agent."""
19 max_api_calls: int = 100
20 max_tokens: int = 100000
21 max_file_size_bytes: int = 10 * 1024 * 1024 # 10MB
22 max_files_created: int = 10
23 max_execution_time_seconds: int = 300 # 5 minutes
24 max_memory_bytes: int = 512 * 1024 * 1024 # 512MB
25 max_output_length: int = 50000
26
27
28@dataclass
29class ResourceUsage:
30 """Track current resource usage."""
31 api_calls: int = 0
32 tokens_used: int = 0
33 bytes_written: int = 0
34 files_created: int = 0
35 start_time: datetime = field(default_factory=datetime.now)
36 peak_memory: int = 0
37
38
39class ResourceMonitor:
40 """Monitor and enforce resource limits."""
41
42 def __init__(self, limits: ResourceLimits):
43 self.limits = limits
44 self.usage = ResourceUsage()
45
46 def check_limit(self, resource: str, amount: int = 1) -> tuple[bool, str]:
47 """Check if resource usage is within limits."""
48
49 if resource == "api_calls":
50 if self.usage.api_calls + amount > self.limits.max_api_calls:
51 return False, f"API call limit exceeded ({self.limits.max_api_calls})"
52
53 elif resource == "tokens":
54 if self.usage.tokens_used + amount > self.limits.max_tokens:
55 return False, f"Token limit exceeded ({self.limits.max_tokens})"
56
57 elif resource == "file_size":
58 if amount > self.limits.max_file_size_bytes:
59 return False, f"File too large (max {self.limits.max_file_size_bytes})"
60
61 elif resource == "files":
62 if self.usage.files_created + amount > self.limits.max_files_created:
63 return False, f"File creation limit exceeded ({self.limits.max_files_created})"
64
65 elif resource == "time":
66 elapsed = (datetime.now() - self.usage.start_time).total_seconds()
67 if elapsed > self.limits.max_execution_time_seconds:
68 return False, f"Execution time limit exceeded ({self.limits.max_execution_time_seconds}s)"
69
70 return True, "Within limits"
71
72 def record_usage(self, resource: str, amount: int):
73 """Record resource usage."""
74 if resource == "api_calls":
75 self.usage.api_calls += amount
76 elif resource == "tokens":
77 self.usage.tokens_used += amount
78 elif resource == "bytes_written":
79 self.usage.bytes_written += amount
80 elif resource == "files":
81 self.usage.files_created += amount
82
83 def get_remaining(self) -> dict[str, int]:
84 """Get remaining resources."""
85 return {
86 "api_calls": self.limits.max_api_calls - self.usage.api_calls,
87 "tokens": self.limits.max_tokens - self.usage.tokens_used,
88 "files": self.limits.max_files_created - self.usage.files_created,
89 "time_seconds": self.limits.max_execution_time_seconds - int(
90 (datetime.now() - self.usage.start_time).total_seconds()
91 )
92 }
93
94 def get_usage_report(self) -> dict:
95 """Get detailed usage report."""
96 remaining = self.get_remaining()
97 return {
98 "limits": {
99 "api_calls": self.limits.max_api_calls,
100 "tokens": self.limits.max_tokens,
101 "files": self.limits.max_files_created,
102 },
103 "used": {
104 "api_calls": self.usage.api_calls,
105 "tokens": self.usage.tokens_used,
106 "files": self.usage.files_created,
107 },
108 "remaining": remaining,
109 "utilization": {
110 "api_calls": self.usage.api_calls / self.limits.max_api_calls,
111 "tokens": self.usage.tokens_used / self.limits.max_tokens,
112 }
113 }
114
115
116class RateLimiter:
117 """Rate limiting for agent actions."""
118
119 def __init__(self):
120 self.action_timestamps: dict[str, list[datetime]] = {}
121 self.rate_limits: dict[str, tuple[int, int]] = {
122 # (max_calls, window_seconds)
123 "api_call": (60, 60), # 60 per minute
124 "file_write": (10, 60), # 10 per minute
125 "code_execute": (5, 60), # 5 per minute
126 }
127
128 def check_rate_limit(self, action_type: str) -> tuple[bool, str]:
129 """Check if action is within rate limit."""
130 if action_type not in self.rate_limits:
131 return True, "No rate limit for action"
132
133 max_calls, window_seconds = self.rate_limits[action_type]
134 now = datetime.now()
135 cutoff = now - timedelta(seconds=window_seconds)
136
137 # Get recent calls
138 timestamps = self.action_timestamps.get(action_type, [])
139 recent = [ts for ts in timestamps if ts > cutoff]
140
141 if len(recent) >= max_calls:
142 wait_time = (recent[0] - cutoff).total_seconds()
143 return False, f"Rate limited. Wait {wait_time:.1f}s"
144
145 return True, "Within rate limit"
146
147 def record_action(self, action_type: str):
148 """Record an action for rate limiting."""
149 if action_type not in self.action_timestamps:
150 self.action_timestamps[action_type] = []
151 self.action_timestamps[action_type].append(datetime.now())
152
153 # Clean old entries
154 self._cleanup_old_entries(action_type)Execution Sandboxing
Isolating Agent Execution
🐍python
1"""
2Execution Sandboxing
3
4Isolate agent code execution to prevent:
5- File system access outside allowed paths
6- Network access to unauthorized hosts
7- System command execution
8- Resource exhaustion
9"""
10
11import subprocess
12from dataclasses import dataclass
13from typing import Any
14
15
16@dataclass
17class SandboxConfig:
18 """Configuration for execution sandbox."""
19 allowed_paths: list[str]
20 allowed_hosts: list[str]
21 max_memory_mb: int
22 max_time_seconds: int
23 allow_network: bool
24 allow_subprocess: bool
25
26
27class ExecutionSandbox:
28 """Sandboxed code execution environment."""
29
30 def __init__(self, config: SandboxConfig):
31 self.config = config
32
33 def execute_python(self, code: str) -> dict:
34 """Execute Python code in sandbox."""
35
36 # Validate code before execution
37 validation = self._validate_code(code)
38 if not validation["safe"]:
39 return {
40 "success": False,
41 "error": validation["reason"],
42 "output": ""
43 }
44
45 # Execute in restricted environment
46 try:
47 result = self._run_in_sandbox(code)
48 return {
49 "success": True,
50 "output": result,
51 "error": None
52 }
53 except Exception as e:
54 return {
55 "success": False,
56 "error": str(e),
57 "output": ""
58 }
59
60 def _validate_code(self, code: str) -> dict:
61 """Validate code for dangerous patterns."""
62 import ast
63
64 dangerous_imports = [
65 "os", "subprocess", "sys", "shutil",
66 "socket", "requests", "urllib",
67 ]
68
69 dangerous_calls = [
70 "eval", "exec", "compile",
71 "open", "__import__", "getattr",
72 ]
73
74 try:
75 tree = ast.parse(code)
76 except SyntaxError as e:
77 return {"safe": False, "reason": f"Syntax error: {e}"}
78
79 for node in ast.walk(tree):
80 # Check imports
81 if isinstance(node, ast.Import):
82 for alias in node.names:
83 if alias.name.split(".")[0] in dangerous_imports:
84 return {
85 "safe": False,
86 "reason": f"Dangerous import: {alias.name}"
87 }
88
89 elif isinstance(node, ast.ImportFrom):
90 if node.module and node.module.split(".")[0] in dangerous_imports:
91 return {
92 "safe": False,
93 "reason": f"Dangerous import: {node.module}"
94 }
95
96 # Check function calls
97 elif isinstance(node, ast.Call):
98 if isinstance(node.func, ast.Name):
99 if node.func.id in dangerous_calls:
100 return {
101 "safe": False,
102 "reason": f"Dangerous function: {node.func.id}"
103 }
104
105 return {"safe": True, "reason": None}
106
107 def _run_in_sandbox(self, code: str) -> str:
108 """Run code in sandboxed subprocess."""
109
110 # Create restricted globals
111 restricted_globals = {
112 "__builtins__": {
113 "print": print,
114 "len": len,
115 "range": range,
116 "str": str,
117 "int": int,
118 "float": float,
119 "list": list,
120 "dict": dict,
121 "True": True,
122 "False": False,
123 "None": None,
124 }
125 }
126
127 # Capture output
128 import io
129 import contextlib
130
131 output = io.StringIO()
132 with contextlib.redirect_stdout(output):
133 exec(code, restricted_globals)
134
135 return output.getvalue()
136
137
138class DockerSandbox:
139 """Docker-based sandbox for stronger isolation."""
140
141 def __init__(self, image: str = "python:3.11-slim"):
142 self.image = image
143 self.memory_limit = "256m"
144 self.cpu_limit = "0.5"
145 self.network = "none"
146
147 def execute(self, code: str, timeout: int = 30) -> dict:
148 """Execute code in Docker container."""
149
150 # Write code to temp file
151 import tempfile
152 with tempfile.NamedTemporaryFile(
153 mode="w",
154 suffix=".py",
155 delete=False
156 ) as f:
157 f.write(code)
158 code_file = f.name
159
160 try:
161 result = subprocess.run(
162 [
163 "docker", "run",
164 "--rm",
165 "--network", self.network,
166 "--memory", self.memory_limit,
167 "--cpus", self.cpu_limit,
168 "--read-only",
169 "-v", f"{code_file}:/code.py:ro",
170 self.image,
171 "python", "/code.py"
172 ],
173 capture_output=True,
174 text=True,
175 timeout=timeout
176 )
177
178 return {
179 "success": result.returncode == 0,
180 "output": result.stdout,
181 "error": result.stderr if result.returncode != 0 else None
182 }
183
184 except subprocess.TimeoutExpired:
185 return {
186 "success": False,
187 "output": "",
188 "error": f"Execution timed out after {timeout}s"
189 }
190
191 finally:
192 import os
193 os.unlink(code_file)
194
195
196# Usage
197sandbox = ExecutionSandbox(SandboxConfig(
198 allowed_paths=["workspace/"],
199 allowed_hosts=[],
200 max_memory_mb=256,
201 max_time_seconds=30,
202 allow_network=False,
203 allow_subprocess=False
204))
205
206result = sandbox.execute_python('''
207x = [1, 2, 3, 4, 5]
208print(f"Sum: {sum(x)}") # This will fail - sum not in restricted builtins
209''')Key Takeaways
- Role-based permissions provide structured control over what agents can do based on their purpose.
- Action allowlists implement deny-by-default - only explicitly allowed actions can be taken.
- Resource limits prevent agents from consuming excessive compute, storage, or API calls.
- Sandboxing isolates code execution to prevent unauthorized system access.
- Dynamic controls adjust permissions based on agent behavior and trust level.
Next Section Preview: We'll explore human-in-the-loop controls for maintaining oversight of agent actions.