Introduction
A coding agent that can execute code is powerful but dangerous. Without proper sandboxing, an AI agent could accidentally (or through prompt injection) execute malicious commands, consume excessive resources, or damage the host system. In this section, we'll build a robust execution sandbox that enables safe code execution.
Security First: Every command an agent executes should be treated as potentially hostile. Defense in depth—multiple layers of protection—is essential for production systems.
Why Sandboxing Matters
Consider these scenarios where unsandboxed execution could go wrong:
| Scenario | Risk | Consequence |
|---|---|---|
| Infinite loop in generated code | Resource exhaustion | System freeze, denial of service |
| rm -rf / in shell command | Data destruction | Complete system wipe |
| Fork bomb | Process exhaustion | System becomes unresponsive |
| Network requests to internal services | Data exfiltration | Sensitive data leaked |
| Reading /etc/passwd | Information disclosure | User enumeration |
| Installing malicious packages | Supply chain attack | Persistent compromise |
Attack Vectors
Attacks can come from multiple sources:
- Direct prompting: Malicious user requests the agent to execute harmful commands
- Prompt injection: Malicious content in files or web pages tricks the agent
- Accidental harm: Well-intentioned but buggy generated code causes damage
- Dependency attacks: Installed packages contain malicious code
Sandboxing Strategies
Different sandboxing approaches offer varying levels of isolation and convenience:
| Strategy | Isolation Level | Performance | Complexity |
|---|---|---|---|
| Command allowlist | Low | High | Low |
| Process limits (ulimit) | Medium | High | Low |
| Subprocess with restrictions | Medium | High | Medium |
| Docker containers | High | Medium | Medium |
| Firecracker microVMs | Very High | Medium | High |
| gVisor/Kata containers | Very High | Medium | High |
| Remote execution (cloud) | Complete | Low | High |
Defense in Depth
The best approach combines multiple strategies:
1from dataclasses import dataclass
2from typing import List, Optional, Set
3from enum import Enum
4
5
6class IsolationLevel(Enum):
7 """Levels of execution isolation."""
8 NONE = "none" # Direct execution (development only)
9 RESTRICTED = "restricted" # Allowlist + resource limits
10 CONTAINER = "container" # Docker isolation
11 MICROVM = "microvm" # Firecracker/gVisor
12 REMOTE = "remote" # Cloud sandbox
13
14
15@dataclass
16class SandboxConfig:
17 """Configuration for the execution sandbox."""
18 isolation_level: IsolationLevel = IsolationLevel.RESTRICTED
19
20 # Command restrictions
21 allowed_commands: Set[str] = None
22 blocked_commands: Set[str] = None
23
24 # Resource limits
25 max_cpu_time: int = 30 # seconds
26 max_memory_mb: int = 512 # megabytes
27 max_processes: int = 50 # concurrent processes
28 max_file_size_mb: int = 100 # megabytes
29 max_open_files: int = 100 # file descriptors
30
31 # Network restrictions
32 allow_network: bool = False
33 allowed_hosts: List[str] = None
34
35 # Docker-specific
36 docker_image: str = "python:3.11-slim"
37 mount_workspace: bool = True
38 read_only_workspace: bool = False
39
40 def __post_init__(self):
41 if self.allowed_commands is None:
42 self.allowed_commands = {
43 # Build tools
44 "npm", "npx", "yarn", "pnpm",
45 "pip", "pip3", "python", "python3",
46 "node", "deno", "bun",
47 "go", "cargo", "rustc",
48 "make", "cmake",
49
50 # Testing
51 "pytest", "jest", "vitest", "mocha",
52 "go test", "cargo test",
53
54 # Linting
55 "eslint", "prettier", "black", "flake8",
56 "mypy", "tsc", "rustfmt", "clippy",
57
58 # Version control (read-only by default)
59 "git status", "git log", "git diff", "git branch",
60
61 # System (safe subset)
62 "ls", "cat", "head", "tail", "grep", "find",
63 "wc", "sort", "uniq", "diff",
64 }
65
66 if self.blocked_commands is None:
67 self.blocked_commands = {
68 # Destructive commands
69 "rm -rf", "rm -r /", "mkfs", "dd if=",
70
71 # Network tools (potential exfiltration)
72 "curl", "wget", "nc", "netcat",
73
74 # System modification
75 "chmod 777", "chown", "sudo", "su",
76
77 # Process control
78 "kill -9", "killall", "pkill",
79
80 # Dangerous git operations
81 "git push", "git remote add",
82 }Docker-Based Sandbox
Docker provides excellent isolation for code execution. Here's a complete Docker-based sandbox implementation:
1import asyncio
2import json
3import uuid
4from pathlib import Path
5from typing import Optional, Dict, Any, List
6from dataclasses import dataclass
7import tempfile
8import shutil
9
10
11@dataclass
12class ExecutionResult:
13 """Result of code execution in sandbox."""
14 success: bool
15 exit_code: int
16 stdout: str
17 stderr: str
18 execution_time: float
19 resource_usage: Dict[str, Any] = None
20 error: Optional[str] = None
21
22
23class DockerSandbox:
24 """
25 Docker-based sandbox for secure code execution.
26 """
27
28 DEFAULT_IMAGE = "python:3.11-slim"
29 CONTAINER_WORKSPACE = "/workspace"
30
31 def __init__(
32 self,
33 config: SandboxConfig,
34 workspace: Path
35 ):
36 self.config = config
37 self.workspace = Path(workspace).resolve()
38 self.container_id: Optional[str] = None
39
40 async def execute(
41 self,
42 command: str,
43 timeout: int = None,
44 env: Dict[str, str] = None
45 ) -> ExecutionResult:
46 """Execute a command in the Docker sandbox."""
47 timeout = timeout or self.config.max_cpu_time
48
49 # Validate command
50 validation = self._validate_command(command)
51 if not validation["allowed"]:
52 return ExecutionResult(
53 success=False,
54 exit_code=-1,
55 stdout="",
56 stderr="",
57 execution_time=0,
58 error=validation["reason"]
59 )
60
61 try:
62 # Build docker run command
63 docker_cmd = self._build_docker_command(command, env)
64
65 # Execute with timeout
66 start_time = asyncio.get_event_loop().time()
67
68 process = await asyncio.create_subprocess_shell(
69 docker_cmd,
70 stdout=asyncio.subprocess.PIPE,
71 stderr=asyncio.subprocess.PIPE
72 )
73
74 try:
75 stdout, stderr = await asyncio.wait_for(
76 process.communicate(),
77 timeout=timeout
78 )
79 except asyncio.TimeoutError:
80 process.kill()
81 await process.wait()
82 return ExecutionResult(
83 success=False,
84 exit_code=-1,
85 stdout="",
86 stderr="",
87 execution_time=timeout,
88 error=f"Execution timed out after {timeout}s"
89 )
90
91 execution_time = asyncio.get_event_loop().time() - start_time
92
93 return ExecutionResult(
94 success=process.returncode == 0,
95 exit_code=process.returncode,
96 stdout=stdout.decode("utf-8", errors="replace"),
97 stderr=stderr.decode("utf-8", errors="replace"),
98 execution_time=execution_time
99 )
100
101 except Exception as e:
102 return ExecutionResult(
103 success=False,
104 exit_code=-1,
105 stdout="",
106 stderr="",
107 execution_time=0,
108 error=str(e)
109 )
110
111 def _build_docker_command(
112 self,
113 command: str,
114 env: Dict[str, str] = None
115 ) -> str:
116 """Build the docker run command with security options."""
117 parts = ["docker", "run", "--rm"]
118
119 # Resource limits
120 parts.extend([
121 f"--memory={self.config.max_memory_mb}m",
122 f"--cpus=1",
123 f"--pids-limit={self.config.max_processes}",
124 "--ulimit", f"nofile={self.config.max_open_files}:{self.config.max_open_files}",
125 "--ulimit", f"fsize={self.config.max_file_size_mb * 1024 * 1024}",
126 ])
127
128 # Security options
129 parts.extend([
130 "--security-opt=no-new-privileges",
131 "--cap-drop=ALL",
132 "--read-only" if self.config.read_only_workspace else "",
133 ])
134
135 # Network isolation
136 if not self.config.allow_network:
137 parts.append("--network=none")
138
139 # Mount workspace
140 if self.config.mount_workspace:
141 mount_option = "ro" if self.config.read_only_workspace else "rw"
142 parts.append(
143 f"-v {self.workspace}:{self.CONTAINER_WORKSPACE}:{mount_option}"
144 )
145 parts.extend(["-w", self.CONTAINER_WORKSPACE])
146
147 # Environment variables
148 if env:
149 for key, value in env.items():
150 # Sanitize environment variables
151 if self._is_safe_env_var(key, value):
152 parts.extend(["-e", f"{key}={value}"])
153
154 # Image and command
155 parts.append(self.config.docker_image)
156 parts.extend(["sh", "-c", f'"{command}"'])
157
158 return " ".join(filter(None, parts))
159
160 def _validate_command(self, command: str) -> Dict[str, Any]:
161 """Validate command against allowlist/blocklist."""
162 # Check blocklist first
163 for blocked in self.config.blocked_commands:
164 if blocked in command:
165 return {
166 "allowed": False,
167 "reason": f"Command contains blocked pattern: {blocked}"
168 }
169
170 # Check if command starts with allowed command
171 cmd_parts = command.split()
172 if not cmd_parts:
173 return {"allowed": False, "reason": "Empty command"}
174
175 base_cmd = cmd_parts[0]
176
177 # Check direct match
178 if base_cmd in self.config.allowed_commands:
179 return {"allowed": True, "reason": None}
180
181 # Check if it's a path to an allowed command
182 if "/" in base_cmd:
183 base_cmd = base_cmd.split("/")[-1]
184 if base_cmd in self.config.allowed_commands:
185 return {"allowed": True, "reason": None}
186
187 return {
188 "allowed": False,
189 "reason": f"Command not in allowlist: {base_cmd}"
190 }
191
192 def _is_safe_env_var(self, key: str, value: str) -> bool:
193 """Check if environment variable is safe to pass."""
194 # Block potentially dangerous env vars
195 dangerous_prefixes = ["LD_", "DYLD_", "PATH=", "HOME=", "USER="]
196 for prefix in dangerous_prefixes:
197 if key.startswith(prefix):
198 return False
199
200 # Block env vars that could contain secrets
201 secret_patterns = ["KEY", "SECRET", "PASSWORD", "TOKEN", "CREDENTIAL"]
202 for pattern in secret_patterns:
203 if pattern in key.upper():
204 return False
205
206 return True
207
208 async def execute_script(
209 self,
210 script: str,
211 language: str,
212 timeout: int = None
213 ) -> ExecutionResult:
214 """Execute a script in the sandbox."""
215 # Create temporary script file
216 extensions = {
217 "python": ".py",
218 "javascript": ".js",
219 "typescript": ".ts",
220 "bash": ".sh",
221 }
222
223 ext = extensions.get(language, ".txt")
224 script_name = f"script_{uuid.uuid4().hex[:8]}{ext}"
225 script_path = self.workspace / script_name
226
227 try:
228 # Write script to workspace
229 script_path.write_text(script)
230
231 # Build execution command
232 commands = {
233 "python": f"python {script_name}",
234 "javascript": f"node {script_name}",
235 "typescript": f"npx ts-node {script_name}",
236 "bash": f"bash {script_name}",
237 }
238
239 command = commands.get(language, f"./{script_name}")
240 result = await self.execute(command, timeout)
241
242 return result
243
244 finally:
245 # Clean up script file
246 if script_path.exists():
247 script_path.unlink()
248
249 async def run_tests(
250 self,
251 test_command: str = None,
252 test_path: str = None
253 ) -> ExecutionResult:
254 """Run tests in the sandbox."""
255 if test_command:
256 return await self.execute(test_command)
257
258 # Auto-detect test framework
259 if (self.workspace / "pytest.ini").exists() or \
260 (self.workspace / "pyproject.toml").exists():
261 cmd = f"pytest {test_path or ''} -v"
262 elif (self.workspace / "package.json").exists():
263 # Check for test script in package.json
264 pkg = json.loads((self.workspace / "package.json").read_text())
265 if "test" in pkg.get("scripts", {}):
266 cmd = "npm test"
267 else:
268 cmd = f"npx jest {test_path or ''}"
269 else:
270 cmd = f"python -m pytest {test_path or ''}"
271
272 return await self.execute(cmd)
273
274 async def install_dependencies(self) -> ExecutionResult:
275 """Install project dependencies in the sandbox."""
276 # Detect package manager
277 if (self.workspace / "requirements.txt").exists():
278 return await self.execute("pip install -r requirements.txt")
279 elif (self.workspace / "pyproject.toml").exists():
280 return await self.execute("pip install -e .")
281 elif (self.workspace / "package-lock.json").exists():
282 return await self.execute("npm ci")
283 elif (self.workspace / "package.json").exists():
284 return await self.execute("npm install")
285 elif (self.workspace / "yarn.lock").exists():
286 return await self.execute("yarn install --frozen-lockfile")
287 else:
288 return ExecutionResult(
289 success=True,
290 exit_code=0,
291 stdout="No dependency file found",
292 stderr="",
293 execution_time=0
294 )Subprocess Isolation
When Docker isn't available, subprocess isolation with resource limits provides a lighter-weight alternative:
1import os
2import signal
3import resource
4import asyncio
5from typing import Optional, Dict, Any, Callable
6from dataclasses import dataclass
7import subprocess
8import tempfile
9
10
11class SubprocessSandbox:
12 """
13 Lightweight sandbox using subprocess with resource limits.
14 Works on Unix systems without Docker.
15 """
16
17 def __init__(self, config: SandboxConfig, workspace: Path):
18 self.config = config
19 self.workspace = Path(workspace).resolve()
20
21 async def execute(
22 self,
23 command: str,
24 timeout: int = None,
25 env: Dict[str, str] = None
26 ) -> ExecutionResult:
27 """Execute command with resource limits."""
28 timeout = timeout or self.config.max_cpu_time
29
30 # Validate command
31 if not self._validate_command(command):
32 return ExecutionResult(
33 success=False,
34 exit_code=-1,
35 stdout="",
36 stderr="",
37 execution_time=0,
38 error="Command not allowed"
39 )
40
41 # Prepare environment
42 safe_env = self._prepare_environment(env)
43
44 try:
45 start_time = asyncio.get_event_loop().time()
46
47 # Create subprocess with resource limits
48 process = await asyncio.create_subprocess_shell(
49 command,
50 stdout=asyncio.subprocess.PIPE,
51 stderr=asyncio.subprocess.PIPE,
52 cwd=self.workspace,
53 env=safe_env,
54 preexec_fn=self._set_resource_limits
55 )
56
57 try:
58 stdout, stderr = await asyncio.wait_for(
59 process.communicate(),
60 timeout=timeout
61 )
62 except asyncio.TimeoutError:
63 # Kill the process group
64 try:
65 os.killpg(os.getpgid(process.pid), signal.SIGKILL)
66 except:
67 process.kill()
68 await process.wait()
69
70 return ExecutionResult(
71 success=False,
72 exit_code=-1,
73 stdout="",
74 stderr="",
75 execution_time=timeout,
76 error=f"Timeout after {timeout}s"
77 )
78
79 execution_time = asyncio.get_event_loop().time() - start_time
80
81 return ExecutionResult(
82 success=process.returncode == 0,
83 exit_code=process.returncode,
84 stdout=stdout.decode("utf-8", errors="replace"),
85 stderr=stderr.decode("utf-8", errors="replace"),
86 execution_time=execution_time
87 )
88
89 except Exception as e:
90 return ExecutionResult(
91 success=False,
92 exit_code=-1,
93 stdout="",
94 stderr="",
95 execution_time=0,
96 error=str(e)
97 )
98
99 def _set_resource_limits(self):
100 """Set resource limits for the subprocess (called in child process)."""
101 # Create new process group for clean termination
102 os.setpgrp()
103
104 # CPU time limit
105 resource.setrlimit(
106 resource.RLIMIT_CPU,
107 (self.config.max_cpu_time, self.config.max_cpu_time + 5)
108 )
109
110 # Memory limit
111 memory_bytes = self.config.max_memory_mb * 1024 * 1024
112 resource.setrlimit(
113 resource.RLIMIT_AS,
114 (memory_bytes, memory_bytes)
115 )
116
117 # Process limit
118 resource.setrlimit(
119 resource.RLIMIT_NPROC,
120 (self.config.max_processes, self.config.max_processes)
121 )
122
123 # File descriptor limit
124 resource.setrlimit(
125 resource.RLIMIT_NOFILE,
126 (self.config.max_open_files, self.config.max_open_files)
127 )
128
129 # File size limit
130 file_size_bytes = self.config.max_file_size_mb * 1024 * 1024
131 resource.setrlimit(
132 resource.RLIMIT_FSIZE,
133 (file_size_bytes, file_size_bytes)
134 )
135
136 def _prepare_environment(self, env: Dict[str, str] = None) -> Dict[str, str]:
137 """Prepare a safe environment for execution."""
138 # Start with minimal environment
139 safe_env = {
140 "PATH": "/usr/local/bin:/usr/bin:/bin",
141 "HOME": str(self.workspace),
142 "LANG": "C.UTF-8",
143 "TERM": "xterm",
144 }
145
146 # Add project-specific paths
147 if (self.workspace / "node_modules" / ".bin").exists():
148 safe_env["PATH"] = f"{self.workspace}/node_modules/.bin:" + safe_env["PATH"]
149
150 if (self.workspace / ".venv" / "bin").exists():
151 safe_env["PATH"] = f"{self.workspace}/.venv/bin:" + safe_env["PATH"]
152 safe_env["VIRTUAL_ENV"] = str(self.workspace / ".venv")
153
154 # Add user-provided env vars (filtered)
155 if env:
156 for key, value in env.items():
157 if self._is_safe_env_var(key):
158 safe_env[key] = value
159
160 return safe_env
161
162 def _validate_command(self, command: str) -> bool:
163 """Validate command against security rules."""
164 # Check blocklist
165 for blocked in self.config.blocked_commands:
166 if blocked in command:
167 return False
168
169 # Check for shell injection patterns
170 dangerous_patterns = [
171 "$(", "`", # Command substitution
172 "&&", "||", ";", # Command chaining (unless in quotes)
173 ">", "<", "|", # Redirection (unless in quotes)
174 "\n", # Newlines
175 ]
176
177 # Simple check - in production, use proper shell parsing
178 for pattern in dangerous_patterns:
179 # Allow if in quotes (simplified check)
180 if pattern in command and not self._is_in_quotes(command, pattern):
181 # Some patterns are okay in certain contexts
182 if pattern in ["&&", "||", ";", "|", ">", "<"]:
183 continue # Allow for now, could be more restrictive
184 return False
185
186 return True
187
188 def _is_in_quotes(self, text: str, pattern: str) -> bool:
189 """Check if pattern is inside quotes (simplified)."""
190 idx = text.find(pattern)
191 if idx == -1:
192 return False
193
194 before = text[:idx]
195 single_quotes = before.count("'") % 2
196 double_quotes = before.count('"') % 2
197
198 return single_quotes == 1 or double_quotes == 1
199
200 def _is_safe_env_var(self, key: str) -> bool:
201 """Check if environment variable key is safe."""
202 dangerous = {"LD_PRELOAD", "LD_LIBRARY_PATH", "DYLD_INSERT_LIBRARIES"}
203 return key not in dangerous and not key.startswith("LD_")Command Execution Framework
A unified framework allows the agent to use the best available sandbox:
1from abc import ABC, abstractmethod
2from typing import Optional, Dict, Any, Union
3from pathlib import Path
4import asyncio
5
6
7class Sandbox(ABC):
8 """Abstract base class for sandboxes."""
9
10 @abstractmethod
11 async def execute(
12 self,
13 command: str,
14 timeout: int = None,
15 env: Dict[str, str] = None
16 ) -> ExecutionResult:
17 pass
18
19 @abstractmethod
20 async def execute_script(
21 self,
22 script: str,
23 language: str,
24 timeout: int = None
25 ) -> ExecutionResult:
26 pass
27
28
29class SandboxManager:
30 """
31 Manages sandbox selection and lifecycle.
32 """
33
34 def __init__(
35 self,
36 workspace: Path,
37 config: SandboxConfig = None
38 ):
39 self.workspace = Path(workspace).resolve()
40 self.config = config or SandboxConfig()
41 self._sandbox: Optional[Sandbox] = None
42
43 async def initialize(self):
44 """Initialize the appropriate sandbox based on config and availability."""
45 if self.config.isolation_level == IsolationLevel.CONTAINER:
46 if await self._docker_available():
47 self._sandbox = DockerSandbox(self.config, self.workspace)
48 else:
49 # Fallback to subprocess
50 print("Docker not available, falling back to subprocess isolation")
51 self._sandbox = SubprocessSandbox(self.config, self.workspace)
52
53 elif self.config.isolation_level == IsolationLevel.RESTRICTED:
54 self._sandbox = SubprocessSandbox(self.config, self.workspace)
55
56 elif self.config.isolation_level == IsolationLevel.NONE:
57 self._sandbox = DirectSandbox(self.config, self.workspace)
58
59 else:
60 raise ValueError(f"Unsupported isolation level: {self.config.isolation_level}")
61
62 async def _docker_available(self) -> bool:
63 """Check if Docker is available."""
64 try:
65 process = await asyncio.create_subprocess_shell(
66 "docker info",
67 stdout=asyncio.subprocess.DEVNULL,
68 stderr=asyncio.subprocess.DEVNULL
69 )
70 await process.wait()
71 return process.returncode == 0
72 except:
73 return False
74
75 async def execute(
76 self,
77 command: str,
78 timeout: int = None,
79 env: Dict[str, str] = None
80 ) -> ExecutionResult:
81 """Execute a command in the sandbox."""
82 if not self._sandbox:
83 await self.initialize()
84
85 return await self._sandbox.execute(command, timeout, env)
86
87 async def execute_script(
88 self,
89 script: str,
90 language: str,
91 timeout: int = None
92 ) -> ExecutionResult:
93 """Execute a script in the sandbox."""
94 if not self._sandbox:
95 await self.initialize()
96
97 return await self._sandbox.execute_script(script, language, timeout)
98
99 async def run_tests(
100 self,
101 test_command: str = None,
102 test_path: str = None
103 ) -> Dict[str, Any]:
104 """Run tests and parse results."""
105 if not self._sandbox:
106 await self.initialize()
107
108 if hasattr(self._sandbox, "run_tests"):
109 result = await self._sandbox.run_tests(test_command, test_path)
110 else:
111 result = await self._sandbox.execute(test_command or "pytest -v")
112
113 # Parse test results
114 return self._parse_test_results(result)
115
116 def _parse_test_results(self, result: ExecutionResult) -> Dict[str, Any]:
117 """Parse test output into structured results."""
118 output = result.stdout + result.stderr
119
120 # Try to parse pytest output
121 pytest_patterns = {
122 "passed": r"(\d+) passed",
123 "failed": r"(\d+) failed",
124 "error": r"(\d+) error",
125 "skipped": r"(\d+) skipped",
126 }
127
128 import re
129 parsed = {"raw_output": output, "success": result.success}
130
131 for key, pattern in pytest_patterns.items():
132 match = re.search(pattern, output)
133 parsed[key] = int(match.group(1)) if match else 0
134
135 # Extract failure details
136 if not result.success:
137 failure_match = re.search(
138 r"FAILED (.+?) - (.+?)(?:\n|$)",
139 output,
140 re.MULTILINE
141 )
142 if failure_match:
143 parsed["first_failure"] = {
144 "test": failure_match.group(1),
145 "reason": failure_match.group(2)
146 }
147
148 return parsed
149
150
151class DirectSandbox(Sandbox):
152 """
153 Direct execution without sandboxing.
154 FOR DEVELOPMENT ONLY - never use in production!
155 """
156
157 def __init__(self, config: SandboxConfig, workspace: Path):
158 self.config = config
159 self.workspace = workspace
160
161 async def execute(
162 self,
163 command: str,
164 timeout: int = None,
165 env: Dict[str, str] = None
166 ) -> ExecutionResult:
167 """Execute command directly (unsafe!)."""
168 timeout = timeout or 30
169
170 process = await asyncio.create_subprocess_shell(
171 command,
172 stdout=asyncio.subprocess.PIPE,
173 stderr=asyncio.subprocess.PIPE,
174 cwd=self.workspace,
175 env={**os.environ, **(env or {})}
176 )
177
178 try:
179 stdout, stderr = await asyncio.wait_for(
180 process.communicate(),
181 timeout=timeout
182 )
183 except asyncio.TimeoutError:
184 process.kill()
185 return ExecutionResult(
186 success=False,
187 exit_code=-1,
188 stdout="",
189 stderr="",
190 execution_time=timeout,
191 error="Timeout"
192 )
193
194 return ExecutionResult(
195 success=process.returncode == 0,
196 exit_code=process.returncode,
197 stdout=stdout.decode(),
198 stderr=stderr.decode(),
199 execution_time=0
200 )
201
202 async def execute_script(
203 self,
204 script: str,
205 language: str,
206 timeout: int = None
207 ) -> ExecutionResult:
208 """Execute script directly (unsafe!)."""
209 # Write to temp file and execute
210 import tempfile
211
212 suffix = {
213 "python": ".py",
214 "javascript": ".js",
215 "bash": ".sh"
216 }.get(language, ".txt")
217
218 with tempfile.NamedTemporaryFile(
219 suffix=suffix,
220 delete=False,
221 mode="w"
222 ) as f:
223 f.write(script)
224 script_path = f.name
225
226 try:
227 interpreters = {
228 "python": f"python {script_path}",
229 "javascript": f"node {script_path}",
230 "bash": f"bash {script_path}"
231 }
232 command = interpreters.get(language, script_path)
233 return await self.execute(command, timeout)
234 finally:
235 os.unlink(script_path)SandboxManager automatically selects the best available sandbox. In production, ensure Docker is always available for maximum isolation.Resource Limits and Timeouts
Proper resource management prevents runaway processes from affecting system stability:
1from dataclasses import dataclass
2from typing import Optional, Callable
3import asyncio
4import psutil
5import os
6
7
8@dataclass
9class ResourceUsage:
10 """Track resource usage during execution."""
11 cpu_time: float # CPU seconds used
12 memory_peak_mb: float # Peak memory usage
13 io_read_mb: float # Bytes read
14 io_write_mb: float # Bytes written
15 wall_time: float # Wall clock time
16
17
18class ResourceMonitor:
19 """
20 Monitor and limit resource usage during execution.
21 """
22
23 def __init__(
24 self,
25 max_cpu_seconds: int = 30,
26 max_memory_mb: int = 512,
27 check_interval: float = 0.5
28 ):
29 self.max_cpu_seconds = max_cpu_seconds
30 self.max_memory_mb = max_memory_mb
31 self.check_interval = check_interval
32
33 self._monitoring = False
34 self._process: Optional[psutil.Process] = None
35 self._usage = ResourceUsage(0, 0, 0, 0, 0)
36
37 async def monitor_process(
38 self,
39 pid: int,
40 on_limit_exceeded: Callable[[str], None] = None
41 ):
42 """Monitor a process for resource limits."""
43 try:
44 self._process = psutil.Process(pid)
45 self._monitoring = True
46 start_time = asyncio.get_event_loop().time()
47
48 while self._monitoring:
49 await asyncio.sleep(self.check_interval)
50
51 try:
52 # Check if process still exists
53 if not self._process.is_running():
54 break
55
56 # Get resource usage
57 with self._process.oneshot():
58 cpu_times = self._process.cpu_times()
59 memory_info = self._process.memory_info()
60 io_counters = self._process.io_counters()
61
62 # Update usage tracking
63 self._usage.cpu_time = cpu_times.user + cpu_times.system
64 self._usage.memory_peak_mb = max(
65 self._usage.memory_peak_mb,
66 memory_info.rss / (1024 * 1024)
67 )
68 self._usage.io_read_mb = io_counters.read_bytes / (1024 * 1024)
69 self._usage.io_write_mb = io_counters.write_bytes / (1024 * 1024)
70 self._usage.wall_time = asyncio.get_event_loop().time() - start_time
71
72 # Check limits
73 if self._usage.cpu_time > self.max_cpu_seconds:
74 self._kill_process("CPU time limit exceeded")
75 if on_limit_exceeded:
76 on_limit_exceeded("cpu")
77 break
78
79 if self._usage.memory_peak_mb > self.max_memory_mb:
80 self._kill_process("Memory limit exceeded")
81 if on_limit_exceeded:
82 on_limit_exceeded("memory")
83 break
84
85 except psutil.NoSuchProcess:
86 break
87 except psutil.AccessDenied:
88 break
89
90 except Exception as e:
91 pass
92 finally:
93 self._monitoring = False
94
95 def _kill_process(self, reason: str):
96 """Kill the monitored process and its children."""
97 if not self._process:
98 return
99
100 try:
101 # Kill child processes first
102 children = self._process.children(recursive=True)
103 for child in children:
104 try:
105 child.kill()
106 except:
107 pass
108
109 # Kill main process
110 self._process.kill()
111 except:
112 pass
113
114 def stop(self):
115 """Stop monitoring."""
116 self._monitoring = False
117
118 def get_usage(self) -> ResourceUsage:
119 """Get current resource usage."""
120 return self._usage
121
122
123class TimeoutManager:
124 """
125 Manage execution timeouts with graceful shutdown.
126 """
127
128 def __init__(
129 self,
130 soft_timeout: int = 25,
131 hard_timeout: int = 30
132 ):
133 self.soft_timeout = soft_timeout
134 self.hard_timeout = hard_timeout
135
136 async def execute_with_timeout(
137 self,
138 coro,
139 on_soft_timeout: Callable = None
140 ):
141 """
142 Execute coroutine with soft and hard timeouts.
143
144 Soft timeout sends a warning/signal.
145 Hard timeout forcefully terminates.
146 """
147 try:
148 # First, try with soft timeout
149 result = await asyncio.wait_for(coro, timeout=self.soft_timeout)
150 return result
151 except asyncio.TimeoutError:
152 # Soft timeout reached
153 if on_soft_timeout:
154 on_soft_timeout()
155
156 # Give a little more time (hard - soft)
157 remaining = self.hard_timeout - self.soft_timeout
158 try:
159 result = await asyncio.wait_for(coro, timeout=remaining)
160 return result
161 except asyncio.TimeoutError:
162 # Hard timeout reached
163 raise TimeoutError(
164 f"Execution exceeded hard timeout of {self.hard_timeout}s"
165 )
166
167
168class ExecutionPool:
169 """
170 Manage concurrent execution with global resource limits.
171 """
172
173 def __init__(
174 self,
175 max_concurrent: int = 5,
176 max_total_memory_mb: int = 2048
177 ):
178 self.max_concurrent = max_concurrent
179 self.max_total_memory_mb = max_total_memory_mb
180
181 self._semaphore = asyncio.Semaphore(max_concurrent)
182 self._current_memory = 0
183 self._lock = asyncio.Lock()
184
185 async def execute(
186 self,
187 sandbox: Sandbox,
188 command: str,
189 memory_estimate_mb: int = 256
190 ) -> ExecutionResult:
191 """Execute with global resource management."""
192 # Wait for slot
193 async with self._semaphore:
194 # Check memory budget
195 async with self._lock:
196 if self._current_memory + memory_estimate_mb > self.max_total_memory_mb:
197 return ExecutionResult(
198 success=False,
199 exit_code=-1,
200 stdout="",
201 stderr="",
202 execution_time=0,
203 error="Global memory limit would be exceeded"
204 )
205 self._current_memory += memory_estimate_mb
206
207 try:
208 result = await sandbox.execute(command)
209 return result
210 finally:
211 async with self._lock:
212 self._current_memory -= memory_estimate_mbSummary
In this section, we built a comprehensive code execution sandbox for our coding agent:
- Sandboxing Strategies: From command allowlists to Docker containers, understanding the security/convenience tradeoffs
- Docker Sandbox: Full container isolation with resource limits, network restrictions, and filesystem mounting
- Subprocess Isolation: Lighter-weight protection using ulimits and process groups for when Docker isn't available
- Unified Framework: A sandbox manager that selects the best available isolation strategy
- Resource Management: Monitoring and limiting CPU, memory, and I/O with soft and hard timeouts
In the next section, we'll add Git integration to our coding agent, enabling it to create branches, commits, and track changes.