Chapter 19
15 min read
Section 120 of 175

Performance Profiling

Observability and Debugging

Introduction

Agent performance directly impacts user experience and operational costs. Understanding where time and resources are spent enables targeted optimizations that can dramatically improve efficiency without sacrificing capability.

Section Overview: We'll cover timing analysis, token usage profiling, bottleneck detection, and optimization strategies for agent systems.

Timing Analysis

Where Time Goes

ComponentTypical RangeOptimization Potential
LLM API call500ms - 5sModel choice, streaming
Tool execution10ms - 30sCaching, parallelization
Memory retrieval10ms - 500msIndex optimization
Network I/O50ms - 2sConnection pooling
Prompt construction1ms - 50msTemplate caching
🐍python
1"""
2Timing Analysis for Agent Performance
3
4Measure and analyze where time is spent.
5"""
6
7import time
8from contextlib import contextmanager
9from dataclasses import dataclass, field
10from datetime import datetime
11from typing import Generator
12
13
14@dataclass
15class TimingRecord:
16    """Record of a timed operation."""
17    name: str
18    start_time: float
19    end_time: float | None = None
20    parent: str | None = None
21    metadata: dict = field(default_factory=dict)
22
23    @property
24    def duration_ms(self) -> float:
25        if self.end_time:
26            return (self.end_time - self.start_time) * 1000
27        return 0
28
29
30class PerformanceProfiler:
31    """Profile agent performance."""
32
33    def __init__(self):
34        self.records: list[TimingRecord] = []
35        self.active_timers: dict[str, TimingRecord] = {}
36        self.current_parent: str | None = None
37
38    @contextmanager
39    def timer(self, name: str, **metadata) -> Generator[TimingRecord, None, None]:
40        """Time a block of code."""
41        record = TimingRecord(
42            name=name,
43            start_time=time.perf_counter(),
44            parent=self.current_parent,
45            metadata=metadata
46        )
47
48        old_parent = self.current_parent
49        self.current_parent = name
50        self.active_timers[name] = record
51
52        try:
53            yield record
54        finally:
55            record.end_time = time.perf_counter()
56            self.records.append(record)
57            del self.active_timers[name]
58            self.current_parent = old_parent
59
60    def get_summary(self) -> dict:
61        """Get timing summary by category."""
62        by_category: dict[str, list[float]] = {}
63
64        for record in self.records:
65            category = record.name.split(":")[0]
66            if category not in by_category:
67                by_category[category] = []
68            by_category[category].append(record.duration_ms)
69
70        summary = {}
71        for category, times in by_category.items():
72            summary[category] = {
73                "count": len(times),
74                "total_ms": sum(times),
75                "avg_ms": sum(times) / len(times) if times else 0,
76                "min_ms": min(times) if times else 0,
77                "max_ms": max(times) if times else 0,
78            }
79
80        return summary
81
82    def get_critical_path(self) -> list[TimingRecord]:
83        """Find the critical path (longest sequential chain)."""
84        # Build parent-child relationships
85        children: dict[str | None, list[TimingRecord]] = {}
86        for record in self.records:
87            parent = record.parent
88            if parent not in children:
89                children[parent] = []
90            children[parent].append(record)
91
92        def find_longest_path(parent: str | None) -> list[TimingRecord]:
93            if parent not in children:
94                return []
95
96            longest = []
97            for child in children[parent]:
98                path = [child] + find_longest_path(child.name)
99                if sum(r.duration_ms for r in path) > sum(r.duration_ms for r in longest):
100                    longest = path
101
102            return longest
103
104        return find_longest_path(None)
105
106    def get_waterfall(self) -> str:
107        """Generate ASCII waterfall chart."""
108        if not self.records:
109            return "No records"
110
111        min_start = min(r.start_time for r in self.records)
112        max_end = max(r.end_time or r.start_time for r in self.records)
113        total_duration = max_end - min_start
114
115        width = 50
116        lines = []
117
118        for record in sorted(self.records, key=lambda r: r.start_time):
119            start_pos = int(((record.start_time - min_start) / total_duration) * width)
120            duration_pos = int((record.duration_ms / (total_duration * 1000)) * width)
121            duration_pos = max(1, duration_pos)
122
123            bar = " " * start_pos + "=" * duration_pos
124            lines.append(f"{record.name:30s} {record.duration_ms:8.1f}ms |{bar}")
125
126        return "\n".join(lines)
127
128
129# Usage
130profiler = PerformanceProfiler()
131
132async def profiled_agent_run(profiler: PerformanceProfiler, task: str):
133    with profiler.timer("agent_run", task=task[:50]):
134
135        with profiler.timer("input_processing"):
136            processed = await process_input(task)
137
138        with profiler.timer("llm:planning"):
139            plan = await generate_plan(processed)
140
141        for i, step in enumerate(plan):
142            with profiler.timer(f"step_{i}:execution"):
143
144                with profiler.timer(f"step_{i}:llm_call"):
145                    action = await decide_action(step)
146
147                with profiler.timer(f"step_{i}:tool_call", tool=action.get("type")):
148                    result = await execute_action(action)
149
150        with profiler.timer("response_generation"):
151            response = await generate_response()
152
153    return response

Token Usage Profiling

Tracking Token Consumption

🐍python
1"""
2Token Usage Profiling
3
4Track and optimize token consumption.
5"""
6
7from dataclasses import dataclass, field
8
9
10@dataclass
11class TokenUsage:
12    """Token usage for a single operation."""
13    operation: str
14    prompt_tokens: int
15    completion_tokens: int
16    model: str
17    timestamp: str
18    metadata: dict = field(default_factory=dict)
19
20    @property
21    def total_tokens(self) -> int:
22        return self.prompt_tokens + self.completion_tokens
23
24    @property
25    def estimated_cost(self) -> float:
26        # Approximate costs per 1K tokens
27        costs = {
28            "gpt-4": {"prompt": 0.03, "completion": 0.06},
29            "gpt-4o": {"prompt": 0.005, "completion": 0.015},
30            "gpt-3.5-turbo": {"prompt": 0.0015, "completion": 0.002},
31            "claude-3-opus": {"prompt": 0.015, "completion": 0.075},
32            "claude-3-sonnet": {"prompt": 0.003, "completion": 0.015},
33        }
34
35        model_costs = costs.get(self.model, {"prompt": 0.01, "completion": 0.03})
36        return (
37            (self.prompt_tokens / 1000) * model_costs["prompt"] +
38            (self.completion_tokens / 1000) * model_costs["completion"]
39        )
40
41
42class TokenProfiler:
43    """Profile token usage across agent operations."""
44
45    def __init__(self):
46        self.usage_records: list[TokenUsage] = []
47        self.budgets: dict[str, int] = {}
48
49    def record(
50        self,
51        operation: str,
52        prompt_tokens: int,
53        completion_tokens: int,
54        model: str,
55        **metadata
56    ):
57        """Record token usage."""
58        from datetime import datetime
59
60        record = TokenUsage(
61            operation=operation,
62            prompt_tokens=prompt_tokens,
63            completion_tokens=completion_tokens,
64            model=model,
65            timestamp=datetime.utcnow().isoformat(),
66            metadata=metadata
67        )
68        self.usage_records.append(record)
69
70    def set_budget(self, category: str, max_tokens: int):
71        """Set token budget for a category."""
72        self.budgets[category] = max_tokens
73
74    def check_budget(self, category: str) -> tuple[bool, int]:
75        """Check if within budget."""
76        if category not in self.budgets:
77            return True, 0
78
79        used = sum(
80            r.total_tokens for r in self.usage_records
81            if r.operation.startswith(category)
82        )
83        budget = self.budgets[category]
84
85        return used < budget, budget - used
86
87    def get_summary(self) -> dict:
88        """Get comprehensive usage summary."""
89        by_operation: dict[str, dict] = {}
90        by_model: dict[str, dict] = {}
91
92        for record in self.usage_records:
93            # By operation
94            op = record.operation
95            if op not in by_operation:
96                by_operation[op] = {
97                    "count": 0,
98                    "prompt_tokens": 0,
99                    "completion_tokens": 0,
100                    "cost": 0
101                }
102            by_operation[op]["count"] += 1
103            by_operation[op]["prompt_tokens"] += record.prompt_tokens
104            by_operation[op]["completion_tokens"] += record.completion_tokens
105            by_operation[op]["cost"] += record.estimated_cost
106
107            # By model
108            model = record.model
109            if model not in by_model:
110                by_model[model] = {
111                    "count": 0,
112                    "prompt_tokens": 0,
113                    "completion_tokens": 0,
114                    "cost": 0
115                }
116            by_model[model]["count"] += 1
117            by_model[model]["prompt_tokens"] += record.prompt_tokens
118            by_model[model]["completion_tokens"] += record.completion_tokens
119            by_model[model]["cost"] += record.estimated_cost
120
121        total_cost = sum(r.estimated_cost for r in self.usage_records)
122        total_tokens = sum(r.total_tokens for r in self.usage_records)
123
124        return {
125            "total_tokens": total_tokens,
126            "total_cost": total_cost,
127            "by_operation": by_operation,
128            "by_model": by_model,
129            "record_count": len(self.usage_records)
130        }
131
132    def get_optimization_suggestions(self) -> list[dict]:
133        """Generate optimization suggestions based on usage patterns."""
134        suggestions = []
135        summary = self.get_summary()
136
137        # Check for expensive operations
138        for op, stats in summary["by_operation"].items():
139            if stats["cost"] > 0.10:  # More than 10 cents
140                suggestions.append({
141                    "type": "expensive_operation",
142                    "operation": op,
143                    "cost": stats["cost"],
144                    "suggestion": f"Consider caching or reducing frequency of '{op}'"
145                })
146
147            # Check for high prompt token ratio
148            total = stats["prompt_tokens"] + stats["completion_tokens"]
149            if total > 0 and stats["prompt_tokens"] / total > 0.8:
150                suggestions.append({
151                    "type": "prompt_heavy",
152                    "operation": op,
153                    "ratio": stats["prompt_tokens"] / total,
154                    "suggestion": f"'{op}' uses {stats['prompt_tokens']/total:.0%} prompt tokens - consider summarizing context"
155                })
156
157        return suggestions

Bottleneck Detection

Finding Performance Bottlenecks

🐍python
1"""
2Bottleneck Detection
3
4Automatically identify performance bottlenecks.
5"""
6
7from dataclasses import dataclass
8from typing import Any
9
10
11@dataclass
12class Bottleneck:
13    """Identified performance bottleneck."""
14    component: str
15    impact: str  # high, medium, low
16    metric: str
17    value: float
18    threshold: float
19    suggestion: str
20
21
22class BottleneckDetector:
23    """Detect performance bottlenecks in agent execution."""
24
25    def __init__(
26        self,
27        timing_profiler: PerformanceProfiler,
28        token_profiler: TokenProfiler
29    ):
30        self.timing = timing_profiler
31        self.tokens = token_profiler
32
33        # Thresholds
34        self.llm_latency_threshold_ms = 3000
35        self.tool_latency_threshold_ms = 1000
36        self.token_cost_threshold = 0.05
37        self.iteration_count_threshold = 10
38
39    def analyze(self) -> list[Bottleneck]:
40        """Analyze for bottlenecks."""
41        bottlenecks = []
42
43        bottlenecks.extend(self._analyze_timing())
44        bottlenecks.extend(self._analyze_tokens())
45        bottlenecks.extend(self._analyze_patterns())
46
47        return sorted(bottlenecks, key=lambda b: self._impact_order(b.impact))
48
49    def _analyze_timing(self) -> list[Bottleneck]:
50        """Analyze timing for bottlenecks."""
51        bottlenecks = []
52        summary = self.timing.get_summary()
53
54        for category, stats in summary.items():
55            # Slow LLM calls
56            if "llm" in category.lower() and stats["avg_ms"] > self.llm_latency_threshold_ms:
57                bottlenecks.append(Bottleneck(
58                    component=category,
59                    impact="high",
60                    metric="avg_latency_ms",
61                    value=stats["avg_ms"],
62                    threshold=self.llm_latency_threshold_ms,
63                    suggestion="Consider using a faster model or implementing streaming"
64                ))
65
66            # Slow tools
67            if "tool" in category.lower() and stats["avg_ms"] > self.tool_latency_threshold_ms:
68                bottlenecks.append(Bottleneck(
69                    component=category,
70                    impact="medium",
71                    metric="avg_latency_ms",
72                    value=stats["avg_ms"],
73                    threshold=self.tool_latency_threshold_ms,
74                    suggestion="Consider caching tool results or parallelizing calls"
75                ))
76
77            # Too many calls
78            if stats["count"] > 20:
79                bottlenecks.append(Bottleneck(
80                    component=category,
81                    impact="medium",
82                    metric="call_count",
83                    value=stats["count"],
84                    threshold=20,
85                    suggestion=f"High call count for {category} - consider batching"
86                ))
87
88        return bottlenecks
89
90    def _analyze_tokens(self) -> list[Bottleneck]:
91        """Analyze token usage for bottlenecks."""
92        bottlenecks = []
93        summary = self.tokens.get_summary()
94
95        for op, stats in summary["by_operation"].items():
96            # Expensive operations
97            if stats["cost"] > self.token_cost_threshold:
98                bottlenecks.append(Bottleneck(
99                    component=op,
100                    impact="high",
101                    metric="cost_usd",
102                    value=stats["cost"],
103                    threshold=self.token_cost_threshold,
104                    suggestion="High token cost - consider prompt compression or cheaper model"
105                ))
106
107        return bottlenecks
108
109    def _analyze_patterns(self) -> list[Bottleneck]:
110        """Analyze execution patterns for bottlenecks."""
111        bottlenecks = []
112
113        # Check for sequential operations that could be parallel
114        records = self.timing.records
115        sequential_groups = self._find_sequential_groups(records)
116
117        for group in sequential_groups:
118            if len(group) >= 3:
119                total_time = sum(r.duration_ms for r in group)
120                bottlenecks.append(Bottleneck(
121                    component="sequential_execution",
122                    impact="medium",
123                    metric="parallelizable_time_ms",
124                    value=total_time,
125                    threshold=0,
126                    suggestion=f"{len(group)} operations could potentially run in parallel"
127                ))
128
129        return bottlenecks
130
131    def _find_sequential_groups(self, records: list) -> list[list]:
132        """Find groups of operations that run sequentially."""
133        # Simplified - in production would do proper dependency analysis
134        groups = []
135        current_group = []
136
137        for record in sorted(records, key=lambda r: r.start_time):
138            if not current_group:
139                current_group.append(record)
140            elif record.parent == current_group[-1].parent:
141                current_group.append(record)
142            else:
143                if len(current_group) > 1:
144                    groups.append(current_group)
145                current_group = [record]
146
147        return groups
148
149    def _impact_order(self, impact: str) -> int:
150        return {"high": 0, "medium": 1, "low": 2}.get(impact, 3)

Optimization Strategies

Common Optimizations

🐍python
1"""
2Performance Optimization Strategies
3
4Common patterns for improving agent performance.
5"""
6
7from functools import lru_cache
8from typing import Any, Callable
9import hashlib
10import json
11
12
13class PromptCache:
14    """Cache LLM responses for identical prompts."""
15
16    def __init__(self, max_size: int = 1000):
17        self.cache: dict[str, dict] = {}
18        self.max_size = max_size
19        self.hits = 0
20        self.misses = 0
21
22    def _hash_prompt(self, messages: list[dict], model: str) -> str:
23        """Create cache key from prompt."""
24        content = json.dumps({"messages": messages, "model": model}, sort_keys=True)
25        return hashlib.sha256(content.encode()).hexdigest()
26
27    def get(self, messages: list[dict], model: str) -> dict | None:
28        """Get cached response."""
29        key = self._hash_prompt(messages, model)
30        if key in self.cache:
31            self.hits += 1
32            return self.cache[key]
33        self.misses += 1
34        return None
35
36    def set(self, messages: list[dict], model: str, response: dict):
37        """Cache a response."""
38        if len(self.cache) >= self.max_size:
39            # Remove oldest entry (simple LRU)
40            oldest_key = next(iter(self.cache))
41            del self.cache[oldest_key]
42
43        key = self._hash_prompt(messages, model)
44        self.cache[key] = response
45
46    @property
47    def hit_rate(self) -> float:
48        total = self.hits + self.misses
49        return self.hits / total if total > 0 else 0
50
51
52class ParallelExecutor:
53    """Execute independent operations in parallel."""
54
55    async def execute_parallel(
56        self,
57        operations: list[tuple[Callable, dict]]
58    ) -> list[Any]:
59        """Execute operations in parallel."""
60        import asyncio
61
62        tasks = []
63        for func, kwargs in operations:
64            if asyncio.iscoroutinefunction(func):
65                tasks.append(func(**kwargs))
66            else:
67                # Wrap sync function
68                tasks.append(asyncio.to_thread(func, **kwargs))
69
70        return await asyncio.gather(*tasks, return_exceptions=True)
71
72    async def execute_with_batching(
73        self,
74        items: list[Any],
75        processor: Callable,
76        batch_size: int = 5
77    ) -> list[Any]:
78        """Process items in batches."""
79        results = []
80
81        for i in range(0, len(items), batch_size):
82            batch = items[i:i + batch_size]
83            batch_results = await self.execute_parallel([
84                (processor, {"item": item}) for item in batch
85            ])
86            results.extend(batch_results)
87
88        return results
89
90
91class StreamingOptimizer:
92    """Optimize with streaming responses."""
93
94    async def stream_llm_response(
95        self,
96        client,
97        messages: list[dict],
98        on_chunk: Callable[[str], None]
99    ) -> str:
100        """Stream LLM response for faster time-to-first-token."""
101        full_response = ""
102
103        async for chunk in client.stream(messages):
104            content = chunk.get("content", "")
105            full_response += content
106            on_chunk(content)
107
108        return full_response
109
110
111class ModelSelector:
112    """Dynamically select model based on task complexity."""
113
114    def __init__(self):
115        self.models = {
116            "simple": {"name": "gpt-3.5-turbo", "cost_factor": 0.1},
117            "medium": {"name": "gpt-4o", "cost_factor": 0.5},
118            "complex": {"name": "gpt-4", "cost_factor": 1.0},
119        }
120
121    def select_model(self, task_complexity: str, max_cost: float = None) -> str:
122        """Select appropriate model for task."""
123        if max_cost:
124            # Find cheapest model that can handle complexity
125            for level in ["simple", "medium", "complex"]:
126                if self.models[level]["cost_factor"] <= max_cost:
127                    if self._can_handle(level, task_complexity):
128                        return self.models[level]["name"]
129
130        # Default to matching complexity
131        return self.models.get(task_complexity, self.models["medium"])["name"]
132
133    def _can_handle(self, model_level: str, task_complexity: str) -> bool:
134        """Check if model level can handle task complexity."""
135        levels = ["simple", "medium", "complex"]
136        return levels.index(model_level) >= levels.index(task_complexity)
137
138
139# Usage example
140async def optimized_agent_execution():
141    # Use prompt caching
142    cache = PromptCache()
143
144    # Use parallel execution for independent tools
145    executor = ParallelExecutor()
146
147    # Use streaming for faster feedback
148    streamer = StreamingOptimizer()
149
150    # Use model selection for cost optimization
151    selector = ModelSelector()
152
153    # Example flow
154    task_complexity = "simple"
155    model = selector.select_model(task_complexity)
156
157    # Check cache first
158    cached = cache.get(messages, model)
159    if cached:
160        return cached
161
162    # Execute with streaming
163    response = await streamer.stream_llm_response(
164        client,
165        messages,
166        on_chunk=lambda c: print(c, end="", flush=True)
167    )
168
169    # Cache the result
170    cache.set(messages, model, {"content": response})
171
172    return response

Key Takeaways

  • Time everything - use profilers to understand where time is actually spent.
  • Track token usage to understand costs and identify optimization opportunities.
  • Automated bottleneck detection helps identify the highest-impact optimization targets.
  • Use caching, parallelization, and streaming as primary optimization strategies.
  • Match model complexity to task for cost-effective performance.
Next Section Preview: We'll explore error analysis and recovery strategies for resilient agent systems.