Chapter 10
18 min read
Section 59 of 175

Task Decomposition

Planning and Reasoning

Introduction

When faced with complex requests like "Plan a week-long trip to Japan including flights, hotels, and activities," humans naturally break this into smaller tasks: research destinations, find flights, book hotels, and plan daily activities. This cognitive process—task decomposition—is fundamental to how we solve complex problems.

For AI agents, task decomposition is equally essential. Without it, agents struggle with ambiguity, lose track of progress, and produce incomplete solutions. In this chapter, we'll explore how agents can break down complex goals into manageable, executable subtasks.

Core Insight: Task decomposition transforms overwhelming problems into structured action plans. It's not just organization—it's the foundation of intelligent planning.

Why Task Decomposition Matters

Consider an agent asked to "build a web application." Without decomposition, it might try to generate an entire application in one step—almost certainly failing. With proper decomposition, it breaks this into: define requirements, set up project structure, create database schema, build backend APIs, implement frontend, write tests, and deploy.

Benefits of Decomposition

BenefitDescriptionImpact
Reduced ComplexityEach subtask is simpler than the wholeHigher success rate per step
Progress TrackingClear milestones for monitoringBetter user experience
Error IsolationFailures contained to subtasksEasier debugging and recovery
ParallelizationIndependent tasks can run concurrentlyFaster overall execution
Resource EstimationPredict time and cost per subtaskAccurate project planning

The Decomposition Paradox

There's a tension in task decomposition: decompose too little and subtasks remain complex; decompose too much and you drown in overhead. Finding the right granularity is key.

🐍python
1# Too coarse - still complex
2tasks = ["Build the entire application"]
3
4# Too fine - excessive overhead
5tasks = [
6    "Open IDE",
7    "Create folder",
8    "Create file",
9    "Type 'import'",
10    "Type ' '",
11    "Type 'flask'",
12    # ... hundreds more
13]
14
15# Just right - actionable units
16tasks = [
17    "Set up Flask project structure",
18    "Create database models",
19    "Implement authentication API",
20    "Build user management endpoints",
21    "Create frontend components",
22    "Write integration tests"
23]

Decomposition Strategies

Different problems call for different decomposition approaches. Here are the primary strategies agents can employ:

1. Functional Decomposition

Break tasks by function or capability required. Each subtask uses a distinct skill or tool.

🐍python
1def functional_decomposition(goal: str) -> list[dict]:
2    """Decompose by required capabilities."""
3    return [
4        {
5            "task": "Research destination options",
6            "capability": "web_search",
7            "tools": ["search_api", "summarizer"]
8        },
9        {
10            "task": "Find flight options",
11            "capability": "flight_booking",
12            "tools": ["flight_api", "price_comparison"]
13        },
14        {
15            "task": "Book accommodation",
16            "capability": "hotel_booking",
17            "tools": ["hotel_api", "review_analyzer"]
18        },
19        {
20            "task": "Create daily itinerary",
21            "capability": "planning",
22            "tools": ["calendar", "map_api", "activity_search"]
23        }
24    ]

2. Sequential Decomposition

Break tasks into ordered steps where each step depends on the previous. This is natural for workflows with clear prerequisites.

🐍python
1def sequential_decomposition(goal: str) -> list[dict]:
2    """Decompose into ordered steps."""
3    return [
4        {
5            "step": 1,
6            "task": "Gather requirements",
7            "prerequisite": None
8        },
9        {
10            "step": 2,
11            "task": "Design system architecture",
12            "prerequisite": 1
13        },
14        {
15            "step": 3,
16            "task": "Implement core modules",
17            "prerequisite": 2
18        },
19        {
20            "step": 4,
21            "task": "Write tests",
22            "prerequisite": 3
23        },
24        {
25            "step": 5,
26            "task": "Deploy and monitor",
27            "prerequisite": 4
28        }
29    ]

3. Recursive Decomposition

Apply decomposition recursively until subtasks reach an executable granularity. This is powerful for deeply complex problems.

🐍python
1from dataclasses import dataclass, field
2from typing import Optional
3
4@dataclass
5class Task:
6    """Recursively decomposable task."""
7    description: str
8    subtasks: list['Task'] = field(default_factory=list)
9    is_atomic: bool = False
10    completed: bool = False
11
12    def decompose(self, decomposer) -> None:
13        """Recursively decompose until atomic."""
14        if self.is_atomic:
15            return
16
17        # Get subtasks from decomposer
18        self.subtasks = decomposer.decompose(self.description)
19
20        # Recursively decompose each subtask
21        for subtask in self.subtasks:
22            subtask.decompose(decomposer)
23
24    def get_next_task(self) -> Optional['Task']:
25        """Get next atomic task to execute."""
26        if self.is_atomic and not self.completed:
27            return self
28
29        for subtask in self.subtasks:
30            next_task = subtask.get_next_task()
31            if next_task:
32                return next_task
33
34        return None

4. Goal-Oriented Decomposition

Work backward from the desired outcome. Each subtask addresses a subgoal necessary for the final goal.

🐍python
1def goal_oriented_decomposition(final_goal: str) -> list[dict]:
2    """Decompose by working backward from goal."""
3
4    # Start from final state
5    goal_tree = {
6        "goal": "Have a successful product launch",
7        "requires": [
8            {
9                "subgoal": "Product is fully tested",
10                "requires": [
11                    {"subgoal": "Unit tests passing"},
12                    {"subgoal": "Integration tests passing"},
13                    {"subgoal": "User acceptance testing complete"}
14                ]
15            },
16            {
17                "subgoal": "Marketing materials ready",
18                "requires": [
19                    {"subgoal": "Landing page live"},
20                    {"subgoal": "Press release written"},
21                    {"subgoal": "Social media scheduled"}
22                ]
23            },
24            {
25                "subgoal": "Infrastructure prepared",
26                "requires": [
27                    {"subgoal": "Production servers scaled"},
28                    {"subgoal": "Monitoring in place"},
29                    {"subgoal": "Rollback plan documented"}
30                ]
31            }
32        ]
33    }
34
35    return flatten_goal_tree(goal_tree)
Choose your decomposition strategy based on the problem type: functional for capability-driven tasks, sequential for workflows, recursive for deep complexity, goal-oriented for outcome-focused planning.

LLM-Based Decomposition

Large language models excel at task decomposition. They understand context, can reason about dependencies, and generate human-readable plans. Here's how to leverage LLMs effectively:

Structured Decomposition Prompting

🐍python
1from anthropic import Anthropic
2from pydantic import BaseModel
3import json
4
5class SubTask(BaseModel):
6    """A single decomposed subtask."""
7    id: str
8    description: str
9    estimated_complexity: str  # "low", "medium", "high"
10    dependencies: list[str]  # IDs of prerequisite tasks
11    tools_needed: list[str]
12    success_criteria: str
13
14class DecompositionResult(BaseModel):
15    """Complete task decomposition."""
16    original_goal: str
17    subtasks: list[SubTask]
18    execution_order: list[str]  # Topologically sorted task IDs
19
20def decompose_with_llm(goal: str, context: str = "") -> DecompositionResult:
21    """Use LLM to decompose a complex goal."""
22
23    client = Anthropic()
24
25    prompt = f"""Decompose the following goal into executable subtasks.
26
27Goal: {goal}
28
29Context: {context}
30
31For each subtask, provide:
321. A unique ID (e.g., "task_1", "task_2")
332. Clear description of what to do
343. Estimated complexity (low/medium/high)
354. Dependencies (IDs of tasks that must complete first)
365. Tools or capabilities needed
376. Clear success criteria
38
39Return a JSON object with:
40- original_goal: the input goal
41- subtasks: array of subtask objects
42- execution_order: array of task IDs in dependency order
43
44Ensure subtasks are:
45- Atomic enough to be executed in a single action
46- Not so granular as to be trivial
47- Complete (covering all aspects of the goal)
48- Well-ordered (dependencies respected)"""
49
50    response = client.messages.create(
51        model="claude-sonnet-4-20250514",
52        max_tokens=4096,
53        messages=[{"role": "user", "content": prompt}]
54    )
55
56    # Parse JSON from response
57    result_json = json.loads(response.content[0].text)
58    return DecompositionResult(**result_json)

Few-Shot Decomposition

Providing examples improves decomposition quality significantly:

🐍python
1DECOMPOSITION_EXAMPLES = """
2Example 1:
3Goal: "Write a blog post about machine learning"
4Subtasks:
51. Research current ML trends and topics
62. Create outline with main points
73. Write introduction hook
84. Write body sections with examples
95. Write conclusion with call-to-action
106. Add relevant images and diagrams
117. Proofread and edit for clarity
12
13Example 2:
14Goal: "Set up a monitoring system"
15Subtasks:
161. Define key metrics to track
172. Choose monitoring tool (Prometheus, Datadog, etc.)
183. Install and configure monitoring agent
194. Set up dashboards for visualization
205. Configure alerting rules
216. Test alerts with synthetic issues
227. Document runbook for each alert
23"""
24
25def decompose_with_examples(goal: str) -> DecompositionResult:
26    """Decompose using few-shot learning."""
27
28    prompt = f"""{DECOMPOSITION_EXAMPLES}
29
30Now decompose this goal:
31Goal: "{goal}"
32
33Return subtasks in the same structured format."""
34
35    # Call LLM with examples...

Iterative Refinement

Complex tasks may need multiple decomposition passes:

🐍python
1async def iterative_decomposition(
2    goal: str,
3    max_iterations: int = 3,
4    complexity_threshold: str = "medium"
5) -> DecompositionResult:
6    """Iteratively refine decomposition until all tasks are simple enough."""
7
8    result = await decompose_with_llm(goal)
9
10    for iteration in range(max_iterations):
11        # Find tasks that are still too complex
12        complex_tasks = [
13            task for task in result.subtasks
14            if task.estimated_complexity == "high"
15        ]
16
17        if not complex_tasks:
18            break
19
20        # Further decompose complex tasks
21        for task in complex_tasks:
22            sub_result = await decompose_with_llm(
23                task.description,
24                context=f"This is part of: {goal}"
25            )
26
27            # Replace complex task with its subtasks
28            result = merge_decomposition(result, task.id, sub_result)
29
30    return result
LLM decomposition can hallucinate unrealistic subtasks. Always validate that tools mentioned actually exist and dependencies are logical.

Dependency Graphs

Real-world tasks have complex interdependencies. Representing these as directed graphs enables sophisticated scheduling and parallel execution.

Building Task Graphs

🐍python
1import networkx as nx
2from typing import Any
3
4class TaskGraph:
5    """Directed acyclic graph of tasks."""
6
7    def __init__(self):
8        self.graph = nx.DiGraph()
9
10    def add_task(
11        self,
12        task_id: str,
13        description: str,
14        metadata: dict[str, Any] = None
15    ) -> None:
16        """Add a task node."""
17        self.graph.add_node(
18            task_id,
19            description=description,
20            status="pending",
21            **(metadata or {})
22        )
23
24    def add_dependency(self, task_id: str, depends_on: str) -> None:
25        """Add dependency edge: depends_on -> task_id."""
26        if depends_on not in self.graph:
27            raise ValueError(f"Task {depends_on} not found")
28        if task_id not in self.graph:
29            raise ValueError(f"Task {task_id} not found")
30
31        self.graph.add_edge(depends_on, task_id)
32
33        # Validate no cycles
34        if not nx.is_directed_acyclic_graph(self.graph):
35            self.graph.remove_edge(depends_on, task_id)
36            raise ValueError(
37                f"Adding dependency would create cycle: "
38                f"{depends_on} -> {task_id}"
39            )
40
41    def get_ready_tasks(self) -> list[str]:
42        """Get tasks whose dependencies are all completed."""
43        ready = []
44        for node in self.graph.nodes():
45            if self.graph.nodes[node]["status"] != "pending":
46                continue
47
48            # Check all predecessors are completed
49            predecessors = list(self.graph.predecessors(node))
50            all_done = all(
51                self.graph.nodes[p]["status"] == "completed"
52                for p in predecessors
53            )
54
55            if all_done:
56                ready.append(node)
57
58        return ready
59
60    def complete_task(self, task_id: str) -> None:
61        """Mark task as completed."""
62        self.graph.nodes[task_id]["status"] = "completed"
63
64    def get_execution_order(self) -> list[str]:
65        """Get topologically sorted execution order."""
66        return list(nx.topological_sort(self.graph))
67
68    def get_critical_path(self) -> list[str]:
69        """Find the longest path (critical path for timing)."""
70        return nx.dag_longest_path(self.graph)

Parallel Execution Planning

🐍python
1import asyncio
2from dataclasses import dataclass
3
4@dataclass
5class ExecutionPlan:
6    """Plan for parallel task execution."""
7    waves: list[list[str]]  # Each wave can run in parallel
8
9class ParallelScheduler:
10    """Schedule tasks for parallel execution."""
11
12    def __init__(self, task_graph: TaskGraph):
13        self.graph = task_graph
14
15    def create_execution_waves(self) -> ExecutionPlan:
16        """Group tasks into parallelizable waves."""
17        waves = []
18        completed = set()
19        remaining = set(self.graph.graph.nodes())
20
21        while remaining:
22            # Find all tasks ready to execute
23            wave = []
24            for task_id in remaining:
25                predecessors = set(self.graph.graph.predecessors(task_id))
26                if predecessors.issubset(completed):
27                    wave.append(task_id)
28
29            if not wave:
30                raise ValueError("Circular dependency detected")
31
32            waves.append(wave)
33            completed.update(wave)
34            remaining -= set(wave)
35
36        return ExecutionPlan(waves=waves)
37
38    async def execute_plan(
39        self,
40        plan: ExecutionPlan,
41        executor: callable
42    ) -> dict[str, Any]:
43        """Execute plan with parallel waves."""
44        results = {}
45
46        for wave_num, wave in enumerate(plan.waves):
47            print(f"Executing wave {wave_num + 1}: {wave}")
48
49            # Run all tasks in wave concurrently
50            tasks = [
51                executor(task_id, self.graph.graph.nodes[task_id])
52                for task_id in wave
53            ]
54
55            wave_results = await asyncio.gather(*tasks)
56
57            for task_id, result in zip(wave, wave_results):
58                results[task_id] = result
59                self.graph.complete_task(task_id)
60
61        return results

Visualizing Task Dependencies

🐍python
1def visualize_task_graph(graph: TaskGraph) -> str:
2    """Generate Mermaid diagram for task graph."""
3    lines = ["graph TD"]
4
5    for node in graph.graph.nodes():
6        data = graph.graph.nodes[node]
7        status_icon = "✅" if data["status"] == "completed" else "⏳"
8        lines.append(f'    {node}["{status_icon} {data["description"]}"]')
9
10    for source, target in graph.graph.edges():
11        lines.append(f"    {source} --> {target}")
12
13    return "\n".join(lines)
14
15# Example output:
16# graph TD
17#     task_1["⏳ Research requirements"]
18#     task_2["⏳ Design architecture"]
19#     task_3["⏳ Implement backend"]
20#     task_4["⏳ Implement frontend"]
21#     task_5["⏳ Integration testing"]
22#     task_1 --> task_2
23#     task_2 --> task_3
24#     task_2 --> task_4
25#     task_3 --> task_5
26#     task_4 --> task_5

Building a Task Decomposer

Let's build a complete task decomposition system that combines LLM intelligence with structured execution:

🐍python
1from anthropic import Anthropic
2from dataclasses import dataclass, field
3from typing import Any, Callable, Optional
4from enum import Enum
5import json
6import asyncio
7
8class TaskStatus(Enum):
9    PENDING = "pending"
10    IN_PROGRESS = "in_progress"
11    COMPLETED = "completed"
12    FAILED = "failed"
13    BLOCKED = "blocked"
14
15@dataclass
16class Task:
17    """A decomposed task unit."""
18    id: str
19    description: str
20    dependencies: list[str] = field(default_factory=list)
21    status: TaskStatus = TaskStatus.PENDING
22    result: Any = None
23    error: Optional[str] = None
24    metadata: dict[str, Any] = field(default_factory=dict)
25
26class TaskDecomposer:
27    """
28    Intelligent task decomposition system.
29
30    Combines LLM-based decomposition with structured
31    execution planning and dependency management.
32    """
33
34    def __init__(self, model: str = "claude-sonnet-4-20250514"):
35        self.client = Anthropic()
36        self.model = model
37        self.tasks: dict[str, Task] = {}
38        self.graph = TaskGraph()
39
40    async def decompose(
41        self,
42        goal: str,
43        context: str = "",
44        max_depth: int = 2
45    ) -> list[Task]:
46        """Decompose a goal into executable tasks."""
47
48        prompt = f"""Decompose this goal into concrete, executable subtasks.
49
50Goal: {goal}
51Context: {context}
52
53Requirements:
541. Each subtask should be completable in a single focused action
552. Include all necessary steps (don't skip obvious ones)
563. Identify dependencies between tasks
574. Estimate complexity: simple, moderate, complex
58
59Return JSON:
60{{
61    "tasks": [
62        {{
63            "id": "unique_id",
64            "description": "what to do",
65            "dependencies": ["id of prerequisite tasks"],
66            "complexity": "simple|moderate|complex",
67            "tools_hint": ["suggested tools or approaches"]
68        }}
69    ]
70}}"""
71
72        response = self.client.messages.create(
73            model=self.model,
74            max_tokens=4096,
75            messages=[{"role": "user", "content": prompt}]
76        )
77
78        result = json.loads(response.content[0].text)
79
80        tasks = []
81        for task_data in result["tasks"]:
82            task = Task(
83                id=task_data["id"],
84                description=task_data["description"],
85                dependencies=task_data.get("dependencies", []),
86                metadata={
87                    "complexity": task_data.get("complexity", "moderate"),
88                    "tools_hint": task_data.get("tools_hint", [])
89                }
90            )
91            tasks.append(task)
92            self.tasks[task.id] = task
93
94            # Add to graph
95            self.graph.add_task(
96                task.id,
97                task.description,
98                task.metadata
99            )
100
101        # Add dependencies to graph
102        for task in tasks:
103            for dep_id in task.dependencies:
104                if dep_id in self.tasks:
105                    self.graph.add_dependency(task.id, dep_id)
106
107        # Recursively decompose complex tasks
108        if max_depth > 0:
109            complex_tasks = [
110                t for t in tasks
111                if t.metadata.get("complexity") == "complex"
112            ]
113
114            for task in complex_tasks:
115                subtasks = await self.decompose(
116                    task.description,
117                    context=f"Part of larger goal: {goal}",
118                    max_depth=max_depth - 1
119                )
120
121                # Update dependencies
122                for subtask in subtasks:
123                    subtask.dependencies.extend(task.dependencies)
124
125                # Remove original complex task
126                self.tasks.pop(task.id)
127                tasks.remove(task)
128                tasks.extend(subtasks)
129
130        return tasks
131
132    def get_ready_tasks(self) -> list[Task]:
133        """Get tasks ready for execution."""
134        ready_ids = self.graph.get_ready_tasks()
135        return [self.tasks[tid] for tid in ready_ids]
136
137    def get_execution_plan(self) -> list[list[Task]]:
138        """Get parallel execution waves."""
139        scheduler = ParallelScheduler(self.graph)
140        plan = scheduler.create_execution_waves()
141
142        return [
143            [self.tasks[tid] for tid in wave]
144            for wave in plan.waves
145        ]
146
147    async def execute(
148        self,
149        executor: Callable[[Task], Any],
150        parallel: bool = True
151    ) -> dict[str, Any]:
152        """Execute all tasks using provided executor."""
153
154        results = {}
155
156        if parallel:
157            plan = self.get_execution_plan()
158
159            for wave in plan:
160                # Execute wave in parallel
161                coros = [executor(task) for task in wave]
162                wave_results = await asyncio.gather(
163                    *coros,
164                    return_exceptions=True
165                )
166
167                for task, result in zip(wave, wave_results):
168                    if isinstance(result, Exception):
169                        task.status = TaskStatus.FAILED
170                        task.error = str(result)
171                    else:
172                        task.status = TaskStatus.COMPLETED
173                        task.result = result
174                        self.graph.complete_task(task.id)
175
176                    results[task.id] = result
177        else:
178            # Sequential execution
179            order = self.graph.get_execution_order()
180
181            for task_id in order:
182                task = self.tasks[task_id]
183                task.status = TaskStatus.IN_PROGRESS
184
185                try:
186                    result = await executor(task)
187                    task.status = TaskStatus.COMPLETED
188                    task.result = result
189                    self.graph.complete_task(task_id)
190                except Exception as e:
191                    task.status = TaskStatus.FAILED
192                    task.error = str(e)
193
194                results[task_id] = task.result or task.error
195
196        return results
197
198    def get_progress(self) -> dict[str, int]:
199        """Get execution progress summary."""
200        statuses = {}
201        for task in self.tasks.values():
202            status = task.status.value
203            statuses[status] = statuses.get(status, 0) + 1
204        return statuses

Usage Example

🐍python
1async def example_executor(task: Task) -> str:
2    """Example task executor that simulates work."""
3    print(f"Executing: {task.description}")
4    await asyncio.sleep(1)  # Simulate work
5    return f"Completed: {task.description}"
6
7async def main():
8    decomposer = TaskDecomposer()
9
10    # Decompose a complex goal
11    goal = "Build a REST API for a todo application"
12    tasks = await decomposer.decompose(goal)
13
14    print("\nDecomposed Tasks:")
15    for task in tasks:
16        deps = ", ".join(task.dependencies) or "none"
17        print(f"  [{task.id}] {task.description}")
18        print(f"      Dependencies: {deps}")
19        print(f"      Complexity: {task.metadata['complexity']}")
20
21    # Show execution plan
22    print("\nExecution Plan (parallel waves):")
23    plan = decomposer.get_execution_plan()
24    for i, wave in enumerate(plan):
25        wave_desc = ", ".join(t.id for t in wave)
26        print(f"  Wave {i + 1}: {wave_desc}")
27
28    # Execute
29    print("\nExecuting...")
30    results = await decomposer.execute(example_executor)
31
32    print("\nResults:")
33    for task_id, result in results.items():
34        print(f"  {task_id}: {result}")
35
36    print("\nProgress:")
37    print(decomposer.get_progress())
38
39# Run
40asyncio.run(main())
This decomposer integrates with the agent patterns from Chapter 6. The executor function can use ReAct loops, tool calls, or any other execution strategy.

Summary

Task decomposition is the first step in intelligent planning. We covered:

  • Why decomposition matters: Reduces complexity, enables progress tracking, isolates errors, and allows parallelization
  • Decomposition strategies: Functional, sequential, recursive, and goal-oriented approaches for different problem types
  • LLM-based decomposition: Structured prompting, few-shot examples, and iterative refinement for quality plans
  • Dependency graphs: Using DAGs to model task relationships and enable parallel execution
  • Complete implementation: A TaskDecomposer class combining LLM intelligence with structured execution

In the next section, we'll explore hierarchical planning—how agents organize subtasks into multi-level abstractions for even more complex goals.