Chapter 17
15 min read
Section 106 of 175

Goal Decomposition

AutoGPT and Autonomous Agents

Introduction

Complex goals must be broken down into manageable subtasks for autonomous agents to execute effectively. Goal decomposition is how agents translate high-level objectives into concrete, actionable steps.

Section Overview: We'll explore decomposition strategies, task tree structures, dynamic replanning, and priority management for autonomous agents.

Decomposition Strategies

Top-Down Decomposition

🐍python
1from langchain_openai import ChatOpenAI
2from langchain_core.messages import SystemMessage, HumanMessage
3from pydantic import BaseModel, Field
4from typing import List, Optional
5import json
6
7
8class Task(BaseModel):
9    """A task in the decomposition tree."""
10    id: str
11    description: str
12    subtasks: List["Task"] = []
13    dependencies: List[str] = []
14    status: str = "pending"
15    priority: int = 1
16
17
18class TopDownDecomposer:
19    """Decomposes goals using top-down strategy."""
20
21    def __init__(self, max_depth: int = 3):
22        self.llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
23        self.max_depth = max_depth
24
25    def decompose(self, goal: str) -> Task:
26        """Decompose a goal into a task tree."""
27        root = Task(
28            id="root",
29            description=goal,
30            priority=10
31        )
32
33        self._decompose_recursive(root, depth=0)
34        return root
35
36    def _decompose_recursive(self, task: Task, depth: int):
37        """Recursively decompose a task."""
38        if depth >= self.max_depth:
39            return
40
41        # Check if task is already atomic
42        if self._is_atomic(task):
43            return
44
45        # Decompose into subtasks
46        subtasks = self._generate_subtasks(task)
47
48        for i, subtask in enumerate(subtasks):
49            child = Task(
50                id=f"{task.id}_{i}",
51                description=subtask["description"],
52                dependencies=subtask.get("dependencies", []),
53                priority=len(subtasks) - i  # Higher priority for earlier tasks
54            )
55            task.subtasks.append(child)
56            self._decompose_recursive(child, depth + 1)
57
58    def _is_atomic(self, task: Task) -> bool:
59        """Check if task is atomic (can't be further decomposed)."""
60        prompt = f"""
61Is this task atomic (can be done in a single action)?
62Task: {task.description}
63
64Answer 'yes' or 'no'.
65"""
66        messages = [
67            SystemMessage(content="Determine if a task is atomic."),
68            HumanMessage(content=prompt)
69        ]
70
71        response = self.llm.invoke(messages)
72        return "yes" in response.content.lower()
73
74    def _generate_subtasks(self, task: Task) -> List[dict]:
75        """Generate subtasks for a task."""
76        prompt = f"""
77Break down this task into 2-5 subtasks:
78Task: {task.description}
79
80Output as JSON list:
81[
82    {{"description": "subtask 1", "dependencies": []}},
83    {{"description": "subtask 2", "dependencies": ["subtask 1"]}},
84    ...
85]
86"""
87        messages = [
88            SystemMessage(content="Decompose tasks into subtasks."),
89            HumanMessage(content=prompt)
90        ]
91
92        response = self.llm.invoke(messages)
93        try:
94            return json.loads(response.content)
95        except json.JSONDecodeError:
96            return []
97
98
99# Usage
100decomposer = TopDownDecomposer(max_depth=3)
101task_tree = decomposer.decompose("Build a market research report on AI agents")
102print(json.dumps(task_tree.model_dump(), indent=2))

Bottom-Up Decomposition

🐍python
1class BottomUpDecomposer:
2    """Decomposes by identifying atomic actions first."""
3
4    def __init__(self):
5        self.llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
6
7    def decompose(self, goal: str) -> List[Task]:
8        """Decompose goal by identifying atomic actions."""
9
10        # Step 1: Identify all atomic actions needed
11        atomic_actions = self._identify_atomic_actions(goal)
12
13        # Step 2: Group related actions
14        groups = self._group_actions(atomic_actions)
15
16        # Step 3: Build task hierarchy
17        tasks = self._build_hierarchy(groups, goal)
18
19        return tasks
20
21    def _identify_atomic_actions(self, goal: str) -> List[str]:
22        """Identify all atomic actions needed for the goal."""
23        prompt = f"""
24List all the atomic (single-step) actions needed to achieve this goal:
25Goal: {goal}
26
27Output as JSON list of action descriptions.
28"""
29        messages = [
30            SystemMessage(content="Identify atomic actions for a goal."),
31            HumanMessage(content=prompt)
32        ]
33
34        response = self.llm.invoke(messages)
35        try:
36            return json.loads(response.content)
37        except json.JSONDecodeError:
38            return []
39
40    def _group_actions(self, actions: List[str]) -> dict:
41        """Group related actions together."""
42        prompt = f"""
43Group these actions into logical categories:
44Actions: {json.dumps(actions)}
45
46Output as JSON object with category names as keys and action lists as values.
47"""
48        messages = [
49            SystemMessage(content="Group related actions."),
50            HumanMessage(content=prompt)
51        ]
52
53        response = self.llm.invoke(messages)
54        try:
55            return json.loads(response.content)
56        except json.JSONDecodeError:
57            return {"default": actions}
58
59    def _build_hierarchy(self, groups: dict, goal: str) -> List[Task]:
60        """Build task hierarchy from groups."""
61        tasks = []
62        task_id = 0
63
64        for category, actions in groups.items():
65            parent = Task(
66                id=f"task_{task_id}",
67                description=category,
68                priority=len(groups) - len(tasks)
69            )
70            task_id += 1
71
72            for action in actions:
73                child = Task(
74                    id=f"task_{task_id}",
75                    description=action,
76                    priority=len(actions) - len(parent.subtasks)
77                )
78                parent.subtasks.append(child)
79                task_id += 1
80
81            tasks.append(parent)
82
83        return tasks

Task Trees

Task Tree Structure

🐍python
1from dataclasses import dataclass, field
2from typing import List, Optional, Callable
3from enum import Enum
4
5
6class TaskStatus(Enum):
7    PENDING = "pending"
8    IN_PROGRESS = "in_progress"
9    COMPLETED = "completed"
10    FAILED = "failed"
11    BLOCKED = "blocked"
12
13
14@dataclass
15class TaskNode:
16    """Node in a task tree."""
17    id: str
18    description: str
19    parent_id: Optional[str] = None
20    children: List["TaskNode"] = field(default_factory=list)
21    dependencies: List[str] = field(default_factory=list)
22    status: TaskStatus = TaskStatus.PENDING
23    priority: int = 1
24    result: Optional[str] = None
25    attempts: int = 0
26    max_attempts: int = 3
27
28
29class TaskTree:
30    """Hierarchical task management."""
31
32    def __init__(self, root_goal: str):
33        self.root = TaskNode(
34            id="root",
35            description=root_goal,
36            priority=10
37        )
38        self.nodes: dict[str, TaskNode] = {"root": self.root}
39
40    def add_subtask(
41        self,
42        parent_id: str,
43        description: str,
44        dependencies: List[str] = None
45    ) -> TaskNode:
46        """Add a subtask to a parent."""
47        parent = self.nodes.get(parent_id)
48        if not parent:
49            raise ValueError(f"Parent {parent_id} not found")
50
51        task_id = f"{parent_id}_{len(parent.children)}"
52        task = TaskNode(
53            id=task_id,
54            description=description,
55            parent_id=parent_id,
56            dependencies=dependencies or [],
57            priority=len(parent.children) + 1
58        )
59
60        parent.children.append(task)
61        self.nodes[task_id] = task
62        return task
63
64    def get_next_task(self) -> Optional[TaskNode]:
65        """Get the next executable task."""
66        candidates = []
67
68        for node in self.nodes.values():
69            if node.status != TaskStatus.PENDING:
70                continue
71
72            # Check if dependencies are met
73            if self._dependencies_met(node):
74                candidates.append(node)
75
76        if not candidates:
77            return None
78
79        # Return highest priority task
80        return max(candidates, key=lambda t: t.priority)
81
82    def _dependencies_met(self, task: TaskNode) -> bool:
83        """Check if all dependencies are completed."""
84        for dep_id in task.dependencies:
85            dep = self.nodes.get(dep_id)
86            if not dep or dep.status != TaskStatus.COMPLETED:
87                return False
88        return True
89
90    def complete_task(self, task_id: str, result: str):
91        """Mark a task as complete."""
92        task = self.nodes.get(task_id)
93        if task:
94            task.status = TaskStatus.COMPLETED
95            task.result = result
96
97            # Check if parent can be completed
98            parent = self.nodes.get(task.parent_id) if task.parent_id else None
99            if parent and all(
100                c.status == TaskStatus.COMPLETED for c in parent.children
101            ):
102                parent.status = TaskStatus.COMPLETED
103
104    def fail_task(self, task_id: str, error: str):
105        """Mark a task as failed."""
106        task = self.nodes.get(task_id)
107        if task:
108            task.attempts += 1
109            if task.attempts >= task.max_attempts:
110                task.status = TaskStatus.FAILED
111                task.result = f"Failed: {error}"
112            else:
113                task.status = TaskStatus.PENDING  # Can retry
114
115    def get_progress(self) -> dict:
116        """Get overall progress."""
117        total = len(self.nodes)
118        completed = sum(
119            1 for n in self.nodes.values()
120            if n.status == TaskStatus.COMPLETED
121        )
122        failed = sum(
123            1 for n in self.nodes.values()
124            if n.status == TaskStatus.FAILED
125        )
126
127        return {
128            "total": total,
129            "completed": completed,
130            "failed": failed,
131            "progress": completed / total if total > 0 else 0
132        }

Dynamic Replanning

Adaptive Task Management

🐍python
1class DynamicPlanner:
2    """Dynamically adjust plans based on execution results."""
3
4    def __init__(self, task_tree: TaskTree):
5        self.tree = task_tree
6        self.llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
7
8    def replan_on_failure(self, failed_task: TaskNode, error: str):
9        """Generate alternative approach after failure."""
10        prompt = f"""
11A task has failed:
12Task: {failed_task.description}
13Error: {error}
14Attempts: {failed_task.attempts}/{failed_task.max_attempts}
15
16Overall goal: {self.tree.root.description}
17
18Generate an alternative approach. Output as JSON:
19{{
20    "should_retry": true/false,
21    "modified_approach": "new approach description",
22    "alternative_subtasks": ["task1", "task2"]
23}}
24"""
25        messages = [
26            SystemMessage(content="Replan after task failure."),
27            HumanMessage(content=prompt)
28        ]
29
30        response = self.llm.invoke(messages)
31        plan = json.loads(response.content)
32
33        if plan["should_retry"]:
34            # Modify the failed task
35            failed_task.description = plan["modified_approach"]
36            failed_task.status = TaskStatus.PENDING
37        else:
38            # Add alternative subtasks
39            for subtask_desc in plan.get("alternative_subtasks", []):
40                self.tree.add_subtask(
41                    failed_task.parent_id or "root",
42                    subtask_desc
43                )
44            failed_task.status = TaskStatus.FAILED
45
46    def adapt_to_discovery(self, discovery: str, current_task: TaskNode):
47        """Adapt plan based on new information."""
48        prompt = f"""
49New information discovered during execution:
50Discovery: {discovery}
51
52Current task: {current_task.description}
53Overall goal: {self.tree.root.description}
54
55Should the plan be adjusted? Output as JSON:
56{{
57    "needs_adjustment": true/false,
58    "new_tasks": ["description1", "description2"],
59    "tasks_to_skip": ["task_id1"],
60    "priority_changes": {{"task_id": new_priority}}
61}}
62"""
63        messages = [
64            SystemMessage(content="Adapt plan to new information."),
65            HumanMessage(content=prompt)
66        ]
67
68        response = self.llm.invoke(messages)
69        adjustment = json.loads(response.content)
70
71        if adjustment["needs_adjustment"]:
72            # Add new tasks
73            for task_desc in adjustment.get("new_tasks", []):
74                self.tree.add_subtask(
75                    current_task.parent_id or "root",
76                    task_desc
77                )
78
79            # Update priorities
80            for task_id, priority in adjustment.get("priority_changes", {}).items():
81                if task_id in self.tree.nodes:
82                    self.tree.nodes[task_id].priority = priority
83
84    def optimize_remaining_plan(self):
85        """Optimize the remaining tasks based on progress."""
86        pending_tasks = [
87            n for n in self.tree.nodes.values()
88            if n.status == TaskStatus.PENDING
89        ]
90
91        completed_results = [
92            n.result for n in self.tree.nodes.values()
93            if n.status == TaskStatus.COMPLETED and n.result
94        ]
95
96        prompt = f"""
97Optimize the remaining tasks based on what's been learned.
98
99Remaining tasks:
100{json.dumps([t.description for t in pending_tasks])}
101
102Results so far:
103{json.dumps(completed_results[:5])}
104
105Output optimizations as JSON:
106{{
107    "can_skip": ["task descriptions that are no longer needed"],
108    "can_merge": [["task1", "task2"]],
109    "new_order": ["task1", "task2", "task3"]
110}}
111"""
112        messages = [
113            SystemMessage(content="Optimize remaining plan."),
114            HumanMessage(content=prompt)
115        ]
116
117        response = self.llm.invoke(messages)
118        return json.loads(response.content)

Priority Management

Priority Queue System

🐍python
1import heapq
2from dataclasses import dataclass
3from typing import Any
4
5
6@dataclass(order=True)
7class PrioritizedTask:
8    """Task with priority for queue management."""
9    priority: int  # Lower number = higher priority
10    task: Any = field(compare=False)
11
12
13class TaskPriorityQueue:
14    """Priority queue for task management."""
15
16    def __init__(self):
17        self.heap: List[PrioritizedTask] = []
18        self.task_map: dict[str, PrioritizedTask] = {}
19
20    def add(self, task: TaskNode):
21        """Add task to queue."""
22        # Negate priority for max-heap behavior
23        entry = PrioritizedTask(-task.priority, task)
24        heapq.heappush(self.heap, entry)
25        self.task_map[task.id] = entry
26
27    def pop(self) -> Optional[TaskNode]:
28        """Get highest priority task."""
29        while self.heap:
30            entry = heapq.heappop(self.heap)
31            if entry.task.id in self.task_map:
32                del self.task_map[entry.task.id]
33                return entry.task
34        return None
35
36    def update_priority(self, task_id: str, new_priority: int):
37        """Update a task's priority."""
38        if task_id in self.task_map:
39            old_entry = self.task_map[task_id]
40            task = old_entry.task
41            task.priority = new_priority
42            del self.task_map[task_id]
43            self.add(task)
44
45    def peek(self) -> Optional[TaskNode]:
46        """See highest priority task without removing."""
47        if self.heap:
48            return self.heap[0].task
49        return None
50
51
52class PriorityManager:
53    """Manage task priorities dynamically."""
54
55    def __init__(self):
56        self.llm = ChatOpenAI(model="gpt-4o", temperature=0)
57
58    def calculate_priority(
59        self,
60        task: TaskNode,
61        context: dict
62    ) -> int:
63        """Calculate priority based on multiple factors."""
64
65        factors = {
66            "goal_alignment": self._score_goal_alignment(task, context),
67            "dependency_urgency": self._score_dependency_urgency(task, context),
68            "effort_estimate": self._score_effort(task),
69            "risk_level": self._score_risk(task)
70        }
71
72        # Weighted combination
73        weights = {
74            "goal_alignment": 0.4,
75            "dependency_urgency": 0.3,
76            "effort_estimate": 0.2,
77            "risk_level": 0.1
78        }
79
80        score = sum(
81            factors[k] * weights[k]
82            for k in factors
83        )
84
85        return int(score * 10)  # Scale to 0-10
86
87    def _score_goal_alignment(self, task: TaskNode, context: dict) -> float:
88        """Score how well task aligns with main goal."""
89        # Could use LLM for semantic similarity
90        return 0.8  # Placeholder
91
92    def _score_dependency_urgency(self, task: TaskNode, context: dict) -> float:
93        """Score based on how many tasks depend on this one."""
94        dependents = context.get("dependents", {}).get(task.id, [])
95        return min(len(dependents) / 5.0, 1.0)
96
97    def _score_effort(self, task: TaskNode) -> float:
98        """Score based on estimated effort (lower effort = higher score)."""
99        return 0.6  # Placeholder
100
101    def _score_risk(self, task: TaskNode) -> float:
102        """Score based on risk (lower risk = higher score)."""
103        if task.attempts > 0:
104            return 0.4  # Previously failed, higher risk
105        return 0.8

Key Takeaways

  • Top-down decomposition breaks goals into subtasks recursively until atomic actions are reached.
  • Bottom-up decomposition identifies atomic actions first and groups them into a hierarchy.
  • Task trees manage hierarchical dependencies and track completion status.
  • Dynamic replanning adapts to failures and new discoveries during execution.
  • Priority management ensures the most important tasks are executed first.
Next Section Preview: We'll explore iterative refinement - how agents improve their outputs through self-critique and revision.