Introduction
When faced with complex requests like "Plan a week-long trip to Japan including flights, hotels, and activities," humans naturally break this into smaller tasks: research destinations, find flights, book hotels, and plan daily activities. This cognitive process—task decomposition—is fundamental to how we solve complex problems.
For AI agents, task decomposition is equally essential. Without it, agents struggle with ambiguity, lose track of progress, and produce incomplete solutions. In this chapter, we'll explore how agents can break down complex goals into manageable, executable subtasks.
Core Insight: Task decomposition transforms overwhelming problems into structured action plans. It's not just organization—it's the foundation of intelligent planning.
Why Task Decomposition Matters
Consider an agent asked to "build a web application." Without decomposition, it might try to generate an entire application in one step—almost certainly failing. With proper decomposition, it breaks this into: define requirements, set up project structure, create database schema, build backend APIs, implement frontend, write tests, and deploy.
Benefits of Decomposition
| Benefit | Description | Impact |
|---|---|---|
| Reduced Complexity | Each subtask is simpler than the whole | Higher success rate per step |
| Progress Tracking | Clear milestones for monitoring | Better user experience |
| Error Isolation | Failures contained to subtasks | Easier debugging and recovery |
| Parallelization | Independent tasks can run concurrently | Faster overall execution |
| Resource Estimation | Predict time and cost per subtask | Accurate project planning |
The Decomposition Paradox
There's a tension in task decomposition: decompose too little and subtasks remain complex; decompose too much and you drown in overhead. Finding the right granularity is key.
1# Too coarse - still complex
2tasks = ["Build the entire application"]
3
4# Too fine - excessive overhead
5tasks = [
6 "Open IDE",
7 "Create folder",
8 "Create file",
9 "Type 'import'",
10 "Type ' '",
11 "Type 'flask'",
12 # ... hundreds more
13]
14
15# Just right - actionable units
16tasks = [
17 "Set up Flask project structure",
18 "Create database models",
19 "Implement authentication API",
20 "Build user management endpoints",
21 "Create frontend components",
22 "Write integration tests"
23]Decomposition Strategies
Different problems call for different decomposition approaches. Here are the primary strategies agents can employ:
1. Functional Decomposition
Break tasks by function or capability required. Each subtask uses a distinct skill or tool.
1def functional_decomposition(goal: str) -> list[dict]:
2 """Decompose by required capabilities."""
3 return [
4 {
5 "task": "Research destination options",
6 "capability": "web_search",
7 "tools": ["search_api", "summarizer"]
8 },
9 {
10 "task": "Find flight options",
11 "capability": "flight_booking",
12 "tools": ["flight_api", "price_comparison"]
13 },
14 {
15 "task": "Book accommodation",
16 "capability": "hotel_booking",
17 "tools": ["hotel_api", "review_analyzer"]
18 },
19 {
20 "task": "Create daily itinerary",
21 "capability": "planning",
22 "tools": ["calendar", "map_api", "activity_search"]
23 }
24 ]2. Sequential Decomposition
Break tasks into ordered steps where each step depends on the previous. This is natural for workflows with clear prerequisites.
1def sequential_decomposition(goal: str) -> list[dict]:
2 """Decompose into ordered steps."""
3 return [
4 {
5 "step": 1,
6 "task": "Gather requirements",
7 "prerequisite": None
8 },
9 {
10 "step": 2,
11 "task": "Design system architecture",
12 "prerequisite": 1
13 },
14 {
15 "step": 3,
16 "task": "Implement core modules",
17 "prerequisite": 2
18 },
19 {
20 "step": 4,
21 "task": "Write tests",
22 "prerequisite": 3
23 },
24 {
25 "step": 5,
26 "task": "Deploy and monitor",
27 "prerequisite": 4
28 }
29 ]3. Recursive Decomposition
Apply decomposition recursively until subtasks reach an executable granularity. This is powerful for deeply complex problems.
1from dataclasses import dataclass, field
2from typing import Optional
3
4@dataclass
5class Task:
6 """Recursively decomposable task."""
7 description: str
8 subtasks: list['Task'] = field(default_factory=list)
9 is_atomic: bool = False
10 completed: bool = False
11
12 def decompose(self, decomposer) -> None:
13 """Recursively decompose until atomic."""
14 if self.is_atomic:
15 return
16
17 # Get subtasks from decomposer
18 self.subtasks = decomposer.decompose(self.description)
19
20 # Recursively decompose each subtask
21 for subtask in self.subtasks:
22 subtask.decompose(decomposer)
23
24 def get_next_task(self) -> Optional['Task']:
25 """Get next atomic task to execute."""
26 if self.is_atomic and not self.completed:
27 return self
28
29 for subtask in self.subtasks:
30 next_task = subtask.get_next_task()
31 if next_task:
32 return next_task
33
34 return None4. Goal-Oriented Decomposition
Work backward from the desired outcome. Each subtask addresses a subgoal necessary for the final goal.
1def goal_oriented_decomposition(final_goal: str) -> list[dict]:
2 """Decompose by working backward from goal."""
3
4 # Start from final state
5 goal_tree = {
6 "goal": "Have a successful product launch",
7 "requires": [
8 {
9 "subgoal": "Product is fully tested",
10 "requires": [
11 {"subgoal": "Unit tests passing"},
12 {"subgoal": "Integration tests passing"},
13 {"subgoal": "User acceptance testing complete"}
14 ]
15 },
16 {
17 "subgoal": "Marketing materials ready",
18 "requires": [
19 {"subgoal": "Landing page live"},
20 {"subgoal": "Press release written"},
21 {"subgoal": "Social media scheduled"}
22 ]
23 },
24 {
25 "subgoal": "Infrastructure prepared",
26 "requires": [
27 {"subgoal": "Production servers scaled"},
28 {"subgoal": "Monitoring in place"},
29 {"subgoal": "Rollback plan documented"}
30 ]
31 }
32 ]
33 }
34
35 return flatten_goal_tree(goal_tree)LLM-Based Decomposition
Large language models excel at task decomposition. They understand context, can reason about dependencies, and generate human-readable plans. Here's how to leverage LLMs effectively:
Structured Decomposition Prompting
1from anthropic import Anthropic
2from pydantic import BaseModel
3import json
4
5class SubTask(BaseModel):
6 """A single decomposed subtask."""
7 id: str
8 description: str
9 estimated_complexity: str # "low", "medium", "high"
10 dependencies: list[str] # IDs of prerequisite tasks
11 tools_needed: list[str]
12 success_criteria: str
13
14class DecompositionResult(BaseModel):
15 """Complete task decomposition."""
16 original_goal: str
17 subtasks: list[SubTask]
18 execution_order: list[str] # Topologically sorted task IDs
19
20def decompose_with_llm(goal: str, context: str = "") -> DecompositionResult:
21 """Use LLM to decompose a complex goal."""
22
23 client = Anthropic()
24
25 prompt = f"""Decompose the following goal into executable subtasks.
26
27Goal: {goal}
28
29Context: {context}
30
31For each subtask, provide:
321. A unique ID (e.g., "task_1", "task_2")
332. Clear description of what to do
343. Estimated complexity (low/medium/high)
354. Dependencies (IDs of tasks that must complete first)
365. Tools or capabilities needed
376. Clear success criteria
38
39Return a JSON object with:
40- original_goal: the input goal
41- subtasks: array of subtask objects
42- execution_order: array of task IDs in dependency order
43
44Ensure subtasks are:
45- Atomic enough to be executed in a single action
46- Not so granular as to be trivial
47- Complete (covering all aspects of the goal)
48- Well-ordered (dependencies respected)"""
49
50 response = client.messages.create(
51 model="claude-sonnet-4-20250514",
52 max_tokens=4096,
53 messages=[{"role": "user", "content": prompt}]
54 )
55
56 # Parse JSON from response
57 result_json = json.loads(response.content[0].text)
58 return DecompositionResult(**result_json)Few-Shot Decomposition
Providing examples improves decomposition quality significantly:
1DECOMPOSITION_EXAMPLES = """
2Example 1:
3Goal: "Write a blog post about machine learning"
4Subtasks:
51. Research current ML trends and topics
62. Create outline with main points
73. Write introduction hook
84. Write body sections with examples
95. Write conclusion with call-to-action
106. Add relevant images and diagrams
117. Proofread and edit for clarity
12
13Example 2:
14Goal: "Set up a monitoring system"
15Subtasks:
161. Define key metrics to track
172. Choose monitoring tool (Prometheus, Datadog, etc.)
183. Install and configure monitoring agent
194. Set up dashboards for visualization
205. Configure alerting rules
216. Test alerts with synthetic issues
227. Document runbook for each alert
23"""
24
25def decompose_with_examples(goal: str) -> DecompositionResult:
26 """Decompose using few-shot learning."""
27
28 prompt = f"""{DECOMPOSITION_EXAMPLES}
29
30Now decompose this goal:
31Goal: "{goal}"
32
33Return subtasks in the same structured format."""
34
35 # Call LLM with examples...Iterative Refinement
Complex tasks may need multiple decomposition passes:
1async def iterative_decomposition(
2 goal: str,
3 max_iterations: int = 3,
4 complexity_threshold: str = "medium"
5) -> DecompositionResult:
6 """Iteratively refine decomposition until all tasks are simple enough."""
7
8 result = await decompose_with_llm(goal)
9
10 for iteration in range(max_iterations):
11 # Find tasks that are still too complex
12 complex_tasks = [
13 task for task in result.subtasks
14 if task.estimated_complexity == "high"
15 ]
16
17 if not complex_tasks:
18 break
19
20 # Further decompose complex tasks
21 for task in complex_tasks:
22 sub_result = await decompose_with_llm(
23 task.description,
24 context=f"This is part of: {goal}"
25 )
26
27 # Replace complex task with its subtasks
28 result = merge_decomposition(result, task.id, sub_result)
29
30 return resultDependency Graphs
Real-world tasks have complex interdependencies. Representing these as directed graphs enables sophisticated scheduling and parallel execution.
Building Task Graphs
1import networkx as nx
2from typing import Any
3
4class TaskGraph:
5 """Directed acyclic graph of tasks."""
6
7 def __init__(self):
8 self.graph = nx.DiGraph()
9
10 def add_task(
11 self,
12 task_id: str,
13 description: str,
14 metadata: dict[str, Any] = None
15 ) -> None:
16 """Add a task node."""
17 self.graph.add_node(
18 task_id,
19 description=description,
20 status="pending",
21 **(metadata or {})
22 )
23
24 def add_dependency(self, task_id: str, depends_on: str) -> None:
25 """Add dependency edge: depends_on -> task_id."""
26 if depends_on not in self.graph:
27 raise ValueError(f"Task {depends_on} not found")
28 if task_id not in self.graph:
29 raise ValueError(f"Task {task_id} not found")
30
31 self.graph.add_edge(depends_on, task_id)
32
33 # Validate no cycles
34 if not nx.is_directed_acyclic_graph(self.graph):
35 self.graph.remove_edge(depends_on, task_id)
36 raise ValueError(
37 f"Adding dependency would create cycle: "
38 f"{depends_on} -> {task_id}"
39 )
40
41 def get_ready_tasks(self) -> list[str]:
42 """Get tasks whose dependencies are all completed."""
43 ready = []
44 for node in self.graph.nodes():
45 if self.graph.nodes[node]["status"] != "pending":
46 continue
47
48 # Check all predecessors are completed
49 predecessors = list(self.graph.predecessors(node))
50 all_done = all(
51 self.graph.nodes[p]["status"] == "completed"
52 for p in predecessors
53 )
54
55 if all_done:
56 ready.append(node)
57
58 return ready
59
60 def complete_task(self, task_id: str) -> None:
61 """Mark task as completed."""
62 self.graph.nodes[task_id]["status"] = "completed"
63
64 def get_execution_order(self) -> list[str]:
65 """Get topologically sorted execution order."""
66 return list(nx.topological_sort(self.graph))
67
68 def get_critical_path(self) -> list[str]:
69 """Find the longest path (critical path for timing)."""
70 return nx.dag_longest_path(self.graph)Parallel Execution Planning
1import asyncio
2from dataclasses import dataclass
3
4@dataclass
5class ExecutionPlan:
6 """Plan for parallel task execution."""
7 waves: list[list[str]] # Each wave can run in parallel
8
9class ParallelScheduler:
10 """Schedule tasks for parallel execution."""
11
12 def __init__(self, task_graph: TaskGraph):
13 self.graph = task_graph
14
15 def create_execution_waves(self) -> ExecutionPlan:
16 """Group tasks into parallelizable waves."""
17 waves = []
18 completed = set()
19 remaining = set(self.graph.graph.nodes())
20
21 while remaining:
22 # Find all tasks ready to execute
23 wave = []
24 for task_id in remaining:
25 predecessors = set(self.graph.graph.predecessors(task_id))
26 if predecessors.issubset(completed):
27 wave.append(task_id)
28
29 if not wave:
30 raise ValueError("Circular dependency detected")
31
32 waves.append(wave)
33 completed.update(wave)
34 remaining -= set(wave)
35
36 return ExecutionPlan(waves=waves)
37
38 async def execute_plan(
39 self,
40 plan: ExecutionPlan,
41 executor: callable
42 ) -> dict[str, Any]:
43 """Execute plan with parallel waves."""
44 results = {}
45
46 for wave_num, wave in enumerate(plan.waves):
47 print(f"Executing wave {wave_num + 1}: {wave}")
48
49 # Run all tasks in wave concurrently
50 tasks = [
51 executor(task_id, self.graph.graph.nodes[task_id])
52 for task_id in wave
53 ]
54
55 wave_results = await asyncio.gather(*tasks)
56
57 for task_id, result in zip(wave, wave_results):
58 results[task_id] = result
59 self.graph.complete_task(task_id)
60
61 return resultsVisualizing Task Dependencies
1def visualize_task_graph(graph: TaskGraph) -> str:
2 """Generate Mermaid diagram for task graph."""
3 lines = ["graph TD"]
4
5 for node in graph.graph.nodes():
6 data = graph.graph.nodes[node]
7 status_icon = "✅" if data["status"] == "completed" else "⏳"
8 lines.append(f' {node}["{status_icon} {data["description"]}"]')
9
10 for source, target in graph.graph.edges():
11 lines.append(f" {source} --> {target}")
12
13 return "\n".join(lines)
14
15# Example output:
16# graph TD
17# task_1["⏳ Research requirements"]
18# task_2["⏳ Design architecture"]
19# task_3["⏳ Implement backend"]
20# task_4["⏳ Implement frontend"]
21# task_5["⏳ Integration testing"]
22# task_1 --> task_2
23# task_2 --> task_3
24# task_2 --> task_4
25# task_3 --> task_5
26# task_4 --> task_5Building a Task Decomposer
Let's build a complete task decomposition system that combines LLM intelligence with structured execution:
1from anthropic import Anthropic
2from dataclasses import dataclass, field
3from typing import Any, Callable, Optional
4from enum import Enum
5import json
6import asyncio
7
8class TaskStatus(Enum):
9 PENDING = "pending"
10 IN_PROGRESS = "in_progress"
11 COMPLETED = "completed"
12 FAILED = "failed"
13 BLOCKED = "blocked"
14
15@dataclass
16class Task:
17 """A decomposed task unit."""
18 id: str
19 description: str
20 dependencies: list[str] = field(default_factory=list)
21 status: TaskStatus = TaskStatus.PENDING
22 result: Any = None
23 error: Optional[str] = None
24 metadata: dict[str, Any] = field(default_factory=dict)
25
26class TaskDecomposer:
27 """
28 Intelligent task decomposition system.
29
30 Combines LLM-based decomposition with structured
31 execution planning and dependency management.
32 """
33
34 def __init__(self, model: str = "claude-sonnet-4-20250514"):
35 self.client = Anthropic()
36 self.model = model
37 self.tasks: dict[str, Task] = {}
38 self.graph = TaskGraph()
39
40 async def decompose(
41 self,
42 goal: str,
43 context: str = "",
44 max_depth: int = 2
45 ) -> list[Task]:
46 """Decompose a goal into executable tasks."""
47
48 prompt = f"""Decompose this goal into concrete, executable subtasks.
49
50Goal: {goal}
51Context: {context}
52
53Requirements:
541. Each subtask should be completable in a single focused action
552. Include all necessary steps (don't skip obvious ones)
563. Identify dependencies between tasks
574. Estimate complexity: simple, moderate, complex
58
59Return JSON:
60{{
61 "tasks": [
62 {{
63 "id": "unique_id",
64 "description": "what to do",
65 "dependencies": ["id of prerequisite tasks"],
66 "complexity": "simple|moderate|complex",
67 "tools_hint": ["suggested tools or approaches"]
68 }}
69 ]
70}}"""
71
72 response = self.client.messages.create(
73 model=self.model,
74 max_tokens=4096,
75 messages=[{"role": "user", "content": prompt}]
76 )
77
78 result = json.loads(response.content[0].text)
79
80 tasks = []
81 for task_data in result["tasks"]:
82 task = Task(
83 id=task_data["id"],
84 description=task_data["description"],
85 dependencies=task_data.get("dependencies", []),
86 metadata={
87 "complexity": task_data.get("complexity", "moderate"),
88 "tools_hint": task_data.get("tools_hint", [])
89 }
90 )
91 tasks.append(task)
92 self.tasks[task.id] = task
93
94 # Add to graph
95 self.graph.add_task(
96 task.id,
97 task.description,
98 task.metadata
99 )
100
101 # Add dependencies to graph
102 for task in tasks:
103 for dep_id in task.dependencies:
104 if dep_id in self.tasks:
105 self.graph.add_dependency(task.id, dep_id)
106
107 # Recursively decompose complex tasks
108 if max_depth > 0:
109 complex_tasks = [
110 t for t in tasks
111 if t.metadata.get("complexity") == "complex"
112 ]
113
114 for task in complex_tasks:
115 subtasks = await self.decompose(
116 task.description,
117 context=f"Part of larger goal: {goal}",
118 max_depth=max_depth - 1
119 )
120
121 # Update dependencies
122 for subtask in subtasks:
123 subtask.dependencies.extend(task.dependencies)
124
125 # Remove original complex task
126 self.tasks.pop(task.id)
127 tasks.remove(task)
128 tasks.extend(subtasks)
129
130 return tasks
131
132 def get_ready_tasks(self) -> list[Task]:
133 """Get tasks ready for execution."""
134 ready_ids = self.graph.get_ready_tasks()
135 return [self.tasks[tid] for tid in ready_ids]
136
137 def get_execution_plan(self) -> list[list[Task]]:
138 """Get parallel execution waves."""
139 scheduler = ParallelScheduler(self.graph)
140 plan = scheduler.create_execution_waves()
141
142 return [
143 [self.tasks[tid] for tid in wave]
144 for wave in plan.waves
145 ]
146
147 async def execute(
148 self,
149 executor: Callable[[Task], Any],
150 parallel: bool = True
151 ) -> dict[str, Any]:
152 """Execute all tasks using provided executor."""
153
154 results = {}
155
156 if parallel:
157 plan = self.get_execution_plan()
158
159 for wave in plan:
160 # Execute wave in parallel
161 coros = [executor(task) for task in wave]
162 wave_results = await asyncio.gather(
163 *coros,
164 return_exceptions=True
165 )
166
167 for task, result in zip(wave, wave_results):
168 if isinstance(result, Exception):
169 task.status = TaskStatus.FAILED
170 task.error = str(result)
171 else:
172 task.status = TaskStatus.COMPLETED
173 task.result = result
174 self.graph.complete_task(task.id)
175
176 results[task.id] = result
177 else:
178 # Sequential execution
179 order = self.graph.get_execution_order()
180
181 for task_id in order:
182 task = self.tasks[task_id]
183 task.status = TaskStatus.IN_PROGRESS
184
185 try:
186 result = await executor(task)
187 task.status = TaskStatus.COMPLETED
188 task.result = result
189 self.graph.complete_task(task_id)
190 except Exception as e:
191 task.status = TaskStatus.FAILED
192 task.error = str(e)
193
194 results[task_id] = task.result or task.error
195
196 return results
197
198 def get_progress(self) -> dict[str, int]:
199 """Get execution progress summary."""
200 statuses = {}
201 for task in self.tasks.values():
202 status = task.status.value
203 statuses[status] = statuses.get(status, 0) + 1
204 return statusesUsage Example
1async def example_executor(task: Task) -> str:
2 """Example task executor that simulates work."""
3 print(f"Executing: {task.description}")
4 await asyncio.sleep(1) # Simulate work
5 return f"Completed: {task.description}"
6
7async def main():
8 decomposer = TaskDecomposer()
9
10 # Decompose a complex goal
11 goal = "Build a REST API for a todo application"
12 tasks = await decomposer.decompose(goal)
13
14 print("\nDecomposed Tasks:")
15 for task in tasks:
16 deps = ", ".join(task.dependencies) or "none"
17 print(f" [{task.id}] {task.description}")
18 print(f" Dependencies: {deps}")
19 print(f" Complexity: {task.metadata['complexity']}")
20
21 # Show execution plan
22 print("\nExecution Plan (parallel waves):")
23 plan = decomposer.get_execution_plan()
24 for i, wave in enumerate(plan):
25 wave_desc = ", ".join(t.id for t in wave)
26 print(f" Wave {i + 1}: {wave_desc}")
27
28 # Execute
29 print("\nExecuting...")
30 results = await decomposer.execute(example_executor)
31
32 print("\nResults:")
33 for task_id, result in results.items():
34 print(f" {task_id}: {result}")
35
36 print("\nProgress:")
37 print(decomposer.get_progress())
38
39# Run
40asyncio.run(main())Summary
Task decomposition is the first step in intelligent planning. We covered:
- Why decomposition matters: Reduces complexity, enables progress tracking, isolates errors, and allows parallelization
- Decomposition strategies: Functional, sequential, recursive, and goal-oriented approaches for different problem types
- LLM-based decomposition: Structured prompting, few-shot examples, and iterative refinement for quality plans
- Dependency graphs: Using DAGs to model task relationships and enable parallel execution
- Complete implementation: A TaskDecomposer class combining LLM intelligence with structured execution
In the next section, we'll explore hierarchical planning—how agents organize subtasks into multi-level abstractions for even more complex goals.