Introduction
Throughout this chapter, we've explored individual planning techniques: task decomposition, hierarchical planning, self-reflection, chain-of-thought reasoning, and tree of thoughts. Now it's time to bring them together into a complete Planner Agentβan intelligent system that can take complex goals, break them down, execute plans, monitor progress, and adapt when things don't go as expected.
This section provides a production-ready implementation that you can use as a foundation for building sophisticated planning agents.
Goal: Build a complete planner agent that combines decomposition, execution, reflection, and adaptation into a coherent system capable of handling real-world complexity.
Planner Agent Architecture
Our planner agent follows a modular architecture with clear separation of concerns:
| Component | Responsibility | Key Methods |
|---|---|---|
| TaskDecomposer | Break goals into executable tasks | decompose(), refine() |
| PlanManager | Track plan state and dependencies | add_task(), get_ready_tasks() |
| Executor | Execute individual tasks | execute(), handle_failure() |
| Reflector | Evaluate quality and progress | reflect(), assess_plan() |
| Replanner | Adapt plans when needed | replan(), adjust_strategy() |
| PlannerAgent | Orchestrate the full workflow | run(), step() |
Information Flow
1βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2β PlannerAgent β
3β β
4β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
5β β Goal βββββΆβ Decomposer βββββΆβ PlanManager β β
6β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
7β β β
8β βΌ β
9β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
10β β Replanner ββββββ Reflector ββββββ Executor β β
11β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
12β β β² β
13β ββββββββββββββββββββββββββββββββββββββββββ β
14β (if replanning needed) β
15βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββCore Components
Let's implement each component with clear interfaces:
Data Models
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3from enum import Enum
4from datetime import datetime
5import uuid
6
7class TaskStatus(Enum):
8 PENDING = "pending"
9 READY = "ready"
10 IN_PROGRESS = "in_progress"
11 COMPLETED = "completed"
12 FAILED = "failed"
13 BLOCKED = "blocked"
14 CANCELLED = "cancelled"
15
16class TaskPriority(Enum):
17 LOW = 1
18 MEDIUM = 2
19 HIGH = 3
20 CRITICAL = 4
21
22@dataclass
23class Task:
24 """A single task in the plan."""
25 id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
26 description: str = ""
27 status: TaskStatus = TaskStatus.PENDING
28 priority: TaskPriority = TaskPriority.MEDIUM
29 dependencies: list[str] = field(default_factory=list)
30 result: Any = None
31 error: Optional[str] = None
32 attempts: int = 0
33 max_attempts: int = 3
34 created_at: datetime = field(default_factory=datetime.now)
35 completed_at: Optional[datetime] = None
36 metadata: dict = field(default_factory=dict)
37
38 def is_ready(self, completed_tasks: set[str]) -> bool:
39 """Check if all dependencies are satisfied."""
40 return all(dep in completed_tasks for dep in self.dependencies)
41
42@dataclass
43class Plan:
44 """A complete plan with tasks and metadata."""
45 id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
46 goal: str = ""
47 tasks: dict[str, Task] = field(default_factory=dict)
48 execution_order: list[str] = field(default_factory=list)
49 created_at: datetime = field(default_factory=datetime.now)
50 status: str = "active"
51 metadata: dict = field(default_factory=dict)
52
53 def add_task(self, task: Task) -> None:
54 """Add a task to the plan."""
55 self.tasks[task.id] = task
56 if task.id not in self.execution_order:
57 self.execution_order.append(task.id)
58
59 def get_completed_task_ids(self) -> set[str]:
60 """Get IDs of completed tasks."""
61 return {
62 tid for tid, task in self.tasks.items()
63 if task.status == TaskStatus.COMPLETED
64 }
65
66 def get_ready_tasks(self) -> list[Task]:
67 """Get tasks ready for execution."""
68 completed = self.get_completed_task_ids()
69 ready = []
70 for task in self.tasks.values():
71 if task.status == TaskStatus.PENDING and task.is_ready(completed):
72 ready.append(task)
73 return sorted(ready, key=lambda t: -t.priority.value)
74
75 def get_progress(self) -> dict:
76 """Get plan progress statistics."""
77 total = len(self.tasks)
78 if total == 0:
79 return {"total": 0, "completed": 0, "percentage": 0}
80
81 by_status = {}
82 for task in self.tasks.values():
83 status = task.status.value
84 by_status[status] = by_status.get(status, 0) + 1
85
86 completed = by_status.get("completed", 0)
87 return {
88 "total": total,
89 "by_status": by_status,
90 "completed": completed,
91 "percentage": round(100 * completed / total, 1)
92 }
93
94@dataclass
95class ExecutionResult:
96 """Result of executing a task."""
97 task_id: str
98 success: bool
99 result: Any = None
100 error: Optional[str] = None
101 duration_seconds: float = 0.0
102
103@dataclass
104class ReflectionResult:
105 """Result of reflecting on progress."""
106 quality_score: float
107 issues: list[str]
108 suggestions: list[str]
109 should_replan: bool
110 reasoning: strTask Decomposer
1from anthropic import Anthropic
2import json
3
4class TaskDecomposer:
5 """Decomposes goals into executable tasks."""
6
7 def __init__(self, model: str = "claude-sonnet-4-20250514"):
8 self.client = Anthropic()
9 self.model = model
10
11 async def decompose(
12 self,
13 goal: str,
14 context: dict = None,
15 constraints: list[str] = None
16 ) -> list[Task]:
17 """Decompose a goal into tasks."""
18
19 prompt = f"""Decompose this goal into concrete, executable tasks.
20
21Goal: {goal}
22Context: {json.dumps(context) if context else 'None'}
23Constraints: {constraints if constraints else 'None'}
24
25Requirements:
261. Each task should be completable in a single focused effort
272. Tasks should have clear success criteria
283. Identify dependencies between tasks
294. Estimate priority (low/medium/high/critical)
30
31Return JSON:
32{{
33 "tasks": [
34 {{
35 "id": "unique_short_id",
36 "description": "what to do",
37 "dependencies": ["ids of prerequisite tasks"],
38 "priority": "low|medium|high|critical",
39 "estimated_effort": "quick|moderate|substantial"
40 }}
41 ]
42}}"""
43
44 response = self.client.messages.create(
45 model=self.model,
46 max_tokens=4096,
47 messages=[{"role": "user", "content": prompt}]
48 )
49
50 result = json.loads(response.content[0].text)
51 tasks = []
52
53 for task_data in result["tasks"]:
54 priority_map = {
55 "low": TaskPriority.LOW,
56 "medium": TaskPriority.MEDIUM,
57 "high": TaskPriority.HIGH,
58 "critical": TaskPriority.CRITICAL
59 }
60
61 task = Task(
62 id=task_data["id"],
63 description=task_data["description"],
64 dependencies=task_data.get("dependencies", []),
65 priority=priority_map.get(task_data.get("priority", "medium"), TaskPriority.MEDIUM),
66 metadata={"estimated_effort": task_data.get("estimated_effort", "moderate")}
67 )
68 tasks.append(task)
69
70 return tasks
71
72 async def refine_task(self, task: Task, feedback: str) -> list[Task]:
73 """Further decompose a task based on feedback."""
74
75 prompt = f"""This task needs to be broken down further.
76
77Task: {task.description}
78Feedback: {feedback}
79
80Break this into smaller, more specific subtasks.
81
82Return JSON:
83{{
84 "subtasks": [
85 {{"id": "...", "description": "...", "dependencies": []}}
86 ]
87}}"""
88
89 response = self.client.messages.create(
90 model=self.model,
91 max_tokens=2048,
92 messages=[{"role": "user", "content": prompt}]
93 )
94
95 result = json.loads(response.content[0].text)
96
97 subtasks = []
98 for st_data in result["subtasks"]:
99 subtask = Task(
100 id=f"{task.id}_{st_data['id']}",
101 description=st_data["description"],
102 dependencies=[f"{task.id}_{d}" for d in st_data.get("dependencies", [])],
103 priority=task.priority
104 )
105 subtasks.append(subtask)
106
107 return subtasksTask Executor
1import time
2import asyncio
3
4class TaskExecutor:
5 """Executes individual tasks."""
6
7 def __init__(
8 self,
9 tools: dict[str, Callable] = None,
10 model: str = "claude-sonnet-4-20250514"
11 ):
12 self.tools = tools or {}
13 self.client = Anthropic()
14 self.model = model
15
16 async def execute(
17 self,
18 task: Task,
19 context: dict = None
20 ) -> ExecutionResult:
21 """Execute a single task."""
22 start_time = time.time()
23
24 try:
25 # Mark as in progress
26 task.status = TaskStatus.IN_PROGRESS
27 task.attempts += 1
28
29 # Determine execution strategy
30 if self._requires_tool(task):
31 result = await self._execute_with_tools(task, context)
32 else:
33 result = await self._execute_with_llm(task, context)
34
35 # Mark completed
36 task.status = TaskStatus.COMPLETED
37 task.result = result
38 task.completed_at = datetime.now()
39
40 return ExecutionResult(
41 task_id=task.id,
42 success=True,
43 result=result,
44 duration_seconds=time.time() - start_time
45 )
46
47 except Exception as e:
48 task.error = str(e)
49
50 if task.attempts >= task.max_attempts:
51 task.status = TaskStatus.FAILED
52 else:
53 task.status = TaskStatus.PENDING # Will retry
54
55 return ExecutionResult(
56 task_id=task.id,
57 success=False,
58 error=str(e),
59 duration_seconds=time.time() - start_time
60 )
61
62 def _requires_tool(self, task: Task) -> bool:
63 """Check if task requires specific tools."""
64 tool_keywords = ["search", "fetch", "run", "execute", "calculate"]
65 return any(kw in task.description.lower() for kw in tool_keywords)
66
67 async def _execute_with_tools(
68 self,
69 task: Task,
70 context: dict
71 ) -> Any:
72 """Execute task using available tools."""
73
74 prompt = f"""Execute this task using available tools.
75
76Task: {task.description}
77Context: {json.dumps(context) if context else 'None'}
78Available tools: {list(self.tools.keys())}
79
80Determine which tool to use and what arguments to pass.
81Return JSON:
82{{"tool": "tool_name", "arguments": {{...}}}}"""
83
84 response = self.client.messages.create(
85 model=self.model,
86 max_tokens=1024,
87 messages=[{"role": "user", "content": prompt}]
88 )
89
90 tool_call = json.loads(response.content[0].text)
91 tool_name = tool_call["tool"]
92 arguments = tool_call.get("arguments", {})
93
94 if tool_name not in self.tools:
95 raise ValueError(f"Tool not found: {tool_name}")
96
97 tool_fn = self.tools[tool_name]
98 result = await tool_fn(**arguments) if asyncio.iscoroutinefunction(tool_fn) else tool_fn(**arguments)
99
100 return result
101
102 async def _execute_with_llm(
103 self,
104 task: Task,
105 context: dict
106 ) -> str:
107 """Execute task using pure LLM reasoning."""
108
109 prompt = f"""Complete this task thoroughly.
110
111Task: {task.description}
112Context: {json.dumps(context) if context else 'None'}
113
114Think step by step and provide a complete result."""
115
116 response = self.client.messages.create(
117 model=self.model,
118 max_tokens=4096,
119 messages=[{"role": "user", "content": prompt}]
120 )
121
122 return response.content[0].textReflector
1class PlanReflector:
2 """Reflects on plan quality and progress."""
3
4 def __init__(self, model: str = "claude-sonnet-4-20250514"):
5 self.client = Anthropic()
6 self.model = model
7
8 async def reflect(
9 self,
10 plan: Plan,
11 recent_results: list[ExecutionResult]
12 ) -> ReflectionResult:
13 """Reflect on plan progress and quality."""
14
15 progress = plan.get_progress()
16
17 # Summarize recent execution
18 recent_summary = []
19 for result in recent_results[-5:]: # Last 5 results
20 task = plan.tasks.get(result.task_id)
21 status = "β" if result.success else "β"
22 recent_summary.append(
23 f"{status} {task.description if task else result.task_id}: "
24 f"{result.result if result.success else result.error}"
25 )
26
27 prompt = f"""Evaluate the current state of this plan.
28
29Goal: {plan.goal}
30
31Progress: {progress['percentage']}% complete ({progress['completed']}/{progress['total']} tasks)
32
33Task status breakdown: {progress['by_status']}
34
35Recent executions:
36{chr(10).join(recent_summary)}
37
38Remaining tasks:
39{chr(10).join(f"- {t.description}" for t in plan.tasks.values() if t.status == TaskStatus.PENDING)}
40
41Evaluate:
421. Quality of progress so far (0-100)
432. Any issues or blockers
443. Suggestions for improvement
454. Whether replanning is needed
46
47Return JSON:
48{{
49 "quality_score": 0-100,
50 "issues": ["list of problems"],
51 "suggestions": ["improvement ideas"],
52 "should_replan": true/false,
53 "reasoning": "explanation"
54}}"""
55
56 response = self.client.messages.create(
57 model=self.model,
58 max_tokens=1024,
59 messages=[{"role": "user", "content": prompt}]
60 )
61
62 result = json.loads(response.content[0].text)
63
64 return ReflectionResult(
65 quality_score=result["quality_score"] / 100.0,
66 issues=result["issues"],
67 suggestions=result["suggestions"],
68 should_replan=result["should_replan"],
69 reasoning=result["reasoning"]
70 )Replanner
1class PlanReplanner:
2 """Adapts plans based on feedback and failures."""
3
4 def __init__(self, model: str = "claude-sonnet-4-20250514"):
5 self.client = Anthropic()
6 self.model = model
7
8 async def replan(
9 self,
10 plan: Plan,
11 reflection: ReflectionResult,
12 failed_tasks: list[Task]
13 ) -> list[Task]:
14 """Generate new or modified tasks based on reflection."""
15
16 completed_tasks = [
17 t.description for t in plan.tasks.values()
18 if t.status == TaskStatus.COMPLETED
19 ]
20
21 pending_tasks = [
22 t.description for t in plan.tasks.values()
23 if t.status == TaskStatus.PENDING
24 ]
25
26 failed_descriptions = [t.description for t in failed_tasks]
27
28 prompt = f"""Revise the plan based on current progress and issues.
29
30Original Goal: {plan.goal}
31
32Completed tasks:
33{chr(10).join(f"- {t}" for t in completed_tasks) or "None yet"}
34
35Failed tasks:
36{chr(10).join(f"- {t}" for t in failed_descriptions) or "None"}
37
38Current issues:
39{chr(10).join(f"- {i}" for i in reflection.issues)}
40
41Suggestions:
42{chr(10).join(f"- {s}" for s in reflection.suggestions)}
43
44Currently pending tasks:
45{chr(10).join(f"- {t}" for t in pending_tasks)}
46
47Create new or revised tasks that:
481. Work around failed tasks if possible
492. Address the identified issues
503. Still achieve the original goal
514. Build on what's already completed
52
53Return JSON:
54{{
55 "remove_tasks": ["descriptions of tasks to remove"],
56 "new_tasks": [
57 {{"id": "...", "description": "...", "dependencies": [], "priority": "..."}}
58 ],
59 "reasoning": "explanation of changes"
60}}"""
61
62 response = self.client.messages.create(
63 model=self.model,
64 max_tokens=4096,
65 messages=[{"role": "user", "content": prompt}]
66 )
67
68 result = json.loads(response.content[0].text)
69
70 # Create new task objects
71 new_tasks = []
72 for task_data in result["new_tasks"]:
73 priority_map = {
74 "low": TaskPriority.LOW,
75 "medium": TaskPriority.MEDIUM,
76 "high": TaskPriority.HIGH,
77 "critical": TaskPriority.CRITICAL
78 }
79
80 task = Task(
81 id=task_data["id"],
82 description=task_data["description"],
83 dependencies=task_data.get("dependencies", []),
84 priority=priority_map.get(task_data.get("priority", "medium"), TaskPriority.MEDIUM),
85 metadata={"source": "replanning"}
86 )
87 new_tasks.append(task)
88
89 return new_tasksThe Planning-Execution Loop
The core of the planner agent is its execution loop, which coordinates all components:
1class PlanningLoop:
2 """
3 Orchestrates the plan-execute-reflect-replan cycle.
4 """
5
6 def __init__(
7 self,
8 decomposer: TaskDecomposer,
9 executor: TaskExecutor,
10 reflector: PlanReflector,
11 replanner: PlanReplanner
12 ):
13 self.decomposer = decomposer
14 self.executor = executor
15 self.reflector = reflector
16 self.replanner = replanner
17
18 self.execution_results: list[ExecutionResult] = []
19 self.reflection_history: list[ReflectionResult] = []
20
21 async def run(
22 self,
23 goal: str,
24 context: dict = None,
25 max_iterations: int = 50,
26 parallel_execution: bool = True
27 ) -> tuple[Plan, list[ExecutionResult]]:
28 """Run the complete planning loop."""
29
30 # Phase 1: Decompose goal into plan
31 print(f"Decomposing goal: {goal}")
32 tasks = await self.decomposer.decompose(goal, context)
33
34 plan = Plan(goal=goal)
35 for task in tasks:
36 plan.add_task(task)
37
38 print(f"Created plan with {len(tasks)} tasks")
39
40 # Phase 2: Execute with reflection
41 iteration = 0
42 while iteration < max_iterations:
43 iteration += 1
44
45 # Get ready tasks
46 ready_tasks = plan.get_ready_tasks()
47
48 if not ready_tasks:
49 # Check if done or blocked
50 progress = plan.get_progress()
51 if progress["completed"] == progress["total"]:
52 print("Plan completed successfully!")
53 break
54 elif all(t.status in [TaskStatus.FAILED, TaskStatus.BLOCKED, TaskStatus.COMPLETED]
55 for t in plan.tasks.values()):
56 print("Plan blocked - no more executable tasks")
57 break
58 else:
59 continue
60
61 # Execute ready tasks
62 if parallel_execution and len(ready_tasks) > 1:
63 results = await self._execute_parallel(ready_tasks, context)
64 else:
65 results = await self._execute_sequential(ready_tasks, context)
66
67 self.execution_results.extend(results)
68
69 # Reflect periodically
70 if iteration % 5 == 0 or any(not r.success for r in results):
71 reflection = await self.reflector.reflect(plan, self.execution_results)
72 self.reflection_history.append(reflection)
73
74 print(f"Reflection: quality={reflection.quality_score:.2f}, "
75 f"replan={reflection.should_replan}")
76
77 # Replan if needed
78 if reflection.should_replan:
79 failed_tasks = [
80 plan.tasks[r.task_id] for r in results
81 if not r.success and r.task_id in plan.tasks
82 ]
83
84 new_tasks = await self.replanner.replan(
85 plan, reflection, failed_tasks
86 )
87
88 for task in new_tasks:
89 plan.add_task(task)
90
91 print(f"Replanned: added {len(new_tasks)} new tasks")
92
93 # Progress report
94 progress = plan.get_progress()
95 print(f"Progress: {progress['percentage']}%")
96
97 return plan, self.execution_results
98
99 async def _execute_parallel(
100 self,
101 tasks: list[Task],
102 context: dict
103 ) -> list[ExecutionResult]:
104 """Execute multiple tasks in parallel."""
105 coros = [self.executor.execute(task, context) for task in tasks]
106 return await asyncio.gather(*coros)
107
108 async def _execute_sequential(
109 self,
110 tasks: list[Task],
111 context: dict
112 ) -> list[ExecutionResult]:
113 """Execute tasks one by one."""
114 results = []
115 for task in tasks:
116 result = await self.executor.execute(task, context)
117 results.append(result)
118 return resultsComplete Implementation
Here's the complete PlannerAgent that ties everything together:
1from anthropic import Anthropic
2from dataclasses import dataclass, field
3from typing import Any, Optional, Callable, Dict, List
4from enum import Enum
5from datetime import datetime
6import json
7import asyncio
8import uuid
9
10class PlannerAgent:
11 """
12 Complete Planner Agent implementation.
13
14 Combines task decomposition, execution, reflection,
15 and replanning into a cohesive planning system.
16 """
17
18 def __init__(
19 self,
20 tools: dict[str, Callable] = None,
21 model: str = "claude-sonnet-4-20250514",
22 config: dict = None
23 ):
24 self.model = model
25 self.tools = tools or {}
26 self.config = config or {
27 "max_iterations": 50,
28 "parallel_execution": True,
29 "reflection_frequency": 5,
30 "max_replan_attempts": 3
31 }
32
33 # Initialize components
34 self.decomposer = TaskDecomposer(model=model)
35 self.executor = TaskExecutor(tools=tools, model=model)
36 self.reflector = PlanReflector(model=model)
37 self.replanner = PlanReplanner(model=model)
38
39 # State
40 self.current_plan: Optional[Plan] = None
41 self.execution_history: list[ExecutionResult] = []
42 self.reflections: list[ReflectionResult] = []
43 self.replan_count = 0
44
45 async def run(
46 self,
47 goal: str,
48 context: dict = None,
49 callbacks: dict = None
50 ) -> dict:
51 """
52 Run the planner agent to achieve a goal.
53
54 Args:
55 goal: The goal to achieve
56 context: Additional context for planning
57 callbacks: Optional callbacks for progress updates
58
59 Returns:
60 dict with plan, results, and summary
61 """
62 callbacks = callbacks or {}
63
64 try:
65 # Phase 1: Planning
66 if callbacks.get("on_phase"):
67 callbacks["on_phase"]("planning")
68
69 print(f"\n{'='*60}")
70 print(f"GOAL: {goal}")
71 print(f"{'='*60}\n")
72
73 tasks = await self.decomposer.decompose(goal, context)
74 self.current_plan = Plan(goal=goal)
75
76 for task in tasks:
77 self.current_plan.add_task(task)
78
79 print(f"Created plan with {len(tasks)} tasks:")
80 for task in tasks:
81 deps = ", ".join(task.dependencies) or "none"
82 print(f" [{task.id}] {task.description} (deps: {deps})")
83
84 # Phase 2: Execution
85 if callbacks.get("on_phase"):
86 callbacks["on_phase"]("execution")
87
88 await self._execution_loop(context, callbacks)
89
90 # Phase 3: Summary
91 summary = self._create_summary()
92
93 if callbacks.get("on_complete"):
94 callbacks["on_complete"](summary)
95
96 return summary
97
98 except Exception as e:
99 error_summary = {
100 "success": False,
101 "error": str(e),
102 "plan": self.current_plan,
103 "results": self.execution_history
104 }
105 if callbacks.get("on_error"):
106 callbacks["on_error"](e)
107 return error_summary
108
109 async def _execution_loop(
110 self,
111 context: dict,
112 callbacks: dict
113 ) -> None:
114 """Main execution loop."""
115 iteration = 0
116 max_iterations = self.config["max_iterations"]
117
118 while iteration < max_iterations:
119 iteration += 1
120
121 # Get ready tasks
122 ready_tasks = self.current_plan.get_ready_tasks()
123
124 if not ready_tasks:
125 if self._is_plan_complete():
126 break
127 continue
128
129 # Notify
130 if callbacks.get("on_iteration"):
131 callbacks["on_iteration"](iteration, ready_tasks)
132
133 # Execute
134 results = await self._execute_tasks(ready_tasks, context)
135 self.execution_history.extend(results)
136
137 # Notify results
138 for result in results:
139 task = self.current_plan.tasks.get(result.task_id)
140 if callbacks.get("on_task_complete"):
141 callbacks["on_task_complete"](task, result)
142
143 print(f" {'β' if result.success else 'β'} {task.description if task else result.task_id}")
144
145 # Reflect and potentially replan
146 if self._should_reflect(iteration, results):
147 await self._reflect_and_adapt(context)
148
149 # Progress
150 progress = self.current_plan.get_progress()
151 print(f"\nProgress: {progress['percentage']}%\n")
152
153 async def _execute_tasks(
154 self,
155 tasks: list[Task],
156 context: dict
157 ) -> list[ExecutionResult]:
158 """Execute a batch of tasks."""
159 if self.config["parallel_execution"] and len(tasks) > 1:
160 coros = [self.executor.execute(t, context) for t in tasks]
161 return await asyncio.gather(*coros)
162 else:
163 results = []
164 for task in tasks:
165 result = await self.executor.execute(task, context)
166 results.append(result)
167 return results
168
169 def _should_reflect(
170 self,
171 iteration: int,
172 results: list[ExecutionResult]
173 ) -> bool:
174 """Determine if reflection is needed."""
175 # Reflect on failures
176 if any(not r.success for r in results):
177 return True
178
179 # Periodic reflection
180 if iteration % self.config["reflection_frequency"] == 0:
181 return True
182
183 return False
184
185 async def _reflect_and_adapt(self, context: dict) -> None:
186 """Perform reflection and adapt plan if needed."""
187 reflection = await self.reflector.reflect(
188 self.current_plan,
189 self.execution_history
190 )
191 self.reflections.append(reflection)
192
193 print(f"\nReflection: quality={reflection.quality_score:.2f}")
194 if reflection.issues:
195 print(f" Issues: {reflection.issues}")
196 if reflection.suggestions:
197 print(f" Suggestions: {reflection.suggestions}")
198
199 if reflection.should_replan:
200 if self.replan_count < self.config["max_replan_attempts"]:
201 await self._replan(reflection, context)
202 else:
203 print(" Max replan attempts reached, continuing with current plan")
204
205 async def _replan(
206 self,
207 reflection: ReflectionResult,
208 context: dict
209 ) -> None:
210 """Replan based on reflection."""
211 self.replan_count += 1
212
213 failed_tasks = [
214 self.current_plan.tasks[r.task_id]
215 for r in self.execution_history
216 if not r.success and r.task_id in self.current_plan.tasks
217 ]
218
219 new_tasks = await self.replanner.replan(
220 self.current_plan,
221 reflection,
222 failed_tasks
223 )
224
225 if new_tasks:
226 print(f"\nReplanning: adding {len(new_tasks)} new tasks")
227 for task in new_tasks:
228 self.current_plan.add_task(task)
229 print(f" + {task.description}")
230
231 def _is_plan_complete(self) -> bool:
232 """Check if plan is complete or stuck."""
233 progress = self.current_plan.get_progress()
234
235 # All done
236 if progress["completed"] == progress["total"]:
237 return True
238
239 # All non-completed tasks are failed or blocked
240 active_statuses = [TaskStatus.PENDING, TaskStatus.READY, TaskStatus.IN_PROGRESS]
241 has_active = any(
242 t.status in active_statuses
243 for t in self.current_plan.tasks.values()
244 )
245
246 return not has_active
247
248 def _create_summary(self) -> dict:
249 """Create execution summary."""
250 progress = self.current_plan.get_progress()
251
252 successful_results = [r for r in self.execution_history if r.success]
253 failed_results = [r for r in self.execution_history if not r.success]
254
255 total_duration = sum(r.duration_seconds for r in self.execution_history)
256
257 return {
258 "success": progress["completed"] == progress["total"],
259 "goal": self.current_plan.goal,
260 "progress": progress,
261 "tasks_executed": len(self.execution_history),
262 "tasks_successful": len(successful_results),
263 "tasks_failed": len(failed_results),
264 "replans": self.replan_count,
265 "reflections": len(self.reflections),
266 "total_duration_seconds": total_duration,
267 "final_quality_score": self.reflections[-1].quality_score if self.reflections else None,
268 "plan": self.current_plan,
269 "results": self.execution_history
270 }
271
272 def get_task_results(self) -> dict[str, Any]:
273 """Get results organized by task."""
274 results_by_task = {}
275 for result in self.execution_history:
276 task = self.current_plan.tasks.get(result.task_id)
277 if task:
278 results_by_task[task.description] = {
279 "success": result.success,
280 "result": result.result,
281 "error": result.error
282 }
283 return results_by_taskTesting the Planner
Let's test our planner agent with a concrete example:
1import asyncio
2
3# Define some tools the agent can use
4async def search_web(query: str) -> str:
5 """Simulate web search."""
6 return f"Search results for: {query}"
7
8async def read_file(path: str) -> str:
9 """Simulate file reading."""
10 return f"Contents of {path}"
11
12async def write_file(path: str, content: str) -> str:
13 """Simulate file writing."""
14 return f"Wrote {len(content)} chars to {path}"
15
16async def run_tests(test_path: str) -> dict:
17 """Simulate running tests."""
18 return {"passed": 5, "failed": 0, "total": 5}
19
20# Create the agent
21tools = {
22 "search_web": search_web,
23 "read_file": read_file,
24 "write_file": write_file,
25 "run_tests": run_tests
26}
27
28agent = PlannerAgent(
29 tools=tools,
30 config={
31 "max_iterations": 30,
32 "parallel_execution": True,
33 "reflection_frequency": 3,
34 "max_replan_attempts": 2
35 }
36)
37
38# Define callbacks for monitoring
39def on_phase(phase: str):
40 print(f"\n{'='*40}")
41 print(f"Phase: {phase.upper()}")
42 print(f"{'='*40}")
43
44def on_task_complete(task, result):
45 status = "SUCCESS" if result.success else "FAILED"
46 print(f" [{status}] {task.description}")
47
48def on_complete(summary):
49 print(f"\n{'='*60}")
50 print("EXECUTION COMPLETE")
51 print(f"{'='*60}")
52 print(f"Success: {summary['success']}")
53 print(f"Tasks: {summary['tasks_successful']}/{summary['tasks_executed']} successful")
54 print(f"Duration: {summary['total_duration_seconds']:.2f}s")
55 print(f"Replans: {summary['replans']}")
56
57async def main():
58 goal = """
59 Create a Python utility library with the following:
60 1. A function to validate email addresses
61 2. A function to format phone numbers
62 3. A function to generate random passwords
63 4. Unit tests for all functions
64 5. Documentation for the library
65 """
66
67 context = {
68 "language": "Python",
69 "testing_framework": "pytest",
70 "output_directory": "./utils"
71 }
72
73 summary = await agent.run(
74 goal=goal,
75 context=context,
76 callbacks={
77 "on_phase": on_phase,
78 "on_task_complete": on_task_complete,
79 "on_complete": on_complete
80 }
81 )
82
83 # Print detailed results
84 print("\nTask Results:")
85 for desc, result in agent.get_task_results().items():
86 status = "β" if result["success"] else "β"
87 print(f" {status} {desc}")
88
89asyncio.run(main())Expected Output
1========================================
2Phase: PLANNING
3========================================
4
5============================================================
6GOAL: Create a Python utility library...
7============================================================
8
9Created plan with 8 tasks:
10 [t1] Design library structure and API (deps: none)
11 [t2] Implement email validation function (deps: t1)
12 [t3] Implement phone number formatting function (deps: t1)
13 [t4] Implement password generation function (deps: t1)
14 [t5] Write unit tests for email validation (deps: t2)
15 [t6] Write unit tests for phone formatting (deps: t3)
16 [t7] Write unit tests for password generation (deps: t4)
17 [t8] Create documentation (deps: t5, t6, t7)
18
19========================================
20Phase: EXECUTION
21========================================
22
23 [SUCCESS] Design library structure and API
24
25Progress: 12.5%
26
27 [SUCCESS] Implement email validation function
28 [SUCCESS] Implement phone number formatting function
29 [SUCCESS] Implement password generation function
30
31Progress: 50.0%
32
33Reflection: quality=0.85
34 Suggestions: ['Consider adding type hints']
35
36 [SUCCESS] Write unit tests for email validation
37 [SUCCESS] Write unit tests for phone formatting
38 [SUCCESS] Write unit tests for password generation
39
40Progress: 87.5%
41
42 [SUCCESS] Create documentation
43
44Progress: 100.0%
45
46============================================================
47EXECUTION COMPLETE
48============================================================
49Success: True
50Tasks: 8/8 successful
51Duration: 4.32s
52Replans: 0Chapter Summary
This chapter has taken you through the complete journey of planning and reasoning for AI agents. We covered:
- Task Decomposition: Breaking complex goals into manageable, executable subtasks with dependency tracking
- Hierarchical Planning: Organizing tasks at multiple abstraction levels for both strategic and tactical execution
- Self-Reflection: Evaluating plan quality, detecting issues, and improving outputs through iterative refinement
- Chain-of-Thought: Making reasoning explicit for better problem-solving and debuggability
- Tree of Thoughts: Exploring multiple reasoning paths to find optimal solutions
- Complete Planner Agent: Integrating all components into a production-ready system
Chapter Complete: You now have a comprehensive understanding of planning and reasoning for AI agents. The techniques covered hereβdecomposition, hierarchy, reflection, and searchβform the cognitive foundation that enables agents to tackle complex, real-world problems systematically.
In the next chapter, we'll move from single agents to multi-agent systemsβbuilding teams of specialized agents that collaborate to achieve goals beyond what any single agent could accomplish alone.