Chapter 10
22 min read
Section 64 of 175

Implementing a Planner Agent

Planning and Reasoning

Introduction

Throughout this chapter, we've explored individual planning techniques: task decomposition, hierarchical planning, self-reflection, chain-of-thought reasoning, and tree of thoughts. Now it's time to bring them together into a complete Planner Agentβ€”an intelligent system that can take complex goals, break them down, execute plans, monitor progress, and adapt when things don't go as expected.

This section provides a production-ready implementation that you can use as a foundation for building sophisticated planning agents.

Goal: Build a complete planner agent that combines decomposition, execution, reflection, and adaptation into a coherent system capable of handling real-world complexity.

Planner Agent Architecture

Our planner agent follows a modular architecture with clear separation of concerns:

ComponentResponsibilityKey Methods
TaskDecomposerBreak goals into executable tasksdecompose(), refine()
PlanManagerTrack plan state and dependenciesadd_task(), get_ready_tasks()
ExecutorExecute individual tasksexecute(), handle_failure()
ReflectorEvaluate quality and progressreflect(), assess_plan()
ReplannerAdapt plans when neededreplan(), adjust_strategy()
PlannerAgentOrchestrate the full workflowrun(), step()

Information Flow

πŸ“text
1β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
2β”‚                      PlannerAgent                           β”‚
3β”‚                                                             β”‚
4β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
5β”‚  β”‚   Goal       │───▢│ Decomposer   │───▢│ PlanManager  β”‚  β”‚
6β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
7β”‚                                                  β”‚          β”‚
8β”‚                                                  β–Ό          β”‚
9β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
10β”‚  β”‚  Replanner   │◀───│  Reflector   │◀───│  Executor    β”‚  β”‚
11β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
12β”‚         β”‚                                        β–²          β”‚
13β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
14β”‚                    (if replanning needed)                   β”‚
15β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

Let's implement each component with clear interfaces:

Data Models

🐍python
1from dataclasses import dataclass, field
2from typing import Any, Optional, Callable
3from enum import Enum
4from datetime import datetime
5import uuid
6
7class TaskStatus(Enum):
8    PENDING = "pending"
9    READY = "ready"
10    IN_PROGRESS = "in_progress"
11    COMPLETED = "completed"
12    FAILED = "failed"
13    BLOCKED = "blocked"
14    CANCELLED = "cancelled"
15
16class TaskPriority(Enum):
17    LOW = 1
18    MEDIUM = 2
19    HIGH = 3
20    CRITICAL = 4
21
22@dataclass
23class Task:
24    """A single task in the plan."""
25    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
26    description: str = ""
27    status: TaskStatus = TaskStatus.PENDING
28    priority: TaskPriority = TaskPriority.MEDIUM
29    dependencies: list[str] = field(default_factory=list)
30    result: Any = None
31    error: Optional[str] = None
32    attempts: int = 0
33    max_attempts: int = 3
34    created_at: datetime = field(default_factory=datetime.now)
35    completed_at: Optional[datetime] = None
36    metadata: dict = field(default_factory=dict)
37
38    def is_ready(self, completed_tasks: set[str]) -> bool:
39        """Check if all dependencies are satisfied."""
40        return all(dep in completed_tasks for dep in self.dependencies)
41
42@dataclass
43class Plan:
44    """A complete plan with tasks and metadata."""
45    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
46    goal: str = ""
47    tasks: dict[str, Task] = field(default_factory=dict)
48    execution_order: list[str] = field(default_factory=list)
49    created_at: datetime = field(default_factory=datetime.now)
50    status: str = "active"
51    metadata: dict = field(default_factory=dict)
52
53    def add_task(self, task: Task) -> None:
54        """Add a task to the plan."""
55        self.tasks[task.id] = task
56        if task.id not in self.execution_order:
57            self.execution_order.append(task.id)
58
59    def get_completed_task_ids(self) -> set[str]:
60        """Get IDs of completed tasks."""
61        return {
62            tid for tid, task in self.tasks.items()
63            if task.status == TaskStatus.COMPLETED
64        }
65
66    def get_ready_tasks(self) -> list[Task]:
67        """Get tasks ready for execution."""
68        completed = self.get_completed_task_ids()
69        ready = []
70        for task in self.tasks.values():
71            if task.status == TaskStatus.PENDING and task.is_ready(completed):
72                ready.append(task)
73        return sorted(ready, key=lambda t: -t.priority.value)
74
75    def get_progress(self) -> dict:
76        """Get plan progress statistics."""
77        total = len(self.tasks)
78        if total == 0:
79            return {"total": 0, "completed": 0, "percentage": 0}
80
81        by_status = {}
82        for task in self.tasks.values():
83            status = task.status.value
84            by_status[status] = by_status.get(status, 0) + 1
85
86        completed = by_status.get("completed", 0)
87        return {
88            "total": total,
89            "by_status": by_status,
90            "completed": completed,
91            "percentage": round(100 * completed / total, 1)
92        }
93
94@dataclass
95class ExecutionResult:
96    """Result of executing a task."""
97    task_id: str
98    success: bool
99    result: Any = None
100    error: Optional[str] = None
101    duration_seconds: float = 0.0
102
103@dataclass
104class ReflectionResult:
105    """Result of reflecting on progress."""
106    quality_score: float
107    issues: list[str]
108    suggestions: list[str]
109    should_replan: bool
110    reasoning: str

Task Decomposer

🐍python
1from anthropic import Anthropic
2import json
3
4class TaskDecomposer:
5    """Decomposes goals into executable tasks."""
6
7    def __init__(self, model: str = "claude-sonnet-4-20250514"):
8        self.client = Anthropic()
9        self.model = model
10
11    async def decompose(
12        self,
13        goal: str,
14        context: dict = None,
15        constraints: list[str] = None
16    ) -> list[Task]:
17        """Decompose a goal into tasks."""
18
19        prompt = f"""Decompose this goal into concrete, executable tasks.
20
21Goal: {goal}
22Context: {json.dumps(context) if context else 'None'}
23Constraints: {constraints if constraints else 'None'}
24
25Requirements:
261. Each task should be completable in a single focused effort
272. Tasks should have clear success criteria
283. Identify dependencies between tasks
294. Estimate priority (low/medium/high/critical)
30
31Return JSON:
32{{
33    "tasks": [
34        {{
35            "id": "unique_short_id",
36            "description": "what to do",
37            "dependencies": ["ids of prerequisite tasks"],
38            "priority": "low|medium|high|critical",
39            "estimated_effort": "quick|moderate|substantial"
40        }}
41    ]
42}}"""
43
44        response = self.client.messages.create(
45            model=self.model,
46            max_tokens=4096,
47            messages=[{"role": "user", "content": prompt}]
48        )
49
50        result = json.loads(response.content[0].text)
51        tasks = []
52
53        for task_data in result["tasks"]:
54            priority_map = {
55                "low": TaskPriority.LOW,
56                "medium": TaskPriority.MEDIUM,
57                "high": TaskPriority.HIGH,
58                "critical": TaskPriority.CRITICAL
59            }
60
61            task = Task(
62                id=task_data["id"],
63                description=task_data["description"],
64                dependencies=task_data.get("dependencies", []),
65                priority=priority_map.get(task_data.get("priority", "medium"), TaskPriority.MEDIUM),
66                metadata={"estimated_effort": task_data.get("estimated_effort", "moderate")}
67            )
68            tasks.append(task)
69
70        return tasks
71
72    async def refine_task(self, task: Task, feedback: str) -> list[Task]:
73        """Further decompose a task based on feedback."""
74
75        prompt = f"""This task needs to be broken down further.
76
77Task: {task.description}
78Feedback: {feedback}
79
80Break this into smaller, more specific subtasks.
81
82Return JSON:
83{{
84    "subtasks": [
85        {{"id": "...", "description": "...", "dependencies": []}}
86    ]
87}}"""
88
89        response = self.client.messages.create(
90            model=self.model,
91            max_tokens=2048,
92            messages=[{"role": "user", "content": prompt}]
93        )
94
95        result = json.loads(response.content[0].text)
96
97        subtasks = []
98        for st_data in result["subtasks"]:
99            subtask = Task(
100                id=f"{task.id}_{st_data['id']}",
101                description=st_data["description"],
102                dependencies=[f"{task.id}_{d}" for d in st_data.get("dependencies", [])],
103                priority=task.priority
104            )
105            subtasks.append(subtask)
106
107        return subtasks

Task Executor

🐍python
1import time
2import asyncio
3
4class TaskExecutor:
5    """Executes individual tasks."""
6
7    def __init__(
8        self,
9        tools: dict[str, Callable] = None,
10        model: str = "claude-sonnet-4-20250514"
11    ):
12        self.tools = tools or {}
13        self.client = Anthropic()
14        self.model = model
15
16    async def execute(
17        self,
18        task: Task,
19        context: dict = None
20    ) -> ExecutionResult:
21        """Execute a single task."""
22        start_time = time.time()
23
24        try:
25            # Mark as in progress
26            task.status = TaskStatus.IN_PROGRESS
27            task.attempts += 1
28
29            # Determine execution strategy
30            if self._requires_tool(task):
31                result = await self._execute_with_tools(task, context)
32            else:
33                result = await self._execute_with_llm(task, context)
34
35            # Mark completed
36            task.status = TaskStatus.COMPLETED
37            task.result = result
38            task.completed_at = datetime.now()
39
40            return ExecutionResult(
41                task_id=task.id,
42                success=True,
43                result=result,
44                duration_seconds=time.time() - start_time
45            )
46
47        except Exception as e:
48            task.error = str(e)
49
50            if task.attempts >= task.max_attempts:
51                task.status = TaskStatus.FAILED
52            else:
53                task.status = TaskStatus.PENDING  # Will retry
54
55            return ExecutionResult(
56                task_id=task.id,
57                success=False,
58                error=str(e),
59                duration_seconds=time.time() - start_time
60            )
61
62    def _requires_tool(self, task: Task) -> bool:
63        """Check if task requires specific tools."""
64        tool_keywords = ["search", "fetch", "run", "execute", "calculate"]
65        return any(kw in task.description.lower() for kw in tool_keywords)
66
67    async def _execute_with_tools(
68        self,
69        task: Task,
70        context: dict
71    ) -> Any:
72        """Execute task using available tools."""
73
74        prompt = f"""Execute this task using available tools.
75
76Task: {task.description}
77Context: {json.dumps(context) if context else 'None'}
78Available tools: {list(self.tools.keys())}
79
80Determine which tool to use and what arguments to pass.
81Return JSON:
82{{"tool": "tool_name", "arguments": {{...}}}}"""
83
84        response = self.client.messages.create(
85            model=self.model,
86            max_tokens=1024,
87            messages=[{"role": "user", "content": prompt}]
88        )
89
90        tool_call = json.loads(response.content[0].text)
91        tool_name = tool_call["tool"]
92        arguments = tool_call.get("arguments", {})
93
94        if tool_name not in self.tools:
95            raise ValueError(f"Tool not found: {tool_name}")
96
97        tool_fn = self.tools[tool_name]
98        result = await tool_fn(**arguments) if asyncio.iscoroutinefunction(tool_fn) else tool_fn(**arguments)
99
100        return result
101
102    async def _execute_with_llm(
103        self,
104        task: Task,
105        context: dict
106    ) -> str:
107        """Execute task using pure LLM reasoning."""
108
109        prompt = f"""Complete this task thoroughly.
110
111Task: {task.description}
112Context: {json.dumps(context) if context else 'None'}
113
114Think step by step and provide a complete result."""
115
116        response = self.client.messages.create(
117            model=self.model,
118            max_tokens=4096,
119            messages=[{"role": "user", "content": prompt}]
120        )
121
122        return response.content[0].text

Reflector

🐍python
1class PlanReflector:
2    """Reflects on plan quality and progress."""
3
4    def __init__(self, model: str = "claude-sonnet-4-20250514"):
5        self.client = Anthropic()
6        self.model = model
7
8    async def reflect(
9        self,
10        plan: Plan,
11        recent_results: list[ExecutionResult]
12    ) -> ReflectionResult:
13        """Reflect on plan progress and quality."""
14
15        progress = plan.get_progress()
16
17        # Summarize recent execution
18        recent_summary = []
19        for result in recent_results[-5:]:  # Last 5 results
20            task = plan.tasks.get(result.task_id)
21            status = "βœ“" if result.success else "βœ—"
22            recent_summary.append(
23                f"{status} {task.description if task else result.task_id}: "
24                f"{result.result if result.success else result.error}"
25            )
26
27        prompt = f"""Evaluate the current state of this plan.
28
29Goal: {plan.goal}
30
31Progress: {progress['percentage']}% complete ({progress['completed']}/{progress['total']} tasks)
32
33Task status breakdown: {progress['by_status']}
34
35Recent executions:
36{chr(10).join(recent_summary)}
37
38Remaining tasks:
39{chr(10).join(f"- {t.description}" for t in plan.tasks.values() if t.status == TaskStatus.PENDING)}
40
41Evaluate:
421. Quality of progress so far (0-100)
432. Any issues or blockers
443. Suggestions for improvement
454. Whether replanning is needed
46
47Return JSON:
48{{
49    "quality_score": 0-100,
50    "issues": ["list of problems"],
51    "suggestions": ["improvement ideas"],
52    "should_replan": true/false,
53    "reasoning": "explanation"
54}}"""
55
56        response = self.client.messages.create(
57            model=self.model,
58            max_tokens=1024,
59            messages=[{"role": "user", "content": prompt}]
60        )
61
62        result = json.loads(response.content[0].text)
63
64        return ReflectionResult(
65            quality_score=result["quality_score"] / 100.0,
66            issues=result["issues"],
67            suggestions=result["suggestions"],
68            should_replan=result["should_replan"],
69            reasoning=result["reasoning"]
70        )

Replanner

🐍python
1class PlanReplanner:
2    """Adapts plans based on feedback and failures."""
3
4    def __init__(self, model: str = "claude-sonnet-4-20250514"):
5        self.client = Anthropic()
6        self.model = model
7
8    async def replan(
9        self,
10        plan: Plan,
11        reflection: ReflectionResult,
12        failed_tasks: list[Task]
13    ) -> list[Task]:
14        """Generate new or modified tasks based on reflection."""
15
16        completed_tasks = [
17            t.description for t in plan.tasks.values()
18            if t.status == TaskStatus.COMPLETED
19        ]
20
21        pending_tasks = [
22            t.description for t in plan.tasks.values()
23            if t.status == TaskStatus.PENDING
24        ]
25
26        failed_descriptions = [t.description for t in failed_tasks]
27
28        prompt = f"""Revise the plan based on current progress and issues.
29
30Original Goal: {plan.goal}
31
32Completed tasks:
33{chr(10).join(f"- {t}" for t in completed_tasks) or "None yet"}
34
35Failed tasks:
36{chr(10).join(f"- {t}" for t in failed_descriptions) or "None"}
37
38Current issues:
39{chr(10).join(f"- {i}" for i in reflection.issues)}
40
41Suggestions:
42{chr(10).join(f"- {s}" for s in reflection.suggestions)}
43
44Currently pending tasks:
45{chr(10).join(f"- {t}" for t in pending_tasks)}
46
47Create new or revised tasks that:
481. Work around failed tasks if possible
492. Address the identified issues
503. Still achieve the original goal
514. Build on what's already completed
52
53Return JSON:
54{{
55    "remove_tasks": ["descriptions of tasks to remove"],
56    "new_tasks": [
57        {{"id": "...", "description": "...", "dependencies": [], "priority": "..."}}
58    ],
59    "reasoning": "explanation of changes"
60}}"""
61
62        response = self.client.messages.create(
63            model=self.model,
64            max_tokens=4096,
65            messages=[{"role": "user", "content": prompt}]
66        )
67
68        result = json.loads(response.content[0].text)
69
70        # Create new task objects
71        new_tasks = []
72        for task_data in result["new_tasks"]:
73            priority_map = {
74                "low": TaskPriority.LOW,
75                "medium": TaskPriority.MEDIUM,
76                "high": TaskPriority.HIGH,
77                "critical": TaskPriority.CRITICAL
78            }
79
80            task = Task(
81                id=task_data["id"],
82                description=task_data["description"],
83                dependencies=task_data.get("dependencies", []),
84                priority=priority_map.get(task_data.get("priority", "medium"), TaskPriority.MEDIUM),
85                metadata={"source": "replanning"}
86            )
87            new_tasks.append(task)
88
89        return new_tasks

The Planning-Execution Loop

The core of the planner agent is its execution loop, which coordinates all components:

🐍python
1class PlanningLoop:
2    """
3    Orchestrates the plan-execute-reflect-replan cycle.
4    """
5
6    def __init__(
7        self,
8        decomposer: TaskDecomposer,
9        executor: TaskExecutor,
10        reflector: PlanReflector,
11        replanner: PlanReplanner
12    ):
13        self.decomposer = decomposer
14        self.executor = executor
15        self.reflector = reflector
16        self.replanner = replanner
17
18        self.execution_results: list[ExecutionResult] = []
19        self.reflection_history: list[ReflectionResult] = []
20
21    async def run(
22        self,
23        goal: str,
24        context: dict = None,
25        max_iterations: int = 50,
26        parallel_execution: bool = True
27    ) -> tuple[Plan, list[ExecutionResult]]:
28        """Run the complete planning loop."""
29
30        # Phase 1: Decompose goal into plan
31        print(f"Decomposing goal: {goal}")
32        tasks = await self.decomposer.decompose(goal, context)
33
34        plan = Plan(goal=goal)
35        for task in tasks:
36            plan.add_task(task)
37
38        print(f"Created plan with {len(tasks)} tasks")
39
40        # Phase 2: Execute with reflection
41        iteration = 0
42        while iteration < max_iterations:
43            iteration += 1
44
45            # Get ready tasks
46            ready_tasks = plan.get_ready_tasks()
47
48            if not ready_tasks:
49                # Check if done or blocked
50                progress = plan.get_progress()
51                if progress["completed"] == progress["total"]:
52                    print("Plan completed successfully!")
53                    break
54                elif all(t.status in [TaskStatus.FAILED, TaskStatus.BLOCKED, TaskStatus.COMPLETED]
55                        for t in plan.tasks.values()):
56                    print("Plan blocked - no more executable tasks")
57                    break
58                else:
59                    continue
60
61            # Execute ready tasks
62            if parallel_execution and len(ready_tasks) > 1:
63                results = await self._execute_parallel(ready_tasks, context)
64            else:
65                results = await self._execute_sequential(ready_tasks, context)
66
67            self.execution_results.extend(results)
68
69            # Reflect periodically
70            if iteration % 5 == 0 or any(not r.success for r in results):
71                reflection = await self.reflector.reflect(plan, self.execution_results)
72                self.reflection_history.append(reflection)
73
74                print(f"Reflection: quality={reflection.quality_score:.2f}, "
75                      f"replan={reflection.should_replan}")
76
77                # Replan if needed
78                if reflection.should_replan:
79                    failed_tasks = [
80                        plan.tasks[r.task_id] for r in results
81                        if not r.success and r.task_id in plan.tasks
82                    ]
83
84                    new_tasks = await self.replanner.replan(
85                        plan, reflection, failed_tasks
86                    )
87
88                    for task in new_tasks:
89                        plan.add_task(task)
90
91                    print(f"Replanned: added {len(new_tasks)} new tasks")
92
93            # Progress report
94            progress = plan.get_progress()
95            print(f"Progress: {progress['percentage']}%")
96
97        return plan, self.execution_results
98
99    async def _execute_parallel(
100        self,
101        tasks: list[Task],
102        context: dict
103    ) -> list[ExecutionResult]:
104        """Execute multiple tasks in parallel."""
105        coros = [self.executor.execute(task, context) for task in tasks]
106        return await asyncio.gather(*coros)
107
108    async def _execute_sequential(
109        self,
110        tasks: list[Task],
111        context: dict
112    ) -> list[ExecutionResult]:
113        """Execute tasks one by one."""
114        results = []
115        for task in tasks:
116            result = await self.executor.execute(task, context)
117            results.append(result)
118        return results

Complete Implementation

Here's the complete PlannerAgent that ties everything together:

🐍python
1from anthropic import Anthropic
2from dataclasses import dataclass, field
3from typing import Any, Optional, Callable, Dict, List
4from enum import Enum
5from datetime import datetime
6import json
7import asyncio
8import uuid
9
10class PlannerAgent:
11    """
12    Complete Planner Agent implementation.
13
14    Combines task decomposition, execution, reflection,
15    and replanning into a cohesive planning system.
16    """
17
18    def __init__(
19        self,
20        tools: dict[str, Callable] = None,
21        model: str = "claude-sonnet-4-20250514",
22        config: dict = None
23    ):
24        self.model = model
25        self.tools = tools or {}
26        self.config = config or {
27            "max_iterations": 50,
28            "parallel_execution": True,
29            "reflection_frequency": 5,
30            "max_replan_attempts": 3
31        }
32
33        # Initialize components
34        self.decomposer = TaskDecomposer(model=model)
35        self.executor = TaskExecutor(tools=tools, model=model)
36        self.reflector = PlanReflector(model=model)
37        self.replanner = PlanReplanner(model=model)
38
39        # State
40        self.current_plan: Optional[Plan] = None
41        self.execution_history: list[ExecutionResult] = []
42        self.reflections: list[ReflectionResult] = []
43        self.replan_count = 0
44
45    async def run(
46        self,
47        goal: str,
48        context: dict = None,
49        callbacks: dict = None
50    ) -> dict:
51        """
52        Run the planner agent to achieve a goal.
53
54        Args:
55            goal: The goal to achieve
56            context: Additional context for planning
57            callbacks: Optional callbacks for progress updates
58
59        Returns:
60            dict with plan, results, and summary
61        """
62        callbacks = callbacks or {}
63
64        try:
65            # Phase 1: Planning
66            if callbacks.get("on_phase"):
67                callbacks["on_phase"]("planning")
68
69            print(f"\n{'='*60}")
70            print(f"GOAL: {goal}")
71            print(f"{'='*60}\n")
72
73            tasks = await self.decomposer.decompose(goal, context)
74            self.current_plan = Plan(goal=goal)
75
76            for task in tasks:
77                self.current_plan.add_task(task)
78
79            print(f"Created plan with {len(tasks)} tasks:")
80            for task in tasks:
81                deps = ", ".join(task.dependencies) or "none"
82                print(f"  [{task.id}] {task.description} (deps: {deps})")
83
84            # Phase 2: Execution
85            if callbacks.get("on_phase"):
86                callbacks["on_phase"]("execution")
87
88            await self._execution_loop(context, callbacks)
89
90            # Phase 3: Summary
91            summary = self._create_summary()
92
93            if callbacks.get("on_complete"):
94                callbacks["on_complete"](summary)
95
96            return summary
97
98        except Exception as e:
99            error_summary = {
100                "success": False,
101                "error": str(e),
102                "plan": self.current_plan,
103                "results": self.execution_history
104            }
105            if callbacks.get("on_error"):
106                callbacks["on_error"](e)
107            return error_summary
108
109    async def _execution_loop(
110        self,
111        context: dict,
112        callbacks: dict
113    ) -> None:
114        """Main execution loop."""
115        iteration = 0
116        max_iterations = self.config["max_iterations"]
117
118        while iteration < max_iterations:
119            iteration += 1
120
121            # Get ready tasks
122            ready_tasks = self.current_plan.get_ready_tasks()
123
124            if not ready_tasks:
125                if self._is_plan_complete():
126                    break
127                continue
128
129            # Notify
130            if callbacks.get("on_iteration"):
131                callbacks["on_iteration"](iteration, ready_tasks)
132
133            # Execute
134            results = await self._execute_tasks(ready_tasks, context)
135            self.execution_history.extend(results)
136
137            # Notify results
138            for result in results:
139                task = self.current_plan.tasks.get(result.task_id)
140                if callbacks.get("on_task_complete"):
141                    callbacks["on_task_complete"](task, result)
142
143                print(f"  {'βœ“' if result.success else 'βœ—'} {task.description if task else result.task_id}")
144
145            # Reflect and potentially replan
146            if self._should_reflect(iteration, results):
147                await self._reflect_and_adapt(context)
148
149            # Progress
150            progress = self.current_plan.get_progress()
151            print(f"\nProgress: {progress['percentage']}%\n")
152
153    async def _execute_tasks(
154        self,
155        tasks: list[Task],
156        context: dict
157    ) -> list[ExecutionResult]:
158        """Execute a batch of tasks."""
159        if self.config["parallel_execution"] and len(tasks) > 1:
160            coros = [self.executor.execute(t, context) for t in tasks]
161            return await asyncio.gather(*coros)
162        else:
163            results = []
164            for task in tasks:
165                result = await self.executor.execute(task, context)
166                results.append(result)
167            return results
168
169    def _should_reflect(
170        self,
171        iteration: int,
172        results: list[ExecutionResult]
173    ) -> bool:
174        """Determine if reflection is needed."""
175        # Reflect on failures
176        if any(not r.success for r in results):
177            return True
178
179        # Periodic reflection
180        if iteration % self.config["reflection_frequency"] == 0:
181            return True
182
183        return False
184
185    async def _reflect_and_adapt(self, context: dict) -> None:
186        """Perform reflection and adapt plan if needed."""
187        reflection = await self.reflector.reflect(
188            self.current_plan,
189            self.execution_history
190        )
191        self.reflections.append(reflection)
192
193        print(f"\nReflection: quality={reflection.quality_score:.2f}")
194        if reflection.issues:
195            print(f"  Issues: {reflection.issues}")
196        if reflection.suggestions:
197            print(f"  Suggestions: {reflection.suggestions}")
198
199        if reflection.should_replan:
200            if self.replan_count < self.config["max_replan_attempts"]:
201                await self._replan(reflection, context)
202            else:
203                print("  Max replan attempts reached, continuing with current plan")
204
205    async def _replan(
206        self,
207        reflection: ReflectionResult,
208        context: dict
209    ) -> None:
210        """Replan based on reflection."""
211        self.replan_count += 1
212
213        failed_tasks = [
214            self.current_plan.tasks[r.task_id]
215            for r in self.execution_history
216            if not r.success and r.task_id in self.current_plan.tasks
217        ]
218
219        new_tasks = await self.replanner.replan(
220            self.current_plan,
221            reflection,
222            failed_tasks
223        )
224
225        if new_tasks:
226            print(f"\nReplanning: adding {len(new_tasks)} new tasks")
227            for task in new_tasks:
228                self.current_plan.add_task(task)
229                print(f"  + {task.description}")
230
231    def _is_plan_complete(self) -> bool:
232        """Check if plan is complete or stuck."""
233        progress = self.current_plan.get_progress()
234
235        # All done
236        if progress["completed"] == progress["total"]:
237            return True
238
239        # All non-completed tasks are failed or blocked
240        active_statuses = [TaskStatus.PENDING, TaskStatus.READY, TaskStatus.IN_PROGRESS]
241        has_active = any(
242            t.status in active_statuses
243            for t in self.current_plan.tasks.values()
244        )
245
246        return not has_active
247
248    def _create_summary(self) -> dict:
249        """Create execution summary."""
250        progress = self.current_plan.get_progress()
251
252        successful_results = [r for r in self.execution_history if r.success]
253        failed_results = [r for r in self.execution_history if not r.success]
254
255        total_duration = sum(r.duration_seconds for r in self.execution_history)
256
257        return {
258            "success": progress["completed"] == progress["total"],
259            "goal": self.current_plan.goal,
260            "progress": progress,
261            "tasks_executed": len(self.execution_history),
262            "tasks_successful": len(successful_results),
263            "tasks_failed": len(failed_results),
264            "replans": self.replan_count,
265            "reflections": len(self.reflections),
266            "total_duration_seconds": total_duration,
267            "final_quality_score": self.reflections[-1].quality_score if self.reflections else None,
268            "plan": self.current_plan,
269            "results": self.execution_history
270        }
271
272    def get_task_results(self) -> dict[str, Any]:
273        """Get results organized by task."""
274        results_by_task = {}
275        for result in self.execution_history:
276            task = self.current_plan.tasks.get(result.task_id)
277            if task:
278                results_by_task[task.description] = {
279                    "success": result.success,
280                    "result": result.result,
281                    "error": result.error
282                }
283        return results_by_task

Testing the Planner

Let's test our planner agent with a concrete example:

🐍python
1import asyncio
2
3# Define some tools the agent can use
4async def search_web(query: str) -> str:
5    """Simulate web search."""
6    return f"Search results for: {query}"
7
8async def read_file(path: str) -> str:
9    """Simulate file reading."""
10    return f"Contents of {path}"
11
12async def write_file(path: str, content: str) -> str:
13    """Simulate file writing."""
14    return f"Wrote {len(content)} chars to {path}"
15
16async def run_tests(test_path: str) -> dict:
17    """Simulate running tests."""
18    return {"passed": 5, "failed": 0, "total": 5}
19
20# Create the agent
21tools = {
22    "search_web": search_web,
23    "read_file": read_file,
24    "write_file": write_file,
25    "run_tests": run_tests
26}
27
28agent = PlannerAgent(
29    tools=tools,
30    config={
31        "max_iterations": 30,
32        "parallel_execution": True,
33        "reflection_frequency": 3,
34        "max_replan_attempts": 2
35    }
36)
37
38# Define callbacks for monitoring
39def on_phase(phase: str):
40    print(f"\n{'='*40}")
41    print(f"Phase: {phase.upper()}")
42    print(f"{'='*40}")
43
44def on_task_complete(task, result):
45    status = "SUCCESS" if result.success else "FAILED"
46    print(f"  [{status}] {task.description}")
47
48def on_complete(summary):
49    print(f"\n{'='*60}")
50    print("EXECUTION COMPLETE")
51    print(f"{'='*60}")
52    print(f"Success: {summary['success']}")
53    print(f"Tasks: {summary['tasks_successful']}/{summary['tasks_executed']} successful")
54    print(f"Duration: {summary['total_duration_seconds']:.2f}s")
55    print(f"Replans: {summary['replans']}")
56
57async def main():
58    goal = """
59    Create a Python utility library with the following:
60    1. A function to validate email addresses
61    2. A function to format phone numbers
62    3. A function to generate random passwords
63    4. Unit tests for all functions
64    5. Documentation for the library
65    """
66
67    context = {
68        "language": "Python",
69        "testing_framework": "pytest",
70        "output_directory": "./utils"
71    }
72
73    summary = await agent.run(
74        goal=goal,
75        context=context,
76        callbacks={
77            "on_phase": on_phase,
78            "on_task_complete": on_task_complete,
79            "on_complete": on_complete
80        }
81    )
82
83    # Print detailed results
84    print("\nTask Results:")
85    for desc, result in agent.get_task_results().items():
86        status = "βœ“" if result["success"] else "βœ—"
87        print(f"  {status} {desc}")
88
89asyncio.run(main())

Expected Output

πŸ“text
1========================================
2Phase: PLANNING
3========================================
4
5============================================================
6GOAL: Create a Python utility library...
7============================================================
8
9Created plan with 8 tasks:
10  [t1] Design library structure and API (deps: none)
11  [t2] Implement email validation function (deps: t1)
12  [t3] Implement phone number formatting function (deps: t1)
13  [t4] Implement password generation function (deps: t1)
14  [t5] Write unit tests for email validation (deps: t2)
15  [t6] Write unit tests for phone formatting (deps: t3)
16  [t7] Write unit tests for password generation (deps: t4)
17  [t8] Create documentation (deps: t5, t6, t7)
18
19========================================
20Phase: EXECUTION
21========================================
22
23  [SUCCESS] Design library structure and API
24
25Progress: 12.5%
26
27  [SUCCESS] Implement email validation function
28  [SUCCESS] Implement phone number formatting function
29  [SUCCESS] Implement password generation function
30
31Progress: 50.0%
32
33Reflection: quality=0.85
34  Suggestions: ['Consider adding type hints']
35
36  [SUCCESS] Write unit tests for email validation
37  [SUCCESS] Write unit tests for phone formatting
38  [SUCCESS] Write unit tests for password generation
39
40Progress: 87.5%
41
42  [SUCCESS] Create documentation
43
44Progress: 100.0%
45
46============================================================
47EXECUTION COMPLETE
48============================================================
49Success: True
50Tasks: 8/8 successful
51Duration: 4.32s
52Replans: 0
Add robust error handling, logging, and metrics collection for production deployments. Consider persisting plan state for long-running tasks.

Chapter Summary

This chapter has taken you through the complete journey of planning and reasoning for AI agents. We covered:

  • Task Decomposition: Breaking complex goals into manageable, executable subtasks with dependency tracking
  • Hierarchical Planning: Organizing tasks at multiple abstraction levels for both strategic and tactical execution
  • Self-Reflection: Evaluating plan quality, detecting issues, and improving outputs through iterative refinement
  • Chain-of-Thought: Making reasoning explicit for better problem-solving and debuggability
  • Tree of Thoughts: Exploring multiple reasoning paths to find optimal solutions
  • Complete Planner Agent: Integrating all components into a production-ready system
Chapter Complete: You now have a comprehensive understanding of planning and reasoning for AI agents. The techniques covered hereβ€”decomposition, hierarchy, reflection, and searchβ€”form the cognitive foundation that enables agents to tackle complex, real-world problems systematically.

In the next chapter, we'll move from single agents to multi-agent systemsβ€”building teams of specialized agents that collaborate to achieve goals beyond what any single agent could accomplish alone.