Introduction
Complex goals must be broken down into manageable subtasks for autonomous agents to execute effectively. Goal decomposition is how agents translate high-level objectives into concrete, actionable steps.
Section Overview: We'll explore decomposition strategies, task tree structures, dynamic replanning, and priority management for autonomous agents.
Decomposition Strategies
Top-Down Decomposition
🐍python
1from langchain_openai import ChatOpenAI
2from langchain_core.messages import SystemMessage, HumanMessage
3from pydantic import BaseModel, Field
4from typing import List, Optional
5import json
6
7
8class Task(BaseModel):
9 """A task in the decomposition tree."""
10 id: str
11 description: str
12 subtasks: List["Task"] = []
13 dependencies: List[str] = []
14 status: str = "pending"
15 priority: int = 1
16
17
18class TopDownDecomposer:
19 """Decomposes goals using top-down strategy."""
20
21 def __init__(self, max_depth: int = 3):
22 self.llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
23 self.max_depth = max_depth
24
25 def decompose(self, goal: str) -> Task:
26 """Decompose a goal into a task tree."""
27 root = Task(
28 id="root",
29 description=goal,
30 priority=10
31 )
32
33 self._decompose_recursive(root, depth=0)
34 return root
35
36 def _decompose_recursive(self, task: Task, depth: int):
37 """Recursively decompose a task."""
38 if depth >= self.max_depth:
39 return
40
41 # Check if task is already atomic
42 if self._is_atomic(task):
43 return
44
45 # Decompose into subtasks
46 subtasks = self._generate_subtasks(task)
47
48 for i, subtask in enumerate(subtasks):
49 child = Task(
50 id=f"{task.id}_{i}",
51 description=subtask["description"],
52 dependencies=subtask.get("dependencies", []),
53 priority=len(subtasks) - i # Higher priority for earlier tasks
54 )
55 task.subtasks.append(child)
56 self._decompose_recursive(child, depth + 1)
57
58 def _is_atomic(self, task: Task) -> bool:
59 """Check if task is atomic (can't be further decomposed)."""
60 prompt = f"""
61Is this task atomic (can be done in a single action)?
62Task: {task.description}
63
64Answer 'yes' or 'no'.
65"""
66 messages = [
67 SystemMessage(content="Determine if a task is atomic."),
68 HumanMessage(content=prompt)
69 ]
70
71 response = self.llm.invoke(messages)
72 return "yes" in response.content.lower()
73
74 def _generate_subtasks(self, task: Task) -> List[dict]:
75 """Generate subtasks for a task."""
76 prompt = f"""
77Break down this task into 2-5 subtasks:
78Task: {task.description}
79
80Output as JSON list:
81[
82 {{"description": "subtask 1", "dependencies": []}},
83 {{"description": "subtask 2", "dependencies": ["subtask 1"]}},
84 ...
85]
86"""
87 messages = [
88 SystemMessage(content="Decompose tasks into subtasks."),
89 HumanMessage(content=prompt)
90 ]
91
92 response = self.llm.invoke(messages)
93 try:
94 return json.loads(response.content)
95 except json.JSONDecodeError:
96 return []
97
98
99# Usage
100decomposer = TopDownDecomposer(max_depth=3)
101task_tree = decomposer.decompose("Build a market research report on AI agents")
102print(json.dumps(task_tree.model_dump(), indent=2))Bottom-Up Decomposition
🐍python
1class BottomUpDecomposer:
2 """Decomposes by identifying atomic actions first."""
3
4 def __init__(self):
5 self.llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
6
7 def decompose(self, goal: str) -> List[Task]:
8 """Decompose goal by identifying atomic actions."""
9
10 # Step 1: Identify all atomic actions needed
11 atomic_actions = self._identify_atomic_actions(goal)
12
13 # Step 2: Group related actions
14 groups = self._group_actions(atomic_actions)
15
16 # Step 3: Build task hierarchy
17 tasks = self._build_hierarchy(groups, goal)
18
19 return tasks
20
21 def _identify_atomic_actions(self, goal: str) -> List[str]:
22 """Identify all atomic actions needed for the goal."""
23 prompt = f"""
24List all the atomic (single-step) actions needed to achieve this goal:
25Goal: {goal}
26
27Output as JSON list of action descriptions.
28"""
29 messages = [
30 SystemMessage(content="Identify atomic actions for a goal."),
31 HumanMessage(content=prompt)
32 ]
33
34 response = self.llm.invoke(messages)
35 try:
36 return json.loads(response.content)
37 except json.JSONDecodeError:
38 return []
39
40 def _group_actions(self, actions: List[str]) -> dict:
41 """Group related actions together."""
42 prompt = f"""
43Group these actions into logical categories:
44Actions: {json.dumps(actions)}
45
46Output as JSON object with category names as keys and action lists as values.
47"""
48 messages = [
49 SystemMessage(content="Group related actions."),
50 HumanMessage(content=prompt)
51 ]
52
53 response = self.llm.invoke(messages)
54 try:
55 return json.loads(response.content)
56 except json.JSONDecodeError:
57 return {"default": actions}
58
59 def _build_hierarchy(self, groups: dict, goal: str) -> List[Task]:
60 """Build task hierarchy from groups."""
61 tasks = []
62 task_id = 0
63
64 for category, actions in groups.items():
65 parent = Task(
66 id=f"task_{task_id}",
67 description=category,
68 priority=len(groups) - len(tasks)
69 )
70 task_id += 1
71
72 for action in actions:
73 child = Task(
74 id=f"task_{task_id}",
75 description=action,
76 priority=len(actions) - len(parent.subtasks)
77 )
78 parent.subtasks.append(child)
79 task_id += 1
80
81 tasks.append(parent)
82
83 return tasksTask Trees
Task Tree Structure
🐍python
1from dataclasses import dataclass, field
2from typing import List, Optional, Callable
3from enum import Enum
4
5
6class TaskStatus(Enum):
7 PENDING = "pending"
8 IN_PROGRESS = "in_progress"
9 COMPLETED = "completed"
10 FAILED = "failed"
11 BLOCKED = "blocked"
12
13
14@dataclass
15class TaskNode:
16 """Node in a task tree."""
17 id: str
18 description: str
19 parent_id: Optional[str] = None
20 children: List["TaskNode"] = field(default_factory=list)
21 dependencies: List[str] = field(default_factory=list)
22 status: TaskStatus = TaskStatus.PENDING
23 priority: int = 1
24 result: Optional[str] = None
25 attempts: int = 0
26 max_attempts: int = 3
27
28
29class TaskTree:
30 """Hierarchical task management."""
31
32 def __init__(self, root_goal: str):
33 self.root = TaskNode(
34 id="root",
35 description=root_goal,
36 priority=10
37 )
38 self.nodes: dict[str, TaskNode] = {"root": self.root}
39
40 def add_subtask(
41 self,
42 parent_id: str,
43 description: str,
44 dependencies: List[str] = None
45 ) -> TaskNode:
46 """Add a subtask to a parent."""
47 parent = self.nodes.get(parent_id)
48 if not parent:
49 raise ValueError(f"Parent {parent_id} not found")
50
51 task_id = f"{parent_id}_{len(parent.children)}"
52 task = TaskNode(
53 id=task_id,
54 description=description,
55 parent_id=parent_id,
56 dependencies=dependencies or [],
57 priority=len(parent.children) + 1
58 )
59
60 parent.children.append(task)
61 self.nodes[task_id] = task
62 return task
63
64 def get_next_task(self) -> Optional[TaskNode]:
65 """Get the next executable task."""
66 candidates = []
67
68 for node in self.nodes.values():
69 if node.status != TaskStatus.PENDING:
70 continue
71
72 # Check if dependencies are met
73 if self._dependencies_met(node):
74 candidates.append(node)
75
76 if not candidates:
77 return None
78
79 # Return highest priority task
80 return max(candidates, key=lambda t: t.priority)
81
82 def _dependencies_met(self, task: TaskNode) -> bool:
83 """Check if all dependencies are completed."""
84 for dep_id in task.dependencies:
85 dep = self.nodes.get(dep_id)
86 if not dep or dep.status != TaskStatus.COMPLETED:
87 return False
88 return True
89
90 def complete_task(self, task_id: str, result: str):
91 """Mark a task as complete."""
92 task = self.nodes.get(task_id)
93 if task:
94 task.status = TaskStatus.COMPLETED
95 task.result = result
96
97 # Check if parent can be completed
98 parent = self.nodes.get(task.parent_id) if task.parent_id else None
99 if parent and all(
100 c.status == TaskStatus.COMPLETED for c in parent.children
101 ):
102 parent.status = TaskStatus.COMPLETED
103
104 def fail_task(self, task_id: str, error: str):
105 """Mark a task as failed."""
106 task = self.nodes.get(task_id)
107 if task:
108 task.attempts += 1
109 if task.attempts >= task.max_attempts:
110 task.status = TaskStatus.FAILED
111 task.result = f"Failed: {error}"
112 else:
113 task.status = TaskStatus.PENDING # Can retry
114
115 def get_progress(self) -> dict:
116 """Get overall progress."""
117 total = len(self.nodes)
118 completed = sum(
119 1 for n in self.nodes.values()
120 if n.status == TaskStatus.COMPLETED
121 )
122 failed = sum(
123 1 for n in self.nodes.values()
124 if n.status == TaskStatus.FAILED
125 )
126
127 return {
128 "total": total,
129 "completed": completed,
130 "failed": failed,
131 "progress": completed / total if total > 0 else 0
132 }Dynamic Replanning
Adaptive Task Management
🐍python
1class DynamicPlanner:
2 """Dynamically adjust plans based on execution results."""
3
4 def __init__(self, task_tree: TaskTree):
5 self.tree = task_tree
6 self.llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
7
8 def replan_on_failure(self, failed_task: TaskNode, error: str):
9 """Generate alternative approach after failure."""
10 prompt = f"""
11A task has failed:
12Task: {failed_task.description}
13Error: {error}
14Attempts: {failed_task.attempts}/{failed_task.max_attempts}
15
16Overall goal: {self.tree.root.description}
17
18Generate an alternative approach. Output as JSON:
19{{
20 "should_retry": true/false,
21 "modified_approach": "new approach description",
22 "alternative_subtasks": ["task1", "task2"]
23}}
24"""
25 messages = [
26 SystemMessage(content="Replan after task failure."),
27 HumanMessage(content=prompt)
28 ]
29
30 response = self.llm.invoke(messages)
31 plan = json.loads(response.content)
32
33 if plan["should_retry"]:
34 # Modify the failed task
35 failed_task.description = plan["modified_approach"]
36 failed_task.status = TaskStatus.PENDING
37 else:
38 # Add alternative subtasks
39 for subtask_desc in plan.get("alternative_subtasks", []):
40 self.tree.add_subtask(
41 failed_task.parent_id or "root",
42 subtask_desc
43 )
44 failed_task.status = TaskStatus.FAILED
45
46 def adapt_to_discovery(self, discovery: str, current_task: TaskNode):
47 """Adapt plan based on new information."""
48 prompt = f"""
49New information discovered during execution:
50Discovery: {discovery}
51
52Current task: {current_task.description}
53Overall goal: {self.tree.root.description}
54
55Should the plan be adjusted? Output as JSON:
56{{
57 "needs_adjustment": true/false,
58 "new_tasks": ["description1", "description2"],
59 "tasks_to_skip": ["task_id1"],
60 "priority_changes": {{"task_id": new_priority}}
61}}
62"""
63 messages = [
64 SystemMessage(content="Adapt plan to new information."),
65 HumanMessage(content=prompt)
66 ]
67
68 response = self.llm.invoke(messages)
69 adjustment = json.loads(response.content)
70
71 if adjustment["needs_adjustment"]:
72 # Add new tasks
73 for task_desc in adjustment.get("new_tasks", []):
74 self.tree.add_subtask(
75 current_task.parent_id or "root",
76 task_desc
77 )
78
79 # Update priorities
80 for task_id, priority in adjustment.get("priority_changes", {}).items():
81 if task_id in self.tree.nodes:
82 self.tree.nodes[task_id].priority = priority
83
84 def optimize_remaining_plan(self):
85 """Optimize the remaining tasks based on progress."""
86 pending_tasks = [
87 n for n in self.tree.nodes.values()
88 if n.status == TaskStatus.PENDING
89 ]
90
91 completed_results = [
92 n.result for n in self.tree.nodes.values()
93 if n.status == TaskStatus.COMPLETED and n.result
94 ]
95
96 prompt = f"""
97Optimize the remaining tasks based on what's been learned.
98
99Remaining tasks:
100{json.dumps([t.description for t in pending_tasks])}
101
102Results so far:
103{json.dumps(completed_results[:5])}
104
105Output optimizations as JSON:
106{{
107 "can_skip": ["task descriptions that are no longer needed"],
108 "can_merge": [["task1", "task2"]],
109 "new_order": ["task1", "task2", "task3"]
110}}
111"""
112 messages = [
113 SystemMessage(content="Optimize remaining plan."),
114 HumanMessage(content=prompt)
115 ]
116
117 response = self.llm.invoke(messages)
118 return json.loads(response.content)Priority Management
Priority Queue System
🐍python
1import heapq
2from dataclasses import dataclass
3from typing import Any
4
5
6@dataclass(order=True)
7class PrioritizedTask:
8 """Task with priority for queue management."""
9 priority: int # Lower number = higher priority
10 task: Any = field(compare=False)
11
12
13class TaskPriorityQueue:
14 """Priority queue for task management."""
15
16 def __init__(self):
17 self.heap: List[PrioritizedTask] = []
18 self.task_map: dict[str, PrioritizedTask] = {}
19
20 def add(self, task: TaskNode):
21 """Add task to queue."""
22 # Negate priority for max-heap behavior
23 entry = PrioritizedTask(-task.priority, task)
24 heapq.heappush(self.heap, entry)
25 self.task_map[task.id] = entry
26
27 def pop(self) -> Optional[TaskNode]:
28 """Get highest priority task."""
29 while self.heap:
30 entry = heapq.heappop(self.heap)
31 if entry.task.id in self.task_map:
32 del self.task_map[entry.task.id]
33 return entry.task
34 return None
35
36 def update_priority(self, task_id: str, new_priority: int):
37 """Update a task's priority."""
38 if task_id in self.task_map:
39 old_entry = self.task_map[task_id]
40 task = old_entry.task
41 task.priority = new_priority
42 del self.task_map[task_id]
43 self.add(task)
44
45 def peek(self) -> Optional[TaskNode]:
46 """See highest priority task without removing."""
47 if self.heap:
48 return self.heap[0].task
49 return None
50
51
52class PriorityManager:
53 """Manage task priorities dynamically."""
54
55 def __init__(self):
56 self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
57
58 def calculate_priority(
59 self,
60 task: TaskNode,
61 context: dict
62 ) -> int:
63 """Calculate priority based on multiple factors."""
64
65 factors = {
66 "goal_alignment": self._score_goal_alignment(task, context),
67 "dependency_urgency": self._score_dependency_urgency(task, context),
68 "effort_estimate": self._score_effort(task),
69 "risk_level": self._score_risk(task)
70 }
71
72 # Weighted combination
73 weights = {
74 "goal_alignment": 0.4,
75 "dependency_urgency": 0.3,
76 "effort_estimate": 0.2,
77 "risk_level": 0.1
78 }
79
80 score = sum(
81 factors[k] * weights[k]
82 for k in factors
83 )
84
85 return int(score * 10) # Scale to 0-10
86
87 def _score_goal_alignment(self, task: TaskNode, context: dict) -> float:
88 """Score how well task aligns with main goal."""
89 # Could use LLM for semantic similarity
90 return 0.8 # Placeholder
91
92 def _score_dependency_urgency(self, task: TaskNode, context: dict) -> float:
93 """Score based on how many tasks depend on this one."""
94 dependents = context.get("dependents", {}).get(task.id, [])
95 return min(len(dependents) / 5.0, 1.0)
96
97 def _score_effort(self, task: TaskNode) -> float:
98 """Score based on estimated effort (lower effort = higher score)."""
99 return 0.6 # Placeholder
100
101 def _score_risk(self, task: TaskNode) -> float:
102 """Score based on risk (lower risk = higher score)."""
103 if task.attempts > 0:
104 return 0.4 # Previously failed, higher risk
105 return 0.8Key Takeaways
- Top-down decomposition breaks goals into subtasks recursively until atomic actions are reached.
- Bottom-up decomposition identifies atomic actions first and groups them into a hierarchy.
- Task trees manage hierarchical dependencies and track completion status.
- Dynamic replanning adapts to failures and new discoveries during execution.
- Priority management ensures the most important tasks are executed first.
Next Section Preview: We'll explore iterative refinement - how agents improve their outputs through self-critique and revision.