Introduction
Now let's build a production-quality ReAct agent with advanced features: robust error recovery, conversation memory, tool validation, and streaming output.
From Prototype to Production: The basic ReAct implementation works, but production agents need error handling, memory, and resilience. This section adds those capabilities.
Enhanced Architecture
🐍enhanced_architecture.py
1from dataclasses import dataclass, field
2from typing import Callable, Any
3from enum import Enum
4import time
5
6class AgentStatus(Enum):
7 RUNNING = "running"
8 COMPLETED = "completed"
9 FAILED = "failed"
10 WAITING = "waiting"
11
12@dataclass
13class AgentConfig:
14 max_steps: int = 15
15 max_retries_per_step: int = 3
16 step_timeout: int = 60
17 total_timeout: int = 300
18 verbose: bool = True
19
20@dataclass
21class Tool:
22 name: str
23 description: str
24 function: Callable[..., str]
25 parameters: dict[str, dict]
26 required_params: list[str] = field(default_factory=list)
27
28 def validate_params(self, params: dict) -> tuple[bool, str]:
29 """Validate parameters before execution."""
30 for req in self.required_params:
31 if req not in params:
32 return False, f"Missing required parameter: {req}"
33 return True, "Valid"
34
35 def execute(self, **params) -> str:
36 """Execute with validation."""
37 valid, msg = self.validate_params(params)
38 if not valid:
39 return f"Error: {msg}"
40
41 try:
42 return self.function(**params)
43 except Exception as e:
44 return f"Error executing {self.name}: {str(e)}"
45
46@dataclass
47class Step:
48 thought: str
49 action: str
50 action_params: dict
51 observation: str
52 success: bool
53 duration: float
54 timestamp: float = field(default_factory=time.time)
55
56@dataclass
57class AgentResult:
58 status: AgentStatus
59 answer: str | None
60 steps: list[Step]
61 total_duration: float
62 error: str | None = NoneError Recovery
🐍error_recovery.py
1class ErrorRecovery:
2 """Handle errors during agent execution."""
3
4 def __init__(self, llm):
5 self.llm = llm
6 self.error_patterns = {
7 "file not found": self.handle_file_not_found,
8 "permission denied": self.handle_permission_denied,
9 "timeout": self.handle_timeout,
10 "connection": self.handle_connection_error,
11 "rate limit": self.handle_rate_limit,
12 }
13
14 def analyze_error(self, error: str, context: dict) -> dict:
15 """Analyze error and suggest recovery."""
16 error_lower = error.lower()
17
18 for pattern, handler in self.error_patterns.items():
19 if pattern in error_lower:
20 return handler(error, context)
21
22 # Use LLM for unknown errors
23 return self.llm_analyze_error(error, context)
24
25 def handle_file_not_found(
26 self,
27 error: str,
28 context: dict,
29 ) -> dict:
30 """Handle file not found errors."""
31 return {
32 "recoverable": True,
33 "strategy": "find_alternative",
34 "suggestion": (
35 "File not found. Try listing directory contents "
36 "or searching for similar files."
37 ),
38 "retry_action": {
39 "name": "list_files",
40 "params": {"directory": "."},
41 },
42 }
43
44 def handle_permission_denied(
45 self,
46 error: str,
47 context: dict,
48 ) -> dict:
49 """Handle permission errors."""
50 return {
51 "recoverable": True,
52 "strategy": "escalate",
53 "suggestion": (
54 "Permission denied. Ask user for elevated "
55 "permissions or try alternative approach."
56 ),
57 "retry_action": None, # Needs user intervention
58 }
59
60 def handle_timeout(
61 self,
62 error: str,
63 context: dict,
64 ) -> dict:
65 """Handle timeout errors."""
66 return {
67 "recoverable": True,
68 "strategy": "retry_with_smaller_scope",
69 "suggestion": (
70 "Operation timed out. Try with smaller input "
71 "or break into smaller steps."
72 ),
73 "retry_action": None,
74 }
75
76 def handle_connection_error(
77 self,
78 error: str,
79 context: dict,
80 ) -> dict:
81 """Handle network errors."""
82 return {
83 "recoverable": True,
84 "strategy": "retry",
85 "suggestion": "Connection failed. Will retry after delay.",
86 "delay": 2,
87 "retry_action": context.get("last_action"),
88 }
89
90 def handle_rate_limit(
91 self,
92 error: str,
93 context: dict,
94 ) -> dict:
95 """Handle rate limiting."""
96 return {
97 "recoverable": True,
98 "strategy": "backoff",
99 "suggestion": "Rate limited. Waiting before retry.",
100 "delay": 10,
101 "retry_action": context.get("last_action"),
102 }
103
104 def llm_analyze_error(
105 self,
106 error: str,
107 context: dict,
108 ) -> dict:
109 """Use LLM to analyze unknown errors."""
110 prompt = f"""
111Analyze this error and suggest recovery:
112
113Error: {error}
114
115Context:
116- Task: {context.get('task')}
117- Last action: {context.get('last_action')}
118- History length: {len(context.get('history', []))}
119
120Provide:
1211. Is this recoverable? (yes/no)
1222. What strategy should we use?
1233. What should we try next?
124
125Response (JSON):
126"""
127 response = self.llm.generate(prompt)
128 try:
129 import json
130 return json.loads(response)
131 except:
132 return {
133 "recoverable": False,
134 "strategy": "abort",
135 "suggestion": "Unable to analyze error",
136 }Adding Memory
🐍agent_memory.py
1from dataclasses import dataclass, field
2from typing import Optional
3import json
4
5@dataclass
6class ConversationMemory:
7 """Short-term memory for current conversation."""
8 messages: list[dict] = field(default_factory=list)
9 max_messages: int = 50
10
11 def add(self, role: str, content: str) -> None:
12 self.messages.append({"role": role, "content": content})
13 if len(self.messages) > self.max_messages:
14 # Keep first (system) and recent messages
15 self.messages = self.messages[:1] + self.messages[-self.max_messages+1:]
16
17 def get_context(self, n_recent: int = 10) -> list[dict]:
18 return self.messages[-n_recent:]
19
20 def summarize(self, llm) -> str:
21 """Summarize conversation so far."""
22 if len(self.messages) < 5:
23 return ""
24
25 prompt = f"""
26Summarize this conversation briefly:
27
28{json.dumps(self.messages, indent=2)}
29
30Summary:
31"""
32 return llm.generate(prompt)
33
34
35@dataclass
36class WorkingMemory:
37 """Working memory for current task."""
38 task: str = ""
39 findings: list[str] = field(default_factory=list)
40 hypotheses: list[str] = field(default_factory=list)
41 decisions: list[str] = field(default_factory=list)
42
43 def add_finding(self, finding: str) -> None:
44 self.findings.append(finding)
45
46 def add_hypothesis(self, hypothesis: str) -> None:
47 self.hypotheses.append(hypothesis)
48
49 def add_decision(self, decision: str) -> None:
50 self.decisions.append(decision)
51
52 def to_context(self) -> str:
53 context = f"Task: {self.task}\n"
54
55 if self.findings:
56 context += "\nFindings:\n"
57 for f in self.findings[-5:]: # Last 5
58 context += f"- {f}\n"
59
60 if self.hypotheses:
61 context += "\nHypotheses:\n"
62 for h in self.hypotheses[-3:]:
63 context += f"- {h}\n"
64
65 if self.decisions:
66 context += "\nDecisions:\n"
67 for d in self.decisions[-3:]:
68 context += f"- {d}\n"
69
70 return context
71
72
73class AgentMemory:
74 """Complete memory system for agent."""
75
76 def __init__(self, llm):
77 self.llm = llm
78 self.conversation = ConversationMemory()
79 self.working = WorkingMemory()
80 self.successful_patterns: list[dict] = []
81 self.failed_patterns: list[dict] = []
82
83 def record_success(
84 self,
85 action: str,
86 context: str,
87 result: str,
88 ) -> None:
89 """Record successful pattern."""
90 self.successful_patterns.append({
91 "action": action,
92 "context": context[:200],
93 "result": result[:200],
94 })
95
96 def record_failure(
97 self,
98 action: str,
99 context: str,
100 error: str,
101 ) -> None:
102 """Record failed pattern."""
103 self.failed_patterns.append({
104 "action": action,
105 "context": context[:200],
106 "error": error[:200],
107 })
108
109 def get_relevant_patterns(self, context: str) -> str:
110 """Get patterns relevant to current context."""
111 # Simple keyword matching for now
112 relevant_successes = []
113 relevant_failures = []
114
115 context_lower = context.lower()
116
117 for pattern in self.successful_patterns[-10:]:
118 if any(word in context_lower for word in pattern["context"].lower().split()):
119 relevant_successes.append(pattern)
120
121 for pattern in self.failed_patterns[-10:]:
122 if any(word in context_lower for word in pattern["context"].lower().split()):
123 relevant_failures.append(pattern)
124
125 result = ""
126 if relevant_successes:
127 result += "\nSuccessful patterns to consider:\n"
128 for p in relevant_successes[:3]:
129 result += f"- {p['action']}: {p['result'][:50]}\n"
130
131 if relevant_failures:
132 result += "\nPatterns to avoid:\n"
133 for p in relevant_failures[:3]:
134 result += f"- {p['action']}: {p['error'][:50]}\n"
135
136 return resultComplete Enhanced Agent
🐍enhanced_react_agent.py
1import anthropic
2import time
3import re
4from dataclasses import dataclass
5
6class EnhancedReActAgent:
7 """Production-ready ReAct agent."""
8
9 def __init__(
10 self,
11 tools: list[Tool],
12 config: AgentConfig = None,
13 ):
14 self.client = anthropic.Anthropic()
15 self.tools = {t.name: t for t in tools}
16 self.config = config or AgentConfig()
17 self.memory = AgentMemory(self)
18 self.error_recovery = ErrorRecovery(self)
19
20 def run(self, task: str) -> AgentResult:
21 """Run agent on task with full error handling."""
22 start_time = time.time()
23 steps = []
24 self.memory.working.task = task
25
26 try:
27 result = self._run_loop(task, steps, start_time)
28 return result
29 except Exception as e:
30 return AgentResult(
31 status=AgentStatus.FAILED,
32 answer=None,
33 steps=steps,
34 total_duration=time.time() - start_time,
35 error=str(e),
36 )
37
38 def _run_loop(
39 self,
40 task: str,
41 steps: list[Step],
42 start_time: float,
43 ) -> AgentResult:
44 """Main agent loop."""
45 step_num = 0
46
47 while step_num < self.config.max_steps:
48 # Check total timeout
49 if time.time() - start_time > self.config.total_timeout:
50 return AgentResult(
51 status=AgentStatus.FAILED,
52 answer=None,
53 steps=steps,
54 total_duration=time.time() - start_time,
55 error="Total timeout exceeded",
56 )
57
58 step_num += 1
59 step_start = time.time()
60
61 if self.config.verbose:
62 print(f"\n{'='*50}")
63 print(f"Step {step_num}")
64 print('='*50)
65
66 # Generate thought and action
67 thought, action_name, action_params = self._generate_step(
68 task, steps
69 )
70
71 if self.config.verbose:
72 print(f"Thought: {thought}")
73 print(f"Action: {action_name}({action_params})")
74
75 # Execute with retry
76 observation, success = self._execute_with_retry(
77 action_name,
78 action_params,
79 {"task": task, "history": steps},
80 )
81
82 if self.config.verbose:
83 print(f"Observation: {observation[:300]}...")
84
85 # Record step
86 step = Step(
87 thought=thought,
88 action=action_name,
89 action_params=action_params,
90 observation=observation,
91 success=success,
92 duration=time.time() - step_start,
93 )
94 steps.append(step)
95
96 # Update memory
97 if success:
98 self.memory.record_success(
99 action_name,
100 thought,
101 observation,
102 )
103 else:
104 self.memory.record_failure(
105 action_name,
106 thought,
107 observation,
108 )
109
110 # Check for completion
111 if action_name == "finish":
112 return AgentResult(
113 status=AgentStatus.COMPLETED,
114 answer=action_params.get("answer", observation),
115 steps=steps,
116 total_duration=time.time() - start_time,
117 )
118
119 # Max steps reached
120 return AgentResult(
121 status=AgentStatus.FAILED,
122 answer=None,
123 steps=steps,
124 total_duration=time.time() - start_time,
125 error="Max steps reached",
126 )
127
128 def _generate_step(
129 self,
130 task: str,
131 history: list[Step],
132 ) -> tuple[str, str, dict]:
133 """Generate next thought and action."""
134 prompt = self._build_prompt(task, history)
135
136 response = self.client.messages.create(
137 model="claude-sonnet-4-20250514",
138 max_tokens=1024,
139 messages=[{"role": "user", "content": prompt}],
140 )
141 output = response.content[0].text
142
143 return self._parse_output(output)
144
145 def _build_prompt(self, task: str, history: list[Step]) -> str:
146 """Build prompt with context and history."""
147 # Tool descriptions
148 tools_desc = "\n".join([
149 f"- {name}: {tool.description}"
150 for name, tool in self.tools.items()
151 ])
152
153 # History
154 history_str = ""
155 for step in history:
156 history_str += f"Thought: {step.thought}\n"
157 history_str += f"Action: {step.action}({step.action_params})\n"
158 history_str += f"Observation: {step.observation}\n\n"
159
160 # Working memory
161 working_context = self.memory.working.to_context()
162
163 # Relevant patterns
164 patterns = self.memory.get_relevant_patterns(task)
165
166 return f'''You solve tasks by reasoning and taking actions.
167
168Tools available:
169{tools_desc}
170
171Format:
172Thought: [Your reasoning]
173Action: tool_name(param="value")
174
175{working_context}
176{patterns}
177
178Task: {task}
179
180{history_str}Thought:'''
181
182 def _parse_output(
183 self,
184 output: str,
185 ) -> tuple[str, str, dict]:
186 """Parse LLM output."""
187 # Extract thought
188 thought_match = re.search(
189 r"Thought:\s*(.+?)(?=Action:|$)",
190 output,
191 re.DOTALL
192 )
193 thought = thought_match.group(1).strip() if thought_match else ""
194
195 # Extract action
196 action_match = re.search(r"Action:\s*(\w+)\((.*)\)", output)
197 if not action_match:
198 # Default to asking for clarification
199 return thought, "finish", {"answer": "Could not determine action"}
200
201 action_name = action_match.group(1)
202 args_str = action_match.group(2)
203
204 # Parse params
205 params = {}
206 pattern = r'(\w+)\s*=\s*["\'](.*?)["\'"]'
207 for key, value in re.findall(pattern, args_str):
208 params[key] = value
209
210 return thought, action_name, params
211
212 def _execute_with_retry(
213 self,
214 action_name: str,
215 params: dict,
216 context: dict,
217 ) -> tuple[str, bool]:
218 """Execute action with retry on failure."""
219 tool = self.tools.get(action_name)
220 if not tool:
221 return f"Unknown tool: {action_name}", False
222
223 for attempt in range(self.config.max_retries_per_step):
224 try:
225 result = tool.execute(**params)
226
227 # Check if result indicates error
228 if "error" in result.lower():
229 recovery = self.error_recovery.analyze_error(
230 result,
231 {**context, "last_action": (action_name, params)},
232 )
233
234 if recovery.get("delay"):
235 time.sleep(recovery["delay"])
236
237 if recovery.get("retry_action") and attempt < 2:
238 continue
239
240 return result, False
241
242 return result, True
243
244 except Exception as e:
245 if attempt == self.config.max_retries_per_step - 1:
246 return f"Error after {attempt + 1} attempts: {e}", False
247 time.sleep(1) # Brief delay before retry
248
249 return "Max retries exceeded", FalseUsing the Enhanced Agent
🐍usage_example.py
1# Define tools
2tools = [
3 Tool(
4 name="search",
5 description="Search the web",
6 function=lambda query: f"Results for: {query}",
7 parameters={"query": {"type": "string", "description": "Search query"}},
8 required_params=["query"],
9 ),
10 Tool(
11 name="read_file",
12 description="Read a file",
13 function=lambda path: open(path).read(),
14 parameters={"path": {"type": "string", "description": "File path"}},
15 required_params=["path"],
16 ),
17 Tool(
18 name="finish",
19 description="Complete with answer",
20 function=lambda answer: answer,
21 parameters={"answer": {"type": "string", "description": "Final answer"}},
22 required_params=["answer"],
23 ),
24]
25
26# Create agent
27agent = EnhancedReActAgent(
28 tools=tools,
29 config=AgentConfig(
30 max_steps=15,
31 max_retries_per_step=3,
32 verbose=True,
33 ),
34)
35
36# Run task
37result = agent.run(
38 "Find information about Python async programming and "
39 "summarize the key concepts."
40)
41
42print(f"\nStatus: {result.status}")
43print(f"Answer: {result.answer}")
44print(f"Steps: {len(result.steps)}")
45print(f"Duration: {result.total_duration:.2f}s")Production Considerations
In production, add: rate limiting, cost tracking, audit logging, user authentication, input sanitization, and output validation. This implementation is a starting point.
Summary
Building a production ReAct agent:
- Architecture: Proper types, configuration, and structure
- Error recovery: Pattern-based and LLM-assisted recovery
- Memory: Working memory, conversation history, learned patterns
- Robustness: Retries, timeouts, validation
- Complete agent: All components integrated
Chapter Complete: You now understand ReAct deeply and can build production-quality agents. In the next chapter, we'll explore tool use and function calling in detail.