Chapter 12
25 min read
Section 73 of 175

Code Execution Sandbox

Building a Coding Agent

Introduction

A coding agent that can execute code is powerful but dangerous. Without proper sandboxing, an AI agent could accidentally (or through prompt injection) execute malicious commands, consume excessive resources, or damage the host system. In this section, we'll build a robust execution sandbox that enables safe code execution.

Security First: Every command an agent executes should be treated as potentially hostile. Defense in depth—multiple layers of protection—is essential for production systems.

Why Sandboxing Matters

Consider these scenarios where unsandboxed execution could go wrong:

ScenarioRiskConsequence
Infinite loop in generated codeResource exhaustionSystem freeze, denial of service
rm -rf / in shell commandData destructionComplete system wipe
Fork bombProcess exhaustionSystem becomes unresponsive
Network requests to internal servicesData exfiltrationSensitive data leaked
Reading /etc/passwdInformation disclosureUser enumeration
Installing malicious packagesSupply chain attackPersistent compromise

Attack Vectors

Attacks can come from multiple sources:

  1. Direct prompting: Malicious user requests the agent to execute harmful commands
  2. Prompt injection: Malicious content in files or web pages tricks the agent
  3. Accidental harm: Well-intentioned but buggy generated code causes damage
  4. Dependency attacks: Installed packages contain malicious code

Sandboxing Strategies

Different sandboxing approaches offer varying levels of isolation and convenience:

StrategyIsolation LevelPerformanceComplexity
Command allowlistLowHighLow
Process limits (ulimit)MediumHighLow
Subprocess with restrictionsMediumHighMedium
Docker containersHighMediumMedium
Firecracker microVMsVery HighMediumHigh
gVisor/Kata containersVery HighMediumHigh
Remote execution (cloud)CompleteLowHigh

Defense in Depth

The best approach combines multiple strategies:

🐍python
1from dataclasses import dataclass
2from typing import List, Optional, Set
3from enum import Enum
4
5
6class IsolationLevel(Enum):
7    """Levels of execution isolation."""
8    NONE = "none"              # Direct execution (development only)
9    RESTRICTED = "restricted"  # Allowlist + resource limits
10    CONTAINER = "container"    # Docker isolation
11    MICROVM = "microvm"        # Firecracker/gVisor
12    REMOTE = "remote"          # Cloud sandbox
13
14
15@dataclass
16class SandboxConfig:
17    """Configuration for the execution sandbox."""
18    isolation_level: IsolationLevel = IsolationLevel.RESTRICTED
19
20    # Command restrictions
21    allowed_commands: Set[str] = None
22    blocked_commands: Set[str] = None
23
24    # Resource limits
25    max_cpu_time: int = 30      # seconds
26    max_memory_mb: int = 512    # megabytes
27    max_processes: int = 50     # concurrent processes
28    max_file_size_mb: int = 100 # megabytes
29    max_open_files: int = 100   # file descriptors
30
31    # Network restrictions
32    allow_network: bool = False
33    allowed_hosts: List[str] = None
34
35    # Docker-specific
36    docker_image: str = "python:3.11-slim"
37    mount_workspace: bool = True
38    read_only_workspace: bool = False
39
40    def __post_init__(self):
41        if self.allowed_commands is None:
42            self.allowed_commands = {
43                # Build tools
44                "npm", "npx", "yarn", "pnpm",
45                "pip", "pip3", "python", "python3",
46                "node", "deno", "bun",
47                "go", "cargo", "rustc",
48                "make", "cmake",
49
50                # Testing
51                "pytest", "jest", "vitest", "mocha",
52                "go test", "cargo test",
53
54                # Linting
55                "eslint", "prettier", "black", "flake8",
56                "mypy", "tsc", "rustfmt", "clippy",
57
58                # Version control (read-only by default)
59                "git status", "git log", "git diff", "git branch",
60
61                # System (safe subset)
62                "ls", "cat", "head", "tail", "grep", "find",
63                "wc", "sort", "uniq", "diff",
64            }
65
66        if self.blocked_commands is None:
67            self.blocked_commands = {
68                # Destructive commands
69                "rm -rf", "rm -r /", "mkfs", "dd if=",
70
71                # Network tools (potential exfiltration)
72                "curl", "wget", "nc", "netcat",
73
74                # System modification
75                "chmod 777", "chown", "sudo", "su",
76
77                # Process control
78                "kill -9", "killall", "pkill",
79
80                # Dangerous git operations
81                "git push", "git remote add",
82            }

Docker-Based Sandbox

Docker provides excellent isolation for code execution. Here's a complete Docker-based sandbox implementation:

🐍python
1import asyncio
2import json
3import uuid
4from pathlib import Path
5from typing import Optional, Dict, Any, List
6from dataclasses import dataclass
7import tempfile
8import shutil
9
10
11@dataclass
12class ExecutionResult:
13    """Result of code execution in sandbox."""
14    success: bool
15    exit_code: int
16    stdout: str
17    stderr: str
18    execution_time: float
19    resource_usage: Dict[str, Any] = None
20    error: Optional[str] = None
21
22
23class DockerSandbox:
24    """
25    Docker-based sandbox for secure code execution.
26    """
27
28    DEFAULT_IMAGE = "python:3.11-slim"
29    CONTAINER_WORKSPACE = "/workspace"
30
31    def __init__(
32        self,
33        config: SandboxConfig,
34        workspace: Path
35    ):
36        self.config = config
37        self.workspace = Path(workspace).resolve()
38        self.container_id: Optional[str] = None
39
40    async def execute(
41        self,
42        command: str,
43        timeout: int = None,
44        env: Dict[str, str] = None
45    ) -> ExecutionResult:
46        """Execute a command in the Docker sandbox."""
47        timeout = timeout or self.config.max_cpu_time
48
49        # Validate command
50        validation = self._validate_command(command)
51        if not validation["allowed"]:
52            return ExecutionResult(
53                success=False,
54                exit_code=-1,
55                stdout="",
56                stderr="",
57                execution_time=0,
58                error=validation["reason"]
59            )
60
61        try:
62            # Build docker run command
63            docker_cmd = self._build_docker_command(command, env)
64
65            # Execute with timeout
66            start_time = asyncio.get_event_loop().time()
67
68            process = await asyncio.create_subprocess_shell(
69                docker_cmd,
70                stdout=asyncio.subprocess.PIPE,
71                stderr=asyncio.subprocess.PIPE
72            )
73
74            try:
75                stdout, stderr = await asyncio.wait_for(
76                    process.communicate(),
77                    timeout=timeout
78                )
79            except asyncio.TimeoutError:
80                process.kill()
81                await process.wait()
82                return ExecutionResult(
83                    success=False,
84                    exit_code=-1,
85                    stdout="",
86                    stderr="",
87                    execution_time=timeout,
88                    error=f"Execution timed out after {timeout}s"
89                )
90
91            execution_time = asyncio.get_event_loop().time() - start_time
92
93            return ExecutionResult(
94                success=process.returncode == 0,
95                exit_code=process.returncode,
96                stdout=stdout.decode("utf-8", errors="replace"),
97                stderr=stderr.decode("utf-8", errors="replace"),
98                execution_time=execution_time
99            )
100
101        except Exception as e:
102            return ExecutionResult(
103                success=False,
104                exit_code=-1,
105                stdout="",
106                stderr="",
107                execution_time=0,
108                error=str(e)
109            )
110
111    def _build_docker_command(
112        self,
113        command: str,
114        env: Dict[str, str] = None
115    ) -> str:
116        """Build the docker run command with security options."""
117        parts = ["docker", "run", "--rm"]
118
119        # Resource limits
120        parts.extend([
121            f"--memory={self.config.max_memory_mb}m",
122            f"--cpus=1",
123            f"--pids-limit={self.config.max_processes}",
124            "--ulimit", f"nofile={self.config.max_open_files}:{self.config.max_open_files}",
125            "--ulimit", f"fsize={self.config.max_file_size_mb * 1024 * 1024}",
126        ])
127
128        # Security options
129        parts.extend([
130            "--security-opt=no-new-privileges",
131            "--cap-drop=ALL",
132            "--read-only" if self.config.read_only_workspace else "",
133        ])
134
135        # Network isolation
136        if not self.config.allow_network:
137            parts.append("--network=none")
138
139        # Mount workspace
140        if self.config.mount_workspace:
141            mount_option = "ro" if self.config.read_only_workspace else "rw"
142            parts.append(
143                f"-v {self.workspace}:{self.CONTAINER_WORKSPACE}:{mount_option}"
144            )
145            parts.extend(["-w", self.CONTAINER_WORKSPACE])
146
147        # Environment variables
148        if env:
149            for key, value in env.items():
150                # Sanitize environment variables
151                if self._is_safe_env_var(key, value):
152                    parts.extend(["-e", f"{key}={value}"])
153
154        # Image and command
155        parts.append(self.config.docker_image)
156        parts.extend(["sh", "-c", f'"{command}"'])
157
158        return " ".join(filter(None, parts))
159
160    def _validate_command(self, command: str) -> Dict[str, Any]:
161        """Validate command against allowlist/blocklist."""
162        # Check blocklist first
163        for blocked in self.config.blocked_commands:
164            if blocked in command:
165                return {
166                    "allowed": False,
167                    "reason": f"Command contains blocked pattern: {blocked}"
168                }
169
170        # Check if command starts with allowed command
171        cmd_parts = command.split()
172        if not cmd_parts:
173            return {"allowed": False, "reason": "Empty command"}
174
175        base_cmd = cmd_parts[0]
176
177        # Check direct match
178        if base_cmd in self.config.allowed_commands:
179            return {"allowed": True, "reason": None}
180
181        # Check if it's a path to an allowed command
182        if "/" in base_cmd:
183            base_cmd = base_cmd.split("/")[-1]
184            if base_cmd in self.config.allowed_commands:
185                return {"allowed": True, "reason": None}
186
187        return {
188            "allowed": False,
189            "reason": f"Command not in allowlist: {base_cmd}"
190        }
191
192    def _is_safe_env_var(self, key: str, value: str) -> bool:
193        """Check if environment variable is safe to pass."""
194        # Block potentially dangerous env vars
195        dangerous_prefixes = ["LD_", "DYLD_", "PATH=", "HOME=", "USER="]
196        for prefix in dangerous_prefixes:
197            if key.startswith(prefix):
198                return False
199
200        # Block env vars that could contain secrets
201        secret_patterns = ["KEY", "SECRET", "PASSWORD", "TOKEN", "CREDENTIAL"]
202        for pattern in secret_patterns:
203            if pattern in key.upper():
204                return False
205
206        return True
207
208    async def execute_script(
209        self,
210        script: str,
211        language: str,
212        timeout: int = None
213    ) -> ExecutionResult:
214        """Execute a script in the sandbox."""
215        # Create temporary script file
216        extensions = {
217            "python": ".py",
218            "javascript": ".js",
219            "typescript": ".ts",
220            "bash": ".sh",
221        }
222
223        ext = extensions.get(language, ".txt")
224        script_name = f"script_{uuid.uuid4().hex[:8]}{ext}"
225        script_path = self.workspace / script_name
226
227        try:
228            # Write script to workspace
229            script_path.write_text(script)
230
231            # Build execution command
232            commands = {
233                "python": f"python {script_name}",
234                "javascript": f"node {script_name}",
235                "typescript": f"npx ts-node {script_name}",
236                "bash": f"bash {script_name}",
237            }
238
239            command = commands.get(language, f"./{script_name}")
240            result = await self.execute(command, timeout)
241
242            return result
243
244        finally:
245            # Clean up script file
246            if script_path.exists():
247                script_path.unlink()
248
249    async def run_tests(
250        self,
251        test_command: str = None,
252        test_path: str = None
253    ) -> ExecutionResult:
254        """Run tests in the sandbox."""
255        if test_command:
256            return await self.execute(test_command)
257
258        # Auto-detect test framework
259        if (self.workspace / "pytest.ini").exists() or \
260           (self.workspace / "pyproject.toml").exists():
261            cmd = f"pytest {test_path or ''} -v"
262        elif (self.workspace / "package.json").exists():
263            # Check for test script in package.json
264            pkg = json.loads((self.workspace / "package.json").read_text())
265            if "test" in pkg.get("scripts", {}):
266                cmd = "npm test"
267            else:
268                cmd = f"npx jest {test_path or ''}"
269        else:
270            cmd = f"python -m pytest {test_path or ''}"
271
272        return await self.execute(cmd)
273
274    async def install_dependencies(self) -> ExecutionResult:
275        """Install project dependencies in the sandbox."""
276        # Detect package manager
277        if (self.workspace / "requirements.txt").exists():
278            return await self.execute("pip install -r requirements.txt")
279        elif (self.workspace / "pyproject.toml").exists():
280            return await self.execute("pip install -e .")
281        elif (self.workspace / "package-lock.json").exists():
282            return await self.execute("npm ci")
283        elif (self.workspace / "package.json").exists():
284            return await self.execute("npm install")
285        elif (self.workspace / "yarn.lock").exists():
286            return await self.execute("yarn install --frozen-lockfile")
287        else:
288            return ExecutionResult(
289                success=True,
290                exit_code=0,
291                stdout="No dependency file found",
292                stderr="",
293                execution_time=0
294            )
For production systems, consider using pre-built Docker images with dependencies already installed. This significantly reduces execution time and eliminates supply chain risks from installing packages at runtime.

Subprocess Isolation

When Docker isn't available, subprocess isolation with resource limits provides a lighter-weight alternative:

🐍python
1import os
2import signal
3import resource
4import asyncio
5from typing import Optional, Dict, Any, Callable
6from dataclasses import dataclass
7import subprocess
8import tempfile
9
10
11class SubprocessSandbox:
12    """
13    Lightweight sandbox using subprocess with resource limits.
14    Works on Unix systems without Docker.
15    """
16
17    def __init__(self, config: SandboxConfig, workspace: Path):
18        self.config = config
19        self.workspace = Path(workspace).resolve()
20
21    async def execute(
22        self,
23        command: str,
24        timeout: int = None,
25        env: Dict[str, str] = None
26    ) -> ExecutionResult:
27        """Execute command with resource limits."""
28        timeout = timeout or self.config.max_cpu_time
29
30        # Validate command
31        if not self._validate_command(command):
32            return ExecutionResult(
33                success=False,
34                exit_code=-1,
35                stdout="",
36                stderr="",
37                execution_time=0,
38                error="Command not allowed"
39            )
40
41        # Prepare environment
42        safe_env = self._prepare_environment(env)
43
44        try:
45            start_time = asyncio.get_event_loop().time()
46
47            # Create subprocess with resource limits
48            process = await asyncio.create_subprocess_shell(
49                command,
50                stdout=asyncio.subprocess.PIPE,
51                stderr=asyncio.subprocess.PIPE,
52                cwd=self.workspace,
53                env=safe_env,
54                preexec_fn=self._set_resource_limits
55            )
56
57            try:
58                stdout, stderr = await asyncio.wait_for(
59                    process.communicate(),
60                    timeout=timeout
61                )
62            except asyncio.TimeoutError:
63                # Kill the process group
64                try:
65                    os.killpg(os.getpgid(process.pid), signal.SIGKILL)
66                except:
67                    process.kill()
68                await process.wait()
69
70                return ExecutionResult(
71                    success=False,
72                    exit_code=-1,
73                    stdout="",
74                    stderr="",
75                    execution_time=timeout,
76                    error=f"Timeout after {timeout}s"
77                )
78
79            execution_time = asyncio.get_event_loop().time() - start_time
80
81            return ExecutionResult(
82                success=process.returncode == 0,
83                exit_code=process.returncode,
84                stdout=stdout.decode("utf-8", errors="replace"),
85                stderr=stderr.decode("utf-8", errors="replace"),
86                execution_time=execution_time
87            )
88
89        except Exception as e:
90            return ExecutionResult(
91                success=False,
92                exit_code=-1,
93                stdout="",
94                stderr="",
95                execution_time=0,
96                error=str(e)
97            )
98
99    def _set_resource_limits(self):
100        """Set resource limits for the subprocess (called in child process)."""
101        # Create new process group for clean termination
102        os.setpgrp()
103
104        # CPU time limit
105        resource.setrlimit(
106            resource.RLIMIT_CPU,
107            (self.config.max_cpu_time, self.config.max_cpu_time + 5)
108        )
109
110        # Memory limit
111        memory_bytes = self.config.max_memory_mb * 1024 * 1024
112        resource.setrlimit(
113            resource.RLIMIT_AS,
114            (memory_bytes, memory_bytes)
115        )
116
117        # Process limit
118        resource.setrlimit(
119            resource.RLIMIT_NPROC,
120            (self.config.max_processes, self.config.max_processes)
121        )
122
123        # File descriptor limit
124        resource.setrlimit(
125            resource.RLIMIT_NOFILE,
126            (self.config.max_open_files, self.config.max_open_files)
127        )
128
129        # File size limit
130        file_size_bytes = self.config.max_file_size_mb * 1024 * 1024
131        resource.setrlimit(
132            resource.RLIMIT_FSIZE,
133            (file_size_bytes, file_size_bytes)
134        )
135
136    def _prepare_environment(self, env: Dict[str, str] = None) -> Dict[str, str]:
137        """Prepare a safe environment for execution."""
138        # Start with minimal environment
139        safe_env = {
140            "PATH": "/usr/local/bin:/usr/bin:/bin",
141            "HOME": str(self.workspace),
142            "LANG": "C.UTF-8",
143            "TERM": "xterm",
144        }
145
146        # Add project-specific paths
147        if (self.workspace / "node_modules" / ".bin").exists():
148            safe_env["PATH"] = f"{self.workspace}/node_modules/.bin:" + safe_env["PATH"]
149
150        if (self.workspace / ".venv" / "bin").exists():
151            safe_env["PATH"] = f"{self.workspace}/.venv/bin:" + safe_env["PATH"]
152            safe_env["VIRTUAL_ENV"] = str(self.workspace / ".venv")
153
154        # Add user-provided env vars (filtered)
155        if env:
156            for key, value in env.items():
157                if self._is_safe_env_var(key):
158                    safe_env[key] = value
159
160        return safe_env
161
162    def _validate_command(self, command: str) -> bool:
163        """Validate command against security rules."""
164        # Check blocklist
165        for blocked in self.config.blocked_commands:
166            if blocked in command:
167                return False
168
169        # Check for shell injection patterns
170        dangerous_patterns = [
171            "$(", "`",           # Command substitution
172            "&&", "||", ";",     # Command chaining (unless in quotes)
173            ">", "<", "|",       # Redirection (unless in quotes)
174            "\n",               # Newlines
175        ]
176
177        # Simple check - in production, use proper shell parsing
178        for pattern in dangerous_patterns:
179            # Allow if in quotes (simplified check)
180            if pattern in command and not self._is_in_quotes(command, pattern):
181                # Some patterns are okay in certain contexts
182                if pattern in ["&&", "||", ";", "|", ">", "<"]:
183                    continue  # Allow for now, could be more restrictive
184                return False
185
186        return True
187
188    def _is_in_quotes(self, text: str, pattern: str) -> bool:
189        """Check if pattern is inside quotes (simplified)."""
190        idx = text.find(pattern)
191        if idx == -1:
192            return False
193
194        before = text[:idx]
195        single_quotes = before.count("'") % 2
196        double_quotes = before.count('"') % 2
197
198        return single_quotes == 1 or double_quotes == 1
199
200    def _is_safe_env_var(self, key: str) -> bool:
201        """Check if environment variable key is safe."""
202        dangerous = {"LD_PRELOAD", "LD_LIBRARY_PATH", "DYLD_INSERT_LIBRARIES"}
203        return key not in dangerous and not key.startswith("LD_")
Subprocess isolation is less secure than container isolation. It protects against resource exhaustion but doesn't provide filesystem or network isolation. Use Docker or VMs for untrusted code.

Command Execution Framework

A unified framework allows the agent to use the best available sandbox:

🐍python
1from abc import ABC, abstractmethod
2from typing import Optional, Dict, Any, Union
3from pathlib import Path
4import asyncio
5
6
7class Sandbox(ABC):
8    """Abstract base class for sandboxes."""
9
10    @abstractmethod
11    async def execute(
12        self,
13        command: str,
14        timeout: int = None,
15        env: Dict[str, str] = None
16    ) -> ExecutionResult:
17        pass
18
19    @abstractmethod
20    async def execute_script(
21        self,
22        script: str,
23        language: str,
24        timeout: int = None
25    ) -> ExecutionResult:
26        pass
27
28
29class SandboxManager:
30    """
31    Manages sandbox selection and lifecycle.
32    """
33
34    def __init__(
35        self,
36        workspace: Path,
37        config: SandboxConfig = None
38    ):
39        self.workspace = Path(workspace).resolve()
40        self.config = config or SandboxConfig()
41        self._sandbox: Optional[Sandbox] = None
42
43    async def initialize(self):
44        """Initialize the appropriate sandbox based on config and availability."""
45        if self.config.isolation_level == IsolationLevel.CONTAINER:
46            if await self._docker_available():
47                self._sandbox = DockerSandbox(self.config, self.workspace)
48            else:
49                # Fallback to subprocess
50                print("Docker not available, falling back to subprocess isolation")
51                self._sandbox = SubprocessSandbox(self.config, self.workspace)
52
53        elif self.config.isolation_level == IsolationLevel.RESTRICTED:
54            self._sandbox = SubprocessSandbox(self.config, self.workspace)
55
56        elif self.config.isolation_level == IsolationLevel.NONE:
57            self._sandbox = DirectSandbox(self.config, self.workspace)
58
59        else:
60            raise ValueError(f"Unsupported isolation level: {self.config.isolation_level}")
61
62    async def _docker_available(self) -> bool:
63        """Check if Docker is available."""
64        try:
65            process = await asyncio.create_subprocess_shell(
66                "docker info",
67                stdout=asyncio.subprocess.DEVNULL,
68                stderr=asyncio.subprocess.DEVNULL
69            )
70            await process.wait()
71            return process.returncode == 0
72        except:
73            return False
74
75    async def execute(
76        self,
77        command: str,
78        timeout: int = None,
79        env: Dict[str, str] = None
80    ) -> ExecutionResult:
81        """Execute a command in the sandbox."""
82        if not self._sandbox:
83            await self.initialize()
84
85        return await self._sandbox.execute(command, timeout, env)
86
87    async def execute_script(
88        self,
89        script: str,
90        language: str,
91        timeout: int = None
92    ) -> ExecutionResult:
93        """Execute a script in the sandbox."""
94        if not self._sandbox:
95            await self.initialize()
96
97        return await self._sandbox.execute_script(script, language, timeout)
98
99    async def run_tests(
100        self,
101        test_command: str = None,
102        test_path: str = None
103    ) -> Dict[str, Any]:
104        """Run tests and parse results."""
105        if not self._sandbox:
106            await self.initialize()
107
108        if hasattr(self._sandbox, "run_tests"):
109            result = await self._sandbox.run_tests(test_command, test_path)
110        else:
111            result = await self._sandbox.execute(test_command or "pytest -v")
112
113        # Parse test results
114        return self._parse_test_results(result)
115
116    def _parse_test_results(self, result: ExecutionResult) -> Dict[str, Any]:
117        """Parse test output into structured results."""
118        output = result.stdout + result.stderr
119
120        # Try to parse pytest output
121        pytest_patterns = {
122            "passed": r"(\d+) passed",
123            "failed": r"(\d+) failed",
124            "error": r"(\d+) error",
125            "skipped": r"(\d+) skipped",
126        }
127
128        import re
129        parsed = {"raw_output": output, "success": result.success}
130
131        for key, pattern in pytest_patterns.items():
132            match = re.search(pattern, output)
133            parsed[key] = int(match.group(1)) if match else 0
134
135        # Extract failure details
136        if not result.success:
137            failure_match = re.search(
138                r"FAILED (.+?) - (.+?)(?:\n|$)",
139                output,
140                re.MULTILINE
141            )
142            if failure_match:
143                parsed["first_failure"] = {
144                    "test": failure_match.group(1),
145                    "reason": failure_match.group(2)
146                }
147
148        return parsed
149
150
151class DirectSandbox(Sandbox):
152    """
153    Direct execution without sandboxing.
154    FOR DEVELOPMENT ONLY - never use in production!
155    """
156
157    def __init__(self, config: SandboxConfig, workspace: Path):
158        self.config = config
159        self.workspace = workspace
160
161    async def execute(
162        self,
163        command: str,
164        timeout: int = None,
165        env: Dict[str, str] = None
166    ) -> ExecutionResult:
167        """Execute command directly (unsafe!)."""
168        timeout = timeout or 30
169
170        process = await asyncio.create_subprocess_shell(
171            command,
172            stdout=asyncio.subprocess.PIPE,
173            stderr=asyncio.subprocess.PIPE,
174            cwd=self.workspace,
175            env={**os.environ, **(env or {})}
176        )
177
178        try:
179            stdout, stderr = await asyncio.wait_for(
180                process.communicate(),
181                timeout=timeout
182            )
183        except asyncio.TimeoutError:
184            process.kill()
185            return ExecutionResult(
186                success=False,
187                exit_code=-1,
188                stdout="",
189                stderr="",
190                execution_time=timeout,
191                error="Timeout"
192            )
193
194        return ExecutionResult(
195            success=process.returncode == 0,
196            exit_code=process.returncode,
197            stdout=stdout.decode(),
198            stderr=stderr.decode(),
199            execution_time=0
200        )
201
202    async def execute_script(
203        self,
204        script: str,
205        language: str,
206        timeout: int = None
207    ) -> ExecutionResult:
208        """Execute script directly (unsafe!)."""
209        # Write to temp file and execute
210        import tempfile
211
212        suffix = {
213            "python": ".py",
214            "javascript": ".js",
215            "bash": ".sh"
216        }.get(language, ".txt")
217
218        with tempfile.NamedTemporaryFile(
219            suffix=suffix,
220            delete=False,
221            mode="w"
222        ) as f:
223            f.write(script)
224            script_path = f.name
225
226        try:
227            interpreters = {
228                "python": f"python {script_path}",
229                "javascript": f"node {script_path}",
230                "bash": f"bash {script_path}"
231            }
232            command = interpreters.get(language, script_path)
233            return await self.execute(command, timeout)
234        finally:
235            os.unlink(script_path)
The SandboxManager automatically selects the best available sandbox. In production, ensure Docker is always available for maximum isolation.

Resource Limits and Timeouts

Proper resource management prevents runaway processes from affecting system stability:

🐍python
1from dataclasses import dataclass
2from typing import Optional, Callable
3import asyncio
4import psutil
5import os
6
7
8@dataclass
9class ResourceUsage:
10    """Track resource usage during execution."""
11    cpu_time: float        # CPU seconds used
12    memory_peak_mb: float  # Peak memory usage
13    io_read_mb: float      # Bytes read
14    io_write_mb: float     # Bytes written
15    wall_time: float       # Wall clock time
16
17
18class ResourceMonitor:
19    """
20    Monitor and limit resource usage during execution.
21    """
22
23    def __init__(
24        self,
25        max_cpu_seconds: int = 30,
26        max_memory_mb: int = 512,
27        check_interval: float = 0.5
28    ):
29        self.max_cpu_seconds = max_cpu_seconds
30        self.max_memory_mb = max_memory_mb
31        self.check_interval = check_interval
32
33        self._monitoring = False
34        self._process: Optional[psutil.Process] = None
35        self._usage = ResourceUsage(0, 0, 0, 0, 0)
36
37    async def monitor_process(
38        self,
39        pid: int,
40        on_limit_exceeded: Callable[[str], None] = None
41    ):
42        """Monitor a process for resource limits."""
43        try:
44            self._process = psutil.Process(pid)
45            self._monitoring = True
46            start_time = asyncio.get_event_loop().time()
47
48            while self._monitoring:
49                await asyncio.sleep(self.check_interval)
50
51                try:
52                    # Check if process still exists
53                    if not self._process.is_running():
54                        break
55
56                    # Get resource usage
57                    with self._process.oneshot():
58                        cpu_times = self._process.cpu_times()
59                        memory_info = self._process.memory_info()
60                        io_counters = self._process.io_counters()
61
62                    # Update usage tracking
63                    self._usage.cpu_time = cpu_times.user + cpu_times.system
64                    self._usage.memory_peak_mb = max(
65                        self._usage.memory_peak_mb,
66                        memory_info.rss / (1024 * 1024)
67                    )
68                    self._usage.io_read_mb = io_counters.read_bytes / (1024 * 1024)
69                    self._usage.io_write_mb = io_counters.write_bytes / (1024 * 1024)
70                    self._usage.wall_time = asyncio.get_event_loop().time() - start_time
71
72                    # Check limits
73                    if self._usage.cpu_time > self.max_cpu_seconds:
74                        self._kill_process("CPU time limit exceeded")
75                        if on_limit_exceeded:
76                            on_limit_exceeded("cpu")
77                        break
78
79                    if self._usage.memory_peak_mb > self.max_memory_mb:
80                        self._kill_process("Memory limit exceeded")
81                        if on_limit_exceeded:
82                            on_limit_exceeded("memory")
83                        break
84
85                except psutil.NoSuchProcess:
86                    break
87                except psutil.AccessDenied:
88                    break
89
90        except Exception as e:
91            pass
92        finally:
93            self._monitoring = False
94
95    def _kill_process(self, reason: str):
96        """Kill the monitored process and its children."""
97        if not self._process:
98            return
99
100        try:
101            # Kill child processes first
102            children = self._process.children(recursive=True)
103            for child in children:
104                try:
105                    child.kill()
106                except:
107                    pass
108
109            # Kill main process
110            self._process.kill()
111        except:
112            pass
113
114    def stop(self):
115        """Stop monitoring."""
116        self._monitoring = False
117
118    def get_usage(self) -> ResourceUsage:
119        """Get current resource usage."""
120        return self._usage
121
122
123class TimeoutManager:
124    """
125    Manage execution timeouts with graceful shutdown.
126    """
127
128    def __init__(
129        self,
130        soft_timeout: int = 25,
131        hard_timeout: int = 30
132    ):
133        self.soft_timeout = soft_timeout
134        self.hard_timeout = hard_timeout
135
136    async def execute_with_timeout(
137        self,
138        coro,
139        on_soft_timeout: Callable = None
140    ):
141        """
142        Execute coroutine with soft and hard timeouts.
143
144        Soft timeout sends a warning/signal.
145        Hard timeout forcefully terminates.
146        """
147        try:
148            # First, try with soft timeout
149            result = await asyncio.wait_for(coro, timeout=self.soft_timeout)
150            return result
151        except asyncio.TimeoutError:
152            # Soft timeout reached
153            if on_soft_timeout:
154                on_soft_timeout()
155
156            # Give a little more time (hard - soft)
157            remaining = self.hard_timeout - self.soft_timeout
158            try:
159                result = await asyncio.wait_for(coro, timeout=remaining)
160                return result
161            except asyncio.TimeoutError:
162                # Hard timeout reached
163                raise TimeoutError(
164                    f"Execution exceeded hard timeout of {self.hard_timeout}s"
165                )
166
167
168class ExecutionPool:
169    """
170    Manage concurrent execution with global resource limits.
171    """
172
173    def __init__(
174        self,
175        max_concurrent: int = 5,
176        max_total_memory_mb: int = 2048
177    ):
178        self.max_concurrent = max_concurrent
179        self.max_total_memory_mb = max_total_memory_mb
180
181        self._semaphore = asyncio.Semaphore(max_concurrent)
182        self._current_memory = 0
183        self._lock = asyncio.Lock()
184
185    async def execute(
186        self,
187        sandbox: Sandbox,
188        command: str,
189        memory_estimate_mb: int = 256
190    ) -> ExecutionResult:
191        """Execute with global resource management."""
192        # Wait for slot
193        async with self._semaphore:
194            # Check memory budget
195            async with self._lock:
196                if self._current_memory + memory_estimate_mb > self.max_total_memory_mb:
197                    return ExecutionResult(
198                        success=False,
199                        exit_code=-1,
200                        stdout="",
201                        stderr="",
202                        execution_time=0,
203                        error="Global memory limit would be exceeded"
204                    )
205                self._current_memory += memory_estimate_mb
206
207            try:
208                result = await sandbox.execute(command)
209                return result
210            finally:
211                async with self._lock:
212                    self._current_memory -= memory_estimate_mb
Use soft timeouts to give processes a chance to clean up gracefully before hard termination. This prevents resource leaks and incomplete operations.

Summary

In this section, we built a comprehensive code execution sandbox for our coding agent:

  • Sandboxing Strategies: From command allowlists to Docker containers, understanding the security/convenience tradeoffs
  • Docker Sandbox: Full container isolation with resource limits, network restrictions, and filesystem mounting
  • Subprocess Isolation: Lighter-weight protection using ulimits and process groups for when Docker isn't available
  • Unified Framework: A sandbox manager that selects the best available isolation strategy
  • Resource Management: Monitoring and limiting CPU, memory, and I/O with soft and hard timeouts

In the next section, we'll add Git integration to our coding agent, enabling it to create branches, commits, and track changes.