Introduction
Git integration transforms a coding agent from a simple code generator into a proper development collaborator. With Git, agents can create branches for isolated work, make atomic commits, track changes, and even create pull requests. This section covers implementing comprehensive Git tooling for your coding agent.
Why Git Matters: Git provides a safety net for agent actions. If something goes wrong, you can always revert. It also creates a clear audit trail of what the agent changed and why.
Git Operations for Agents
Coding agents need access to specific Git operations, carefully balanced between capability and safety:
| Operation | Agent Need | Safety Consideration |
|---|---|---|
| status | See what changed | Read-only, always safe |
| diff | Understand changes | Read-only, always safe |
| log | Understand history | Read-only, always safe |
| branch | Create isolated work | Safe if naming controlled |
| checkout | Switch contexts | Can lose uncommitted work |
| add | Stage changes | Safe, reversible |
| commit | Save work | Creates history, should be controlled |
| push | Share work | Dangerous, should require approval |
| reset | Undo changes | Can lose work, needs care |
| merge | Combine work | Complex, may need human review |
Operation Categories
We categorize operations by their risk level:
1from enum import Enum
2from typing import Set
3
4
5class GitOperationRisk(Enum):
6 """Risk levels for Git operations."""
7 READ_ONLY = "read_only" # Always safe
8 LOCAL_SAFE = "local_safe" # Safe local modifications
9 LOCAL_RISKY = "local_risky" # Could lose local work
10 REMOTE = "remote" # Affects remote repository
11 DESTRUCTIVE = "destructive" # Could permanently lose work
12
13
14GIT_OPERATIONS = {
15 # Read-only operations
16 "status": GitOperationRisk.READ_ONLY,
17 "diff": GitOperationRisk.READ_ONLY,
18 "log": GitOperationRisk.READ_ONLY,
19 "show": GitOperationRisk.READ_ONLY,
20 "branch --list": GitOperationRisk.READ_ONLY,
21 "stash list": GitOperationRisk.READ_ONLY,
22
23 # Safe local operations
24 "add": GitOperationRisk.LOCAL_SAFE,
25 "commit": GitOperationRisk.LOCAL_SAFE,
26 "branch": GitOperationRisk.LOCAL_SAFE,
27 "checkout -b": GitOperationRisk.LOCAL_SAFE,
28 "stash": GitOperationRisk.LOCAL_SAFE,
29 "stash pop": GitOperationRisk.LOCAL_SAFE,
30
31 # Risky local operations
32 "checkout": GitOperationRisk.LOCAL_RISKY,
33 "reset --soft": GitOperationRisk.LOCAL_RISKY,
34 "revert": GitOperationRisk.LOCAL_RISKY,
35
36 # Remote operations (need approval)
37 "push": GitOperationRisk.REMOTE,
38 "pull": GitOperationRisk.REMOTE,
39 "fetch": GitOperationRisk.REMOTE,
40
41 # Destructive operations (should be blocked)
42 "reset --hard": GitOperationRisk.DESTRUCTIVE,
43 "clean -fd": GitOperationRisk.DESTRUCTIVE,
44 "push --force": GitOperationRisk.DESTRUCTIVE,
45 "branch -D": GitOperationRisk.DESTRUCTIVE,
46}Git Tools Implementation
Let's implement a complete Git toolset for our coding agent:
1import asyncio
2import re
3from dataclasses import dataclass, field
4from typing import List, Dict, Any, Optional, Tuple
5from pathlib import Path
6from datetime import datetime
7
8
9@dataclass
10class GitStatus:
11 """Parsed git status information."""
12 branch: str
13 ahead: int = 0
14 behind: int = 0
15 staged: List[str] = field(default_factory=list)
16 modified: List[str] = field(default_factory=list)
17 untracked: List[str] = field(default_factory=list)
18 deleted: List[str] = field(default_factory=list)
19 conflicted: List[str] = field(default_factory=list)
20
21 @property
22 def is_clean(self) -> bool:
23 return not any([
24 self.staged, self.modified,
25 self.untracked, self.deleted, self.conflicted
26 ])
27
28 @property
29 def has_changes(self) -> bool:
30 return not self.is_clean
31
32
33@dataclass
34class GitCommit:
35 """Git commit information."""
36 hash: str
37 short_hash: str
38 author: str
39 date: str
40 message: str
41 files_changed: int = 0
42
43
44@dataclass
45class GitDiff:
46 """Parsed diff information."""
47 file: str
48 additions: int
49 deletions: int
50 chunks: List[Dict[str, Any]] = field(default_factory=list)
51
52
53class GitTools:
54 """
55 Git integration tools for the coding agent.
56 """
57
58 BRANCH_PREFIX = "agent/" # Prefix for agent-created branches
59
60 def __init__(
61 self,
62 workspace: Path,
63 allowed_risks: set = None,
64 require_approval_for: set = None
65 ):
66 self.workspace = Path(workspace)
67 self.allowed_risks = allowed_risks or {
68 GitOperationRisk.READ_ONLY,
69 GitOperationRisk.LOCAL_SAFE,
70 GitOperationRisk.LOCAL_RISKY,
71 }
72 self.require_approval_for = require_approval_for or {
73 GitOperationRisk.REMOTE,
74 }
75
76 async def _run_git(
77 self,
78 *args: str,
79 check: bool = True
80 ) -> Tuple[bool, str, str]:
81 """Run a git command and return success, stdout, stderr."""
82 cmd = ["git"] + list(args)
83
84 process = await asyncio.create_subprocess_exec(
85 *cmd,
86 stdout=asyncio.subprocess.PIPE,
87 stderr=asyncio.subprocess.PIPE,
88 cwd=self.workspace
89 )
90
91 stdout, stderr = await process.communicate()
92 success = process.returncode == 0
93
94 return (
95 success,
96 stdout.decode("utf-8", errors="replace").strip(),
97 stderr.decode("utf-8", errors="replace").strip()
98 )
99
100 async def status(self) -> GitStatus:
101 """Get the current git status."""
102 success, stdout, stderr = await self._run_git("status", "--porcelain=v2", "--branch")
103
104 if not success:
105 raise Exception(f"Git status failed: {stderr}")
106
107 status = GitStatus(branch="unknown")
108
109 for line in stdout.split("\n"):
110 if not line:
111 continue
112
113 if line.startswith("# branch.head"):
114 status.branch = line.split()[-1]
115 elif line.startswith("# branch.ab"):
116 parts = line.split()
117 for part in parts:
118 if part.startswith("+"):
119 status.ahead = int(part[1:])
120 elif part.startswith("-"):
121 status.behind = abs(int(part))
122 elif line.startswith("1 "): # Modified/added files
123 parts = line.split()
124 xy = parts[1]
125 path = parts[-1]
126
127 if xy[0] != ".": # Staged
128 status.staged.append(path)
129 if xy[1] == "M": # Modified in work tree
130 status.modified.append(path)
131 elif xy[1] == "D": # Deleted in work tree
132 status.deleted.append(path)
133 elif line.startswith("? "): # Untracked
134 status.untracked.append(line[2:])
135 elif line.startswith("u "): # Conflicted
136 status.conflicted.append(line.split()[-1])
137
138 return status
139
140 async def diff(
141 self,
142 staged: bool = False,
143 file_path: str = None,
144 commit: str = None
145 ) -> List[GitDiff]:
146 """Get diff information."""
147 args = ["diff", "--numstat"]
148
149 if staged:
150 args.append("--cached")
151 if commit:
152 args.append(commit)
153 if file_path:
154 args.extend(["--", file_path])
155
156 success, stdout, stderr = await self._run_git(*args)
157
158 if not success:
159 raise Exception(f"Git diff failed: {stderr}")
160
161 diffs = []
162 for line in stdout.split("\n"):
163 if not line:
164 continue
165
166 parts = line.split("\t")
167 if len(parts) >= 3:
168 additions = int(parts[0]) if parts[0] != "-" else 0
169 deletions = int(parts[1]) if parts[1] != "-" else 0
170 file = parts[2]
171
172 diffs.append(GitDiff(
173 file=file,
174 additions=additions,
175 deletions=deletions
176 ))
177
178 return diffs
179
180 async def diff_content(
181 self,
182 file_path: str = None,
183 staged: bool = False
184 ) -> str:
185 """Get the actual diff content."""
186 args = ["diff"]
187 if staged:
188 args.append("--cached")
189 if file_path:
190 args.extend(["--", file_path])
191
192 success, stdout, stderr = await self._run_git(*args)
193 return stdout
194
195 async def log(
196 self,
197 count: int = 10,
198 file_path: str = None,
199 since: str = None
200 ) -> List[GitCommit]:
201 """Get recent commits."""
202 format_str = "%H|%h|%an|%ai|%s"
203 args = ["log", f"--format={format_str}", f"-n{count}"]
204
205 if since:
206 args.append(f"--since={since}")
207 if file_path:
208 args.extend(["--", file_path])
209
210 success, stdout, stderr = await self._run_git(*args)
211
212 if not success:
213 raise Exception(f"Git log failed: {stderr}")
214
215 commits = []
216 for line in stdout.split("\n"):
217 if not line:
218 continue
219
220 parts = line.split("|", 4)
221 if len(parts) >= 5:
222 commits.append(GitCommit(
223 hash=parts[0],
224 short_hash=parts[1],
225 author=parts[2],
226 date=parts[3],
227 message=parts[4]
228 ))
229
230 return commits
231
232 async def show_commit(self, commit_hash: str) -> Dict[str, Any]:
233 """Show details of a specific commit."""
234 # Get commit info
235 success, stdout, stderr = await self._run_git(
236 "show", commit_hash,
237 "--format=%H%n%an%n%ae%n%ai%n%s%n%b",
238 "--numstat"
239 )
240
241 if not success:
242 raise Exception(f"Git show failed: {stderr}")
243
244 lines = stdout.split("\n")
245 if len(lines) < 5:
246 raise Exception("Unexpected git show output")
247
248 # Parse header
249 info = {
250 "hash": lines[0],
251 "author": lines[1],
252 "email": lines[2],
253 "date": lines[3],
254 "subject": lines[4],
255 "body": "",
256 "files": []
257 }
258
259 # Parse body and files
260 body_lines = []
261 in_body = True
262
263 for line in lines[5:]:
264 if line.strip() == "" and in_body:
265 continue
266 if "\t" in line and not line.startswith("\t"):
267 in_body = False
268 parts = line.split("\t")
269 if len(parts) >= 3:
270 info["files"].append({
271 "additions": int(parts[0]) if parts[0] != "-" else 0,
272 "deletions": int(parts[1]) if parts[1] != "-" else 0,
273 "file": parts[2]
274 })
275 elif in_body:
276 body_lines.append(line)
277
278 info["body"] = "\n".join(body_lines).strip()
279
280 return infoBranch Management
Agents should create branches for their work, keeping the main branch clean and allowing easy review of changes:
1class GitTools:
2 # ... (continued from above)
3
4 async def list_branches(self, remote: bool = False) -> List[Dict[str, Any]]:
5 """List all branches."""
6 args = ["branch", "-v", "--format=%(refname:short)|%(objectname:short)|%(upstream:short)|%(committerdate:relative)"]
7 if remote:
8 args.append("-r")
9
10 success, stdout, stderr = await self._run_git(*args)
11
12 if not success:
13 raise Exception(f"Git branch failed: {stderr}")
14
15 branches = []
16 for line in stdout.split("\n"):
17 if not line:
18 continue
19
20 parts = line.split("|")
21 if len(parts) >= 4:
22 branches.append({
23 "name": parts[0],
24 "commit": parts[1],
25 "upstream": parts[2] if parts[2] else None,
26 "last_commit": parts[3]
27 })
28
29 return branches
30
31 async def current_branch(self) -> str:
32 """Get the current branch name."""
33 success, stdout, stderr = await self._run_git("branch", "--show-current")
34 if not success:
35 raise Exception(f"Could not get current branch: {stderr}")
36 return stdout
37
38 async def create_branch(
39 self,
40 name: str,
41 from_ref: str = None,
42 checkout: bool = True
43 ) -> bool:
44 """Create a new branch, optionally checking it out."""
45 # Ensure agent branch prefix
46 if not name.startswith(self.BRANCH_PREFIX):
47 name = f"{self.BRANCH_PREFIX}{name}"
48
49 # Sanitize branch name
50 name = self._sanitize_branch_name(name)
51
52 args = ["checkout", "-b", name] if checkout else ["branch", name]
53
54 if from_ref:
55 args.append(from_ref)
56
57 success, stdout, stderr = await self._run_git(*args)
58
59 if not success:
60 raise Exception(f"Failed to create branch: {stderr}")
61
62 return True
63
64 async def checkout_branch(
65 self,
66 name: str,
67 create: bool = False
68 ) -> bool:
69 """Checkout an existing branch."""
70 # Check for uncommitted changes
71 status = await self.status()
72 if status.has_changes:
73 raise Exception(
74 "Cannot checkout with uncommitted changes. "
75 "Commit or stash your changes first."
76 )
77
78 args = ["checkout"]
79 if create:
80 args.append("-b")
81 args.append(name)
82
83 success, stdout, stderr = await self._run_git(*args)
84
85 if not success:
86 raise Exception(f"Failed to checkout branch: {stderr}")
87
88 return True
89
90 async def delete_branch(
91 self,
92 name: str,
93 force: bool = False
94 ) -> bool:
95 """Delete a branch (only agent branches can be deleted)."""
96 if not name.startswith(self.BRANCH_PREFIX):
97 raise Exception(
98 f"Can only delete agent branches (prefix: {self.BRANCH_PREFIX})"
99 )
100
101 args = ["branch", "-D" if force else "-d", name]
102 success, stdout, stderr = await self._run_git(*args)
103
104 if not success:
105 raise Exception(f"Failed to delete branch: {stderr}")
106
107 return True
108
109 def _sanitize_branch_name(self, name: str) -> str:
110 """Sanitize a branch name to be valid."""
111 # Replace invalid characters
112 name = re.sub(r"[^a-zA-Z0-9/_-]", "-", name)
113 # Remove consecutive dashes
114 name = re.sub(r"-+", "-", name)
115 # Remove leading/trailing dashes
116 name = name.strip("-")
117 return name
118
119 async def stash_changes(self, message: str = None) -> bool:
120 """Stash current changes."""
121 args = ["stash", "push"]
122 if message:
123 args.extend(["-m", message])
124
125 success, stdout, stderr = await self._run_git(*args)
126 return success
127
128 async def pop_stash(self) -> bool:
129 """Pop the most recent stash."""
130 success, stdout, stderr = await self._run_git("stash", "pop")
131 return success
132
133 async def ensure_clean_state(self) -> bool:
134 """Ensure the working directory is in a clean state."""
135 status = await self.status()
136
137 if status.has_changes:
138 # Try to stash changes
139 await self.stash_changes(
140 f"Auto-stash by agent at {datetime.now().isoformat()}"
141 )
142
143 return Trueagent/) making them easy to identify and clean up later.Commit Generation
Generating meaningful commit messages is an important capability for coding agents. We combine analysis of the changes with LLM generation:
1class GitTools:
2 # ... (continued)
3
4 async def add_files(
5 self,
6 paths: List[str] = None,
7 all_changes: bool = False
8 ) -> bool:
9 """Stage files for commit."""
10 if all_changes:
11 args = ["add", "-A"]
12 elif paths:
13 args = ["add"] + paths
14 else:
15 raise Exception("Must specify paths or all_changes=True")
16
17 success, stdout, stderr = await self._run_git(*args)
18
19 if not success:
20 raise Exception(f"Failed to stage files: {stderr}")
21
22 return True
23
24 async def commit(
25 self,
26 message: str,
27 author: str = None
28 ) -> str:
29 """Create a commit and return the commit hash."""
30 args = ["commit", "-m", message]
31
32 if author:
33 args.extend(["--author", author])
34
35 success, stdout, stderr = await self._run_git(*args)
36
37 if not success:
38 raise Exception(f"Failed to commit: {stderr}")
39
40 # Get the commit hash
41 success, hash_out, _ = await self._run_git("rev-parse", "HEAD")
42 return hash_out[:8] if success else "unknown"
43
44 async def generate_commit_message(
45 self,
46 llm_client,
47 include_body: bool = True
48 ) -> str:
49 """Generate a commit message based on staged changes."""
50 # Get staged changes
51 status = await self.status()
52 diff_content = await self.diff_content(staged=True)
53
54 if not status.staged:
55 raise Exception("No staged changes to commit")
56
57 # Build context for LLM
58 changes_summary = []
59 for file in status.staged:
60 changes_summary.append(f"- {file}")
61
62 prompt = f"""Generate a git commit message for the following changes.
63
64Files changed:
65{chr(10).join(changes_summary)}
66
67Diff:
68{diff_content[:3000]} # Truncate if too long
69
70Requirements:
711. First line: imperative mood, max 50 characters, no period
722. Second line: blank
733. Body: explain what and why (not how), wrap at 72 characters
744. Use conventional commit format if applicable (feat:, fix:, refactor:, etc.)
75
76Examples of good first lines:
77- "Add user authentication middleware"
78- "Fix memory leak in cache module"
79- "Refactor database connection pooling"
80
81Respond with ONLY the commit message, no explanation."""
82
83 response = await llm_client.generate(prompt)
84
85 # Clean up response
86 message = response.strip()
87
88 if not include_body:
89 # Take only first line
90 message = message.split("\n")[0]
91
92 return message
93
94 async def create_semantic_commit(
95 self,
96 commit_type: str,
97 scope: str = None,
98 description: str = "",
99 body: str = None,
100 breaking: bool = False
101 ) -> str:
102 """Create a commit following conventional commit format."""
103 # Build commit message
104 type_scope = commit_type
105 if scope:
106 type_scope = f"{commit_type}({scope})"
107 if breaking:
108 type_scope = f"{type_scope}!"
109
110 message = f"{type_scope}: {description}"
111
112 if body:
113 message = f"{message}\n\n{body}"
114
115 # Stage all changes and commit
116 await self.add_files(all_changes=True)
117 return await self.commit(message)
118
119
120class CommitMessageGenerator:
121 """
122 Specialized commit message generator.
123 """
124
125 COMMIT_TYPES = {
126 "feat": "A new feature",
127 "fix": "A bug fix",
128 "docs": "Documentation only changes",
129 "style": "Changes that do not affect the meaning of the code",
130 "refactor": "A code change that neither fixes a bug nor adds a feature",
131 "perf": "A code change that improves performance",
132 "test": "Adding missing tests or correcting existing tests",
133 "chore": "Changes to the build process or auxiliary tools",
134 }
135
136 def __init__(self, llm_client):
137 self.llm = llm_client
138
139 async def analyze_changes(self, diffs: List[GitDiff]) -> Dict[str, Any]:
140 """Analyze changes to determine commit type and scope."""
141 analysis = {
142 "type": "chore", # default
143 "scope": None,
144 "files_by_type": {},
145 "total_additions": 0,
146 "total_deletions": 0,
147 }
148
149 for diff in diffs:
150 analysis["total_additions"] += diff.additions
151 analysis["total_deletions"] += diff.deletions
152
153 # Categorize by file type/location
154 if "/test" in diff.file or "test_" in diff.file or "_test." in diff.file:
155 analysis["files_by_type"].setdefault("test", []).append(diff.file)
156 elif diff.file.endswith(".md"):
157 analysis["files_by_type"].setdefault("docs", []).append(diff.file)
158 else:
159 # Determine scope from path
160 parts = diff.file.split("/")
161 if len(parts) > 1:
162 scope = parts[0]
163 analysis["files_by_type"].setdefault(scope, []).append(diff.file)
164
165 # Determine type based on file categories
166 file_types = analysis["files_by_type"]
167
168 if "test" in file_types and len(file_types) == 1:
169 analysis["type"] = "test"
170 elif "docs" in file_types and len(file_types) == 1:
171 analysis["type"] = "docs"
172 elif analysis["total_additions"] > analysis["total_deletions"] * 2:
173 analysis["type"] = "feat" # Likely new feature
174 elif analysis["total_deletions"] > analysis["total_additions"] * 2:
175 analysis["type"] = "refactor" # Likely cleanup
176
177 # Determine scope
178 if len(file_types) == 1:
179 analysis["scope"] = list(file_types.keys())[0]
180
181 return analysis
182
183 async def generate(
184 self,
185 diffs: List[GitDiff],
186 diff_content: str,
187 context: str = None
188 ) -> str:
189 """Generate a complete commit message."""
190 analysis = await self.analyze_changes(diffs)
191
192 prompt = f"""Generate a git commit message.
193
194Change Analysis:
195- Type: {analysis['type']}
196- Scope: {analysis['scope'] or 'none'}
197- Additions: {analysis['total_additions']} lines
198- Deletions: {analysis['total_deletions']} lines
199
200Files:
201{chr(10).join(f'- {d.file} (+{d.additions}/-{d.deletions})' for d in diffs)}
202
203Diff excerpt:
204{diff_content[:2000]}
205
206{f'Additional context: {context}' if context else ''}
207
208Generate a conventional commit message:
209<type>(<scope>): <short description>
210
211<body explaining what changed and why>
212
213Use imperative mood. Be concise but complete."""
214
215 return await self.llm.generate(prompt)Diff Analysis
Understanding diffs helps the agent verify its changes and identify potential issues:
1from typing import List, Dict, Any, Tuple
2import re
3
4
5class DiffAnalyzer:
6 """
7 Analyze git diffs for the coding agent.
8 """
9
10 def __init__(self, llm_client=None):
11 self.llm = llm_client
12
13 def parse_unified_diff(self, diff_content: str) -> List[Dict[str, Any]]:
14 """Parse a unified diff into structured data."""
15 files = []
16 current_file = None
17 current_hunk = None
18
19 for line in diff_content.split("\n"):
20 # New file
21 if line.startswith("diff --git"):
22 if current_file:
23 files.append(current_file)
24 match = re.search(r"b/(.+)$", line)
25 current_file = {
26 "path": match.group(1) if match else "unknown",
27 "hunks": [],
28 "additions": 0,
29 "deletions": 0,
30 }
31 current_hunk = None
32
33 # File header
34 elif line.startswith("--- ") or line.startswith("+++ "):
35 continue
36
37 # Hunk header
38 elif line.startswith("@@"):
39 match = re.match(
40 r"@@ -(d+)(?:,(d+))? +(d+)(?:,(d+))? @@(.*)",
41 line
42 )
43 if match:
44 current_hunk = {
45 "old_start": int(match.group(1)),
46 "old_count": int(match.group(2) or 1),
47 "new_start": int(match.group(3)),
48 "new_count": int(match.group(4) or 1),
49 "header": match.group(5).strip(),
50 "lines": []
51 }
52 if current_file:
53 current_file["hunks"].append(current_hunk)
54
55 # Diff lines
56 elif current_hunk is not None:
57 if line.startswith("+") and not line.startswith("+++"):
58 current_hunk["lines"].append({"type": "add", "content": line[1:]})
59 if current_file:
60 current_file["additions"] += 1
61 elif line.startswith("-") and not line.startswith("---"):
62 current_hunk["lines"].append({"type": "del", "content": line[1:]})
63 if current_file:
64 current_file["deletions"] += 1
65 elif line.startswith(" "):
66 current_hunk["lines"].append({"type": "ctx", "content": line[1:]})
67
68 if current_file:
69 files.append(current_file)
70
71 return files
72
73 def summarize_changes(self, parsed_diff: List[Dict[str, Any]]) -> Dict[str, Any]:
74 """Generate a summary of the changes."""
75 summary = {
76 "files_changed": len(parsed_diff),
77 "total_additions": sum(f["additions"] for f in parsed_diff),
78 "total_deletions": sum(f["deletions"] for f in parsed_diff),
79 "files": [],
80 "by_type": {}
81 }
82
83 for file in parsed_diff:
84 file_info = {
85 "path": file["path"],
86 "additions": file["additions"],
87 "deletions": file["deletions"],
88 "type": self._classify_change(file)
89 }
90 summary["files"].append(file_info)
91
92 # Group by change type
93 change_type = file_info["type"]
94 if change_type not in summary["by_type"]:
95 summary["by_type"][change_type] = []
96 summary["by_type"][change_type].append(file["path"])
97
98 return summary
99
100 def _classify_change(self, file: Dict[str, Any]) -> str:
101 """Classify the type of change made to a file."""
102 if file["additions"] > 0 and file["deletions"] == 0:
103 return "add_only"
104 elif file["deletions"] > 0 and file["additions"] == 0:
105 return "delete_only"
106 elif file["additions"] > file["deletions"] * 3:
107 return "mostly_additions"
108 elif file["deletions"] > file["additions"] * 3:
109 return "mostly_deletions"
110 else:
111 return "mixed"
112
113 async def review_diff(
114 self,
115 diff_content: str,
116 context: str = None
117 ) -> Dict[str, Any]:
118 """Use LLM to review a diff for potential issues."""
119 if not self.llm:
120 return {"error": "LLM client not configured"}
121
122 prompt = f"""Review this code diff for potential issues.
123
124{f'Context: {context}' if context else ''}
125
126Diff:
127{diff_content[:4000]}
128
129Check for:
1301. Obvious bugs or logic errors
1312. Security vulnerabilities
1323. Performance issues
1334. Missing error handling
1345. Incomplete changes (e.g., missing imports, undefined variables)
1356. Style inconsistencies
136
137Respond in JSON format:
138{{
139 "issues": [
140 {{"severity": "high|medium|low", "description": "...", "line": optional_line_number}}
141 ],
142 "suggestions": ["..."],
143 "overall_assessment": "approve|request_changes|needs_discussion"
144}}"""
145
146 response = await self.llm.generate(prompt)
147
148 import json
149 try:
150 return json.loads(response)
151 except:
152 return {
153 "issues": [],
154 "suggestions": [],
155 "overall_assessment": "needs_discussion",
156 "raw_response": response
157 }
158
159 def check_for_common_issues(
160 self,
161 parsed_diff: List[Dict[str, Any]]
162 ) -> List[Dict[str, Any]]:
163 """Check diff for common programming issues."""
164 issues = []
165
166 for file in parsed_diff:
167 for hunk in file["hunks"]:
168 for i, line in enumerate(hunk["lines"]):
169 if line["type"] != "add":
170 continue
171
172 content = line["content"]
173
174 # Check for debug statements
175 if re.search(r"\b(console\.log|print|debugger|pdb\.set_trace)\b", content):
176 issues.append({
177 "file": file["path"],
178 "type": "debug_statement",
179 "severity": "low",
180 "content": content.strip()
181 })
182
183 # Check for TODO/FIXME
184 if re.search(r"\b(TODO|FIXME|HACK|XXX)\b", content, re.IGNORECASE):
185 issues.append({
186 "file": file["path"],
187 "type": "todo_marker",
188 "severity": "info",
189 "content": content.strip()
190 })
191
192 # Check for hardcoded credentials (simplified)
193 if re.search(r"(password|secret|api_key)\s*=\s*['"][^'"]+['"]", content, re.IGNORECASE):
194 issues.append({
195 "file": file["path"],
196 "type": "hardcoded_secret",
197 "severity": "high",
198 "content": "[REDACTED]"
199 })
200
201 return issuesSafe Git Operations
Implementing safety guards ensures the agent doesn't accidentally cause data loss or push unwanted changes:
1from typing import Callable, Optional
2from enum import Enum
3
4
5class GitSafetyGuard:
6 """
7 Safety layer for git operations.
8 """
9
10 def __init__(
11 self,
12 git_tools: GitTools,
13 on_approval_needed: Callable[[str, str], bool] = None
14 ):
15 self.git = git_tools
16 self.on_approval_needed = on_approval_needed or self._default_approval
17
18 def _default_approval(self, operation: str, details: str) -> bool:
19 """Default approval handler (always denies remote operations)."""
20 return False
21
22 async def safe_checkout(self, branch: str) -> Tuple[bool, str]:
23 """Safely checkout a branch, stashing changes if needed."""
24 status = await self.git.status()
25
26 if status.has_changes:
27 # Stash changes
28 await self.git.stash_changes(
29 f"Auto-stash before checkout to {branch}"
30 )
31
32 try:
33 await self.git.checkout_branch(branch)
34 return True, f"Checked out {branch}"
35 except Exception as e:
36 # Try to restore stash
37 try:
38 await self.git.pop_stash()
39 except:
40 pass
41 return False, str(e)
42
43 async def safe_commit(
44 self,
45 message: str = None,
46 files: List[str] = None
47 ) -> Tuple[bool, str]:
48 """Safely create a commit with validation."""
49 status = await self.git.status()
50
51 if not status.has_changes:
52 return False, "No changes to commit"
53
54 # Stage files
55 if files:
56 await self.git.add_files(files)
57 else:
58 # Stage all modified files (not untracked)
59 files_to_stage = status.modified + status.deleted
60 if files_to_stage:
61 await self.git.add_files(files_to_stage)
62
63 # Generate message if not provided
64 if not message:
65 message = await self.git.generate_commit_message(self.git.llm)
66
67 # Create commit
68 try:
69 commit_hash = await self.git.commit(message)
70 return True, f"Created commit {commit_hash}"
71 except Exception as e:
72 return False, str(e)
73
74 async def safe_push(
75 self,
76 remote: str = "origin",
77 branch: str = None
78 ) -> Tuple[bool, str]:
79 """Request approval before pushing."""
80 branch = branch or await self.git.current_branch()
81
82 # Get changes to push
83 commits = await self.git.log(count=5)
84
85 details = f"Push {len(commits)} commits to {remote}/{branch}"
86
87 if not self.on_approval_needed("push", details):
88 return False, "Push not approved"
89
90 success, stdout, stderr = await self.git._run_git(
91 "push", remote, branch
92 )
93
94 if success:
95 return True, f"Pushed to {remote}/{branch}"
96 else:
97 return False, stderr
98
99 async def safe_reset(
100 self,
101 target: str = "HEAD~1",
102 mode: str = "soft"
103 ) -> Tuple[bool, str]:
104 """Safely reset, creating a backup branch first."""
105 current = await self.git.current_branch()
106 backup_branch = f"{self.git.BRANCH_PREFIX}backup/{current}/{datetime.now().strftime('%Y%m%d_%H%M%S')}"
107
108 # Create backup branch
109 await self.git.create_branch(backup_branch, checkout=False)
110
111 # Perform reset
112 success, stdout, stderr = await self.git._run_git(
113 "reset", f"--{mode}", target
114 )
115
116 if success:
117 return True, f"Reset to {target}. Backup at {backup_branch}"
118 else:
119 return False, stderr
120
121 async def undo_last_commit(self) -> Tuple[bool, str]:
122 """Undo the last commit, keeping changes staged."""
123 return await self.safe_reset("HEAD~1", "soft")
124
125 async def discard_changes(self, paths: List[str]) -> Tuple[bool, str]:
126 """Discard changes to specific files."""
127 # Validate paths are in workspace
128 for path in paths:
129 full_path = self.git.workspace / path
130 if not full_path.exists():
131 return False, f"File not found: {path}"
132
133 success, stdout, stderr = await self.git._run_git(
134 "checkout", "--", *paths
135 )
136
137 if success:
138 return True, f"Discarded changes to {len(paths)} files"
139 else:
140 return False, stderr
141
142
143class GitWorkflow:
144 """
145 High-level git workflows for the coding agent.
146 """
147
148 def __init__(self, git_tools: GitTools, safety_guard: GitSafetyGuard):
149 self.git = git_tools
150 self.safety = safety_guard
151
152 async def start_task(self, task_id: str) -> str:
153 """Start a new task by creating a feature branch."""
154 branch_name = f"task/{task_id}"
155
156 # Ensure we're on main/master first
157 status = await self.git.status()
158 if status.has_changes:
159 await self.git.stash_changes(f"Pre-task stash for {task_id}")
160
161 # Create and checkout the branch
162 await self.git.create_branch(branch_name)
163
164 return branch_name
165
166 async def checkpoint(self, message: str = None) -> str:
167 """Create a work-in-progress commit."""
168 status = await self.git.status()
169
170 if not status.has_changes:
171 return "No changes to checkpoint"
172
173 # Stage all changes
174 await self.git.add_files(all_changes=True)
175
176 # Create WIP commit
177 commit_msg = message or f"WIP: checkpoint at {datetime.now().isoformat()}"
178 commit_hash = await self.git.commit(f"wip: {commit_msg}")
179
180 return f"Checkpoint created: {commit_hash}"
181
182 async def finish_task(
183 self,
184 task_id: str,
185 final_message: str = None
186 ) -> Dict[str, Any]:
187 """Complete a task with a final commit."""
188 # Ensure all changes are committed
189 status = await self.git.status()
190
191 if status.has_changes:
192 await self.git.add_files(all_changes=True)
193
194 if final_message:
195 await self.git.commit(final_message)
196 else:
197 message = await self.git.generate_commit_message(self.git.llm)
198 await self.git.commit(message)
199
200 # Get summary of work done
201 branch = await self.git.current_branch()
202 commits = await self.git.log(count=20)
203
204 return {
205 "branch": branch,
206 "commits": len(commits),
207 "status": "completed"
208 }git push without explicit user approval. Remote operations should always require human confirmation.Summary
In this section, we implemented comprehensive Git integration for our coding agent:
- Git Operations: Status, diff, log, branch management, and commit operations with proper parsing
- Branch Management: Creating, switching, and deleting branches with safety prefixes for agent work
- Commit Generation: LLM-powered commit message generation following conventional commit format
- Diff Analysis: Parsing and analyzing changes, detecting common issues automatically
- Safety Guards: Protection against data loss, approval workflows for remote operations
In the next section, we'll explore test-driven development patterns for coding agents, enabling them to write tests and verify their own work.