Introduction
Task decomposition gives us subtasks, but how do we organize them into coherent plans? Hierarchical planning structures tasks at multiple levels of abstraction—from high-level goals to concrete actions. This mirrors how humans naturally think: "I want to go on vacation" decomposes into "book travel" which further breaks into "search flights" and "compare prices."
In this section, we'll explore how to build agents that plan at multiple abstraction levels, enabling them to handle complex, long-horizon tasks while maintaining flexibility to adapt when things change.
Core Insight: Hierarchical planning separates "what to achieve" from "how to achieve it," allowing agents to reason at the right level of detail for each decision.
Levels of Abstraction
Effective hierarchical plans operate at distinct levels, each serving a different purpose:
| Level | Focus | Example | Timeframe |
|---|---|---|---|
| Strategic | Overall goals and approach | Build a SaaS product | Months |
| Tactical | Major milestones and phases | Launch MVP to beta users | Weeks |
| Operational | Specific tasks and workflows | Implement user authentication | Days |
| Atomic | Individual executable actions | Write login endpoint | Hours |
Why Multiple Levels Matter
- Cognitive load management: Each level deals with appropriate complexity
- Flexible adaptation: High-level plans remain stable while details change
- Progress visibility: Stakeholders track at their relevant level
- Error recovery: Failures at low levels don't invalidate the overall strategy
1from dataclasses import dataclass, field
2from enum import Enum
3from typing import Optional
4
5class AbstractionLevel(Enum):
6 STRATEGIC = "strategic" # Why are we doing this?
7 TACTICAL = "tactical" # What milestones do we need?
8 OPERATIONAL = "operational" # What tasks must we complete?
9 ATOMIC = "atomic" # What single actions do we take?
10
11@dataclass
12class HierarchicalTask:
13 """A task at any abstraction level."""
14 id: str
15 description: str
16 level: AbstractionLevel
17 parent_id: Optional[str] = None
18 children: list[str] = field(default_factory=list)
19 status: str = "pending"
20
21 def is_leaf(self) -> bool:
22 """Leaf tasks are directly executable."""
23 return len(self.children) == 0 or self.level == AbstractionLevel.ATOMIC
24
25@dataclass
26class HierarchicalPlan:
27 """Multi-level plan structure."""
28 goal: str
29 tasks: dict[str, HierarchicalTask] = field(default_factory=dict)
30
31 def add_task(self, task: HierarchicalTask) -> None:
32 """Add task and update parent's children."""
33 self.tasks[task.id] = task
34 if task.parent_id and task.parent_id in self.tasks:
35 parent = self.tasks[task.parent_id]
36 if task.id not in parent.children:
37 parent.children.append(task.id)
38
39 def get_level(self, level: AbstractionLevel) -> list[HierarchicalTask]:
40 """Get all tasks at a specific abstraction level."""
41 return [t for t in self.tasks.values() if t.level == level]
42
43 def get_executable_tasks(self) -> list[HierarchicalTask]:
44 """Get leaf tasks ready for execution."""
45 ready = []
46 for task in self.tasks.values():
47 if not task.is_leaf():
48 continue
49 if task.status != "pending":
50 continue
51
52 # Check if parent allows execution
53 if task.parent_id:
54 parent = self.tasks.get(task.parent_id)
55 if parent and parent.status not in ["pending", "in_progress"]:
56 continue
57
58 ready.append(task)
59 return readyHierarchical Task Networks
Hierarchical Task Networks (HTN) are a classical AI planning formalism. They decompose compound tasks into subtasks using predefined methods until reaching primitive actions.
HTN Components
- Tasks: Can be primitive (directly executable) or compound (needs decomposition)
- Methods: Rules for decomposing compound tasks into subtask sequences
- Preconditions: State conditions required for method application
- Effects: State changes after task completion
1from dataclasses import dataclass
2from typing import Callable, Any
3
4@dataclass
5class Method:
6 """A method for decomposing a compound task."""
7 name: str
8 task_type: str # What task this decomposes
9 preconditions: Callable[[dict], bool] # State -> applicable?
10 subtasks: list[str] # Ordered subtask types
11 ordering: str = "sequential" # or "parallel"
12
13@dataclass
14class PrimitiveTask:
15 """Directly executable action."""
16 name: str
17 preconditions: Callable[[dict], bool]
18 effects: Callable[[dict], dict] # State transformation
19
20class HTNPlanner:
21 """Hierarchical Task Network planner."""
22
23 def __init__(self):
24 self.methods: dict[str, list[Method]] = {}
25 self.primitives: dict[str, PrimitiveTask] = {}
26
27 def register_method(self, method: Method) -> None:
28 """Register a decomposition method."""
29 if method.task_type not in self.methods:
30 self.methods[method.task_type] = []
31 self.methods[method.task_type].append(method)
32
33 def register_primitive(self, primitive: PrimitiveTask) -> None:
34 """Register a primitive action."""
35 self.primitives[primitive.name] = primitive
36
37 def decompose(
38 self,
39 task_type: str,
40 state: dict[str, Any]
41 ) -> list[str]:
42 """Decompose a task given current state."""
43
44 # Check if primitive
45 if task_type in self.primitives:
46 primitive = self.primitives[task_type]
47 if primitive.preconditions(state):
48 return [task_type]
49 else:
50 return [] # Not applicable
51
52 # Find applicable method
53 methods = self.methods.get(task_type, [])
54 for method in methods:
55 if method.preconditions(state):
56 # Recursively decompose subtasks
57 plan = []
58 current_state = state.copy()
59
60 for subtask in method.subtasks:
61 subplan = self.decompose(subtask, current_state)
62 if not subplan:
63 break # Method failed
64
65 plan.extend(subplan)
66
67 # Update state for sequential subtasks
68 if method.ordering == "sequential":
69 for action in subplan:
70 if action in self.primitives:
71 current_state = self.primitives[action].effects(
72 current_state
73 )
74 else:
75 return plan # Success
76
77 return [] # No applicable method foundHTN Example: Travel Planning
1# Define primitive actions
2primitives = [
3 PrimitiveTask(
4 name="search_flights",
5 preconditions=lambda s: "destination" in s,
6 effects=lambda s: {**s, "flights_found": True}
7 ),
8 PrimitiveTask(
9 name="book_flight",
10 preconditions=lambda s: s.get("flights_found"),
11 effects=lambda s: {**s, "flight_booked": True}
12 ),
13 PrimitiveTask(
14 name="search_hotels",
15 preconditions=lambda s: "destination" in s,
16 effects=lambda s: {**s, "hotels_found": True}
17 ),
18 PrimitiveTask(
19 name="book_hotel",
20 preconditions=lambda s: s.get("hotels_found"),
21 effects=lambda s: {**s, "hotel_booked": True}
22 ),
23 PrimitiveTask(
24 name="create_itinerary",
25 preconditions=lambda s: s.get("flight_booked") and s.get("hotel_booked"),
26 effects=lambda s: {**s, "itinerary_ready": True}
27 ),
28]
29
30# Define decomposition methods
31methods = [
32 Method(
33 name="plan_trip_standard",
34 task_type="plan_trip",
35 preconditions=lambda s: "destination" in s and "budget" in s,
36 subtasks=["arrange_transport", "arrange_accommodation", "create_itinerary"]
37 ),
38 Method(
39 name="arrange_transport_by_air",
40 task_type="arrange_transport",
41 preconditions=lambda s: s.get("budget", 0) > 500,
42 subtasks=["search_flights", "book_flight"]
43 ),
44 Method(
45 name="arrange_accommodation_hotel",
46 task_type="arrange_accommodation",
47 preconditions=lambda s: True,
48 subtasks=["search_hotels", "book_hotel"]
49 ),
50]
51
52# Create and run planner
53planner = HTNPlanner()
54for p in primitives:
55 planner.register_primitive(p)
56for m in methods:
57 planner.register_method(m)
58
59initial_state = {
60 "destination": "Tokyo",
61 "budget": 2000
62}
63
64plan = planner.decompose("plan_trip", initial_state)
65print("Plan:", plan)
66# Output: ['search_flights', 'book_flight', 'search_hotels', 'book_hotel', 'create_itinerary']Goal-Subgoal Decomposition
While HTN uses predefined methods, LLM-based agents can dynamically generate goal-subgoal hierarchies. This is more flexible but requires careful prompting.
Means-Ends Analysis
Work backward from the goal, identifying obstacles and creating subgoals to address them:
1from anthropic import Anthropic
2import json
3
4class MeansEndsAnalyzer:
5 """
6 Performs means-ends analysis to decompose goals.
7
8 Works backward from goal, identifying gaps between
9 current state and goal state, then creating subgoals
10 to close those gaps.
11 """
12
13 def __init__(self):
14 self.client = Anthropic()
15
16 async def analyze(
17 self,
18 goal: str,
19 current_state: dict,
20 constraints: list[str] = None
21 ) -> dict:
22 """
23 Analyze goal and generate subgoal hierarchy.
24 """
25
26 prompt = f"""Perform means-ends analysis on this goal.
27
28Goal: {goal}
29
30Current State:
31{json.dumps(current_state, indent=2)}
32
33Constraints: {constraints or 'None specified'}
34
35Instructions:
361. Describe the goal state (what must be true when goal is achieved)
372. Identify differences between current and goal state
383. For each difference, create a subgoal to eliminate it
394. Order subgoals by dependencies
405. For complex subgoals, apply recursive decomposition
41
42Return JSON:
43{{
44 "goal_state": {{"key": "value", ...}},
45 "differences": [
46 {{"gap": "description", "subgoal": "what to achieve", "priority": 1-5}}
47 ],
48 "subgoal_hierarchy": {{
49 "main_goal": "...",
50 "subgoals": [
51 {{
52 "id": "sg1",
53 "description": "...",
54 "parent": null,
55 "dependencies": [],
56 "subgoals": [...] // Nested subgoals
57 }}
58 ]
59 }}
60}}"""
61
62 response = self.client.messages.create(
63 model="claude-sonnet-4-20250514",
64 max_tokens=4096,
65 messages=[{"role": "user", "content": prompt}]
66 )
67
68 return json.loads(response.content[0].text)Layered Goal Structure
1@dataclass
2class Goal:
3 """A goal in the hierarchy."""
4 id: str
5 description: str
6 parent_id: Optional[str] = None
7 subgoals: list['Goal'] = field(default_factory=list)
8 achieved: bool = False
9 priority: int = 1
10
11 def add_subgoal(self, subgoal: 'Goal') -> None:
12 """Add a subgoal."""
13 subgoal.parent_id = self.id
14 self.subgoals.append(subgoal)
15
16 def is_achievable(self) -> bool:
17 """Check if all subgoals are achieved."""
18 if not self.subgoals:
19 return True # Leaf goal
20 return all(sg.achieved for sg in self.subgoals)
21
22 def get_next_focus(self) -> Optional['Goal']:
23 """Get next subgoal to work on (DFS)."""
24 if self.achieved:
25 return None
26
27 if not self.subgoals:
28 return self # This is a leaf goal
29
30 for subgoal in sorted(self.subgoals, key=lambda g: -g.priority):
31 if not subgoal.achieved:
32 focus = subgoal.get_next_focus()
33 if focus:
34 return focus
35
36 return None # All subgoals achieved
37
38 def to_tree_string(self, indent: int = 0) -> str:
39 """Visualize goal hierarchy."""
40 icon = "✅" if self.achieved else "○"
41 result = " " * indent + f"{icon} {self.description}\n"
42 for subgoal in self.subgoals:
43 result += subgoal.to_tree_string(indent + 1)
44 return result
45
46
47class GoalStack:
48 """
49 Manages a stack of active goals.
50
51 Goals are pushed when work begins and popped
52 when achieved or abandoned.
53 """
54
55 def __init__(self):
56 self.stack: list[Goal] = []
57
58 def push(self, goal: Goal) -> None:
59 """Push a new goal onto the stack."""
60 self.stack.append(goal)
61
62 def pop(self) -> Optional[Goal]:
63 """Pop the top goal."""
64 return self.stack.pop() if self.stack else None
65
66 def current(self) -> Optional[Goal]:
67 """Get current focus goal."""
68 return self.stack[-1] if self.stack else None
69
70 def achieve_current(self) -> None:
71 """Mark current goal as achieved and pop."""
72 if self.stack:
73 self.stack[-1].achieved = True
74 self.pop()
75
76 # Check if parent goal is now achievable
77 while self.stack and self.stack[-1].is_achievable():
78 self.stack[-1].achieved = True
79 self.pop()Building Hierarchical Planners
Let's build a complete hierarchical planner that combines LLM intelligence with structured execution:
1from anthropic import Anthropic
2from dataclasses import dataclass, field
3from typing import Any, Optional, Callable
4from enum import Enum
5import json
6import asyncio
7
8class PlannerConfig:
9 """Configuration for hierarchical planner."""
10 max_depth: int = 4
11 min_task_complexity: str = "simple"
12 allow_parallel: bool = True
13 replan_on_failure: bool = True
14
15class HierarchicalPlanner:
16 """
17 LLM-powered hierarchical planner.
18
19 Decomposes goals into multi-level task hierarchies
20 and manages execution with dynamic replanning.
21 """
22
23 def __init__(
24 self,
25 config: PlannerConfig = None,
26 model: str = "claude-sonnet-4-20250514"
27 ):
28 self.config = config or PlannerConfig()
29 self.client = Anthropic()
30 self.model = model
31 self.plan: Optional[HierarchicalPlan] = None
32
33 async def create_plan(
34 self,
35 goal: str,
36 context: dict[str, Any] = None
37 ) -> HierarchicalPlan:
38 """Create a hierarchical plan for the goal."""
39
40 self.plan = HierarchicalPlan(goal=goal)
41
42 # Create strategic level
43 strategic = await self._decompose_level(
44 goal,
45 AbstractionLevel.STRATEGIC,
46 context
47 )
48
49 for task in strategic:
50 self.plan.add_task(task)
51
52 # Decompose to tactical
53 tactical = await self._decompose_level(
54 task.description,
55 AbstractionLevel.TACTICAL,
56 context,
57 parent_id=task.id
58 )
59
60 for tac_task in tactical:
61 self.plan.add_task(tac_task)
62
63 # Decompose to operational
64 operational = await self._decompose_level(
65 tac_task.description,
66 AbstractionLevel.OPERATIONAL,
67 context,
68 parent_id=tac_task.id
69 )
70
71 for op_task in operational:
72 self.plan.add_task(op_task)
73
74 # Decompose to atomic if needed
75 if self._needs_decomposition(op_task):
76 atomic = await self._decompose_level(
77 op_task.description,
78 AbstractionLevel.ATOMIC,
79 context,
80 parent_id=op_task.id
81 )
82 for atom in atomic:
83 self.plan.add_task(atom)
84
85 return self.plan
86
87 async def _decompose_level(
88 self,
89 task_description: str,
90 target_level: AbstractionLevel,
91 context: dict = None,
92 parent_id: str = None
93 ) -> list[HierarchicalTask]:
94 """Decompose a task to a specific abstraction level."""
95
96 level_guidance = {
97 AbstractionLevel.STRATEGIC: "high-level phases or major workstreams",
98 AbstractionLevel.TACTICAL: "key milestones or deliverables",
99 AbstractionLevel.OPERATIONAL: "specific tasks that can be done in a day",
100 AbstractionLevel.ATOMIC: "single actions taking minutes to hours",
101 }
102
103 prompt = f"""Decompose this task into {level_guidance[target_level]}.
104
105Task: {task_description}
106Target Level: {target_level.value}
107Context: {json.dumps(context) if context else 'None'}
108
109Return JSON array of subtasks:
110[
111 {{
112 "id": "unique_id",
113 "description": "clear task description",
114 "estimated_complexity": "simple|moderate|complex"
115 }}
116]
117
118Guidelines:
119- Create 2-5 subtasks at this level
120- Each subtask should be clearly scoped
121- Subtasks should be mutually exclusive and collectively exhaustive
122- Use action verbs (implement, design, test, deploy)"""
123
124 response = self.client.messages.create(
125 model=self.model,
126 max_tokens=2048,
127 messages=[{"role": "user", "content": prompt}]
128 )
129
130 subtasks_data = json.loads(response.content[0].text)
131
132 tasks = []
133 for i, data in enumerate(subtasks_data):
134 task = HierarchicalTask(
135 id=f"{parent_id}_{i}" if parent_id else f"t_{i}",
136 description=data["description"],
137 level=target_level,
138 parent_id=parent_id
139 )
140 tasks.append(task)
141
142 return tasks
143
144 def _needs_decomposition(self, task: HierarchicalTask) -> bool:
145 """Check if task needs further decomposition."""
146 # Could use LLM to assess, or use simple heuristics
147 word_count = len(task.description.split())
148 return word_count > 10 # Simple heuristic
149
150 def visualize(self) -> str:
151 """Generate text visualization of plan."""
152 if not self.plan:
153 return "No plan created"
154
155 lines = [f"Goal: {self.plan.goal}", "=" * 50]
156
157 def print_task(task: HierarchicalTask, indent: int) -> None:
158 status_icon = {
159 "pending": "○",
160 "in_progress": "◐",
161 "completed": "●",
162 "failed": "✗"
163 }.get(task.status, "?")
164
165 level_color = {
166 AbstractionLevel.STRATEGIC: "🔵",
167 AbstractionLevel.TACTICAL: "🟢",
168 AbstractionLevel.OPERATIONAL: "🟡",
169 AbstractionLevel.ATOMIC: "⚪",
170 }.get(task.level, "")
171
172 lines.append(
173 " " * indent +
174 f"{status_icon} {level_color} {task.description}"
175 )
176
177 for child_id in task.children:
178 if child_id in self.plan.tasks:
179 print_task(self.plan.tasks[child_id], indent + 1)
180
181 # Print from strategic level
182 for task in self.plan.get_level(AbstractionLevel.STRATEGIC):
183 print_task(task, 0)
184
185 return "\n".join(lines)Dynamic Replanning
Plans rarely survive contact with reality. Hierarchical planners must adapt when circumstances change:
When to Replan
- Task failure: A required task cannot be completed
- New information: Discovery of constraints or opportunities
- Goal change: Priorities shift or scope changes
- Resource change: Time, budget, or capabilities change
1class AdaptivePlanner(HierarchicalPlanner):
2 """Planner with dynamic replanning capabilities."""
3
4 async def handle_failure(
5 self,
6 failed_task: HierarchicalTask,
7 error: str
8 ) -> bool:
9 """Handle task failure with replanning."""
10
11 # Try local repair first
12 repaired = await self._local_repair(failed_task, error)
13 if repaired:
14 return True
15
16 # Escalate to parent level
17 if failed_task.parent_id:
18 parent = self.plan.tasks.get(failed_task.parent_id)
19 if parent:
20 # Replan the parent's subtasks
21 await self._replan_subtasks(parent, error)
22 return True
23
24 # Full replan if local repair fails
25 await self._full_replan(error)
26 return True
27
28 async def _local_repair(
29 self,
30 task: HierarchicalTask,
31 error: str
32 ) -> bool:
33 """Attempt to fix the task locally."""
34
35 prompt = f"""A task failed. Can it be fixed with a simple modification?
36
37Failed Task: {task.description}
38Error: {error}
39
40If fixable, return JSON:
41{{"fixable": true, "modified_task": "new description", "reason": "why"}}
42
43If not fixable locally, return:
44{{"fixable": false, "reason": "why not"}}"""
45
46 response = self.client.messages.create(
47 model=self.model,
48 max_tokens=1024,
49 messages=[{"role": "user", "content": prompt}]
50 )
51
52 result = json.loads(response.content[0].text)
53
54 if result["fixable"]:
55 task.description = result["modified_task"]
56 task.status = "pending"
57 return True
58
59 return False
60
61 async def _replan_subtasks(
62 self,
63 parent: HierarchicalTask,
64 failure_context: str
65 ) -> None:
66 """Replan subtasks of a parent task."""
67
68 # Get completed and pending subtasks
69 completed = []
70 pending = []
71 for child_id in parent.children:
72 child = self.plan.tasks.get(child_id)
73 if child:
74 if child.status == "completed":
75 completed.append(child.description)
76 else:
77 pending.append(child.description)
78
79 prompt = f"""Replan subtasks after a failure.
80
81Parent Task: {parent.description}
82Completed: {completed}
83Failed/Pending: {pending}
84Failure Context: {failure_context}
85
86Create new subtasks that:
871. Build on what's already completed
882. Avoid the failure that occurred
893. Still achieve the parent task goal
90
91Return JSON array of new subtasks."""
92
93 response = self.client.messages.create(
94 model=self.model,
95 max_tokens=2048,
96 messages=[{"role": "user", "content": prompt}]
97 )
98
99 new_subtasks_data = json.loads(response.content[0].text)
100
101 # Remove old pending subtasks
102 for child_id in parent.children:
103 child = self.plan.tasks.get(child_id)
104 if child and child.status != "completed":
105 del self.plan.tasks[child_id]
106
107 parent.children = [
108 cid for cid in parent.children
109 if cid in self.plan.tasks
110 ]
111
112 # Add new subtasks
113 for i, data in enumerate(new_subtasks_data):
114 new_task = HierarchicalTask(
115 id=f"{parent.id}_replanned_{i}",
116 description=data["description"],
117 level=AbstractionLevel(parent.level.value), # Same level as siblings
118 parent_id=parent.id
119 )
120 self.plan.add_task(new_task)
121
122 async def _full_replan(self, context: str) -> None:
123 """Complete replanning from scratch."""
124
125 # Preserve completed work
126 completed = [
127 t for t in self.plan.tasks.values()
128 if t.status == "completed"
129 ]
130
131 prompt = f"""Create a new plan, preserving completed work.
132
133Original Goal: {self.plan.goal}
134Completed Tasks: {[t.description for t in completed]}
135Reason for Replan: {context}
136
137Create a new plan that achieves the goal while:
1381. Not repeating completed work
1392. Addressing the issues that caused replanning
1403. Finding alternative paths if needed"""
141
142 # Create new plan
143 new_plan = await self.create_plan(
144 self.plan.goal,
145 {"completed": [t.description for t in completed]}
146 )
147
148 self.plan = new_planSummary
Hierarchical planning enables agents to tackle complex, long-horizon tasks by organizing work at multiple abstraction levels. We covered:
- Abstraction levels: Strategic, tactical, operational, and atomic layers for different planning horizons
- Hierarchical Task Networks: Classical AI approach using methods to decompose compound tasks
- Goal-subgoal decomposition: Means-ends analysis and goal stacks for flexible planning
- LLM-powered planners: Combining language model reasoning with structured hierarchies
- Dynamic replanning: Local repair and full replanning strategies for adaptation
In the next section, we'll explore self-reflection and correction—how agents can evaluate their own plans and outputs to improve quality.