Introduction
Coding agents represent one of the most powerful applications of agentic AI. Unlike simple code completion or chat assistants, coding agents can autonomously navigate codebases, understand context, write code, execute tests, debug failures, and iterate until tasks are complete. In this chapter, we'll build a complete coding agent from scratch, starting with its architecture.
Chapter Goals: By the end of this chapter, you'll have built a fully functional coding agent capable of reading files, writing code, executing commands, managing git operations, running tests, and debugging failures automatically.
What Makes Coding Agents Different
Coding agents have unique requirements that distinguish them from general-purpose AI agents. Understanding these differences is crucial for designing an effective architecture.
Key Characteristics
| Characteristic | General Agent | Coding Agent |
|---|---|---|
| Environment | Web, APIs, databases | File system, terminal, git |
| State | Often stateless | Highly stateful (codebase) |
| Verification | Subjective evaluation | Objective (tests pass/fail) |
| Iteration | Single response | Multiple edit-test cycles |
| Context | Conversation history | Entire codebase understanding |
| Safety | API rate limits | Sandbox execution, file protection |
The Coding Agent Challenge
A coding agent must solve several interconnected problems:
- Context Gathering: Understanding what code already exists, how it's structured, and what conventions are followed
- Task Understanding: Translating natural language requests into specific code changes
- Code Generation: Writing code that integrates correctly with the existing codebase
- Verification: Running tests and checking that changes work correctly
- Iteration: Debugging failures and refining the solution until it works
Core Architecture Components
A well-designed coding agent consists of several distinct layers, each with specific responsibilities. Here's the high-level architecture:
1from abc import ABC, abstractmethod
2from dataclasses import dataclass, field
3from typing import List, Dict, Any, Optional
4from enum import Enum
5
6
7class AgentState(Enum):
8 """Current state of the coding agent."""
9 IDLE = "idle"
10 ANALYZING = "analyzing"
11 PLANNING = "planning"
12 CODING = "coding"
13 TESTING = "testing"
14 DEBUGGING = "debugging"
15 COMPLETE = "complete"
16 ERROR = "error"
17
18
19@dataclass
20class CodebaseContext:
21 """Represents understanding of the current codebase."""
22 root_path: str
23 file_tree: Dict[str, Any] = field(default_factory=dict)
24 open_files: Dict[str, str] = field(default_factory=dict)
25 git_status: Optional[Dict[str, Any]] = None
26 conventions: Dict[str, str] = field(default_factory=dict)
27 dependencies: List[str] = field(default_factory=list)
28
29 def summary(self) -> str:
30 """Generate a summary for the LLM context."""
31 files_count = self._count_files(self.file_tree)
32 return f"""Codebase: {self.root_path}
33Files: {files_count}
34Open files: {list(self.open_files.keys())}
35Git status: {'Clean' if not self.git_status else 'Modified'}
36Dependencies: {len(self.dependencies)} packages"""
37
38 def _count_files(self, tree: Dict) -> int:
39 count = 0
40 for key, value in tree.items():
41 if isinstance(value, dict):
42 count += self._count_files(value)
43 else:
44 count += 1
45 return count
46
47
48@dataclass
49class Task:
50 """A coding task to be executed."""
51 description: str
52 task_type: str # "feature", "bugfix", "refactor", "test"
53 target_files: List[str] = field(default_factory=list)
54 dependencies: List[str] = field(default_factory=list)
55 acceptance_criteria: List[str] = field(default_factory=list)
56 status: str = "pending"
57
58
59@dataclass
60class AgentMemory:
61 """Memory system for the coding agent."""
62 conversation_history: List[Dict[str, str]] = field(default_factory=list)
63 executed_actions: List[Dict[str, Any]] = field(default_factory=list)
64 test_results: List[Dict[str, Any]] = field(default_factory=list)
65 errors_encountered: List[str] = field(default_factory=list)
66 successful_patterns: List[str] = field(default_factory=list)
67
68 def add_action(self, action: str, result: Any, success: bool):
69 self.executed_actions.append({
70 "action": action,
71 "result": str(result)[:500], # Truncate long results
72 "success": success
73 })
74
75 def get_recent_context(self, n: int = 10) -> str:
76 """Get recent actions for context."""
77 recent = self.executed_actions[-n:]
78 return "\n".join([
79 f"- {a['action']}: {'✓' if a['success'] else '✗'}"
80 for a in recent
81 ])
82
83
84class CodingAgent:
85 """
86 Main coding agent architecture.
87
88 Components:
89 - Context Layer: Understanding the codebase
90 - Planning Layer: Breaking down tasks
91 - Action Layer: Executing changes
92 - Verification Layer: Testing and validation
93 - Memory Layer: Learning from actions
94 """
95
96 def __init__(
97 self,
98 llm_client,
99 workspace_path: str,
100 tools: Dict[str, Any],
101 config: Optional[Dict[str, Any]] = None
102 ):
103 self.llm = llm_client
104 self.workspace = workspace_path
105 self.tools = tools
106 self.config = config or {}
107
108 # Initialize components
109 self.context = CodebaseContext(root_path=workspace_path)
110 self.memory = AgentMemory()
111 self.state = AgentState.IDLE
112 self.current_task: Optional[Task] = None
113
114 async def run(self, user_request: str) -> Dict[str, Any]:
115 """Main agent loop for processing a request."""
116 self.state = AgentState.ANALYZING
117
118 try:
119 # Phase 1: Analyze and understand
120 await self._gather_context()
121
122 # Phase 2: Plan the approach
123 self.state = AgentState.PLANNING
124 plan = await self._create_plan(user_request)
125
126 # Phase 3: Execute the plan
127 self.state = AgentState.CODING
128 results = await self._execute_plan(plan)
129
130 # Phase 4: Verify the changes
131 self.state = AgentState.TESTING
132 test_results = await self._run_verification()
133
134 # Phase 5: Debug if needed
135 if not test_results["success"]:
136 self.state = AgentState.DEBUGGING
137 results = await self._debug_and_fix(test_results)
138
139 self.state = AgentState.COMPLETE
140 return {
141 "success": True,
142 "results": results,
143 "test_results": test_results
144 }
145
146 except Exception as e:
147 self.state = AgentState.ERROR
148 self.memory.errors_encountered.append(str(e))
149 return {"success": False, "error": str(e)}
150
151 async def _gather_context(self):
152 """Gather information about the codebase."""
153 # Build file tree
154 self.context.file_tree = await self.tools["list_files"].run(
155 self.workspace
156 )
157
158 # Get git status
159 if await self._has_git():
160 self.context.git_status = await self.tools["git_status"].run()
161
162 # Detect conventions (from config files, README, etc.)
163 self.context.conventions = await self._detect_conventions()
164
165 async def _create_plan(self, request: str) -> List[Dict[str, Any]]:
166 """Create an execution plan for the request."""
167 prompt = f"""You are a coding agent planning how to complete a task.
168
169Codebase Context:
170{self.context.summary()}
171
172User Request: {request}
173
174Recent Actions:
175{self.memory.get_recent_context()}
176
177Create a step-by-step plan. For each step, specify:
1781. action: The type of action (read_file, write_file, run_command, etc.)
1792. target: The file or command target
1803. description: What this step accomplishes
1814. dependencies: Which previous steps must complete first
182
183Return as JSON array."""
184
185 response = await self.llm.generate(prompt)
186 return self._parse_plan(response)
187
188 async def _execute_plan(self, plan: List[Dict[str, Any]]) -> List[Any]:
189 """Execute each step of the plan."""
190 results = []
191
192 for step in plan:
193 action = step["action"]
194 tool = self.tools.get(action)
195
196 if not tool:
197 self.memory.add_action(action, "Tool not found", False)
198 continue
199
200 try:
201 result = await tool.run(**step.get("params", {}))
202 self.memory.add_action(action, result, True)
203 results.append({"step": step, "result": result, "success": True})
204 except Exception as e:
205 self.memory.add_action(action, str(e), False)
206 results.append({"step": step, "error": str(e), "success": False})
207
208 return results
209
210 async def _run_verification(self) -> Dict[str, Any]:
211 """Run tests to verify changes."""
212 # Implementation in later sections
213 pass
214
215 async def _debug_and_fix(self, test_results: Dict) -> Dict[str, Any]:
216 """Debug failures and attempt fixes."""
217 # Implementation in later sections
218 pass
219
220 async def _has_git(self) -> bool:
221 """Check if workspace is a git repository."""
222 try:
223 await self.tools["run_command"].run("git status")
224 return True
225 except:
226 return False
227
228 async def _detect_conventions(self) -> Dict[str, str]:
229 """Detect coding conventions from the codebase."""
230 conventions = {}
231
232 # Check for common config files
233 config_files = [
234 (".eslintrc", "eslint"),
235 ("pyproject.toml", "python"),
236 ("tsconfig.json", "typescript"),
237 (".prettierrc", "prettier"),
238 ]
239
240 for filename, convention_type in config_files:
241 if await self._file_exists(filename):
242 conventions[convention_type] = await self.tools["read_file"].run(
243 f"{self.workspace}/{filename}"
244 )
245
246 return conventions
247
248 async def _file_exists(self, path: str) -> bool:
249 """Check if a file exists."""
250 try:
251 await self.tools["read_file"].run(f"{self.workspace}/{path}")
252 return True
253 except:
254 return False
255
256 def _parse_plan(self, response: str) -> List[Dict[str, Any]]:
257 """Parse LLM response into structured plan."""
258 import json
259 try:
260 # Extract JSON from response
261 start = response.find("[")
262 end = response.rfind("]") + 1
263 return json.loads(response[start:end])
264 except:
265 return []The Code Understanding Layer
Before a coding agent can make changes, it must understand the codebase. The code understanding layer handles parsing, indexing, and semantic analysis.
1import ast
2import os
3from pathlib import Path
4from dataclasses import dataclass, field
5from typing import List, Dict, Set, Optional
6import re
7
8
9@dataclass
10class Symbol:
11 """Represents a code symbol (function, class, variable)."""
12 name: str
13 symbol_type: str # "function", "class", "variable", "import"
14 file_path: str
15 line_number: int
16 signature: Optional[str] = None
17 docstring: Optional[str] = None
18 references: List[str] = field(default_factory=list)
19
20
21@dataclass
22class FileAnalysis:
23 """Analysis results for a single file."""
24 path: str
25 language: str
26 symbols: List[Symbol] = field(default_factory=list)
27 imports: List[str] = field(default_factory=list)
28 dependencies: Set[str] = field(default_factory=set)
29 complexity_score: int = 0
30
31
32class CodeUnderstandingLayer:
33 """
34 Analyzes and indexes the codebase for efficient querying.
35 """
36
37 LANGUAGE_EXTENSIONS = {
38 ".py": "python",
39 ".js": "javascript",
40 ".ts": "typescript",
41 ".tsx": "typescript",
42 ".jsx": "javascript",
43 ".java": "java",
44 ".go": "go",
45 ".rs": "rust",
46 }
47
48 def __init__(self, workspace_path: str):
49 self.workspace = Path(workspace_path)
50 self.file_index: Dict[str, FileAnalysis] = {}
51 self.symbol_index: Dict[str, List[Symbol]] = {}
52 self.dependency_graph: Dict[str, Set[str]] = {}
53
54 async def index_codebase(self) -> Dict[str, Any]:
55 """Build a complete index of the codebase."""
56 stats = {"files": 0, "symbols": 0, "errors": 0}
57
58 for file_path in self._iter_source_files():
59 try:
60 analysis = await self._analyze_file(file_path)
61 self.file_index[str(file_path)] = analysis
62
63 # Index symbols for quick lookup
64 for symbol in analysis.symbols:
65 if symbol.name not in self.symbol_index:
66 self.symbol_index[symbol.name] = []
67 self.symbol_index[symbol.name].append(symbol)
68
69 stats["files"] += 1
70 stats["symbols"] += len(analysis.symbols)
71
72 except Exception as e:
73 stats["errors"] += 1
74
75 # Build dependency graph
76 self._build_dependency_graph()
77
78 return stats
79
80 async def _analyze_file(self, file_path: Path) -> FileAnalysis:
81 """Analyze a single source file."""
82 content = file_path.read_text()
83 language = self.LANGUAGE_EXTENSIONS.get(file_path.suffix, "unknown")
84
85 analysis = FileAnalysis(
86 path=str(file_path),
87 language=language
88 )
89
90 if language == "python":
91 analysis = self._analyze_python(content, analysis)
92 elif language in ("javascript", "typescript"):
93 analysis = self._analyze_javascript(content, analysis)
94
95 return analysis
96
97 def _analyze_python(self, content: str, analysis: FileAnalysis) -> FileAnalysis:
98 """Analyze Python source code."""
99 try:
100 tree = ast.parse(content)
101 except SyntaxError:
102 return analysis
103
104 for node in ast.walk(tree):
105 if isinstance(node, ast.FunctionDef):
106 # Extract function signature
107 args = [arg.arg for arg in node.args.args]
108 signature = f"def {node.name}({', '.join(args)})"
109
110 analysis.symbols.append(Symbol(
111 name=node.name,
112 symbol_type="function",
113 file_path=analysis.path,
114 line_number=node.lineno,
115 signature=signature,
116 docstring=ast.get_docstring(node)
117 ))
118
119 elif isinstance(node, ast.ClassDef):
120 # Get base classes
121 bases = [self._get_name(base) for base in node.bases]
122 signature = f"class {node.name}({', '.join(bases)})"
123
124 analysis.symbols.append(Symbol(
125 name=node.name,
126 symbol_type="class",
127 file_path=analysis.path,
128 line_number=node.lineno,
129 signature=signature,
130 docstring=ast.get_docstring(node)
131 ))
132
133 elif isinstance(node, ast.Import):
134 for alias in node.names:
135 analysis.imports.append(alias.name)
136 analysis.dependencies.add(alias.name.split(".")[0])
137
138 elif isinstance(node, ast.ImportFrom):
139 if node.module:
140 analysis.imports.append(node.module)
141 analysis.dependencies.add(node.module.split(".")[0])
142
143 # Calculate complexity (simplified McCabe)
144 analysis.complexity_score = self._calculate_complexity(tree)
145
146 return analysis
147
148 def _analyze_javascript(self, content: str, analysis: FileAnalysis) -> FileAnalysis:
149 """Analyze JavaScript/TypeScript source code (regex-based)."""
150 # Function declarations
151 for match in re.finditer(
152 r'(?:exports+)?(?:asyncs+)?functions+(w+)s*((.*?))',
153 content
154 ):
155 analysis.symbols.append(Symbol(
156 name=match.group(1),
157 symbol_type="function",
158 file_path=analysis.path,
159 line_number=content[:match.start()].count('\n') + 1,
160 signature=f"function {match.group(1)}({match.group(2)})"
161 ))
162
163 # Arrow functions assigned to const/let
164 for match in re.finditer(
165 r'(?:exports+)?consts+(w+)s*=s*(?:asyncs+)?((.*?))s*=>',
166 content
167 ):
168 analysis.symbols.append(Symbol(
169 name=match.group(1),
170 symbol_type="function",
171 file_path=analysis.path,
172 line_number=content[:match.start()].count('\n') + 1,
173 signature=f"const {match.group(1)} = ({match.group(2)}) =>"
174 ))
175
176 # Class declarations
177 for match in re.finditer(
178 r'(?:exports+)?classs+(w+)(?:s+extendss+(w+))?',
179 content
180 ):
181 base = match.group(2) or ""
182 analysis.symbols.append(Symbol(
183 name=match.group(1),
184 symbol_type="class",
185 file_path=analysis.path,
186 line_number=content[:match.start()].count('\n') + 1,
187 signature=f"class {match.group(1)}" + (f" extends {base}" if base else "")
188 ))
189
190 # Imports
191 for match in re.finditer(r"imports+.*?froms+['"](.+?)['"]", content):
192 analysis.imports.append(match.group(1))
193
194 return analysis
195
196 def _get_name(self, node) -> str:
197 """Get the name from an AST node."""
198 if isinstance(node, ast.Name):
199 return node.id
200 elif isinstance(node, ast.Attribute):
201 return f"{self._get_name(node.value)}.{node.attr}"
202 return "unknown"
203
204 def _calculate_complexity(self, tree: ast.AST) -> int:
205 """Calculate simplified cyclomatic complexity."""
206 complexity = 1
207 for node in ast.walk(tree):
208 if isinstance(node, (ast.If, ast.While, ast.For, ast.ExceptHandler)):
209 complexity += 1
210 elif isinstance(node, ast.BoolOp):
211 complexity += len(node.values) - 1
212 return complexity
213
214 def _build_dependency_graph(self):
215 """Build a graph of file dependencies."""
216 for file_path, analysis in self.file_index.items():
217 self.dependency_graph[file_path] = set()
218
219 for imp in analysis.imports:
220 # Try to resolve import to a file
221 resolved = self._resolve_import(imp, file_path)
222 if resolved:
223 self.dependency_graph[file_path].add(resolved)
224
225 def _resolve_import(self, import_path: str, from_file: str) -> Optional[str]:
226 """Resolve an import to a file path."""
227 # Handle relative imports
228 if import_path.startswith("."):
229 base = Path(from_file).parent
230 parts = import_path.split(".")
231 for part in parts[1:]: # Skip leading dot
232 if part:
233 base = base / part
234
235 for ext in self.LANGUAGE_EXTENSIONS:
236 candidate = str(base) + ext
237 if candidate in self.file_index:
238 return candidate
239
240 return None
241
242 def _iter_source_files(self):
243 """Iterate over all source files in the workspace."""
244 ignore_dirs = {".git", "node_modules", "__pycache__", ".venv", "venv"}
245
246 for root, dirs, files in os.walk(self.workspace):
247 # Skip ignored directories
248 dirs[:] = [d for d in dirs if d not in ignore_dirs]
249
250 for file in files:
251 path = Path(root) / file
252 if path.suffix in self.LANGUAGE_EXTENSIONS:
253 yield path
254
255 def find_symbol(self, name: str) -> List[Symbol]:
256 """Find all occurrences of a symbol."""
257 return self.symbol_index.get(name, [])
258
259 def get_file_context(self, file_path: str, radius: int = 2) -> Dict[str, FileAnalysis]:
260 """Get a file and its nearby dependencies."""
261 context = {}
262 visited = set()
263
264 def collect(path: str, depth: int):
265 if depth > radius or path in visited:
266 return
267 visited.add(path)
268
269 if path in self.file_index:
270 context[path] = self.file_index[path]
271
272 # Collect dependencies
273 for dep in self.dependency_graph.get(path, []):
274 collect(dep, depth + 1)
275
276 collect(file_path, 0)
277 return context
278
279 def search_code(self, query: str, limit: int = 10) -> List[Dict[str, Any]]:
280 """Search for code matching a query."""
281 results = []
282
283 for file_path, analysis in self.file_index.items():
284 # Search in symbol names
285 for symbol in analysis.symbols:
286 if query.lower() in symbol.name.lower():
287 results.append({
288 "type": "symbol",
289 "symbol": symbol,
290 "file": file_path,
291 "score": 1.0 if query.lower() == symbol.name.lower() else 0.5
292 })
293
294 # Search in file content
295 try:
296 content = Path(file_path).read_text()
297 if query.lower() in content.lower():
298 # Find the line
299 for i, line in enumerate(content.split("\n")):
300 if query.lower() in line.lower():
301 results.append({
302 "type": "content",
303 "file": file_path,
304 "line": i + 1,
305 "content": line.strip(),
306 "score": 0.3
307 })
308 except:
309 pass
310
311 # Sort by score and limit
312 results.sort(key=lambda x: x["score"], reverse=True)
313 return results[:limit]The Action Execution Layer
The action layer is responsible for actually making changes to the codebase. It must handle file operations, command execution, and state management safely.
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import Any, Dict, Optional, List
4from pathlib import Path
5import subprocess
6import asyncio
7import json
8
9
10@dataclass
11class ActionResult:
12 """Result of executing an action."""
13 success: bool
14 output: Any
15 error: Optional[str] = None
16 changes: List[Dict[str, Any]] = None
17
18 def __post_init__(self):
19 if self.changes is None:
20 self.changes = []
21
22
23class Action(ABC):
24 """Base class for all agent actions."""
25
26 @property
27 @abstractmethod
28 def name(self) -> str:
29 """Unique name for this action."""
30 pass
31
32 @property
33 @abstractmethod
34 def description(self) -> str:
35 """Description for LLM context."""
36 pass
37
38 @property
39 def schema(self) -> Dict[str, Any]:
40 """JSON schema for action parameters."""
41 return {}
42
43 @abstractmethod
44 async def execute(self, **params) -> ActionResult:
45 """Execute the action with given parameters."""
46 pass
47
48 def validate_params(self, params: Dict[str, Any]) -> bool:
49 """Validate parameters against schema."""
50 # Basic validation - extend with jsonschema in production
51 required = self.schema.get("required", [])
52 return all(key in params for key in required)
53
54
55class ReadFileAction(Action):
56 """Read contents of a file."""
57
58 def __init__(self, workspace: Path):
59 self.workspace = workspace
60
61 @property
62 def name(self) -> str:
63 return "read_file"
64
65 @property
66 def description(self) -> str:
67 return "Read the contents of a file at the specified path"
68
69 @property
70 def schema(self) -> Dict[str, Any]:
71 return {
72 "type": "object",
73 "properties": {
74 "path": {
75 "type": "string",
76 "description": "Path to the file relative to workspace"
77 },
78 "start_line": {
79 "type": "integer",
80 "description": "Start reading from this line (1-indexed)"
81 },
82 "end_line": {
83 "type": "integer",
84 "description": "Stop reading at this line"
85 }
86 },
87 "required": ["path"]
88 }
89
90 async def execute(self, path: str, start_line: int = None, end_line: int = None) -> ActionResult:
91 try:
92 file_path = self.workspace / path
93
94 # Security check - prevent path traversal
95 if not file_path.resolve().is_relative_to(self.workspace.resolve()):
96 return ActionResult(
97 success=False,
98 output=None,
99 error="Path traversal not allowed"
100 )
101
102 if not file_path.exists():
103 return ActionResult(
104 success=False,
105 output=None,
106 error=f"File not found: {path}"
107 )
108
109 content = file_path.read_text()
110
111 # Handle line range
112 if start_line or end_line:
113 lines = content.split("\n")
114 start = (start_line - 1) if start_line else 0
115 end = end_line if end_line else len(lines)
116 content = "\n".join(lines[start:end])
117
118 return ActionResult(
119 success=True,
120 output=content
121 )
122
123 except Exception as e:
124 return ActionResult(
125 success=False,
126 output=None,
127 error=str(e)
128 )
129
130
131class WriteFileAction(Action):
132 """Write or create a file."""
133
134 def __init__(self, workspace: Path):
135 self.workspace = workspace
136
137 @property
138 def name(self) -> str:
139 return "write_file"
140
141 @property
142 def description(self) -> str:
143 return "Write content to a file, creating it if it doesn't exist"
144
145 @property
146 def schema(self) -> Dict[str, Any]:
147 return {
148 "type": "object",
149 "properties": {
150 "path": {
151 "type": "string",
152 "description": "Path to the file relative to workspace"
153 },
154 "content": {
155 "type": "string",
156 "description": "Content to write to the file"
157 }
158 },
159 "required": ["path", "content"]
160 }
161
162 async def execute(self, path: str, content: str) -> ActionResult:
163 try:
164 file_path = self.workspace / path
165
166 # Security check
167 if not file_path.resolve().is_relative_to(self.workspace.resolve()):
168 return ActionResult(
169 success=False,
170 output=None,
171 error="Path traversal not allowed"
172 )
173
174 # Store original for rollback
175 original = None
176 if file_path.exists():
177 original = file_path.read_text()
178
179 # Create parent directories
180 file_path.parent.mkdir(parents=True, exist_ok=True)
181
182 # Write the file
183 file_path.write_text(content)
184
185 return ActionResult(
186 success=True,
187 output=f"Wrote {len(content)} bytes to {path}",
188 changes=[{
189 "type": "write",
190 "path": path,
191 "original": original,
192 "new": content
193 }]
194 )
195
196 except Exception as e:
197 return ActionResult(
198 success=False,
199 output=None,
200 error=str(e)
201 )
202
203
204class EditFileAction(Action):
205 """Edit a specific portion of a file."""
206
207 def __init__(self, workspace: Path):
208 self.workspace = workspace
209
210 @property
211 def name(self) -> str:
212 return "edit_file"
213
214 @property
215 def description(self) -> str:
216 return "Replace a specific section of a file with new content"
217
218 @property
219 def schema(self) -> Dict[str, Any]:
220 return {
221 "type": "object",
222 "properties": {
223 "path": {
224 "type": "string",
225 "description": "Path to the file"
226 },
227 "old_content": {
228 "type": "string",
229 "description": "Exact content to replace"
230 },
231 "new_content": {
232 "type": "string",
233 "description": "New content to insert"
234 }
235 },
236 "required": ["path", "old_content", "new_content"]
237 }
238
239 async def execute(
240 self,
241 path: str,
242 old_content: str,
243 new_content: str
244 ) -> ActionResult:
245 try:
246 file_path = self.workspace / path
247
248 if not file_path.exists():
249 return ActionResult(
250 success=False,
251 output=None,
252 error=f"File not found: {path}"
253 )
254
255 content = file_path.read_text()
256
257 # Check if old_content exists
258 if old_content not in content:
259 return ActionResult(
260 success=False,
261 output=None,
262 error="Old content not found in file"
263 )
264
265 # Replace content
266 new_file_content = content.replace(old_content, new_content, 1)
267 file_path.write_text(new_file_content)
268
269 return ActionResult(
270 success=True,
271 output=f"Edited {path}",
272 changes=[{
273 "type": "edit",
274 "path": path,
275 "old": old_content,
276 "new": new_content
277 }]
278 )
279
280 except Exception as e:
281 return ActionResult(
282 success=False,
283 output=None,
284 error=str(e)
285 )
286
287
288class RunCommandAction(Action):
289 """Execute a shell command."""
290
291 def __init__(self, workspace: Path, allowed_commands: List[str] = None):
292 self.workspace = workspace
293 self.allowed_commands = allowed_commands or [
294 "npm", "npx", "python", "pytest", "pip",
295 "git", "ls", "cat", "grep", "find"
296 ]
297
298 @property
299 def name(self) -> str:
300 return "run_command"
301
302 @property
303 def description(self) -> str:
304 return f"Execute a shell command. Allowed: {', '.join(self.allowed_commands)}"
305
306 @property
307 def schema(self) -> Dict[str, Any]:
308 return {
309 "type": "object",
310 "properties": {
311 "command": {
312 "type": "string",
313 "description": "The command to execute"
314 },
315 "timeout": {
316 "type": "integer",
317 "description": "Timeout in seconds (default 30)"
318 }
319 },
320 "required": ["command"]
321 }
322
323 async def execute(self, command: str, timeout: int = 30) -> ActionResult:
324 try:
325 # Security check - validate command
326 cmd_parts = command.split()
327 if not cmd_parts:
328 return ActionResult(
329 success=False,
330 output=None,
331 error="Empty command"
332 )
333
334 base_command = cmd_parts[0]
335 if base_command not in self.allowed_commands:
336 return ActionResult(
337 success=False,
338 output=None,
339 error=f"Command not allowed: {base_command}"
340 )
341
342 # Run command
343 process = await asyncio.create_subprocess_shell(
344 command,
345 stdout=asyncio.subprocess.PIPE,
346 stderr=asyncio.subprocess.PIPE,
347 cwd=self.workspace
348 )
349
350 try:
351 stdout, stderr = await asyncio.wait_for(
352 process.communicate(),
353 timeout=timeout
354 )
355 except asyncio.TimeoutError:
356 process.kill()
357 return ActionResult(
358 success=False,
359 output=None,
360 error=f"Command timed out after {timeout}s"
361 )
362
363 output = stdout.decode() if stdout else ""
364 error = stderr.decode() if stderr else ""
365
366 return ActionResult(
367 success=process.returncode == 0,
368 output=output,
369 error=error if process.returncode != 0 else None
370 )
371
372 except Exception as e:
373 return ActionResult(
374 success=False,
375 output=None,
376 error=str(e)
377 )
378
379
380class ActionExecutor:
381 """Manages and executes agent actions."""
382
383 def __init__(self, workspace: Path):
384 self.workspace = workspace
385 self.actions: Dict[str, Action] = {}
386 self.history: List[Dict[str, Any]] = []
387
388 # Register default actions
389 self.register(ReadFileAction(workspace))
390 self.register(WriteFileAction(workspace))
391 self.register(EditFileAction(workspace))
392 self.register(RunCommandAction(workspace))
393
394 def register(self, action: Action):
395 """Register an action."""
396 self.actions[action.name] = action
397
398 def get_action_descriptions(self) -> str:
399 """Get descriptions of all actions for LLM context."""
400 descriptions = []
401 for action in self.actions.values():
402 desc = f"- {action.name}: {action.description}"
403 if action.schema.get("properties"):
404 params = list(action.schema["properties"].keys())
405 desc += f" (params: {', '.join(params)})"
406 descriptions.append(desc)
407 return "\n".join(descriptions)
408
409 async def execute(
410 self,
411 action_name: str,
412 params: Dict[str, Any]
413 ) -> ActionResult:
414 """Execute an action by name."""
415 if action_name not in self.actions:
416 return ActionResult(
417 success=False,
418 output=None,
419 error=f"Unknown action: {action_name}"
420 )
421
422 action = self.actions[action_name]
423
424 # Validate parameters
425 if not action.validate_params(params):
426 return ActionResult(
427 success=False,
428 output=None,
429 error=f"Invalid parameters for {action_name}"
430 )
431
432 # Execute and record
433 result = await action.execute(**params)
434
435 self.history.append({
436 "action": action_name,
437 "params": params,
438 "result": {
439 "success": result.success,
440 "output": str(result.output)[:500] if result.output else None,
441 "error": result.error
442 }
443 })
444
445 return result
446
447 def get_undo_actions(self) -> List[Dict[str, Any]]:
448 """Get actions needed to undo recent changes."""
449 undo_actions = []
450
451 for entry in reversed(self.history):
452 result = entry.get("result", {})
453 # Only WriteFile and EditFile can be undone
454 if entry["action"] == "write_file" and result.get("changes"):
455 for change in result["changes"]:
456 if change.get("original") is not None:
457 undo_actions.append({
458 "action": "write_file",
459 "params": {
460 "path": change["path"],
461 "content": change["original"]
462 }
463 })
464
465 return undo_actionsDesigning the Agent Loop
The agent loop ties all components together, orchestrating the flow from user request to completed task. A well-designed loop handles errors gracefully and knows when to ask for help.
1import asyncio
2from typing import AsyncGenerator, Dict, Any, List
3from dataclasses import dataclass
4from enum import Enum
5
6
7class LoopState(Enum):
8 THINKING = "thinking"
9 ACTING = "acting"
10 OBSERVING = "observing"
11 COMPLETE = "complete"
12 STUCK = "stuck"
13
14
15@dataclass
16class LoopStep:
17 """A single step in the agent loop."""
18 thought: str
19 action: str
20 action_params: Dict[str, Any]
21 observation: str
22 success: bool
23
24
25class CodingAgentLoop:
26 """
27 The main agent loop implementing think-act-observe.
28 """
29
30 MAX_ITERATIONS = 20
31 MAX_CONSECUTIVE_FAILURES = 3
32
33 def __init__(
34 self,
35 llm_client,
36 action_executor,
37 code_understanding,
38 memory
39 ):
40 self.llm = llm_client
41 self.executor = action_executor
42 self.code_index = code_understanding
43 self.memory = memory
44
45 self.state = LoopState.THINKING
46 self.steps: List[LoopStep] = []
47 self.consecutive_failures = 0
48
49 async def run(
50 self,
51 task: str,
52 context: Dict[str, Any] = None
53 ) -> AsyncGenerator[Dict[str, Any], None]:
54 """
55 Run the agent loop, yielding updates at each step.
56 """
57 iteration = 0
58
59 while iteration < self.MAX_ITERATIONS:
60 iteration += 1
61
62 # Phase 1: Think
63 self.state = LoopState.THINKING
64 yield {"state": "thinking", "iteration": iteration}
65
66 thought, action, params = await self._think(task, context)
67
68 # Check for completion
69 if action == "complete":
70 self.state = LoopState.COMPLETE
71 yield {
72 "state": "complete",
73 "thought": thought,
74 "summary": params.get("summary", "Task completed")
75 }
76 return
77
78 # Check if stuck
79 if action == "ask_user":
80 self.state = LoopState.STUCK
81 yield {
82 "state": "stuck",
83 "thought": thought,
84 "question": params.get("question")
85 }
86 return
87
88 # Phase 2: Act
89 self.state = LoopState.ACTING
90 yield {
91 "state": "acting",
92 "thought": thought,
93 "action": action,
94 "params": params
95 }
96
97 result = await self.executor.execute(action, params)
98
99 # Phase 3: Observe
100 self.state = LoopState.OBSERVING
101
102 observation = self._format_observation(result)
103
104 step = LoopStep(
105 thought=thought,
106 action=action,
107 action_params=params,
108 observation=observation,
109 success=result.success
110 )
111 self.steps.append(step)
112
113 yield {
114 "state": "observing",
115 "observation": observation,
116 "success": result.success
117 }
118
119 # Track failures
120 if not result.success:
121 self.consecutive_failures += 1
122 if self.consecutive_failures >= self.MAX_CONSECUTIVE_FAILURES:
123 self.state = LoopState.STUCK
124 yield {
125 "state": "stuck",
126 "reason": "Too many consecutive failures",
127 "last_error": result.error
128 }
129 return
130 else:
131 self.consecutive_failures = 0
132
133 # Update memory
134 self.memory.add_action(action, result.output, result.success)
135
136 # Max iterations reached
137 yield {
138 "state": "max_iterations",
139 "iterations": iteration
140 }
141
142 async def _think(
143 self,
144 task: str,
145 context: Dict[str, Any] = None
146 ) -> tuple[str, str, Dict[str, Any]]:
147 """
148 Decide what to do next.
149 """
150 # Build prompt with history
151 history_text = self._format_history()
152 actions_text = self.executor.get_action_descriptions()
153
154 prompt = f"""You are a coding agent working on a task.
155
156TASK: {task}
157
158AVAILABLE ACTIONS:
159{actions_text}
160- complete: Mark the task as done (params: summary)
161- ask_user: Ask the user for clarification (params: question)
162
163PREVIOUS STEPS:
164{history_text}
165
166CODEBASE CONTEXT:
167{context.get('summary', 'No context gathered yet') if context else 'No context'}
168
169Based on the task and previous steps, decide what to do next.
170
171Respond in this JSON format:
172{{
173 "thought": "Your reasoning about what to do next",
174 "action": "The action to take",
175 "params": {{ ... action parameters ... }}
176}}
177
178Think step by step. If the task is complete, use the "complete" action.
179If you need more information from the user, use "ask_user"."""
180
181 response = await self.llm.generate(prompt)
182
183 # Parse response
184 return self._parse_response(response)
185
186 def _parse_response(
187 self,
188 response: str
189 ) -> tuple[str, str, Dict[str, Any]]:
190 """Parse the LLM response into thought, action, and params."""
191 import json
192
193 try:
194 # Find JSON in response
195 start = response.find("{")
196 end = response.rfind("}") + 1
197 data = json.loads(response[start:end])
198
199 return (
200 data.get("thought", ""),
201 data.get("action", ""),
202 data.get("params", {})
203 )
204 except:
205 # Fallback
206 return ("Failed to parse response", "ask_user", {
207 "question": "I encountered an error. Could you clarify the task?"
208 })
209
210 def _format_history(self) -> str:
211 """Format step history for the prompt."""
212 if not self.steps:
213 return "No previous steps"
214
215 lines = []
216 for i, step in enumerate(self.steps[-5:]): # Last 5 steps
217 status = "✓" if step.success else "✗"
218 lines.append(f"{i+1}. [{status}] {step.action}")
219 lines.append(f" Thought: {step.thought[:100]}...")
220 lines.append(f" Result: {step.observation[:100]}...")
221
222 return "\n".join(lines)
223
224 def _format_observation(self, result) -> str:
225 """Format action result as observation."""
226 if result.success:
227 output = str(result.output)
228 if len(output) > 500:
229 output = output[:500] + "... (truncated)"
230 return f"Success: {output}"
231 else:
232 return f"Error: {result.error}"
233
234 def get_summary(self) -> Dict[str, Any]:
235 """Get a summary of the loop execution."""
236 return {
237 "total_steps": len(self.steps),
238 "successful_steps": sum(1 for s in self.steps if s.success),
239 "failed_steps": sum(1 for s in self.steps if not s.success),
240 "final_state": self.state.value,
241 "actions_used": list(set(s.action for s in self.steps))
242 }Architecture Patterns
Different coding tasks benefit from different architectural patterns. Here are the most common approaches:
Pattern 1: Simple ReAct
Best for straightforward tasks like simple bug fixes or adding a single function.
1class SimpleReActCodingAgent:
2 """
3 Simple ReAct pattern for coding tasks.
4 Think -> Act -> Observe -> Repeat
5 """
6
7 async def run(self, task: str):
8 while not self.is_complete():
9 # Single step: think and act
10 thought = await self.think(task)
11 action, params = self.parse_action(thought)
12 result = await self.execute(action, params)
13 self.update_state(result)Pattern 2: Plan-then-Execute
Better for larger features that require coordinated changes across multiple files.
1class PlanExecuteCodingAgent:
2 """
3 Two-phase pattern: plan first, then execute.
4 """
5
6 async def run(self, task: str):
7 # Phase 1: Create detailed plan
8 plan = await self.create_plan(task)
9
10 # Phase 2: Execute each step
11 for step in plan:
12 result = await self.execute_step(step)
13
14 if not result.success:
15 # Replan from current state
16 plan = await self.replan(task, step, result.error)Pattern 3: Test-Driven
Ideal for bug fixes and refactoring where correctness is paramount.
1class TestDrivenCodingAgent:
2 """
3 Test-driven approach: write/run tests first.
4 """
5
6 async def run(self, task: str):
7 # Step 1: Understand the expected behavior
8 tests = await self.identify_tests(task)
9
10 # Step 2: Run existing tests
11 baseline = await self.run_tests(tests)
12
13 # Step 3: Make changes
14 await self.implement(task)
15
16 # Step 4: Verify tests pass
17 while not await self.tests_pass(tests):
18 errors = await self.get_test_errors()
19 await self.fix_errors(errors)Choosing the Right Pattern
| Task Type | Recommended Pattern | Reason |
|---|---|---|
| Simple bug fix | Simple ReAct | Quick iteration, minimal planning needed |
| New feature | Plan-then-Execute | Requires coordinated multi-file changes |
| Refactoring | Test-Driven | Need to preserve existing behavior |
| API integration | Plan-then-Execute | Multiple steps with dependencies |
| Documentation | Simple ReAct | Low complexity, quick feedback |
Summary
In this section, we've established the architectural foundation for our coding agent. Key concepts include:
- Layered Architecture: Separating context, planning, execution, and verification into distinct components
- Code Understanding: Building an index of the codebase for efficient navigation and search
- Action System: A pluggable system of actions that can be validated and executed safely
- Agent Loop: The core think-act-observe cycle that drives the agent forward
- Architecture Patterns: Different approaches for different types of coding tasks
In the next section, we'll implement the file system tools that allow our agent to navigate and manipulate the codebase.