Chapter 10
20 min read
Section 60 of 175

Hierarchical Planning

Planning and Reasoning

Introduction

Task decomposition gives us subtasks, but how do we organize them into coherent plans? Hierarchical planning structures tasks at multiple levels of abstraction—from high-level goals to concrete actions. This mirrors how humans naturally think: "I want to go on vacation" decomposes into "book travel" which further breaks into "search flights" and "compare prices."

In this section, we'll explore how to build agents that plan at multiple abstraction levels, enabling them to handle complex, long-horizon tasks while maintaining flexibility to adapt when things change.

Core Insight: Hierarchical planning separates "what to achieve" from "how to achieve it," allowing agents to reason at the right level of detail for each decision.

Levels of Abstraction

Effective hierarchical plans operate at distinct levels, each serving a different purpose:

LevelFocusExampleTimeframe
StrategicOverall goals and approachBuild a SaaS productMonths
TacticalMajor milestones and phasesLaunch MVP to beta usersWeeks
OperationalSpecific tasks and workflowsImplement user authenticationDays
AtomicIndividual executable actionsWrite login endpointHours

Why Multiple Levels Matter

  • Cognitive load management: Each level deals with appropriate complexity
  • Flexible adaptation: High-level plans remain stable while details change
  • Progress visibility: Stakeholders track at their relevant level
  • Error recovery: Failures at low levels don't invalidate the overall strategy
🐍python
1from dataclasses import dataclass, field
2from enum import Enum
3from typing import Optional
4
5class AbstractionLevel(Enum):
6    STRATEGIC = "strategic"    # Why are we doing this?
7    TACTICAL = "tactical"      # What milestones do we need?
8    OPERATIONAL = "operational"  # What tasks must we complete?
9    ATOMIC = "atomic"          # What single actions do we take?
10
11@dataclass
12class HierarchicalTask:
13    """A task at any abstraction level."""
14    id: str
15    description: str
16    level: AbstractionLevel
17    parent_id: Optional[str] = None
18    children: list[str] = field(default_factory=list)
19    status: str = "pending"
20
21    def is_leaf(self) -> bool:
22        """Leaf tasks are directly executable."""
23        return len(self.children) == 0 or self.level == AbstractionLevel.ATOMIC
24
25@dataclass
26class HierarchicalPlan:
27    """Multi-level plan structure."""
28    goal: str
29    tasks: dict[str, HierarchicalTask] = field(default_factory=dict)
30
31    def add_task(self, task: HierarchicalTask) -> None:
32        """Add task and update parent's children."""
33        self.tasks[task.id] = task
34        if task.parent_id and task.parent_id in self.tasks:
35            parent = self.tasks[task.parent_id]
36            if task.id not in parent.children:
37                parent.children.append(task.id)
38
39    def get_level(self, level: AbstractionLevel) -> list[HierarchicalTask]:
40        """Get all tasks at a specific abstraction level."""
41        return [t for t in self.tasks.values() if t.level == level]
42
43    def get_executable_tasks(self) -> list[HierarchicalTask]:
44        """Get leaf tasks ready for execution."""
45        ready = []
46        for task in self.tasks.values():
47            if not task.is_leaf():
48                continue
49            if task.status != "pending":
50                continue
51
52            # Check if parent allows execution
53            if task.parent_id:
54                parent = self.tasks.get(task.parent_id)
55                if parent and parent.status not in ["pending", "in_progress"]:
56                    continue
57
58            ready.append(task)
59        return ready

Hierarchical Task Networks

Hierarchical Task Networks (HTN) are a classical AI planning formalism. They decompose compound tasks into subtasks using predefined methods until reaching primitive actions.

HTN Components

  1. Tasks: Can be primitive (directly executable) or compound (needs decomposition)
  2. Methods: Rules for decomposing compound tasks into subtask sequences
  3. Preconditions: State conditions required for method application
  4. Effects: State changes after task completion
🐍python
1from dataclasses import dataclass
2from typing import Callable, Any
3
4@dataclass
5class Method:
6    """A method for decomposing a compound task."""
7    name: str
8    task_type: str  # What task this decomposes
9    preconditions: Callable[[dict], bool]  # State -> applicable?
10    subtasks: list[str]  # Ordered subtask types
11    ordering: str = "sequential"  # or "parallel"
12
13@dataclass
14class PrimitiveTask:
15    """Directly executable action."""
16    name: str
17    preconditions: Callable[[dict], bool]
18    effects: Callable[[dict], dict]  # State transformation
19
20class HTNPlanner:
21    """Hierarchical Task Network planner."""
22
23    def __init__(self):
24        self.methods: dict[str, list[Method]] = {}
25        self.primitives: dict[str, PrimitiveTask] = {}
26
27    def register_method(self, method: Method) -> None:
28        """Register a decomposition method."""
29        if method.task_type not in self.methods:
30            self.methods[method.task_type] = []
31        self.methods[method.task_type].append(method)
32
33    def register_primitive(self, primitive: PrimitiveTask) -> None:
34        """Register a primitive action."""
35        self.primitives[primitive.name] = primitive
36
37    def decompose(
38        self,
39        task_type: str,
40        state: dict[str, Any]
41    ) -> list[str]:
42        """Decompose a task given current state."""
43
44        # Check if primitive
45        if task_type in self.primitives:
46            primitive = self.primitives[task_type]
47            if primitive.preconditions(state):
48                return [task_type]
49            else:
50                return []  # Not applicable
51
52        # Find applicable method
53        methods = self.methods.get(task_type, [])
54        for method in methods:
55            if method.preconditions(state):
56                # Recursively decompose subtasks
57                plan = []
58                current_state = state.copy()
59
60                for subtask in method.subtasks:
61                    subplan = self.decompose(subtask, current_state)
62                    if not subplan:
63                        break  # Method failed
64
65                    plan.extend(subplan)
66
67                    # Update state for sequential subtasks
68                    if method.ordering == "sequential":
69                        for action in subplan:
70                            if action in self.primitives:
71                                current_state = self.primitives[action].effects(
72                                    current_state
73                                )
74                else:
75                    return plan  # Success
76
77        return []  # No applicable method found

HTN Example: Travel Planning

🐍python
1# Define primitive actions
2primitives = [
3    PrimitiveTask(
4        name="search_flights",
5        preconditions=lambda s: "destination" in s,
6        effects=lambda s: {**s, "flights_found": True}
7    ),
8    PrimitiveTask(
9        name="book_flight",
10        preconditions=lambda s: s.get("flights_found"),
11        effects=lambda s: {**s, "flight_booked": True}
12    ),
13    PrimitiveTask(
14        name="search_hotels",
15        preconditions=lambda s: "destination" in s,
16        effects=lambda s: {**s, "hotels_found": True}
17    ),
18    PrimitiveTask(
19        name="book_hotel",
20        preconditions=lambda s: s.get("hotels_found"),
21        effects=lambda s: {**s, "hotel_booked": True}
22    ),
23    PrimitiveTask(
24        name="create_itinerary",
25        preconditions=lambda s: s.get("flight_booked") and s.get("hotel_booked"),
26        effects=lambda s: {**s, "itinerary_ready": True}
27    ),
28]
29
30# Define decomposition methods
31methods = [
32    Method(
33        name="plan_trip_standard",
34        task_type="plan_trip",
35        preconditions=lambda s: "destination" in s and "budget" in s,
36        subtasks=["arrange_transport", "arrange_accommodation", "create_itinerary"]
37    ),
38    Method(
39        name="arrange_transport_by_air",
40        task_type="arrange_transport",
41        preconditions=lambda s: s.get("budget", 0) > 500,
42        subtasks=["search_flights", "book_flight"]
43    ),
44    Method(
45        name="arrange_accommodation_hotel",
46        task_type="arrange_accommodation",
47        preconditions=lambda s: True,
48        subtasks=["search_hotels", "book_hotel"]
49    ),
50]
51
52# Create and run planner
53planner = HTNPlanner()
54for p in primitives:
55    planner.register_primitive(p)
56for m in methods:
57    planner.register_method(m)
58
59initial_state = {
60    "destination": "Tokyo",
61    "budget": 2000
62}
63
64plan = planner.decompose("plan_trip", initial_state)
65print("Plan:", plan)
66# Output: ['search_flights', 'book_flight', 'search_hotels', 'book_hotel', 'create_itinerary']

Goal-Subgoal Decomposition

While HTN uses predefined methods, LLM-based agents can dynamically generate goal-subgoal hierarchies. This is more flexible but requires careful prompting.

Means-Ends Analysis

Work backward from the goal, identifying obstacles and creating subgoals to address them:

🐍python
1from anthropic import Anthropic
2import json
3
4class MeansEndsAnalyzer:
5    """
6    Performs means-ends analysis to decompose goals.
7
8    Works backward from goal, identifying gaps between
9    current state and goal state, then creating subgoals
10    to close those gaps.
11    """
12
13    def __init__(self):
14        self.client = Anthropic()
15
16    async def analyze(
17        self,
18        goal: str,
19        current_state: dict,
20        constraints: list[str] = None
21    ) -> dict:
22        """
23        Analyze goal and generate subgoal hierarchy.
24        """
25
26        prompt = f"""Perform means-ends analysis on this goal.
27
28Goal: {goal}
29
30Current State:
31{json.dumps(current_state, indent=2)}
32
33Constraints: {constraints or 'None specified'}
34
35Instructions:
361. Describe the goal state (what must be true when goal is achieved)
372. Identify differences between current and goal state
383. For each difference, create a subgoal to eliminate it
394. Order subgoals by dependencies
405. For complex subgoals, apply recursive decomposition
41
42Return JSON:
43{{
44    "goal_state": {{"key": "value", ...}},
45    "differences": [
46        {{"gap": "description", "subgoal": "what to achieve", "priority": 1-5}}
47    ],
48    "subgoal_hierarchy": {{
49        "main_goal": "...",
50        "subgoals": [
51            {{
52                "id": "sg1",
53                "description": "...",
54                "parent": null,
55                "dependencies": [],
56                "subgoals": [...]  // Nested subgoals
57            }}
58        ]
59    }}
60}}"""
61
62        response = self.client.messages.create(
63            model="claude-sonnet-4-20250514",
64            max_tokens=4096,
65            messages=[{"role": "user", "content": prompt}]
66        )
67
68        return json.loads(response.content[0].text)

Layered Goal Structure

🐍python
1@dataclass
2class Goal:
3    """A goal in the hierarchy."""
4    id: str
5    description: str
6    parent_id: Optional[str] = None
7    subgoals: list['Goal'] = field(default_factory=list)
8    achieved: bool = False
9    priority: int = 1
10
11    def add_subgoal(self, subgoal: 'Goal') -> None:
12        """Add a subgoal."""
13        subgoal.parent_id = self.id
14        self.subgoals.append(subgoal)
15
16    def is_achievable(self) -> bool:
17        """Check if all subgoals are achieved."""
18        if not self.subgoals:
19            return True  # Leaf goal
20        return all(sg.achieved for sg in self.subgoals)
21
22    def get_next_focus(self) -> Optional['Goal']:
23        """Get next subgoal to work on (DFS)."""
24        if self.achieved:
25            return None
26
27        if not self.subgoals:
28            return self  # This is a leaf goal
29
30        for subgoal in sorted(self.subgoals, key=lambda g: -g.priority):
31            if not subgoal.achieved:
32                focus = subgoal.get_next_focus()
33                if focus:
34                    return focus
35
36        return None  # All subgoals achieved
37
38    def to_tree_string(self, indent: int = 0) -> str:
39        """Visualize goal hierarchy."""
40        icon = "✅" if self.achieved else "○"
41        result = "  " * indent + f"{icon} {self.description}\n"
42        for subgoal in self.subgoals:
43            result += subgoal.to_tree_string(indent + 1)
44        return result
45
46
47class GoalStack:
48    """
49    Manages a stack of active goals.
50
51    Goals are pushed when work begins and popped
52    when achieved or abandoned.
53    """
54
55    def __init__(self):
56        self.stack: list[Goal] = []
57
58    def push(self, goal: Goal) -> None:
59        """Push a new goal onto the stack."""
60        self.stack.append(goal)
61
62    def pop(self) -> Optional[Goal]:
63        """Pop the top goal."""
64        return self.stack.pop() if self.stack else None
65
66    def current(self) -> Optional[Goal]:
67        """Get current focus goal."""
68        return self.stack[-1] if self.stack else None
69
70    def achieve_current(self) -> None:
71        """Mark current goal as achieved and pop."""
72        if self.stack:
73            self.stack[-1].achieved = True
74            self.pop()
75
76            # Check if parent goal is now achievable
77            while self.stack and self.stack[-1].is_achievable():
78                self.stack[-1].achieved = True
79                self.pop()
Goal stacks are useful for agents that need to interrupt current work to handle new priorities, then resume where they left off.

Building Hierarchical Planners

Let's build a complete hierarchical planner that combines LLM intelligence with structured execution:

🐍python
1from anthropic import Anthropic
2from dataclasses import dataclass, field
3from typing import Any, Optional, Callable
4from enum import Enum
5import json
6import asyncio
7
8class PlannerConfig:
9    """Configuration for hierarchical planner."""
10    max_depth: int = 4
11    min_task_complexity: str = "simple"
12    allow_parallel: bool = True
13    replan_on_failure: bool = True
14
15class HierarchicalPlanner:
16    """
17    LLM-powered hierarchical planner.
18
19    Decomposes goals into multi-level task hierarchies
20    and manages execution with dynamic replanning.
21    """
22
23    def __init__(
24        self,
25        config: PlannerConfig = None,
26        model: str = "claude-sonnet-4-20250514"
27    ):
28        self.config = config or PlannerConfig()
29        self.client = Anthropic()
30        self.model = model
31        self.plan: Optional[HierarchicalPlan] = None
32
33    async def create_plan(
34        self,
35        goal: str,
36        context: dict[str, Any] = None
37    ) -> HierarchicalPlan:
38        """Create a hierarchical plan for the goal."""
39
40        self.plan = HierarchicalPlan(goal=goal)
41
42        # Create strategic level
43        strategic = await self._decompose_level(
44            goal,
45            AbstractionLevel.STRATEGIC,
46            context
47        )
48
49        for task in strategic:
50            self.plan.add_task(task)
51
52            # Decompose to tactical
53            tactical = await self._decompose_level(
54                task.description,
55                AbstractionLevel.TACTICAL,
56                context,
57                parent_id=task.id
58            )
59
60            for tac_task in tactical:
61                self.plan.add_task(tac_task)
62
63                # Decompose to operational
64                operational = await self._decompose_level(
65                    tac_task.description,
66                    AbstractionLevel.OPERATIONAL,
67                    context,
68                    parent_id=tac_task.id
69                )
70
71                for op_task in operational:
72                    self.plan.add_task(op_task)
73
74                    # Decompose to atomic if needed
75                    if self._needs_decomposition(op_task):
76                        atomic = await self._decompose_level(
77                            op_task.description,
78                            AbstractionLevel.ATOMIC,
79                            context,
80                            parent_id=op_task.id
81                        )
82                        for atom in atomic:
83                            self.plan.add_task(atom)
84
85        return self.plan
86
87    async def _decompose_level(
88        self,
89        task_description: str,
90        target_level: AbstractionLevel,
91        context: dict = None,
92        parent_id: str = None
93    ) -> list[HierarchicalTask]:
94        """Decompose a task to a specific abstraction level."""
95
96        level_guidance = {
97            AbstractionLevel.STRATEGIC: "high-level phases or major workstreams",
98            AbstractionLevel.TACTICAL: "key milestones or deliverables",
99            AbstractionLevel.OPERATIONAL: "specific tasks that can be done in a day",
100            AbstractionLevel.ATOMIC: "single actions taking minutes to hours",
101        }
102
103        prompt = f"""Decompose this task into {level_guidance[target_level]}.
104
105Task: {task_description}
106Target Level: {target_level.value}
107Context: {json.dumps(context) if context else 'None'}
108
109Return JSON array of subtasks:
110[
111    {{
112        "id": "unique_id",
113        "description": "clear task description",
114        "estimated_complexity": "simple|moderate|complex"
115    }}
116]
117
118Guidelines:
119- Create 2-5 subtasks at this level
120- Each subtask should be clearly scoped
121- Subtasks should be mutually exclusive and collectively exhaustive
122- Use action verbs (implement, design, test, deploy)"""
123
124        response = self.client.messages.create(
125            model=self.model,
126            max_tokens=2048,
127            messages=[{"role": "user", "content": prompt}]
128        )
129
130        subtasks_data = json.loads(response.content[0].text)
131
132        tasks = []
133        for i, data in enumerate(subtasks_data):
134            task = HierarchicalTask(
135                id=f"{parent_id}_{i}" if parent_id else f"t_{i}",
136                description=data["description"],
137                level=target_level,
138                parent_id=parent_id
139            )
140            tasks.append(task)
141
142        return tasks
143
144    def _needs_decomposition(self, task: HierarchicalTask) -> bool:
145        """Check if task needs further decomposition."""
146        # Could use LLM to assess, or use simple heuristics
147        word_count = len(task.description.split())
148        return word_count > 10  # Simple heuristic
149
150    def visualize(self) -> str:
151        """Generate text visualization of plan."""
152        if not self.plan:
153            return "No plan created"
154
155        lines = [f"Goal: {self.plan.goal}", "=" * 50]
156
157        def print_task(task: HierarchicalTask, indent: int) -> None:
158            status_icon = {
159                "pending": "○",
160                "in_progress": "◐",
161                "completed": "●",
162                "failed": "✗"
163            }.get(task.status, "?")
164
165            level_color = {
166                AbstractionLevel.STRATEGIC: "🔵",
167                AbstractionLevel.TACTICAL: "🟢",
168                AbstractionLevel.OPERATIONAL: "🟡",
169                AbstractionLevel.ATOMIC: "⚪",
170            }.get(task.level, "")
171
172            lines.append(
173                "  " * indent +
174                f"{status_icon} {level_color} {task.description}"
175            )
176
177            for child_id in task.children:
178                if child_id in self.plan.tasks:
179                    print_task(self.plan.tasks[child_id], indent + 1)
180
181        # Print from strategic level
182        for task in self.plan.get_level(AbstractionLevel.STRATEGIC):
183            print_task(task, 0)
184
185        return "\n".join(lines)

Dynamic Replanning

Plans rarely survive contact with reality. Hierarchical planners must adapt when circumstances change:

When to Replan

  • Task failure: A required task cannot be completed
  • New information: Discovery of constraints or opportunities
  • Goal change: Priorities shift or scope changes
  • Resource change: Time, budget, or capabilities change
🐍python
1class AdaptivePlanner(HierarchicalPlanner):
2    """Planner with dynamic replanning capabilities."""
3
4    async def handle_failure(
5        self,
6        failed_task: HierarchicalTask,
7        error: str
8    ) -> bool:
9        """Handle task failure with replanning."""
10
11        # Try local repair first
12        repaired = await self._local_repair(failed_task, error)
13        if repaired:
14            return True
15
16        # Escalate to parent level
17        if failed_task.parent_id:
18            parent = self.plan.tasks.get(failed_task.parent_id)
19            if parent:
20                # Replan the parent's subtasks
21                await self._replan_subtasks(parent, error)
22                return True
23
24        # Full replan if local repair fails
25        await self._full_replan(error)
26        return True
27
28    async def _local_repair(
29        self,
30        task: HierarchicalTask,
31        error: str
32    ) -> bool:
33        """Attempt to fix the task locally."""
34
35        prompt = f"""A task failed. Can it be fixed with a simple modification?
36
37Failed Task: {task.description}
38Error: {error}
39
40If fixable, return JSON:
41{{"fixable": true, "modified_task": "new description", "reason": "why"}}
42
43If not fixable locally, return:
44{{"fixable": false, "reason": "why not"}}"""
45
46        response = self.client.messages.create(
47            model=self.model,
48            max_tokens=1024,
49            messages=[{"role": "user", "content": prompt}]
50        )
51
52        result = json.loads(response.content[0].text)
53
54        if result["fixable"]:
55            task.description = result["modified_task"]
56            task.status = "pending"
57            return True
58
59        return False
60
61    async def _replan_subtasks(
62        self,
63        parent: HierarchicalTask,
64        failure_context: str
65    ) -> None:
66        """Replan subtasks of a parent task."""
67
68        # Get completed and pending subtasks
69        completed = []
70        pending = []
71        for child_id in parent.children:
72            child = self.plan.tasks.get(child_id)
73            if child:
74                if child.status == "completed":
75                    completed.append(child.description)
76                else:
77                    pending.append(child.description)
78
79        prompt = f"""Replan subtasks after a failure.
80
81Parent Task: {parent.description}
82Completed: {completed}
83Failed/Pending: {pending}
84Failure Context: {failure_context}
85
86Create new subtasks that:
871. Build on what's already completed
882. Avoid the failure that occurred
893. Still achieve the parent task goal
90
91Return JSON array of new subtasks."""
92
93        response = self.client.messages.create(
94            model=self.model,
95            max_tokens=2048,
96            messages=[{"role": "user", "content": prompt}]
97        )
98
99        new_subtasks_data = json.loads(response.content[0].text)
100
101        # Remove old pending subtasks
102        for child_id in parent.children:
103            child = self.plan.tasks.get(child_id)
104            if child and child.status != "completed":
105                del self.plan.tasks[child_id]
106
107        parent.children = [
108            cid for cid in parent.children
109            if cid in self.plan.tasks
110        ]
111
112        # Add new subtasks
113        for i, data in enumerate(new_subtasks_data):
114            new_task = HierarchicalTask(
115                id=f"{parent.id}_replanned_{i}",
116                description=data["description"],
117                level=AbstractionLevel(parent.level.value),  # Same level as siblings
118                parent_id=parent.id
119            )
120            self.plan.add_task(new_task)
121
122    async def _full_replan(self, context: str) -> None:
123        """Complete replanning from scratch."""
124
125        # Preserve completed work
126        completed = [
127            t for t in self.plan.tasks.values()
128            if t.status == "completed"
129        ]
130
131        prompt = f"""Create a new plan, preserving completed work.
132
133Original Goal: {self.plan.goal}
134Completed Tasks: {[t.description for t in completed]}
135Reason for Replan: {context}
136
137Create a new plan that achieves the goal while:
1381. Not repeating completed work
1392. Addressing the issues that caused replanning
1403. Finding alternative paths if needed"""
141
142        # Create new plan
143        new_plan = await self.create_plan(
144            self.plan.goal,
145            {"completed": [t.description for t in completed]}
146        )
147
148        self.plan = new_plan
Replanning is computationally expensive. Use local repairs when possible and only escalate to full replanning when necessary.

Summary

Hierarchical planning enables agents to tackle complex, long-horizon tasks by organizing work at multiple abstraction levels. We covered:

  • Abstraction levels: Strategic, tactical, operational, and atomic layers for different planning horizons
  • Hierarchical Task Networks: Classical AI approach using methods to decompose compound tasks
  • Goal-subgoal decomposition: Means-ends analysis and goal stacks for flexible planning
  • LLM-powered planners: Combining language model reasoning with structured hierarchies
  • Dynamic replanning: Local repair and full replanning strategies for adaptation

In the next section, we'll explore self-reflection and correction—how agents can evaluate their own plans and outputs to improve quality.