Introduction
Agent performance directly impacts user experience and operational costs. Understanding where time and resources are spent enables targeted optimizations that can dramatically improve efficiency without sacrificing capability.
Section Overview: We'll cover timing analysis, token usage profiling, bottleneck detection, and optimization strategies for agent systems.
Timing Analysis
Where Time Goes
| Component | Typical Range | Optimization Potential |
|---|---|---|
| LLM API call | 500ms - 5s | Model choice, streaming |
| Tool execution | 10ms - 30s | Caching, parallelization |
| Memory retrieval | 10ms - 500ms | Index optimization |
| Network I/O | 50ms - 2s | Connection pooling |
| Prompt construction | 1ms - 50ms | Template caching |
🐍python
1"""
2Timing Analysis for Agent Performance
3
4Measure and analyze where time is spent.
5"""
6
7import time
8from contextlib import contextmanager
9from dataclasses import dataclass, field
10from datetime import datetime
11from typing import Generator
12
13
14@dataclass
15class TimingRecord:
16 """Record of a timed operation."""
17 name: str
18 start_time: float
19 end_time: float | None = None
20 parent: str | None = None
21 metadata: dict = field(default_factory=dict)
22
23 @property
24 def duration_ms(self) -> float:
25 if self.end_time:
26 return (self.end_time - self.start_time) * 1000
27 return 0
28
29
30class PerformanceProfiler:
31 """Profile agent performance."""
32
33 def __init__(self):
34 self.records: list[TimingRecord] = []
35 self.active_timers: dict[str, TimingRecord] = {}
36 self.current_parent: str | None = None
37
38 @contextmanager
39 def timer(self, name: str, **metadata) -> Generator[TimingRecord, None, None]:
40 """Time a block of code."""
41 record = TimingRecord(
42 name=name,
43 start_time=time.perf_counter(),
44 parent=self.current_parent,
45 metadata=metadata
46 )
47
48 old_parent = self.current_parent
49 self.current_parent = name
50 self.active_timers[name] = record
51
52 try:
53 yield record
54 finally:
55 record.end_time = time.perf_counter()
56 self.records.append(record)
57 del self.active_timers[name]
58 self.current_parent = old_parent
59
60 def get_summary(self) -> dict:
61 """Get timing summary by category."""
62 by_category: dict[str, list[float]] = {}
63
64 for record in self.records:
65 category = record.name.split(":")[0]
66 if category not in by_category:
67 by_category[category] = []
68 by_category[category].append(record.duration_ms)
69
70 summary = {}
71 for category, times in by_category.items():
72 summary[category] = {
73 "count": len(times),
74 "total_ms": sum(times),
75 "avg_ms": sum(times) / len(times) if times else 0,
76 "min_ms": min(times) if times else 0,
77 "max_ms": max(times) if times else 0,
78 }
79
80 return summary
81
82 def get_critical_path(self) -> list[TimingRecord]:
83 """Find the critical path (longest sequential chain)."""
84 # Build parent-child relationships
85 children: dict[str | None, list[TimingRecord]] = {}
86 for record in self.records:
87 parent = record.parent
88 if parent not in children:
89 children[parent] = []
90 children[parent].append(record)
91
92 def find_longest_path(parent: str | None) -> list[TimingRecord]:
93 if parent not in children:
94 return []
95
96 longest = []
97 for child in children[parent]:
98 path = [child] + find_longest_path(child.name)
99 if sum(r.duration_ms for r in path) > sum(r.duration_ms for r in longest):
100 longest = path
101
102 return longest
103
104 return find_longest_path(None)
105
106 def get_waterfall(self) -> str:
107 """Generate ASCII waterfall chart."""
108 if not self.records:
109 return "No records"
110
111 min_start = min(r.start_time for r in self.records)
112 max_end = max(r.end_time or r.start_time for r in self.records)
113 total_duration = max_end - min_start
114
115 width = 50
116 lines = []
117
118 for record in sorted(self.records, key=lambda r: r.start_time):
119 start_pos = int(((record.start_time - min_start) / total_duration) * width)
120 duration_pos = int((record.duration_ms / (total_duration * 1000)) * width)
121 duration_pos = max(1, duration_pos)
122
123 bar = " " * start_pos + "=" * duration_pos
124 lines.append(f"{record.name:30s} {record.duration_ms:8.1f}ms |{bar}")
125
126 return "\n".join(lines)
127
128
129# Usage
130profiler = PerformanceProfiler()
131
132async def profiled_agent_run(profiler: PerformanceProfiler, task: str):
133 with profiler.timer("agent_run", task=task[:50]):
134
135 with profiler.timer("input_processing"):
136 processed = await process_input(task)
137
138 with profiler.timer("llm:planning"):
139 plan = await generate_plan(processed)
140
141 for i, step in enumerate(plan):
142 with profiler.timer(f"step_{i}:execution"):
143
144 with profiler.timer(f"step_{i}:llm_call"):
145 action = await decide_action(step)
146
147 with profiler.timer(f"step_{i}:tool_call", tool=action.get("type")):
148 result = await execute_action(action)
149
150 with profiler.timer("response_generation"):
151 response = await generate_response()
152
153 return responseToken Usage Profiling
Tracking Token Consumption
🐍python
1"""
2Token Usage Profiling
3
4Track and optimize token consumption.
5"""
6
7from dataclasses import dataclass, field
8
9
10@dataclass
11class TokenUsage:
12 """Token usage for a single operation."""
13 operation: str
14 prompt_tokens: int
15 completion_tokens: int
16 model: str
17 timestamp: str
18 metadata: dict = field(default_factory=dict)
19
20 @property
21 def total_tokens(self) -> int:
22 return self.prompt_tokens + self.completion_tokens
23
24 @property
25 def estimated_cost(self) -> float:
26 # Approximate costs per 1K tokens
27 costs = {
28 "gpt-4": {"prompt": 0.03, "completion": 0.06},
29 "gpt-4o": {"prompt": 0.005, "completion": 0.015},
30 "gpt-3.5-turbo": {"prompt": 0.0015, "completion": 0.002},
31 "claude-3-opus": {"prompt": 0.015, "completion": 0.075},
32 "claude-3-sonnet": {"prompt": 0.003, "completion": 0.015},
33 }
34
35 model_costs = costs.get(self.model, {"prompt": 0.01, "completion": 0.03})
36 return (
37 (self.prompt_tokens / 1000) * model_costs["prompt"] +
38 (self.completion_tokens / 1000) * model_costs["completion"]
39 )
40
41
42class TokenProfiler:
43 """Profile token usage across agent operations."""
44
45 def __init__(self):
46 self.usage_records: list[TokenUsage] = []
47 self.budgets: dict[str, int] = {}
48
49 def record(
50 self,
51 operation: str,
52 prompt_tokens: int,
53 completion_tokens: int,
54 model: str,
55 **metadata
56 ):
57 """Record token usage."""
58 from datetime import datetime
59
60 record = TokenUsage(
61 operation=operation,
62 prompt_tokens=prompt_tokens,
63 completion_tokens=completion_tokens,
64 model=model,
65 timestamp=datetime.utcnow().isoformat(),
66 metadata=metadata
67 )
68 self.usage_records.append(record)
69
70 def set_budget(self, category: str, max_tokens: int):
71 """Set token budget for a category."""
72 self.budgets[category] = max_tokens
73
74 def check_budget(self, category: str) -> tuple[bool, int]:
75 """Check if within budget."""
76 if category not in self.budgets:
77 return True, 0
78
79 used = sum(
80 r.total_tokens for r in self.usage_records
81 if r.operation.startswith(category)
82 )
83 budget = self.budgets[category]
84
85 return used < budget, budget - used
86
87 def get_summary(self) -> dict:
88 """Get comprehensive usage summary."""
89 by_operation: dict[str, dict] = {}
90 by_model: dict[str, dict] = {}
91
92 for record in self.usage_records:
93 # By operation
94 op = record.operation
95 if op not in by_operation:
96 by_operation[op] = {
97 "count": 0,
98 "prompt_tokens": 0,
99 "completion_tokens": 0,
100 "cost": 0
101 }
102 by_operation[op]["count"] += 1
103 by_operation[op]["prompt_tokens"] += record.prompt_tokens
104 by_operation[op]["completion_tokens"] += record.completion_tokens
105 by_operation[op]["cost"] += record.estimated_cost
106
107 # By model
108 model = record.model
109 if model not in by_model:
110 by_model[model] = {
111 "count": 0,
112 "prompt_tokens": 0,
113 "completion_tokens": 0,
114 "cost": 0
115 }
116 by_model[model]["count"] += 1
117 by_model[model]["prompt_tokens"] += record.prompt_tokens
118 by_model[model]["completion_tokens"] += record.completion_tokens
119 by_model[model]["cost"] += record.estimated_cost
120
121 total_cost = sum(r.estimated_cost for r in self.usage_records)
122 total_tokens = sum(r.total_tokens for r in self.usage_records)
123
124 return {
125 "total_tokens": total_tokens,
126 "total_cost": total_cost,
127 "by_operation": by_operation,
128 "by_model": by_model,
129 "record_count": len(self.usage_records)
130 }
131
132 def get_optimization_suggestions(self) -> list[dict]:
133 """Generate optimization suggestions based on usage patterns."""
134 suggestions = []
135 summary = self.get_summary()
136
137 # Check for expensive operations
138 for op, stats in summary["by_operation"].items():
139 if stats["cost"] > 0.10: # More than 10 cents
140 suggestions.append({
141 "type": "expensive_operation",
142 "operation": op,
143 "cost": stats["cost"],
144 "suggestion": f"Consider caching or reducing frequency of '{op}'"
145 })
146
147 # Check for high prompt token ratio
148 total = stats["prompt_tokens"] + stats["completion_tokens"]
149 if total > 0 and stats["prompt_tokens"] / total > 0.8:
150 suggestions.append({
151 "type": "prompt_heavy",
152 "operation": op,
153 "ratio": stats["prompt_tokens"] / total,
154 "suggestion": f"'{op}' uses {stats['prompt_tokens']/total:.0%} prompt tokens - consider summarizing context"
155 })
156
157 return suggestionsBottleneck Detection
Finding Performance Bottlenecks
🐍python
1"""
2Bottleneck Detection
3
4Automatically identify performance bottlenecks.
5"""
6
7from dataclasses import dataclass
8from typing import Any
9
10
11@dataclass
12class Bottleneck:
13 """Identified performance bottleneck."""
14 component: str
15 impact: str # high, medium, low
16 metric: str
17 value: float
18 threshold: float
19 suggestion: str
20
21
22class BottleneckDetector:
23 """Detect performance bottlenecks in agent execution."""
24
25 def __init__(
26 self,
27 timing_profiler: PerformanceProfiler,
28 token_profiler: TokenProfiler
29 ):
30 self.timing = timing_profiler
31 self.tokens = token_profiler
32
33 # Thresholds
34 self.llm_latency_threshold_ms = 3000
35 self.tool_latency_threshold_ms = 1000
36 self.token_cost_threshold = 0.05
37 self.iteration_count_threshold = 10
38
39 def analyze(self) -> list[Bottleneck]:
40 """Analyze for bottlenecks."""
41 bottlenecks = []
42
43 bottlenecks.extend(self._analyze_timing())
44 bottlenecks.extend(self._analyze_tokens())
45 bottlenecks.extend(self._analyze_patterns())
46
47 return sorted(bottlenecks, key=lambda b: self._impact_order(b.impact))
48
49 def _analyze_timing(self) -> list[Bottleneck]:
50 """Analyze timing for bottlenecks."""
51 bottlenecks = []
52 summary = self.timing.get_summary()
53
54 for category, stats in summary.items():
55 # Slow LLM calls
56 if "llm" in category.lower() and stats["avg_ms"] > self.llm_latency_threshold_ms:
57 bottlenecks.append(Bottleneck(
58 component=category,
59 impact="high",
60 metric="avg_latency_ms",
61 value=stats["avg_ms"],
62 threshold=self.llm_latency_threshold_ms,
63 suggestion="Consider using a faster model or implementing streaming"
64 ))
65
66 # Slow tools
67 if "tool" in category.lower() and stats["avg_ms"] > self.tool_latency_threshold_ms:
68 bottlenecks.append(Bottleneck(
69 component=category,
70 impact="medium",
71 metric="avg_latency_ms",
72 value=stats["avg_ms"],
73 threshold=self.tool_latency_threshold_ms,
74 suggestion="Consider caching tool results or parallelizing calls"
75 ))
76
77 # Too many calls
78 if stats["count"] > 20:
79 bottlenecks.append(Bottleneck(
80 component=category,
81 impact="medium",
82 metric="call_count",
83 value=stats["count"],
84 threshold=20,
85 suggestion=f"High call count for {category} - consider batching"
86 ))
87
88 return bottlenecks
89
90 def _analyze_tokens(self) -> list[Bottleneck]:
91 """Analyze token usage for bottlenecks."""
92 bottlenecks = []
93 summary = self.tokens.get_summary()
94
95 for op, stats in summary["by_operation"].items():
96 # Expensive operations
97 if stats["cost"] > self.token_cost_threshold:
98 bottlenecks.append(Bottleneck(
99 component=op,
100 impact="high",
101 metric="cost_usd",
102 value=stats["cost"],
103 threshold=self.token_cost_threshold,
104 suggestion="High token cost - consider prompt compression or cheaper model"
105 ))
106
107 return bottlenecks
108
109 def _analyze_patterns(self) -> list[Bottleneck]:
110 """Analyze execution patterns for bottlenecks."""
111 bottlenecks = []
112
113 # Check for sequential operations that could be parallel
114 records = self.timing.records
115 sequential_groups = self._find_sequential_groups(records)
116
117 for group in sequential_groups:
118 if len(group) >= 3:
119 total_time = sum(r.duration_ms for r in group)
120 bottlenecks.append(Bottleneck(
121 component="sequential_execution",
122 impact="medium",
123 metric="parallelizable_time_ms",
124 value=total_time,
125 threshold=0,
126 suggestion=f"{len(group)} operations could potentially run in parallel"
127 ))
128
129 return bottlenecks
130
131 def _find_sequential_groups(self, records: list) -> list[list]:
132 """Find groups of operations that run sequentially."""
133 # Simplified - in production would do proper dependency analysis
134 groups = []
135 current_group = []
136
137 for record in sorted(records, key=lambda r: r.start_time):
138 if not current_group:
139 current_group.append(record)
140 elif record.parent == current_group[-1].parent:
141 current_group.append(record)
142 else:
143 if len(current_group) > 1:
144 groups.append(current_group)
145 current_group = [record]
146
147 return groups
148
149 def _impact_order(self, impact: str) -> int:
150 return {"high": 0, "medium": 1, "low": 2}.get(impact, 3)Optimization Strategies
Common Optimizations
🐍python
1"""
2Performance Optimization Strategies
3
4Common patterns for improving agent performance.
5"""
6
7from functools import lru_cache
8from typing import Any, Callable
9import hashlib
10import json
11
12
13class PromptCache:
14 """Cache LLM responses for identical prompts."""
15
16 def __init__(self, max_size: int = 1000):
17 self.cache: dict[str, dict] = {}
18 self.max_size = max_size
19 self.hits = 0
20 self.misses = 0
21
22 def _hash_prompt(self, messages: list[dict], model: str) -> str:
23 """Create cache key from prompt."""
24 content = json.dumps({"messages": messages, "model": model}, sort_keys=True)
25 return hashlib.sha256(content.encode()).hexdigest()
26
27 def get(self, messages: list[dict], model: str) -> dict | None:
28 """Get cached response."""
29 key = self._hash_prompt(messages, model)
30 if key in self.cache:
31 self.hits += 1
32 return self.cache[key]
33 self.misses += 1
34 return None
35
36 def set(self, messages: list[dict], model: str, response: dict):
37 """Cache a response."""
38 if len(self.cache) >= self.max_size:
39 # Remove oldest entry (simple LRU)
40 oldest_key = next(iter(self.cache))
41 del self.cache[oldest_key]
42
43 key = self._hash_prompt(messages, model)
44 self.cache[key] = response
45
46 @property
47 def hit_rate(self) -> float:
48 total = self.hits + self.misses
49 return self.hits / total if total > 0 else 0
50
51
52class ParallelExecutor:
53 """Execute independent operations in parallel."""
54
55 async def execute_parallel(
56 self,
57 operations: list[tuple[Callable, dict]]
58 ) -> list[Any]:
59 """Execute operations in parallel."""
60 import asyncio
61
62 tasks = []
63 for func, kwargs in operations:
64 if asyncio.iscoroutinefunction(func):
65 tasks.append(func(**kwargs))
66 else:
67 # Wrap sync function
68 tasks.append(asyncio.to_thread(func, **kwargs))
69
70 return await asyncio.gather(*tasks, return_exceptions=True)
71
72 async def execute_with_batching(
73 self,
74 items: list[Any],
75 processor: Callable,
76 batch_size: int = 5
77 ) -> list[Any]:
78 """Process items in batches."""
79 results = []
80
81 for i in range(0, len(items), batch_size):
82 batch = items[i:i + batch_size]
83 batch_results = await self.execute_parallel([
84 (processor, {"item": item}) for item in batch
85 ])
86 results.extend(batch_results)
87
88 return results
89
90
91class StreamingOptimizer:
92 """Optimize with streaming responses."""
93
94 async def stream_llm_response(
95 self,
96 client,
97 messages: list[dict],
98 on_chunk: Callable[[str], None]
99 ) -> str:
100 """Stream LLM response for faster time-to-first-token."""
101 full_response = ""
102
103 async for chunk in client.stream(messages):
104 content = chunk.get("content", "")
105 full_response += content
106 on_chunk(content)
107
108 return full_response
109
110
111class ModelSelector:
112 """Dynamically select model based on task complexity."""
113
114 def __init__(self):
115 self.models = {
116 "simple": {"name": "gpt-3.5-turbo", "cost_factor": 0.1},
117 "medium": {"name": "gpt-4o", "cost_factor": 0.5},
118 "complex": {"name": "gpt-4", "cost_factor": 1.0},
119 }
120
121 def select_model(self, task_complexity: str, max_cost: float = None) -> str:
122 """Select appropriate model for task."""
123 if max_cost:
124 # Find cheapest model that can handle complexity
125 for level in ["simple", "medium", "complex"]:
126 if self.models[level]["cost_factor"] <= max_cost:
127 if self._can_handle(level, task_complexity):
128 return self.models[level]["name"]
129
130 # Default to matching complexity
131 return self.models.get(task_complexity, self.models["medium"])["name"]
132
133 def _can_handle(self, model_level: str, task_complexity: str) -> bool:
134 """Check if model level can handle task complexity."""
135 levels = ["simple", "medium", "complex"]
136 return levels.index(model_level) >= levels.index(task_complexity)
137
138
139# Usage example
140async def optimized_agent_execution():
141 # Use prompt caching
142 cache = PromptCache()
143
144 # Use parallel execution for independent tools
145 executor = ParallelExecutor()
146
147 # Use streaming for faster feedback
148 streamer = StreamingOptimizer()
149
150 # Use model selection for cost optimization
151 selector = ModelSelector()
152
153 # Example flow
154 task_complexity = "simple"
155 model = selector.select_model(task_complexity)
156
157 # Check cache first
158 cached = cache.get(messages, model)
159 if cached:
160 return cached
161
162 # Execute with streaming
163 response = await streamer.stream_llm_response(
164 client,
165 messages,
166 on_chunk=lambda c: print(c, end="", flush=True)
167 )
168
169 # Cache the result
170 cache.set(messages, model, {"content": response})
171
172 return responseKey Takeaways
- Time everything - use profilers to understand where time is actually spent.
- Track token usage to understand costs and identify optimization opportunities.
- Automated bottleneck detection helps identify the highest-impact optimization targets.
- Use caching, parallelization, and streaming as primary optimization strategies.
- Match model complexity to task for cost-effective performance.
Next Section Preview: We'll explore error analysis and recovery strategies for resilient agent systems.