Chapter 12
15 min read
Section 71 of 175

Coding Agent Architecture

Building a Coding Agent

Introduction

Coding agents represent one of the most powerful applications of agentic AI. Unlike simple code completion or chat assistants, coding agents can autonomously navigate codebases, understand context, write code, execute tests, debug failures, and iterate until tasks are complete. In this chapter, we'll build a complete coding agent from scratch, starting with its architecture.

Chapter Goals: By the end of this chapter, you'll have built a fully functional coding agent capable of reading files, writing code, executing commands, managing git operations, running tests, and debugging failures automatically.

What Makes Coding Agents Different

Coding agents have unique requirements that distinguish them from general-purpose AI agents. Understanding these differences is crucial for designing an effective architecture.

Key Characteristics

CharacteristicGeneral AgentCoding Agent
EnvironmentWeb, APIs, databasesFile system, terminal, git
StateOften statelessHighly stateful (codebase)
VerificationSubjective evaluationObjective (tests pass/fail)
IterationSingle responseMultiple edit-test cycles
ContextConversation historyEntire codebase understanding
SafetyAPI rate limitsSandbox execution, file protection

The Coding Agent Challenge

A coding agent must solve several interconnected problems:

  1. Context Gathering: Understanding what code already exists, how it's structured, and what conventions are followed
  2. Task Understanding: Translating natural language requests into specific code changes
  3. Code Generation: Writing code that integrates correctly with the existing codebase
  4. Verification: Running tests and checking that changes work correctly
  5. Iteration: Debugging failures and refining the solution until it works

Core Architecture Components

A well-designed coding agent consists of several distinct layers, each with specific responsibilities. Here's the high-level architecture:

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import List, Dict, Any, Optional
4from enum import Enum
5
6
7class AgentState(Enum):
8    """Current state of the coding agent."""
9    IDLE = "idle"
10    ANALYZING = "analyzing"
11    PLANNING = "planning"
12    CODING = "coding"
13    TESTING = "testing"
14    DEBUGGING = "debugging"
15    COMPLETE = "complete"
16    ERROR = "error"
17
18
19@dataclass
20class CodebaseContext:
21    """Represents understanding of the current codebase."""
22    root_path: str
23    file_tree: Dict[str, Any] = field(default_factory=dict)
24    open_files: Dict[str, str] = field(default_factory=dict)
25    git_status: Optional[Dict[str, Any]] = None
26    conventions: Dict[str, str] = field(default_factory=dict)
27    dependencies: List[str] = field(default_factory=list)
28
29    def summary(self) -> str:
30        """Generate a summary for the LLM context."""
31        files_count = self._count_files(self.file_tree)
32        return f"""Codebase: {self.root_path}
33Files: {files_count}
34Open files: {list(self.open_files.keys())}
35Git status: {'Clean' if not self.git_status else 'Modified'}
36Dependencies: {len(self.dependencies)} packages"""
37
38    def _count_files(self, tree: Dict) -> int:
39        count = 0
40        for key, value in tree.items():
41            if isinstance(value, dict):
42                count += self._count_files(value)
43            else:
44                count += 1
45        return count
46
47
48@dataclass
49class Task:
50    """A coding task to be executed."""
51    description: str
52    task_type: str  # "feature", "bugfix", "refactor", "test"
53    target_files: List[str] = field(default_factory=list)
54    dependencies: List[str] = field(default_factory=list)
55    acceptance_criteria: List[str] = field(default_factory=list)
56    status: str = "pending"
57
58
59@dataclass
60class AgentMemory:
61    """Memory system for the coding agent."""
62    conversation_history: List[Dict[str, str]] = field(default_factory=list)
63    executed_actions: List[Dict[str, Any]] = field(default_factory=list)
64    test_results: List[Dict[str, Any]] = field(default_factory=list)
65    errors_encountered: List[str] = field(default_factory=list)
66    successful_patterns: List[str] = field(default_factory=list)
67
68    def add_action(self, action: str, result: Any, success: bool):
69        self.executed_actions.append({
70            "action": action,
71            "result": str(result)[:500],  # Truncate long results
72            "success": success
73        })
74
75    def get_recent_context(self, n: int = 10) -> str:
76        """Get recent actions for context."""
77        recent = self.executed_actions[-n:]
78        return "\n".join([
79            f"- {a['action']}: {'✓' if a['success'] else '✗'}"
80            for a in recent
81        ])
82
83
84class CodingAgent:
85    """
86    Main coding agent architecture.
87
88    Components:
89    - Context Layer: Understanding the codebase
90    - Planning Layer: Breaking down tasks
91    - Action Layer: Executing changes
92    - Verification Layer: Testing and validation
93    - Memory Layer: Learning from actions
94    """
95
96    def __init__(
97        self,
98        llm_client,
99        workspace_path: str,
100        tools: Dict[str, Any],
101        config: Optional[Dict[str, Any]] = None
102    ):
103        self.llm = llm_client
104        self.workspace = workspace_path
105        self.tools = tools
106        self.config = config or {}
107
108        # Initialize components
109        self.context = CodebaseContext(root_path=workspace_path)
110        self.memory = AgentMemory()
111        self.state = AgentState.IDLE
112        self.current_task: Optional[Task] = None
113
114    async def run(self, user_request: str) -> Dict[str, Any]:
115        """Main agent loop for processing a request."""
116        self.state = AgentState.ANALYZING
117
118        try:
119            # Phase 1: Analyze and understand
120            await self._gather_context()
121
122            # Phase 2: Plan the approach
123            self.state = AgentState.PLANNING
124            plan = await self._create_plan(user_request)
125
126            # Phase 3: Execute the plan
127            self.state = AgentState.CODING
128            results = await self._execute_plan(plan)
129
130            # Phase 4: Verify the changes
131            self.state = AgentState.TESTING
132            test_results = await self._run_verification()
133
134            # Phase 5: Debug if needed
135            if not test_results["success"]:
136                self.state = AgentState.DEBUGGING
137                results = await self._debug_and_fix(test_results)
138
139            self.state = AgentState.COMPLETE
140            return {
141                "success": True,
142                "results": results,
143                "test_results": test_results
144            }
145
146        except Exception as e:
147            self.state = AgentState.ERROR
148            self.memory.errors_encountered.append(str(e))
149            return {"success": False, "error": str(e)}
150
151    async def _gather_context(self):
152        """Gather information about the codebase."""
153        # Build file tree
154        self.context.file_tree = await self.tools["list_files"].run(
155            self.workspace
156        )
157
158        # Get git status
159        if await self._has_git():
160            self.context.git_status = await self.tools["git_status"].run()
161
162        # Detect conventions (from config files, README, etc.)
163        self.context.conventions = await self._detect_conventions()
164
165    async def _create_plan(self, request: str) -> List[Dict[str, Any]]:
166        """Create an execution plan for the request."""
167        prompt = f"""You are a coding agent planning how to complete a task.
168
169Codebase Context:
170{self.context.summary()}
171
172User Request: {request}
173
174Recent Actions:
175{self.memory.get_recent_context()}
176
177Create a step-by-step plan. For each step, specify:
1781. action: The type of action (read_file, write_file, run_command, etc.)
1792. target: The file or command target
1803. description: What this step accomplishes
1814. dependencies: Which previous steps must complete first
182
183Return as JSON array."""
184
185        response = await self.llm.generate(prompt)
186        return self._parse_plan(response)
187
188    async def _execute_plan(self, plan: List[Dict[str, Any]]) -> List[Any]:
189        """Execute each step of the plan."""
190        results = []
191
192        for step in plan:
193            action = step["action"]
194            tool = self.tools.get(action)
195
196            if not tool:
197                self.memory.add_action(action, "Tool not found", False)
198                continue
199
200            try:
201                result = await tool.run(**step.get("params", {}))
202                self.memory.add_action(action, result, True)
203                results.append({"step": step, "result": result, "success": True})
204            except Exception as e:
205                self.memory.add_action(action, str(e), False)
206                results.append({"step": step, "error": str(e), "success": False})
207
208        return results
209
210    async def _run_verification(self) -> Dict[str, Any]:
211        """Run tests to verify changes."""
212        # Implementation in later sections
213        pass
214
215    async def _debug_and_fix(self, test_results: Dict) -> Dict[str, Any]:
216        """Debug failures and attempt fixes."""
217        # Implementation in later sections
218        pass
219
220    async def _has_git(self) -> bool:
221        """Check if workspace is a git repository."""
222        try:
223            await self.tools["run_command"].run("git status")
224            return True
225        except:
226            return False
227
228    async def _detect_conventions(self) -> Dict[str, str]:
229        """Detect coding conventions from the codebase."""
230        conventions = {}
231
232        # Check for common config files
233        config_files = [
234            (".eslintrc", "eslint"),
235            ("pyproject.toml", "python"),
236            ("tsconfig.json", "typescript"),
237            (".prettierrc", "prettier"),
238        ]
239
240        for filename, convention_type in config_files:
241            if await self._file_exists(filename):
242                conventions[convention_type] = await self.tools["read_file"].run(
243                    f"{self.workspace}/{filename}"
244                )
245
246        return conventions
247
248    async def _file_exists(self, path: str) -> bool:
249        """Check if a file exists."""
250        try:
251            await self.tools["read_file"].run(f"{self.workspace}/{path}")
252            return True
253        except:
254            return False
255
256    def _parse_plan(self, response: str) -> List[Dict[str, Any]]:
257        """Parse LLM response into structured plan."""
258        import json
259        try:
260            # Extract JSON from response
261            start = response.find("[")
262            end = response.rfind("]") + 1
263            return json.loads(response[start:end])
264        except:
265            return []
The architecture separates concerns cleanly: context gathering, planning, execution, and verification are distinct phases. This makes the agent easier to debug and extend.

The Code Understanding Layer

Before a coding agent can make changes, it must understand the codebase. The code understanding layer handles parsing, indexing, and semantic analysis.

🐍python
1import ast
2import os
3from pathlib import Path
4from dataclasses import dataclass, field
5from typing import List, Dict, Set, Optional
6import re
7
8
9@dataclass
10class Symbol:
11    """Represents a code symbol (function, class, variable)."""
12    name: str
13    symbol_type: str  # "function", "class", "variable", "import"
14    file_path: str
15    line_number: int
16    signature: Optional[str] = None
17    docstring: Optional[str] = None
18    references: List[str] = field(default_factory=list)
19
20
21@dataclass
22class FileAnalysis:
23    """Analysis results for a single file."""
24    path: str
25    language: str
26    symbols: List[Symbol] = field(default_factory=list)
27    imports: List[str] = field(default_factory=list)
28    dependencies: Set[str] = field(default_factory=set)
29    complexity_score: int = 0
30
31
32class CodeUnderstandingLayer:
33    """
34    Analyzes and indexes the codebase for efficient querying.
35    """
36
37    LANGUAGE_EXTENSIONS = {
38        ".py": "python",
39        ".js": "javascript",
40        ".ts": "typescript",
41        ".tsx": "typescript",
42        ".jsx": "javascript",
43        ".java": "java",
44        ".go": "go",
45        ".rs": "rust",
46    }
47
48    def __init__(self, workspace_path: str):
49        self.workspace = Path(workspace_path)
50        self.file_index: Dict[str, FileAnalysis] = {}
51        self.symbol_index: Dict[str, List[Symbol]] = {}
52        self.dependency_graph: Dict[str, Set[str]] = {}
53
54    async def index_codebase(self) -> Dict[str, Any]:
55        """Build a complete index of the codebase."""
56        stats = {"files": 0, "symbols": 0, "errors": 0}
57
58        for file_path in self._iter_source_files():
59            try:
60                analysis = await self._analyze_file(file_path)
61                self.file_index[str(file_path)] = analysis
62
63                # Index symbols for quick lookup
64                for symbol in analysis.symbols:
65                    if symbol.name not in self.symbol_index:
66                        self.symbol_index[symbol.name] = []
67                    self.symbol_index[symbol.name].append(symbol)
68
69                stats["files"] += 1
70                stats["symbols"] += len(analysis.symbols)
71
72            except Exception as e:
73                stats["errors"] += 1
74
75        # Build dependency graph
76        self._build_dependency_graph()
77
78        return stats
79
80    async def _analyze_file(self, file_path: Path) -> FileAnalysis:
81        """Analyze a single source file."""
82        content = file_path.read_text()
83        language = self.LANGUAGE_EXTENSIONS.get(file_path.suffix, "unknown")
84
85        analysis = FileAnalysis(
86            path=str(file_path),
87            language=language
88        )
89
90        if language == "python":
91            analysis = self._analyze_python(content, analysis)
92        elif language in ("javascript", "typescript"):
93            analysis = self._analyze_javascript(content, analysis)
94
95        return analysis
96
97    def _analyze_python(self, content: str, analysis: FileAnalysis) -> FileAnalysis:
98        """Analyze Python source code."""
99        try:
100            tree = ast.parse(content)
101        except SyntaxError:
102            return analysis
103
104        for node in ast.walk(tree):
105            if isinstance(node, ast.FunctionDef):
106                # Extract function signature
107                args = [arg.arg for arg in node.args.args]
108                signature = f"def {node.name}({', '.join(args)})"
109
110                analysis.symbols.append(Symbol(
111                    name=node.name,
112                    symbol_type="function",
113                    file_path=analysis.path,
114                    line_number=node.lineno,
115                    signature=signature,
116                    docstring=ast.get_docstring(node)
117                ))
118
119            elif isinstance(node, ast.ClassDef):
120                # Get base classes
121                bases = [self._get_name(base) for base in node.bases]
122                signature = f"class {node.name}({', '.join(bases)})"
123
124                analysis.symbols.append(Symbol(
125                    name=node.name,
126                    symbol_type="class",
127                    file_path=analysis.path,
128                    line_number=node.lineno,
129                    signature=signature,
130                    docstring=ast.get_docstring(node)
131                ))
132
133            elif isinstance(node, ast.Import):
134                for alias in node.names:
135                    analysis.imports.append(alias.name)
136                    analysis.dependencies.add(alias.name.split(".")[0])
137
138            elif isinstance(node, ast.ImportFrom):
139                if node.module:
140                    analysis.imports.append(node.module)
141                    analysis.dependencies.add(node.module.split(".")[0])
142
143        # Calculate complexity (simplified McCabe)
144        analysis.complexity_score = self._calculate_complexity(tree)
145
146        return analysis
147
148    def _analyze_javascript(self, content: str, analysis: FileAnalysis) -> FileAnalysis:
149        """Analyze JavaScript/TypeScript source code (regex-based)."""
150        # Function declarations
151        for match in re.finditer(
152            r'(?:exports+)?(?:asyncs+)?functions+(w+)s*((.*?))',
153            content
154        ):
155            analysis.symbols.append(Symbol(
156                name=match.group(1),
157                symbol_type="function",
158                file_path=analysis.path,
159                line_number=content[:match.start()].count('\n') + 1,
160                signature=f"function {match.group(1)}({match.group(2)})"
161            ))
162
163        # Arrow functions assigned to const/let
164        for match in re.finditer(
165            r'(?:exports+)?consts+(w+)s*=s*(?:asyncs+)?((.*?))s*=>',
166            content
167        ):
168            analysis.symbols.append(Symbol(
169                name=match.group(1),
170                symbol_type="function",
171                file_path=analysis.path,
172                line_number=content[:match.start()].count('\n') + 1,
173                signature=f"const {match.group(1)} = ({match.group(2)}) =>"
174            ))
175
176        # Class declarations
177        for match in re.finditer(
178            r'(?:exports+)?classs+(w+)(?:s+extendss+(w+))?',
179            content
180        ):
181            base = match.group(2) or ""
182            analysis.symbols.append(Symbol(
183                name=match.group(1),
184                symbol_type="class",
185                file_path=analysis.path,
186                line_number=content[:match.start()].count('\n') + 1,
187                signature=f"class {match.group(1)}" + (f" extends {base}" if base else "")
188            ))
189
190        # Imports
191        for match in re.finditer(r"imports+.*?froms+['"](.+?)['"]", content):
192            analysis.imports.append(match.group(1))
193
194        return analysis
195
196    def _get_name(self, node) -> str:
197        """Get the name from an AST node."""
198        if isinstance(node, ast.Name):
199            return node.id
200        elif isinstance(node, ast.Attribute):
201            return f"{self._get_name(node.value)}.{node.attr}"
202        return "unknown"
203
204    def _calculate_complexity(self, tree: ast.AST) -> int:
205        """Calculate simplified cyclomatic complexity."""
206        complexity = 1
207        for node in ast.walk(tree):
208            if isinstance(node, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
209                complexity += 1
210            elif isinstance(node, ast.BoolOp):
211                complexity += len(node.values) - 1
212        return complexity
213
214    def _build_dependency_graph(self):
215        """Build a graph of file dependencies."""
216        for file_path, analysis in self.file_index.items():
217            self.dependency_graph[file_path] = set()
218
219            for imp in analysis.imports:
220                # Try to resolve import to a file
221                resolved = self._resolve_import(imp, file_path)
222                if resolved:
223                    self.dependency_graph[file_path].add(resolved)
224
225    def _resolve_import(self, import_path: str, from_file: str) -> Optional[str]:
226        """Resolve an import to a file path."""
227        # Handle relative imports
228        if import_path.startswith("."):
229            base = Path(from_file).parent
230            parts = import_path.split(".")
231            for part in parts[1:]:  # Skip leading dot
232                if part:
233                    base = base / part
234
235            for ext in self.LANGUAGE_EXTENSIONS:
236                candidate = str(base) + ext
237                if candidate in self.file_index:
238                    return candidate
239
240        return None
241
242    def _iter_source_files(self):
243        """Iterate over all source files in the workspace."""
244        ignore_dirs = {".git", "node_modules", "__pycache__", ".venv", "venv"}
245
246        for root, dirs, files in os.walk(self.workspace):
247            # Skip ignored directories
248            dirs[:] = [d for d in dirs if d not in ignore_dirs]
249
250            for file in files:
251                path = Path(root) / file
252                if path.suffix in self.LANGUAGE_EXTENSIONS:
253                    yield path
254
255    def find_symbol(self, name: str) -> List[Symbol]:
256        """Find all occurrences of a symbol."""
257        return self.symbol_index.get(name, [])
258
259    def get_file_context(self, file_path: str, radius: int = 2) -> Dict[str, FileAnalysis]:
260        """Get a file and its nearby dependencies."""
261        context = {}
262        visited = set()
263
264        def collect(path: str, depth: int):
265            if depth > radius or path in visited:
266                return
267            visited.add(path)
268
269            if path in self.file_index:
270                context[path] = self.file_index[path]
271
272                # Collect dependencies
273                for dep in self.dependency_graph.get(path, []):
274                    collect(dep, depth + 1)
275
276        collect(file_path, 0)
277        return context
278
279    def search_code(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
280        """Search for code matching a query."""
281        results = []
282
283        for file_path, analysis in self.file_index.items():
284            # Search in symbol names
285            for symbol in analysis.symbols:
286                if query.lower() in symbol.name.lower():
287                    results.append({
288                        "type": "symbol",
289                        "symbol": symbol,
290                        "file": file_path,
291                        "score": 1.0 if query.lower() == symbol.name.lower() else 0.5
292                    })
293
294            # Search in file content
295            try:
296                content = Path(file_path).read_text()
297                if query.lower() in content.lower():
298                    # Find the line
299                    for i, line in enumerate(content.split("\n")):
300                        if query.lower() in line.lower():
301                            results.append({
302                                "type": "content",
303                                "file": file_path,
304                                "line": i + 1,
305                                "content": line.strip(),
306                                "score": 0.3
307                            })
308            except:
309                pass
310
311        # Sort by score and limit
312        results.sort(key=lambda x: x["score"], reverse=True)
313        return results[:limit]
Code understanding is computationally expensive. In production, you'd want to cache the index and update it incrementally as files change, rather than re-indexing the entire codebase for each request.

The Action Execution Layer

The action layer is responsible for actually making changes to the codebase. It must handle file operations, command execution, and state management safely.

🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import Any, Dict, Optional, List
4from pathlib import Path
5import subprocess
6import asyncio
7import json
8
9
10@dataclass
11class ActionResult:
12    """Result of executing an action."""
13    success: bool
14    output: Any
15    error: Optional[str] = None
16    changes: List[Dict[str, Any]] = None
17
18    def __post_init__(self):
19        if self.changes is None:
20            self.changes = []
21
22
23class Action(ABC):
24    """Base class for all agent actions."""
25
26    @property
27    @abstractmethod
28    def name(self) -> str:
29        """Unique name for this action."""
30        pass
31
32    @property
33    @abstractmethod
34    def description(self) -> str:
35        """Description for LLM context."""
36        pass
37
38    @property
39    def schema(self) -> Dict[str, Any]:
40        """JSON schema for action parameters."""
41        return {}
42
43    @abstractmethod
44    async def execute(self, **params) -> ActionResult:
45        """Execute the action with given parameters."""
46        pass
47
48    def validate_params(self, params: Dict[str, Any]) -> bool:
49        """Validate parameters against schema."""
50        # Basic validation - extend with jsonschema in production
51        required = self.schema.get("required", [])
52        return all(key in params for key in required)
53
54
55class ReadFileAction(Action):
56    """Read contents of a file."""
57
58    def __init__(self, workspace: Path):
59        self.workspace = workspace
60
61    @property
62    def name(self) -> str:
63        return "read_file"
64
65    @property
66    def description(self) -> str:
67        return "Read the contents of a file at the specified path"
68
69    @property
70    def schema(self) -> Dict[str, Any]:
71        return {
72            "type": "object",
73            "properties": {
74                "path": {
75                    "type": "string",
76                    "description": "Path to the file relative to workspace"
77                },
78                "start_line": {
79                    "type": "integer",
80                    "description": "Start reading from this line (1-indexed)"
81                },
82                "end_line": {
83                    "type": "integer",
84                    "description": "Stop reading at this line"
85                }
86            },
87            "required": ["path"]
88        }
89
90    async def execute(self, path: str, start_line: int = None, end_line: int = None) -> ActionResult:
91        try:
92            file_path = self.workspace / path
93
94            # Security check - prevent path traversal
95            if not file_path.resolve().is_relative_to(self.workspace.resolve()):
96                return ActionResult(
97                    success=False,
98                    output=None,
99                    error="Path traversal not allowed"
100                )
101
102            if not file_path.exists():
103                return ActionResult(
104                    success=False,
105                    output=None,
106                    error=f"File not found: {path}"
107                )
108
109            content = file_path.read_text()
110
111            # Handle line range
112            if start_line or end_line:
113                lines = content.split("\n")
114                start = (start_line - 1) if start_line else 0
115                end = end_line if end_line else len(lines)
116                content = "\n".join(lines[start:end])
117
118            return ActionResult(
119                success=True,
120                output=content
121            )
122
123        except Exception as e:
124            return ActionResult(
125                success=False,
126                output=None,
127                error=str(e)
128            )
129
130
131class WriteFileAction(Action):
132    """Write or create a file."""
133
134    def __init__(self, workspace: Path):
135        self.workspace = workspace
136
137    @property
138    def name(self) -> str:
139        return "write_file"
140
141    @property
142    def description(self) -> str:
143        return "Write content to a file, creating it if it doesn't exist"
144
145    @property
146    def schema(self) -> Dict[str, Any]:
147        return {
148            "type": "object",
149            "properties": {
150                "path": {
151                    "type": "string",
152                    "description": "Path to the file relative to workspace"
153                },
154                "content": {
155                    "type": "string",
156                    "description": "Content to write to the file"
157                }
158            },
159            "required": ["path", "content"]
160        }
161
162    async def execute(self, path: str, content: str) -> ActionResult:
163        try:
164            file_path = self.workspace / path
165
166            # Security check
167            if not file_path.resolve().is_relative_to(self.workspace.resolve()):
168                return ActionResult(
169                    success=False,
170                    output=None,
171                    error="Path traversal not allowed"
172                )
173
174            # Store original for rollback
175            original = None
176            if file_path.exists():
177                original = file_path.read_text()
178
179            # Create parent directories
180            file_path.parent.mkdir(parents=True, exist_ok=True)
181
182            # Write the file
183            file_path.write_text(content)
184
185            return ActionResult(
186                success=True,
187                output=f"Wrote {len(content)} bytes to {path}",
188                changes=[{
189                    "type": "write",
190                    "path": path,
191                    "original": original,
192                    "new": content
193                }]
194            )
195
196        except Exception as e:
197            return ActionResult(
198                success=False,
199                output=None,
200                error=str(e)
201            )
202
203
204class EditFileAction(Action):
205    """Edit a specific portion of a file."""
206
207    def __init__(self, workspace: Path):
208        self.workspace = workspace
209
210    @property
211    def name(self) -> str:
212        return "edit_file"
213
214    @property
215    def description(self) -> str:
216        return "Replace a specific section of a file with new content"
217
218    @property
219    def schema(self) -> Dict[str, Any]:
220        return {
221            "type": "object",
222            "properties": {
223                "path": {
224                    "type": "string",
225                    "description": "Path to the file"
226                },
227                "old_content": {
228                    "type": "string",
229                    "description": "Exact content to replace"
230                },
231                "new_content": {
232                    "type": "string",
233                    "description": "New content to insert"
234                }
235            },
236            "required": ["path", "old_content", "new_content"]
237        }
238
239    async def execute(
240        self,
241        path: str,
242        old_content: str,
243        new_content: str
244    ) -> ActionResult:
245        try:
246            file_path = self.workspace / path
247
248            if not file_path.exists():
249                return ActionResult(
250                    success=False,
251                    output=None,
252                    error=f"File not found: {path}"
253                )
254
255            content = file_path.read_text()
256
257            # Check if old_content exists
258            if old_content not in content:
259                return ActionResult(
260                    success=False,
261                    output=None,
262                    error="Old content not found in file"
263                )
264
265            # Replace content
266            new_file_content = content.replace(old_content, new_content, 1)
267            file_path.write_text(new_file_content)
268
269            return ActionResult(
270                success=True,
271                output=f"Edited {path}",
272                changes=[{
273                    "type": "edit",
274                    "path": path,
275                    "old": old_content,
276                    "new": new_content
277                }]
278            )
279
280        except Exception as e:
281            return ActionResult(
282                success=False,
283                output=None,
284                error=str(e)
285            )
286
287
288class RunCommandAction(Action):
289    """Execute a shell command."""
290
291    def __init__(self, workspace: Path, allowed_commands: List[str] = None):
292        self.workspace = workspace
293        self.allowed_commands = allowed_commands or [
294            "npm", "npx", "python", "pytest", "pip",
295            "git", "ls", "cat", "grep", "find"
296        ]
297
298    @property
299    def name(self) -> str:
300        return "run_command"
301
302    @property
303    def description(self) -> str:
304        return f"Execute a shell command. Allowed: {', '.join(self.allowed_commands)}"
305
306    @property
307    def schema(self) -> Dict[str, Any]:
308        return {
309            "type": "object",
310            "properties": {
311                "command": {
312                    "type": "string",
313                    "description": "The command to execute"
314                },
315                "timeout": {
316                    "type": "integer",
317                    "description": "Timeout in seconds (default 30)"
318                }
319            },
320            "required": ["command"]
321        }
322
323    async def execute(self, command: str, timeout: int = 30) -> ActionResult:
324        try:
325            # Security check - validate command
326            cmd_parts = command.split()
327            if not cmd_parts:
328                return ActionResult(
329                    success=False,
330                    output=None,
331                    error="Empty command"
332                )
333
334            base_command = cmd_parts[0]
335            if base_command not in self.allowed_commands:
336                return ActionResult(
337                    success=False,
338                    output=None,
339                    error=f"Command not allowed: {base_command}"
340                )
341
342            # Run command
343            process = await asyncio.create_subprocess_shell(
344                command,
345                stdout=asyncio.subprocess.PIPE,
346                stderr=asyncio.subprocess.PIPE,
347                cwd=self.workspace
348            )
349
350            try:
351                stdout, stderr = await asyncio.wait_for(
352                    process.communicate(),
353                    timeout=timeout
354                )
355            except asyncio.TimeoutError:
356                process.kill()
357                return ActionResult(
358                    success=False,
359                    output=None,
360                    error=f"Command timed out after {timeout}s"
361                )
362
363            output = stdout.decode() if stdout else ""
364            error = stderr.decode() if stderr else ""
365
366            return ActionResult(
367                success=process.returncode == 0,
368                output=output,
369                error=error if process.returncode != 0 else None
370            )
371
372        except Exception as e:
373            return ActionResult(
374                success=False,
375                output=None,
376                error=str(e)
377            )
378
379
380class ActionExecutor:
381    """Manages and executes agent actions."""
382
383    def __init__(self, workspace: Path):
384        self.workspace = workspace
385        self.actions: Dict[str, Action] = {}
386        self.history: List[Dict[str, Any]] = []
387
388        # Register default actions
389        self.register(ReadFileAction(workspace))
390        self.register(WriteFileAction(workspace))
391        self.register(EditFileAction(workspace))
392        self.register(RunCommandAction(workspace))
393
394    def register(self, action: Action):
395        """Register an action."""
396        self.actions[action.name] = action
397
398    def get_action_descriptions(self) -> str:
399        """Get descriptions of all actions for LLM context."""
400        descriptions = []
401        for action in self.actions.values():
402            desc = f"- {action.name}: {action.description}"
403            if action.schema.get("properties"):
404                params = list(action.schema["properties"].keys())
405                desc += f" (params: {', '.join(params)})"
406            descriptions.append(desc)
407        return "\n".join(descriptions)
408
409    async def execute(
410        self,
411        action_name: str,
412        params: Dict[str, Any]
413    ) -> ActionResult:
414        """Execute an action by name."""
415        if action_name not in self.actions:
416            return ActionResult(
417                success=False,
418                output=None,
419                error=f"Unknown action: {action_name}"
420            )
421
422        action = self.actions[action_name]
423
424        # Validate parameters
425        if not action.validate_params(params):
426            return ActionResult(
427                success=False,
428                output=None,
429                error=f"Invalid parameters for {action_name}"
430            )
431
432        # Execute and record
433        result = await action.execute(**params)
434
435        self.history.append({
436            "action": action_name,
437            "params": params,
438            "result": {
439                "success": result.success,
440                "output": str(result.output)[:500] if result.output else None,
441                "error": result.error
442            }
443        })
444
445        return result
446
447    def get_undo_actions(self) -> List[Dict[str, Any]]:
448        """Get actions needed to undo recent changes."""
449        undo_actions = []
450
451        for entry in reversed(self.history):
452            result = entry.get("result", {})
453            # Only WriteFile and EditFile can be undone
454            if entry["action"] == "write_file" and result.get("changes"):
455                for change in result["changes"]:
456                    if change.get("original") is not None:
457                        undo_actions.append({
458                            "action": "write_file",
459                            "params": {
460                                "path": change["path"],
461                                "content": change["original"]
462                            }
463                        })
464
465        return undo_actions
Command execution is inherently dangerous. Always validate commands against an allowlist and run them in a sandboxed environment in production. We'll cover sandboxing in detail in Section 3.

Designing the Agent Loop

The agent loop ties all components together, orchestrating the flow from user request to completed task. A well-designed loop handles errors gracefully and knows when to ask for help.

🐍python
1import asyncio
2from typing import AsyncGenerator, Dict, Any, List
3from dataclasses import dataclass
4from enum import Enum
5
6
7class LoopState(Enum):
8    THINKING = "thinking"
9    ACTING = "acting"
10    OBSERVING = "observing"
11    COMPLETE = "complete"
12    STUCK = "stuck"
13
14
15@dataclass
16class LoopStep:
17    """A single step in the agent loop."""
18    thought: str
19    action: str
20    action_params: Dict[str, Any]
21    observation: str
22    success: bool
23
24
25class CodingAgentLoop:
26    """
27    The main agent loop implementing think-act-observe.
28    """
29
30    MAX_ITERATIONS = 20
31    MAX_CONSECUTIVE_FAILURES = 3
32
33    def __init__(
34        self,
35        llm_client,
36        action_executor,
37        code_understanding,
38        memory
39    ):
40        self.llm = llm_client
41        self.executor = action_executor
42        self.code_index = code_understanding
43        self.memory = memory
44
45        self.state = LoopState.THINKING
46        self.steps: List[LoopStep] = []
47        self.consecutive_failures = 0
48
49    async def run(
50        self,
51        task: str,
52        context: Dict[str, Any] = None
53    ) -> AsyncGenerator[Dict[str, Any], None]:
54        """
55        Run the agent loop, yielding updates at each step.
56        """
57        iteration = 0
58
59        while iteration < self.MAX_ITERATIONS:
60            iteration += 1
61
62            # Phase 1: Think
63            self.state = LoopState.THINKING
64            yield {"state": "thinking", "iteration": iteration}
65
66            thought, action, params = await self._think(task, context)
67
68            # Check for completion
69            if action == "complete":
70                self.state = LoopState.COMPLETE
71                yield {
72                    "state": "complete",
73                    "thought": thought,
74                    "summary": params.get("summary", "Task completed")
75                }
76                return
77
78            # Check if stuck
79            if action == "ask_user":
80                self.state = LoopState.STUCK
81                yield {
82                    "state": "stuck",
83                    "thought": thought,
84                    "question": params.get("question")
85                }
86                return
87
88            # Phase 2: Act
89            self.state = LoopState.ACTING
90            yield {
91                "state": "acting",
92                "thought": thought,
93                "action": action,
94                "params": params
95            }
96
97            result = await self.executor.execute(action, params)
98
99            # Phase 3: Observe
100            self.state = LoopState.OBSERVING
101
102            observation = self._format_observation(result)
103
104            step = LoopStep(
105                thought=thought,
106                action=action,
107                action_params=params,
108                observation=observation,
109                success=result.success
110            )
111            self.steps.append(step)
112
113            yield {
114                "state": "observing",
115                "observation": observation,
116                "success": result.success
117            }
118
119            # Track failures
120            if not result.success:
121                self.consecutive_failures += 1
122                if self.consecutive_failures >= self.MAX_CONSECUTIVE_FAILURES:
123                    self.state = LoopState.STUCK
124                    yield {
125                        "state": "stuck",
126                        "reason": "Too many consecutive failures",
127                        "last_error": result.error
128                    }
129                    return
130            else:
131                self.consecutive_failures = 0
132
133            # Update memory
134            self.memory.add_action(action, result.output, result.success)
135
136        # Max iterations reached
137        yield {
138            "state": "max_iterations",
139            "iterations": iteration
140        }
141
142    async def _think(
143        self,
144        task: str,
145        context: Dict[str, Any] = None
146    ) -> tuple[str, str, Dict[str, Any]]:
147        """
148        Decide what to do next.
149        """
150        # Build prompt with history
151        history_text = self._format_history()
152        actions_text = self.executor.get_action_descriptions()
153
154        prompt = f"""You are a coding agent working on a task.
155
156TASK: {task}
157
158AVAILABLE ACTIONS:
159{actions_text}
160- complete: Mark the task as done (params: summary)
161- ask_user: Ask the user for clarification (params: question)
162
163PREVIOUS STEPS:
164{history_text}
165
166CODEBASE CONTEXT:
167{context.get('summary', 'No context gathered yet') if context else 'No context'}
168
169Based on the task and previous steps, decide what to do next.
170
171Respond in this JSON format:
172{{
173    "thought": "Your reasoning about what to do next",
174    "action": "The action to take",
175    "params": {{ ... action parameters ... }}
176}}
177
178Think step by step. If the task is complete, use the "complete" action.
179If you need more information from the user, use "ask_user"."""
180
181        response = await self.llm.generate(prompt)
182
183        # Parse response
184        return self._parse_response(response)
185
186    def _parse_response(
187        self,
188        response: str
189    ) -> tuple[str, str, Dict[str, Any]]:
190        """Parse the LLM response into thought, action, and params."""
191        import json
192
193        try:
194            # Find JSON in response
195            start = response.find("{")
196            end = response.rfind("}") + 1
197            data = json.loads(response[start:end])
198
199            return (
200                data.get("thought", ""),
201                data.get("action", ""),
202                data.get("params", {})
203            )
204        except:
205            # Fallback
206            return ("Failed to parse response", "ask_user", {
207                "question": "I encountered an error. Could you clarify the task?"
208            })
209
210    def _format_history(self) -> str:
211        """Format step history for the prompt."""
212        if not self.steps:
213            return "No previous steps"
214
215        lines = []
216        for i, step in enumerate(self.steps[-5:]):  # Last 5 steps
217            status = "✓" if step.success else "✗"
218            lines.append(f"{i+1}. [{status}] {step.action}")
219            lines.append(f"   Thought: {step.thought[:100]}...")
220            lines.append(f"   Result: {step.observation[:100]}...")
221
222        return "\n".join(lines)
223
224    def _format_observation(self, result) -> str:
225        """Format action result as observation."""
226        if result.success:
227            output = str(result.output)
228            if len(output) > 500:
229                output = output[:500] + "... (truncated)"
230            return f"Success: {output}"
231        else:
232            return f"Error: {result.error}"
233
234    def get_summary(self) -> Dict[str, Any]:
235        """Get a summary of the loop execution."""
236        return {
237            "total_steps": len(self.steps),
238            "successful_steps": sum(1 for s in self.steps if s.success),
239            "failed_steps": sum(1 for s in self.steps if not s.success),
240            "final_state": self.state.value,
241            "actions_used": list(set(s.action for s in self.steps))
242        }
The agent loop yields updates at each step, allowing the UI to show real-time progress. This is especially important for coding agents where tasks can take minutes to complete.

Architecture Patterns

Different coding tasks benefit from different architectural patterns. Here are the most common approaches:

Pattern 1: Simple ReAct

Best for straightforward tasks like simple bug fixes or adding a single function.

🐍python
1class SimpleReActCodingAgent:
2    """
3    Simple ReAct pattern for coding tasks.
4    Think -> Act -> Observe -> Repeat
5    """
6
7    async def run(self, task: str):
8        while not self.is_complete():
9            # Single step: think and act
10            thought = await self.think(task)
11            action, params = self.parse_action(thought)
12            result = await self.execute(action, params)
13            self.update_state(result)

Pattern 2: Plan-then-Execute

Better for larger features that require coordinated changes across multiple files.

🐍python
1class PlanExecuteCodingAgent:
2    """
3    Two-phase pattern: plan first, then execute.
4    """
5
6    async def run(self, task: str):
7        # Phase 1: Create detailed plan
8        plan = await self.create_plan(task)
9
10        # Phase 2: Execute each step
11        for step in plan:
12            result = await self.execute_step(step)
13
14            if not result.success:
15                # Replan from current state
16                plan = await self.replan(task, step, result.error)

Pattern 3: Test-Driven

Ideal for bug fixes and refactoring where correctness is paramount.

🐍python
1class TestDrivenCodingAgent:
2    """
3    Test-driven approach: write/run tests first.
4    """
5
6    async def run(self, task: str):
7        # Step 1: Understand the expected behavior
8        tests = await self.identify_tests(task)
9
10        # Step 2: Run existing tests
11        baseline = await self.run_tests(tests)
12
13        # Step 3: Make changes
14        await self.implement(task)
15
16        # Step 4: Verify tests pass
17        while not await self.tests_pass(tests):
18            errors = await self.get_test_errors()
19            await self.fix_errors(errors)

Choosing the Right Pattern

Task TypeRecommended PatternReason
Simple bug fixSimple ReActQuick iteration, minimal planning needed
New featurePlan-then-ExecuteRequires coordinated multi-file changes
RefactoringTest-DrivenNeed to preserve existing behavior
API integrationPlan-then-ExecuteMultiple steps with dependencies
DocumentationSimple ReActLow complexity, quick feedback

Summary

In this section, we've established the architectural foundation for our coding agent. Key concepts include:

  • Layered Architecture: Separating context, planning, execution, and verification into distinct components
  • Code Understanding: Building an index of the codebase for efficient navigation and search
  • Action System: A pluggable system of actions that can be validated and executed safely
  • Agent Loop: The core think-act-observe cycle that drives the agent forward
  • Architecture Patterns: Different approaches for different types of coding tasks

In the next section, we'll implement the file system tools that allow our agent to navigate and manipulate the codebase.