Introduction
File system tools are the foundation of any coding agent. The ability to read, write, search, and navigate the codebase determines how effectively the agent can understand and modify code. In this section, we'll build a comprehensive toolkit for file system operations optimized for coding tasks.
Why This Matters: A coding agent's effectiveness is directly proportional to how well it can navigate and understand the codebase. Poor file system tools lead to incorrect context gathering, missed dependencies, and broken code changes.
File Operations Overview
Coding agents need several categories of file operations, each with specific requirements:
| Category | Operations | Key Requirements |
|---|---|---|
| Reading | Read file, read lines, read with context | Efficient for large files, line-aware |
| Writing | Create, overwrite, append | Atomic writes, backup capability |
| Editing | Replace text, insert, delete | Precise targeting, rollback support |
| Navigation | List files, tree view, glob patterns | Ignore patterns (.git, node_modules) |
| Search | Text search, regex, symbol search | Fast, context-aware results |
The Tool Interface
All file system tools follow a consistent interface that makes them easy for the LLM to understand and use:
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3from typing import Any, Dict, Optional, List
4from pathlib import Path
5import json
6
7
8@dataclass
9class ToolResult:
10 """Standardized result from any tool execution."""
11 success: bool
12 data: Any
13 error: Optional[str] = None
14 metadata: Dict[str, Any] = None
15
16 def to_observation(self) -> str:
17 """Convert to text observation for the agent."""
18 if self.success:
19 if isinstance(self.data, str):
20 return self.data
21 return json.dumps(self.data, indent=2)
22 return f"Error: {self.error}"
23
24
25class FileSystemTool(ABC):
26 """Base class for all file system tools."""
27
28 def __init__(self, workspace: Path, config: Dict[str, Any] = None):
29 self.workspace = Path(workspace).resolve()
30 self.config = config or {}
31
32 @property
33 @abstractmethod
34 def name(self) -> str:
35 """Tool name for the agent to reference."""
36 pass
37
38 @property
39 @abstractmethod
40 def description(self) -> str:
41 """Description for the LLM to understand usage."""
42 pass
43
44 @property
45 @abstractmethod
46 def parameters_schema(self) -> Dict[str, Any]:
47 """JSON Schema for tool parameters."""
48 pass
49
50 @abstractmethod
51 async def run(self, **params) -> ToolResult:
52 """Execute the tool with given parameters."""
53 pass
54
55 def _validate_path(self, path: str) -> tuple[bool, Optional[Path]]:
56 """
57 Validate and resolve a path, ensuring it's within workspace.
58 Returns (is_valid, resolved_path)
59 """
60 try:
61 # Handle relative and absolute paths
62 if Path(path).is_absolute():
63 resolved = Path(path).resolve()
64 else:
65 resolved = (self.workspace / path).resolve()
66
67 # Security check: must be within workspace
68 if not str(resolved).startswith(str(self.workspace)):
69 return False, None
70
71 return True, resolved
72 except Exception:
73 return False, None
74
75 def _should_ignore(self, path: Path) -> bool:
76 """Check if a path should be ignored."""
77 ignore_patterns = self.config.get("ignore_patterns", [
78 ".git", "node_modules", "__pycache__", ".venv",
79 "venv", ".env", ".idea", ".vscode", "dist", "build"
80 ])
81
82 for part in path.parts:
83 if part in ignore_patterns:
84 return True
85 if part.startswith(".") and part not in [".", ".."]:
86 # Skip hidden files unless explicitly configured
87 if not self.config.get("include_hidden", False):
88 return True
89
90 return FalseReading Files Intelligently
Reading files seems simple, but coding agents need intelligent reading capabilities that handle large files, binary detection, and context-aware retrieval.
1import mimetypes
2from typing import Optional, List, Tuple
3
4
5class ReadFileTool(FileSystemTool):
6 """Read file contents with intelligent handling."""
7
8 MAX_FILE_SIZE = 1024 * 1024 # 1MB default limit
9 BINARY_CHECK_BYTES = 8192
10
11 @property
12 def name(self) -> str:
13 return "read_file"
14
15 @property
16 def description(self) -> str:
17 return """Read the contents of a file. Supports:
18- Reading entire file or specific line ranges
19- Automatic encoding detection
20- Binary file detection
21- Large file handling with truncation warnings"""
22
23 @property
24 def parameters_schema(self) -> Dict[str, Any]:
25 return {
26 "type": "object",
27 "properties": {
28 "path": {
29 "type": "string",
30 "description": "Path to the file (relative to workspace)"
31 },
32 "start_line": {
33 "type": "integer",
34 "description": "Start reading from this line (1-indexed, inclusive)"
35 },
36 "end_line": {
37 "type": "integer",
38 "description": "Stop reading at this line (inclusive)"
39 },
40 "context_lines": {
41 "type": "integer",
42 "description": "Number of context lines around the range"
43 }
44 },
45 "required": ["path"]
46 }
47
48 async def run(
49 self,
50 path: str,
51 start_line: int = None,
52 end_line: int = None,
53 context_lines: int = 0
54 ) -> ToolResult:
55 # Validate path
56 valid, resolved_path = self._validate_path(path)
57 if not valid:
58 return ToolResult(
59 success=False,
60 data=None,
61 error=f"Invalid path: {path}"
62 )
63
64 if not resolved_path.exists():
65 return ToolResult(
66 success=False,
67 data=None,
68 error=f"File not found: {path}"
69 )
70
71 if resolved_path.is_dir():
72 return ToolResult(
73 success=False,
74 data=None,
75 error=f"Path is a directory, not a file: {path}"
76 )
77
78 # Check file size
79 file_size = resolved_path.stat().st_size
80
81 # Detect binary files
82 if self._is_binary(resolved_path):
83 return ToolResult(
84 success=True,
85 data=f"[Binary file: {file_size} bytes]",
86 metadata={"is_binary": True, "size": file_size}
87 )
88
89 try:
90 # Read file with encoding detection
91 content = self._read_with_encoding(resolved_path)
92
93 # Handle line ranges
94 if start_line is not None or end_line is not None:
95 content, metadata = self._extract_lines(
96 content,
97 start_line,
98 end_line,
99 context_lines
100 )
101 else:
102 # Check if file is too large
103 if file_size > self.MAX_FILE_SIZE:
104 lines = content.split("\n")
105 content = "\n".join(lines[:1000])
106 content += f"\n\n... [Truncated: {len(lines)} total lines]"
107
108 metadata = {
109 "total_lines": content.count("\n") + 1,
110 "size": file_size
111 }
112
113 return ToolResult(
114 success=True,
115 data=content,
116 metadata=metadata
117 )
118
119 except Exception as e:
120 return ToolResult(
121 success=False,
122 data=None,
123 error=f"Failed to read file: {str(e)}"
124 )
125
126 def _is_binary(self, path: Path) -> bool:
127 """Check if a file is binary."""
128 # Check by mimetype first
129 mime, _ = mimetypes.guess_type(str(path))
130 if mime:
131 if mime.startswith("text/"):
132 return False
133 if mime in ["application/json", "application/xml", "application/javascript"]:
134 return False
135
136 # Check by content
137 try:
138 with open(path, "rb") as f:
139 chunk = f.read(self.BINARY_CHECK_BYTES)
140 # Look for null bytes (common in binary files)
141 if b"\x00" in chunk:
142 return True
143 # Check if most bytes are printable
144 text_chars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)))
145 non_text = sum(1 for b in chunk if b not in text_chars)
146 return non_text / len(chunk) > 0.3 if chunk else False
147 except:
148 return True
149
150 def _read_with_encoding(self, path: Path) -> str:
151 """Read file with automatic encoding detection."""
152 encodings = ["utf-8", "utf-16", "latin-1", "cp1252"]
153
154 for encoding in encodings:
155 try:
156 return path.read_text(encoding=encoding)
157 except UnicodeDecodeError:
158 continue
159
160 # Fallback: read as latin-1 (accepts any byte sequence)
161 return path.read_text(encoding="latin-1")
162
163 def _extract_lines(
164 self,
165 content: str,
166 start: Optional[int],
167 end: Optional[int],
168 context: int
169 ) -> Tuple[str, Dict]:
170 """Extract specific lines from content."""
171 lines = content.split("\n")
172 total_lines = len(lines)
173
174 # Default values
175 start = max(1, start or 1)
176 end = min(total_lines, end or total_lines)
177
178 # Add context
179 actual_start = max(1, start - context)
180 actual_end = min(total_lines, end + context)
181
182 # Extract lines (convert to 0-indexed)
183 extracted = lines[actual_start - 1:actual_end]
184
185 # Add line numbers
186 numbered_lines = []
187 for i, line in enumerate(extracted, start=actual_start):
188 prefix = ">" if start <= i <= end else " "
189 numbered_lines.append(f"{prefix} {i:4d} | {line}")
190
191 return "\n".join(numbered_lines), {
192 "start_line": actual_start,
193 "end_line": actual_end,
194 "total_lines": total_lines,
195 "requested_range": (start, end)
196 }
197
198
199class ReadMultipleFilesTool(FileSystemTool):
200 """Read multiple files in a single operation."""
201
202 @property
203 def name(self) -> str:
204 return "read_multiple_files"
205
206 @property
207 def description(self) -> str:
208 return "Read multiple files at once. Efficient for gathering context from related files."
209
210 @property
211 def parameters_schema(self) -> Dict[str, Any]:
212 return {
213 "type": "object",
214 "properties": {
215 "paths": {
216 "type": "array",
217 "items": {"type": "string"},
218 "description": "List of file paths to read"
219 },
220 "max_lines_per_file": {
221 "type": "integer",
222 "description": "Maximum lines to read from each file"
223 }
224 },
225 "required": ["paths"]
226 }
227
228 async def run(
229 self,
230 paths: List[str],
231 max_lines_per_file: int = 200
232 ) -> ToolResult:
233 results = {}
234 errors = []
235
236 read_tool = ReadFileTool(self.workspace, self.config)
237
238 for path in paths:
239 result = await read_tool.run(path)
240
241 if result.success:
242 content = result.data
243 lines = content.split("\n")
244
245 if len(lines) > max_lines_per_file:
246 content = "\n".join(lines[:max_lines_per_file])
247 content += f"\n... [{len(lines) - max_lines_per_file} more lines]"
248
249 results[path] = content
250 else:
251 errors.append(f"{path}: {result.error}")
252
253 return ToolResult(
254 success=len(errors) == 0,
255 data=results,
256 error="; ".join(errors) if errors else None,
257 metadata={
258 "files_read": len(results),
259 "files_failed": len(errors)
260 }
261 )read_multiple_files tool is particularly useful for coding agents that need to understand how multiple files relate to each other, such as a class and its tests, or a component and its dependencies.Writing and Editing Files
Writing and editing files requires careful handling to avoid data loss and ensure changes can be rolled back if needed.
1import shutil
2from datetime import datetime
3from typing import Optional, List, Dict
4import difflib
5
6
7class WriteFileTool(FileSystemTool):
8 """Write or create files with safety features."""
9
10 @property
11 def name(self) -> str:
12 return "write_file"
13
14 @property
15 def description(self) -> str:
16 return """Write content to a file. Creates parent directories if needed.
17Options for backup and overwrite behavior."""
18
19 @property
20 def parameters_schema(self) -> Dict[str, Any]:
21 return {
22 "type": "object",
23 "properties": {
24 "path": {
25 "type": "string",
26 "description": "Path for the file"
27 },
28 "content": {
29 "type": "string",
30 "description": "Content to write"
31 },
32 "create_backup": {
33 "type": "boolean",
34 "description": "Create backup of existing file"
35 }
36 },
37 "required": ["path", "content"]
38 }
39
40 async def run(
41 self,
42 path: str,
43 content: str,
44 create_backup: bool = True
45 ) -> ToolResult:
46 valid, resolved_path = self._validate_path(path)
47 if not valid:
48 return ToolResult(
49 success=False,
50 data=None,
51 error=f"Invalid path: {path}"
52 )
53
54 try:
55 # Create backup if file exists
56 backup_path = None
57 original_content = None
58
59 if resolved_path.exists():
60 original_content = resolved_path.read_text()
61
62 if create_backup:
63 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
64 backup_path = resolved_path.with_suffix(
65 f".{timestamp}.bak"
66 )
67 shutil.copy2(resolved_path, backup_path)
68
69 # Create parent directories
70 resolved_path.parent.mkdir(parents=True, exist_ok=True)
71
72 # Write file
73 resolved_path.write_text(content)
74
75 # Generate diff for logging
76 diff = None
77 if original_content:
78 diff = self._generate_diff(original_content, content, path)
79
80 return ToolResult(
81 success=True,
82 data=f"Wrote {len(content)} bytes to {path}",
83 metadata={
84 "bytes_written": len(content),
85 "backup": str(backup_path) if backup_path else None,
86 "is_new_file": original_content is None,
87 "diff_summary": diff
88 }
89 )
90
91 except Exception as e:
92 return ToolResult(
93 success=False,
94 data=None,
95 error=f"Failed to write file: {str(e)}"
96 )
97
98 def _generate_diff(self, old: str, new: str, path: str) -> str:
99 """Generate a unified diff summary."""
100 old_lines = old.splitlines(keepends=True)
101 new_lines = new.splitlines(keepends=True)
102
103 diff = list(difflib.unified_diff(
104 old_lines, new_lines,
105 fromfile=f"a/{path}",
106 tofile=f"b/{path}",
107 lineterm=""
108 ))
109
110 if not diff:
111 return "No changes"
112
113 # Count changes
114 additions = sum(1 for line in diff if line.startswith("+") and not line.startswith("+++"))
115 deletions = sum(1 for line in diff if line.startswith("-") and not line.startswith("---"))
116
117 return f"+{additions} -{deletions} lines"
118
119
120class EditFileTool(FileSystemTool):
121 """Make precise edits to existing files."""
122
123 @property
124 def name(self) -> str:
125 return "edit_file"
126
127 @property
128 def description(self) -> str:
129 return """Edit a file by replacing specific content.
130Use this for targeted changes rather than rewriting entire files."""
131
132 @property
133 def parameters_schema(self) -> Dict[str, Any]:
134 return {
135 "type": "object",
136 "properties": {
137 "path": {
138 "type": "string",
139 "description": "Path to the file"
140 },
141 "old_content": {
142 "type": "string",
143 "description": "Exact content to find and replace"
144 },
145 "new_content": {
146 "type": "string",
147 "description": "New content to insert"
148 }
149 },
150 "required": ["path", "old_content", "new_content"]
151 }
152
153 async def run(
154 self,
155 path: str,
156 old_content: str,
157 new_content: str
158 ) -> ToolResult:
159 valid, resolved_path = self._validate_path(path)
160 if not valid:
161 return ToolResult(
162 success=False,
163 data=None,
164 error=f"Invalid path: {path}"
165 )
166
167 if not resolved_path.exists():
168 return ToolResult(
169 success=False,
170 data=None,
171 error=f"File not found: {path}"
172 )
173
174 try:
175 content = resolved_path.read_text()
176
177 # Check if old_content exists
178 if old_content not in content:
179 # Try to find similar content
180 similar = self._find_similar(content, old_content)
181 if similar:
182 return ToolResult(
183 success=False,
184 data=None,
185 error=f"Exact content not found. Did you mean:\n{similar}"
186 )
187 return ToolResult(
188 success=False,
189 data=None,
190 error="Content to replace not found in file"
191 )
192
193 # Check for multiple occurrences
194 count = content.count(old_content)
195 if count > 1:
196 return ToolResult(
197 success=False,
198 data=None,
199 error=f"Content appears {count} times. Please provide more context for unique match."
200 )
201
202 # Make the replacement
203 new_file_content = content.replace(old_content, new_content, 1)
204 resolved_path.write_text(new_file_content)
205
206 return ToolResult(
207 success=True,
208 data=f"Edited {path}: replaced {len(old_content)} chars with {len(new_content)} chars",
209 metadata={
210 "chars_removed": len(old_content),
211 "chars_added": len(new_content)
212 }
213 )
214
215 except Exception as e:
216 return ToolResult(
217 success=False,
218 data=None,
219 error=f"Failed to edit file: {str(e)}"
220 )
221
222 def _find_similar(self, content: str, target: str, threshold: float = 0.6) -> Optional[str]:
223 """Find content similar to the target."""
224 lines = content.split("\n")
225 target_lines = target.split("\n")
226
227 # Look for similar line sequences
228 for i in range(len(lines) - len(target_lines) + 1):
229 candidate_lines = lines[i:i + len(target_lines)]
230 candidate = "\n".join(candidate_lines)
231
232 ratio = difflib.SequenceMatcher(None, target, candidate).ratio()
233 if ratio > threshold:
234 return candidate
235
236 return None
237
238
239class InsertContentTool(FileSystemTool):
240 """Insert content at a specific location in a file."""
241
242 @property
243 def name(self) -> str:
244 return "insert_content"
245
246 @property
247 def description(self) -> str:
248 return "Insert new content at a specific line or after matching content."
249
250 @property
251 def parameters_schema(self) -> Dict[str, Any]:
252 return {
253 "type": "object",
254 "properties": {
255 "path": {
256 "type": "string",
257 "description": "Path to the file"
258 },
259 "content": {
260 "type": "string",
261 "description": "Content to insert"
262 },
263 "after_line": {
264 "type": "integer",
265 "description": "Insert after this line number"
266 },
267 "after_content": {
268 "type": "string",
269 "description": "Insert after this content (first occurrence)"
270 }
271 },
272 "required": ["path", "content"]
273 }
274
275 async def run(
276 self,
277 path: str,
278 content: str,
279 after_line: int = None,
280 after_content: str = None
281 ) -> ToolResult:
282 valid, resolved_path = self._validate_path(path)
283 if not valid or not resolved_path.exists():
284 return ToolResult(
285 success=False,
286 data=None,
287 error=f"Invalid or missing file: {path}"
288 )
289
290 try:
291 file_content = resolved_path.read_text()
292 lines = file_content.split("\n")
293
294 if after_line is not None:
295 # Insert after specific line
296 if after_line < 0 or after_line > len(lines):
297 return ToolResult(
298 success=False,
299 data=None,
300 error=f"Invalid line number: {after_line}"
301 )
302 insert_pos = after_line
303 elif after_content is not None:
304 # Find the line containing the content
305 insert_pos = None
306 for i, line in enumerate(lines):
307 if after_content in line:
308 insert_pos = i + 1
309 break
310 if insert_pos is None:
311 return ToolResult(
312 success=False,
313 data=None,
314 error=f"Content not found: {after_content}"
315 )
316 else:
317 # Insert at end
318 insert_pos = len(lines)
319
320 # Insert the content
321 new_lines = lines[:insert_pos] + content.split("\n") + lines[insert_pos:]
322 resolved_path.write_text("\n".join(new_lines))
323
324 return ToolResult(
325 success=True,
326 data=f"Inserted {len(content.split(chr(10)))} lines after line {insert_pos}",
327 metadata={"insert_position": insert_pos}
328 )
329
330 except Exception as e:
331 return ToolResult(
332 success=False,
333 data=None,
334 error=str(e)
335 )Directory Navigation
Navigating the directory structure helps agents understand project layout and find relevant files quickly.
1import os
2from pathlib import Path
3from typing import List, Dict, Any, Optional
4import fnmatch
5
6
7class ListFilesTool(FileSystemTool):
8 """List files and directories with filtering."""
9
10 @property
11 def name(self) -> str:
12 return "list_files"
13
14 @property
15 def description(self) -> str:
16 return """List files and directories. Supports:
17- Recursive listing
18- Glob pattern filtering
19- File type filtering"""
20
21 @property
22 def parameters_schema(self) -> Dict[str, Any]:
23 return {
24 "type": "object",
25 "properties": {
26 "path": {
27 "type": "string",
28 "description": "Directory path (default: workspace root)"
29 },
30 "pattern": {
31 "type": "string",
32 "description": "Glob pattern to filter files (e.g., '*.py')"
33 },
34 "recursive": {
35 "type": "boolean",
36 "description": "List files recursively"
37 },
38 "max_depth": {
39 "type": "integer",
40 "description": "Maximum depth for recursive listing"
41 },
42 "include_hidden": {
43 "type": "boolean",
44 "description": "Include hidden files"
45 }
46 }
47 }
48
49 async def run(
50 self,
51 path: str = ".",
52 pattern: str = None,
53 recursive: bool = False,
54 max_depth: int = 5,
55 include_hidden: bool = False
56 ) -> ToolResult:
57 valid, resolved_path = self._validate_path(path)
58 if not valid:
59 return ToolResult(
60 success=False,
61 data=None,
62 error=f"Invalid path: {path}"
63 )
64
65 if not resolved_path.is_dir():
66 return ToolResult(
67 success=False,
68 data=None,
69 error=f"Not a directory: {path}"
70 )
71
72 try:
73 files = []
74 dirs = []
75
76 if recursive:
77 for item in self._walk_directory(resolved_path, max_depth):
78 rel_path = item.relative_to(self.workspace)
79
80 if self._should_ignore(item):
81 continue
82
83 if not include_hidden and any(p.startswith(".") for p in rel_path.parts):
84 continue
85
86 if pattern and not fnmatch.fnmatch(item.name, pattern):
87 continue
88
89 if item.is_file():
90 files.append({
91 "path": str(rel_path),
92 "size": item.stat().st_size,
93 "type": self._get_file_type(item)
94 })
95 else:
96 dirs.append(str(rel_path))
97 else:
98 for item in resolved_path.iterdir():
99 if self._should_ignore(item):
100 continue
101
102 if not include_hidden and item.name.startswith("."):
103 continue
104
105 if pattern and not fnmatch.fnmatch(item.name, pattern):
106 continue
107
108 if item.is_file():
109 files.append({
110 "path": str(item.relative_to(self.workspace)),
111 "size": item.stat().st_size,
112 "type": self._get_file_type(item)
113 })
114 else:
115 dirs.append(str(item.relative_to(self.workspace)))
116
117 return ToolResult(
118 success=True,
119 data={
120 "directories": sorted(dirs),
121 "files": sorted(files, key=lambda x: x["path"])
122 },
123 metadata={
124 "total_files": len(files),
125 "total_directories": len(dirs)
126 }
127 )
128
129 except Exception as e:
130 return ToolResult(
131 success=False,
132 data=None,
133 error=str(e)
134 )
135
136 def _walk_directory(self, path: Path, max_depth: int, current_depth: int = 0):
137 """Walk directory with depth limit."""
138 if current_depth > max_depth:
139 return
140
141 try:
142 for item in path.iterdir():
143 yield item
144 if item.is_dir() and not self._should_ignore(item):
145 yield from self._walk_directory(item, max_depth, current_depth + 1)
146 except PermissionError:
147 pass
148
149 def _get_file_type(self, path: Path) -> str:
150 """Determine file type from extension."""
151 ext_map = {
152 ".py": "python",
153 ".js": "javascript",
154 ".ts": "typescript",
155 ".tsx": "typescript-react",
156 ".jsx": "javascript-react",
157 ".json": "json",
158 ".yaml": "yaml",
159 ".yml": "yaml",
160 ".md": "markdown",
161 ".css": "css",
162 ".html": "html",
163 ".sql": "sql",
164 }
165 return ext_map.get(path.suffix.lower(), "other")
166
167
168class FileTreeTool(FileSystemTool):
169 """Generate a tree view of the directory structure."""
170
171 @property
172 def name(self) -> str:
173 return "file_tree"
174
175 @property
176 def description(self) -> str:
177 return "Generate a tree view of the project structure. Useful for understanding project layout."
178
179 @property
180 def parameters_schema(self) -> Dict[str, Any]:
181 return {
182 "type": "object",
183 "properties": {
184 "path": {
185 "type": "string",
186 "description": "Root path for the tree"
187 },
188 "max_depth": {
189 "type": "integer",
190 "description": "Maximum depth to display"
191 },
192 "show_files": {
193 "type": "boolean",
194 "description": "Include files in the tree"
195 }
196 }
197 }
198
199 async def run(
200 self,
201 path: str = ".",
202 max_depth: int = 3,
203 show_files: bool = True
204 ) -> ToolResult:
205 valid, resolved_path = self._validate_path(path)
206 if not valid:
207 return ToolResult(
208 success=False,
209 data=None,
210 error=f"Invalid path: {path}"
211 )
212
213 try:
214 tree_lines = [str(resolved_path.name) + "/"]
215 self._build_tree(
216 resolved_path,
217 tree_lines,
218 prefix="",
219 max_depth=max_depth,
220 show_files=show_files
221 )
222
223 return ToolResult(
224 success=True,
225 data="\n".join(tree_lines)
226 )
227
228 except Exception as e:
229 return ToolResult(
230 success=False,
231 data=None,
232 error=str(e)
233 )
234
235 def _build_tree(
236 self,
237 path: Path,
238 lines: List[str],
239 prefix: str,
240 max_depth: int,
241 show_files: bool,
242 depth: int = 0
243 ):
244 if depth >= max_depth:
245 return
246
247 items = sorted(path.iterdir(), key=lambda x: (x.is_file(), x.name.lower()))
248 items = [i for i in items if not self._should_ignore(i)]
249
250 if not show_files:
251 items = [i for i in items if i.is_dir()]
252
253 for i, item in enumerate(items):
254 is_last = i == len(items) - 1
255 connector = "└── " if is_last else "├── "
256 new_prefix = prefix + (" " if is_last else "│ ")
257
258 if item.is_dir():
259 lines.append(f"{prefix}{connector}{item.name}/")
260 self._build_tree(
261 item, lines, new_prefix,
262 max_depth, show_files, depth + 1
263 )
264 else:
265 lines.append(f"{prefix}{connector}{item.name}")Code Search Tools
Search tools help agents find relevant code quickly. We implement both text search and semantic search capabilities.
1import re
2from typing import List, Dict, Any, Optional, Tuple
3from dataclasses import dataclass
4
5
6@dataclass
7class SearchMatch:
8 """A single search match."""
9 file: str
10 line: int
11 column: int
12 content: str
13 context_before: List[str]
14 context_after: List[str]
15
16
17class GrepTool(FileSystemTool):
18 """Search for patterns in files."""
19
20 @property
21 def name(self) -> str:
22 return "grep"
23
24 @property
25 def description(self) -> str:
26 return """Search for text or regex patterns in files.
27Returns matching lines with context. Supports case-insensitive search."""
28
29 @property
30 def parameters_schema(self) -> Dict[str, Any]:
31 return {
32 "type": "object",
33 "properties": {
34 "pattern": {
35 "type": "string",
36 "description": "Search pattern (text or regex)"
37 },
38 "path": {
39 "type": "string",
40 "description": "Directory or file to search"
41 },
42 "file_pattern": {
43 "type": "string",
44 "description": "Glob pattern to filter files (e.g., '*.py')"
45 },
46 "case_sensitive": {
47 "type": "boolean",
48 "description": "Case-sensitive search"
49 },
50 "context_lines": {
51 "type": "integer",
52 "description": "Lines of context around matches"
53 },
54 "max_results": {
55 "type": "integer",
56 "description": "Maximum number of results"
57 }
58 },
59 "required": ["pattern"]
60 }
61
62 async def run(
63 self,
64 pattern: str,
65 path: str = ".",
66 file_pattern: str = None,
67 case_sensitive: bool = True,
68 context_lines: int = 2,
69 max_results: int = 50
70 ) -> ToolResult:
71 valid, resolved_path = self._validate_path(path)
72 if not valid:
73 return ToolResult(
74 success=False,
75 data=None,
76 error=f"Invalid path: {path}"
77 )
78
79 try:
80 # Compile regex
81 flags = 0 if case_sensitive else re.IGNORECASE
82 try:
83 regex = re.compile(pattern, flags)
84 except re.error as e:
85 return ToolResult(
86 success=False,
87 data=None,
88 error=f"Invalid regex pattern: {e}"
89 )
90
91 matches = []
92 files_searched = 0
93
94 # Get files to search
95 if resolved_path.is_file():
96 files_to_search = [resolved_path]
97 else:
98 files_to_search = list(self._iter_files(resolved_path, file_pattern))
99
100 for file_path in files_to_search:
101 if len(matches) >= max_results:
102 break
103
104 file_matches = self._search_file(
105 file_path, regex, context_lines
106 )
107 matches.extend(file_matches)
108 files_searched += 1
109
110 # Format results
111 formatted = []
112 for match in matches[:max_results]:
113 result = {
114 "file": str(Path(match.file).relative_to(self.workspace)),
115 "line": match.line,
116 "content": match.content,
117 }
118 if context_lines > 0:
119 result["context_before"] = match.context_before
120 result["context_after"] = match.context_after
121 formatted.append(result)
122
123 return ToolResult(
124 success=True,
125 data=formatted,
126 metadata={
127 "total_matches": len(matches),
128 "files_searched": files_searched,
129 "pattern": pattern
130 }
131 )
132
133 except Exception as e:
134 return ToolResult(
135 success=False,
136 data=None,
137 error=str(e)
138 )
139
140 def _iter_files(self, path: Path, file_pattern: str = None):
141 """Iterate over files to search."""
142 for root, dirs, files in os.walk(path):
143 # Filter ignored directories
144 dirs[:] = [d for d in dirs if not self._should_ignore(Path(root) / d)]
145
146 for file in files:
147 file_path = Path(root) / file
148
149 if self._should_ignore(file_path):
150 continue
151
152 if file_pattern and not fnmatch.fnmatch(file, file_pattern):
153 continue
154
155 # Skip binary files
156 if self._is_likely_binary(file_path):
157 continue
158
159 yield file_path
160
161 def _is_likely_binary(self, path: Path) -> bool:
162 """Quick check if file is likely binary."""
163 binary_extensions = {
164 ".pyc", ".pyo", ".exe", ".dll", ".so", ".dylib",
165 ".png", ".jpg", ".jpeg", ".gif", ".ico", ".pdf",
166 ".zip", ".tar", ".gz", ".whl"
167 }
168 return path.suffix.lower() in binary_extensions
169
170 def _search_file(
171 self,
172 file_path: Path,
173 regex: re.Pattern,
174 context_lines: int
175 ) -> List[SearchMatch]:
176 """Search a single file for matches."""
177 matches = []
178
179 try:
180 content = file_path.read_text()
181 lines = content.split("\n")
182
183 for i, line in enumerate(lines):
184 if regex.search(line):
185 # Get context
186 start = max(0, i - context_lines)
187 end = min(len(lines), i + context_lines + 1)
188
189 matches.append(SearchMatch(
190 file=str(file_path),
191 line=i + 1, # 1-indexed
192 column=0, # Could be enhanced to find exact position
193 content=line.strip(),
194 context_before=lines[start:i],
195 context_after=lines[i + 1:end]
196 ))
197 except:
198 pass
199
200 return matches
201
202
203class FindSymbolTool(FileSystemTool):
204 """Find function, class, and variable definitions."""
205
206 @property
207 def name(self) -> str:
208 return "find_symbol"
209
210 @property
211 def description(self) -> str:
212 return """Find definitions of functions, classes, or variables by name.
213More precise than grep for finding code symbols."""
214
215 @property
216 def parameters_schema(self) -> Dict[str, Any]:
217 return {
218 "type": "object",
219 "properties": {
220 "symbol": {
221 "type": "string",
222 "description": "Name of the symbol to find"
223 },
224 "symbol_type": {
225 "type": "string",
226 "enum": ["function", "class", "variable", "any"],
227 "description": "Type of symbol to find"
228 },
229 "path": {
230 "type": "string",
231 "description": "Directory to search"
232 }
233 },
234 "required": ["symbol"]
235 }
236
237 # Symbol patterns for different languages
238 PATTERNS = {
239 "python": {
240 "function": r"^\s*(?:async\s+)?def\s+{symbol}\s*\(",
241 "class": r"^\s*class\s+{symbol}\s*[:\(]",
242 "variable": r"^\s*{symbol}\s*="
243 },
244 "javascript": {
245 "function": r"(?:function\s+{symbol}\s*\(|const\s+{symbol}\s*=\s*(?:async\s+)?(?:\([^)]*\)\s*=>|function))",
246 "class": r"class\s+{symbol}\s*(?:extends|\{{)",
247 "variable": r"(?:const|let|var)\s+{symbol}\s*="
248 },
249 "typescript": {
250 "function": r"(?:function\s+{symbol}\s*[<\(]|(?:const|let)\s+{symbol}\s*(?::\s*[^=]+)?=\s*(?:async\s+)?(?:\([^)]*\)\s*=>|function))",
251 "class": r"(?:class|interface|type)\s+{symbol}\s*(?:<[^>]*>)?\s*(?:extends|implements|=|\{{)",
252 "variable": r"(?:const|let|var)\s+{symbol}\s*(?::\s*[^=]+)?="
253 }
254 }
255
256 async def run(
257 self,
258 symbol: str,
259 symbol_type: str = "any",
260 path: str = "."
261 ) -> ToolResult:
262 valid, resolved_path = self._validate_path(path)
263 if not valid:
264 return ToolResult(
265 success=False,
266 data=None,
267 error=f"Invalid path: {path}"
268 )
269
270 results = []
271 escaped_symbol = re.escape(symbol)
272
273 for file_path in self._iter_source_files(resolved_path):
274 language = self._detect_language(file_path)
275 if language not in self.PATTERNS:
276 continue
277
278 patterns = self.PATTERNS[language]
279
280 try:
281 content = file_path.read_text()
282 lines = content.split("\n")
283
284 types_to_check = [symbol_type] if symbol_type != "any" else ["function", "class", "variable"]
285
286 for stype in types_to_check:
287 pattern_template = patterns.get(stype)
288 if not pattern_template:
289 continue
290
291 pattern = pattern_template.format(symbol=escaped_symbol)
292 regex = re.compile(pattern, re.MULTILINE)
293
294 for i, line in enumerate(lines):
295 if regex.search(line):
296 results.append({
297 "file": str(file_path.relative_to(self.workspace)),
298 "line": i + 1,
299 "type": stype,
300 "content": line.strip(),
301 "language": language
302 })
303 except:
304 continue
305
306 return ToolResult(
307 success=True,
308 data=results,
309 metadata={
310 "symbol": symbol,
311 "total_found": len(results)
312 }
313 )
314
315 def _iter_source_files(self, path: Path):
316 """Iterate over source code files."""
317 source_extensions = {".py", ".js", ".ts", ".tsx", ".jsx", ".java", ".go", ".rs"}
318
319 for root, dirs, files in os.walk(path):
320 dirs[:] = [d for d in dirs if not self._should_ignore(Path(root) / d)]
321
322 for file in files:
323 file_path = Path(root) / file
324 if file_path.suffix in source_extensions:
325 yield file_path
326
327 def _detect_language(self, path: Path) -> str:
328 """Detect programming language from file extension."""
329 ext_map = {
330 ".py": "python",
331 ".js": "javascript",
332 ".jsx": "javascript",
333 ".ts": "typescript",
334 ".tsx": "typescript",
335 }
336 return ext_map.get(path.suffix, "unknown")find_symbol tool is more reliable than grep for finding code definitions because it understands language syntax. Use it to find where functions and classes are defined.File System Safety
Safety is paramount when giving an AI agent access to the file system. We implement multiple layers of protection:
1from pathlib import Path
2from typing import Set, List, Optional
3import hashlib
4from datetime import datetime
5
6
7class FileSystemGuard:
8 """
9 Safety layer for file system operations.
10 Prevents dangerous operations and tracks changes.
11 """
12
13 def __init__(
14 self,
15 workspace: Path,
16 protected_patterns: List[str] = None,
17 max_file_size: int = 10 * 1024 * 1024, # 10MB
18 max_files_per_operation: int = 10
19 ):
20 self.workspace = Path(workspace).resolve()
21 self.max_file_size = max_file_size
22 self.max_files_per_operation = max_files_per_operation
23
24 # Protected patterns that cannot be modified
25 self.protected_patterns = protected_patterns or [
26 ".git/*",
27 "*.env",
28 "*.env.*",
29 "**/secrets/*",
30 "**/credentials/*",
31 "**/.ssh/*",
32 "**/id_rsa*",
33 "**/*.pem",
34 "**/*.key",
35 ]
36
37 # Track all changes for potential rollback
38 self.change_history: List[dict] = []
39 self.file_checksums: dict[str, str] = {}
40
41 def validate_read(self, path: Path) -> tuple[bool, Optional[str]]:
42 """Validate a read operation."""
43 resolved = path.resolve()
44
45 # Must be within workspace
46 if not self._is_within_workspace(resolved):
47 return False, "Path is outside workspace"
48
49 # Check file size
50 if resolved.exists() and resolved.stat().st_size > self.max_file_size:
51 return False, f"File exceeds maximum size of {self.max_file_size} bytes"
52
53 return True, None
54
55 def validate_write(self, path: Path, content: str) -> tuple[bool, Optional[str]]:
56 """Validate a write operation."""
57 resolved = path.resolve()
58
59 # Must be within workspace
60 if not self._is_within_workspace(resolved):
61 return False, "Path is outside workspace"
62
63 # Check against protected patterns
64 if self._is_protected(resolved):
65 return False, f"Path matches protected pattern: {path}"
66
67 # Check content size
68 if len(content.encode()) > self.max_file_size:
69 return False, f"Content exceeds maximum size of {self.max_file_size} bytes"
70
71 # Validate content doesn't look like it contains secrets
72 if self._contains_potential_secrets(content):
73 return False, "Content appears to contain secrets/credentials"
74
75 return True, None
76
77 def validate_delete(self, path: Path) -> tuple[bool, Optional[str]]:
78 """Validate a delete operation."""
79 resolved = path.resolve()
80
81 if not self._is_within_workspace(resolved):
82 return False, "Path is outside workspace"
83
84 if self._is_protected(resolved):
85 return False, "Cannot delete protected file"
86
87 # Prevent deleting directories with many files
88 if resolved.is_dir():
89 file_count = sum(1 for _ in resolved.rglob("*") if _.is_file())
90 if file_count > self.max_files_per_operation:
91 return False, f"Directory contains {file_count} files (max: {self.max_files_per_operation})"
92
93 return True, None
94
95 def _is_within_workspace(self, path: Path) -> bool:
96 """Check if path is within the workspace."""
97 try:
98 path.relative_to(self.workspace)
99 return True
100 except ValueError:
101 return False
102
103 def _is_protected(self, path: Path) -> bool:
104 """Check if path matches any protected pattern."""
105 import fnmatch
106
107 rel_path = str(path.relative_to(self.workspace))
108
109 for pattern in self.protected_patterns:
110 if fnmatch.fnmatch(rel_path, pattern):
111 return True
112 if fnmatch.fnmatch(path.name, pattern):
113 return True
114
115 return False
116
117 def _contains_potential_secrets(self, content: str) -> bool:
118 """Check if content appears to contain secrets."""
119 import re
120
121 secret_patterns = [
122 r"(?i)api[_-]?key\s*[=:]\s*['"][^'"]+['"]",
123 r"(?i)secret[_-]?key\s*[=:]\s*['"][^'"]+['"]",
124 r"(?i)password\s*[=:]\s*['"][^'"]+['"]",
125 r"(?i)aws[_-]?access[_-]?key",
126 r"-----BEGIN (?:RSA |DSA |EC )?PRIVATE KEY-----",
127 r"(?i)bearer\s+[a-zA-Z0-9_\-\.]+",
128 ]
129
130 for pattern in secret_patterns:
131 if re.search(pattern, content):
132 return True
133
134 return False
135
136 def record_change(
137 self,
138 operation: str,
139 path: Path,
140 original_content: Optional[str] = None,
141 new_content: Optional[str] = None
142 ):
143 """Record a change for potential rollback."""
144 change = {
145 "timestamp": datetime.now().isoformat(),
146 "operation": operation,
147 "path": str(path.relative_to(self.workspace)),
148 "original_checksum": None,
149 "new_checksum": None
150 }
151
152 if original_content:
153 change["original_checksum"] = hashlib.sha256(
154 original_content.encode()
155 ).hexdigest()
156 change["original_content"] = original_content
157
158 if new_content:
159 change["new_checksum"] = hashlib.sha256(
160 new_content.encode()
161 ).hexdigest()
162
163 self.change_history.append(change)
164
165 def get_rollback_actions(self, n: int = None) -> List[dict]:
166 """Get actions needed to rollback recent changes."""
167 changes = self.change_history[-n:] if n else self.change_history
168 rollback_actions = []
169
170 for change in reversed(changes):
171 path = self.workspace / change["path"]
172
173 if change["operation"] == "create":
174 # Rollback: delete the file
175 rollback_actions.append({
176 "action": "delete",
177 "path": change["path"]
178 })
179 elif change["operation"] == "modify":
180 # Rollback: restore original content
181 if "original_content" in change:
182 rollback_actions.append({
183 "action": "write",
184 "path": change["path"],
185 "content": change["original_content"]
186 })
187 elif change["operation"] == "delete":
188 # Rollback: recreate file
189 if "original_content" in change:
190 rollback_actions.append({
191 "action": "write",
192 "path": change["path"],
193 "content": change["original_content"]
194 })
195
196 return rollback_actions
197
198 def create_snapshot(self) -> dict:
199 """Create a snapshot of current file states."""
200 snapshot = {
201 "timestamp": datetime.now().isoformat(),
202 "files": {}
203 }
204
205 for path in self.workspace.rglob("*"):
206 if path.is_file() and not self._should_ignore_for_snapshot(path):
207 try:
208 content = path.read_bytes()
209 snapshot["files"][str(path.relative_to(self.workspace))] = {
210 "checksum": hashlib.sha256(content).hexdigest(),
211 "size": len(content)
212 }
213 except:
214 pass
215
216 return snapshot
217
218 def _should_ignore_for_snapshot(self, path: Path) -> bool:
219 """Check if path should be ignored in snapshots."""
220 ignore = {".git", "node_modules", "__pycache__", ".venv", "venv"}
221 return any(part in ignore for part in path.parts)Protected File Patterns
Configure protected patterns based on your project:
| Pattern | Purpose |
|---|---|
| .git/* | Prevent corruption of git repository |
| *.env* | Protect environment files with secrets |
| **/secrets/* | Protect secrets directories |
| **/credentials/* | Protect credential files |
| **/*.pem, **/*.key | Protect private keys |
| package-lock.json | Prevent accidental lockfile changes |
Summary
In this section, we built a comprehensive file system toolkit for our coding agent:
- Intelligent Reading: Handle large files, detect binary content, support line ranges and context
- Safe Writing: Atomic writes with backup support, diff generation, and rollback capability
- Precise Editing: Replace specific content, insert at locations, handle ambiguous matches gracefully
- Efficient Navigation: List files with filtering, tree views, respect ignore patterns
- Powerful Search: Text and regex search with context, symbol-aware search for code definitions
- Security Guards: Path validation, protected patterns, secret detection, change tracking
In the next section, we'll build the code execution sandbox that allows our agent to safely run code and commands.