Introduction
In this capstone section, we bring together everything we've learned about observability to build a comprehensive monitoring dashboard for AI agents. This dashboard integrates logging, tracing, performance metrics, and error analysis into a unified view that enables operators to understand, debug, and optimize agent behavior in real-time.
What You'll Build: A complete observability dashboard with real-time metrics, trace visualization, performance charts, error tracking, and intelligent alerting—everything you need to monitor production AI agents.
A well-designed observability dashboard transforms raw telemetry data into actionable insights. Rather than drowning in logs and metrics, operators can quickly identify issues, understand their root causes, and take corrective action.
Dashboard Architecture
Our observability dashboard follows a layered architecture that separates data collection, processing, storage, and visualization concerns:
1"""
2Observability Dashboard Architecture
3
4This module defines the core architecture for the agent observability
5dashboard, integrating all telemetry sources into a unified view.
6"""
7
8from abc import ABC, abstractmethod
9from dataclasses import dataclass, field
10from datetime import datetime, timedelta
11from enum import Enum
12from typing import Any, Dict, List, Optional, Callable
13from collections import defaultdict
14import asyncio
15import json
16
17
18class DataSource(Enum):
19 """Types of telemetry data sources."""
20 LOGS = "logs"
21 TRACES = "traces"
22 METRICS = "metrics"
23 ERRORS = "errors"
24 ALERTS = "alerts"
25
26
27@dataclass
28class TimeRange:
29 """Time range for queries."""
30 start: datetime
31 end: datetime
32
33 @classmethod
34 def last_minutes(cls, minutes: int) -> "TimeRange":
35 end = datetime.utcnow()
36 start = end - timedelta(minutes=minutes)
37 return cls(start=start, end=end)
38
39 @classmethod
40 def last_hours(cls, hours: int) -> "TimeRange":
41 end = datetime.utcnow()
42 start = end - timedelta(hours=hours)
43 return cls(start=start, end=end)
44
45
46@dataclass
47class DashboardConfig:
48 """Configuration for the observability dashboard."""
49 refresh_interval_seconds: int = 30
50 default_time_range_minutes: int = 60
51 max_data_points: int = 1000
52 enable_real_time: bool = True
53 alert_polling_interval: int = 10
54
55 # Data retention
56 log_retention_hours: int = 24
57 trace_retention_hours: int = 72
58 metric_retention_hours: int = 168 # 7 days
59
60 # Performance settings
61 aggregation_interval_seconds: int = 60
62 batch_size: int = 100
63
64
65class TelemetryCollector(ABC):
66 """Abstract base class for telemetry collectors."""
67
68 @abstractmethod
69 async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
70 """Collect telemetry data within the time range."""
71 pass
72
73 @abstractmethod
74 def get_source_type(self) -> DataSource:
75 """Return the type of data this collector handles."""
76 pass
77
78
79class DataAggregator:
80 """Aggregates telemetry data from multiple sources."""
81
82 def __init__(self, config: DashboardConfig):
83 self.config = config
84 self.collectors: Dict[DataSource, TelemetryCollector] = {}
85 self.cache: Dict[str, Any] = {}
86 self.cache_ttl: Dict[str, datetime] = {}
87
88 def register_collector(self, collector: TelemetryCollector):
89 """Register a telemetry collector."""
90 self.collectors[collector.get_source_type()] = collector
91
92 async def fetch_all(
93 self,
94 time_range: TimeRange,
95 sources: Optional[List[DataSource]] = None
96 ) -> Dict[DataSource, List[Dict[str, Any]]]:
97 """Fetch data from all or specified sources."""
98
99 if sources is None:
100 sources = list(self.collectors.keys())
101
102 results = {}
103 tasks = []
104
105 for source in sources:
106 if source in self.collectors:
107 tasks.append(self._fetch_source(source, time_range))
108
109 gathered = await asyncio.gather(*tasks, return_exceptions=True)
110
111 for source, data in zip(sources, gathered):
112 if isinstance(data, Exception):
113 results[source] = []
114 else:
115 results[source] = data
116
117 return results
118
119 async def _fetch_source(
120 self,
121 source: DataSource,
122 time_range: TimeRange
123 ) -> List[Dict[str, Any]]:
124 """Fetch data from a single source with caching."""
125
126 cache_key = f"{source.value}:{time_range.start}:{time_range.end}"
127
128 # Check cache
129 if cache_key in self.cache:
130 if datetime.utcnow() < self.cache_ttl.get(cache_key, datetime.min):
131 return self.cache[cache_key]
132
133 # Fetch fresh data
134 collector = self.collectors[source]
135 data = await collector.collect(time_range)
136
137 # Update cache
138 self.cache[cache_key] = data
139 self.cache_ttl[cache_key] = datetime.utcnow() + timedelta(
140 seconds=self.config.refresh_interval_seconds
141 )
142
143 return data
144
145
146@dataclass
147class DashboardPanel:
148 """Represents a single panel in the dashboard."""
149 id: str
150 title: str
151 panel_type: str # "chart", "table", "stat", "log"
152 data_source: DataSource
153 query: Dict[str, Any]
154 width: int = 6 # Grid units (1-12)
155 height: int = 4 # Grid units
156 refresh_interval: Optional[int] = None
157
158
159@dataclass
160class DashboardLayout:
161 """Defines the layout of dashboard panels."""
162 panels: List[DashboardPanel] = field(default_factory=list)
163 columns: int = 12
164
165 def add_panel(self, panel: DashboardPanel):
166 """Add a panel to the layout."""
167 self.panels.append(panel)
168
169 def to_grid(self) -> List[List[Optional[DashboardPanel]]]:
170 """Convert panels to a grid layout."""
171 # Simple grid layout algorithm
172 grid = []
173 current_row = []
174 current_width = 0
175
176 for panel in self.panels:
177 if current_width + panel.width > self.columns:
178 grid.append(current_row)
179 current_row = []
180 current_width = 0
181
182 current_row.append(panel)
183 current_width += panel.width
184
185 if current_row:
186 grid.append(current_row)
187
188 return gridCore Components
| Component | Responsibility | Key Features |
|---|---|---|
| TelemetryCollector | Gather raw data from sources | Async collection, source abstraction |
| DataAggregator | Combine and cache data | Multi-source aggregation, caching |
| DashboardLayout | Organize visual panels | Grid system, responsive layout |
| AlertManager | Handle alerting logic | Threshold monitoring, notifications |
Data Collection Layer
The data collection layer implements collectors for each telemetry source. These collectors transform raw data into a normalized format suitable for dashboard visualization:
1"""
2Telemetry collectors for the observability dashboard.
3
4Each collector fetches data from its respective source and
5normalizes it for dashboard consumption.
6"""
7
8from typing import Any, Dict, List
9import asyncio
10
11
12class LogCollector(TelemetryCollector):
13 """Collects log entries from the logging system."""
14
15 def __init__(self, log_store):
16 self.log_store = log_store
17
18 def get_source_type(self) -> DataSource:
19 return DataSource.LOGS
20
21 async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
22 """Collect logs within the time range."""
23
24 logs = await self.log_store.query(
25 start_time=time_range.start,
26 end_time=time_range.end,
27 limit=1000
28 )
29
30 return [
31 {
32 "timestamp": log.timestamp.isoformat(),
33 "level": log.level,
34 "message": log.message,
35 "agent_id": log.extra.get("agent_id"),
36 "trace_id": log.extra.get("trace_id"),
37 "attributes": log.extra
38 }
39 for log in logs
40 ]
41
42
43class TraceCollector(TelemetryCollector):
44 """Collects distributed traces from the tracing system."""
45
46 def __init__(self, trace_store):
47 self.trace_store = trace_store
48
49 def get_source_type(self) -> DataSource:
50 return DataSource.TRACES
51
52 async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
53 """Collect traces within the time range."""
54
55 traces = await self.trace_store.query_traces(
56 start_time=time_range.start,
57 end_time=time_range.end
58 )
59
60 return [
61 {
62 "trace_id": trace.trace_id,
63 "root_span": trace.root_span.name,
64 "start_time": trace.start_time.isoformat(),
65 "duration_ms": trace.duration_ms,
66 "span_count": len(trace.spans),
67 "status": trace.status,
68 "agent_id": trace.attributes.get("agent_id"),
69 "spans": [
70 {
71 "span_id": span.span_id,
72 "name": span.name,
73 "duration_ms": span.duration_ms,
74 "status": span.status
75 }
76 for span in trace.spans
77 ]
78 }
79 for trace in traces
80 ]
81
82
83class MetricsCollector(TelemetryCollector):
84 """Collects metrics from the metrics system."""
85
86 def __init__(self, metrics_store):
87 self.metrics_store = metrics_store
88
89 def get_source_type(self) -> DataSource:
90 return DataSource.METRICS
91
92 async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
93 """Collect metrics within the time range."""
94
95 # Fetch various metric types
96 metrics = {}
97
98 # Request latency
99 metrics["latency"] = await self.metrics_store.query(
100 metric_name="agent.request.latency",
101 start_time=time_range.start,
102 end_time=time_range.end,
103 aggregation="avg",
104 interval="1m"
105 )
106
107 # Token usage
108 metrics["tokens"] = await self.metrics_store.query(
109 metric_name="agent.tokens.total",
110 start_time=time_range.start,
111 end_time=time_range.end,
112 aggregation="sum",
113 interval="1m"
114 )
115
116 # Error rate
117 metrics["errors"] = await self.metrics_store.query(
118 metric_name="agent.errors.count",
119 start_time=time_range.start,
120 end_time=time_range.end,
121 aggregation="sum",
122 interval="1m"
123 )
124
125 # Request count
126 metrics["requests"] = await self.metrics_store.query(
127 metric_name="agent.requests.count",
128 start_time=time_range.start,
129 end_time=time_range.end,
130 aggregation="sum",
131 interval="1m"
132 )
133
134 return [
135 {
136 "metric_type": metric_type,
137 "data_points": [
138 {
139 "timestamp": dp.timestamp.isoformat(),
140 "value": dp.value
141 }
142 for dp in data_points
143 ]
144 }
145 for metric_type, data_points in metrics.items()
146 ]
147
148
149class ErrorCollector(TelemetryCollector):
150 """Collects error events from the error tracking system."""
151
152 def __init__(self, error_store):
153 self.error_store = error_store
154
155 def get_source_type(self) -> DataSource:
156 return DataSource.ERRORS
157
158 async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
159 """Collect errors within the time range."""
160
161 errors = await self.error_store.query(
162 start_time=time_range.start,
163 end_time=time_range.end
164 )
165
166 # Group errors by type
167 grouped = {}
168 for error in errors:
169 key = f"{error.error_type}:{error.message[:50]}"
170 if key not in grouped:
171 grouped[key] = {
172 "error_type": error.error_type,
173 "message": error.message,
174 "count": 0,
175 "first_seen": error.timestamp,
176 "last_seen": error.timestamp,
177 "sample_trace_id": error.trace_id,
178 "severity": error.severity
179 }
180
181 grouped[key]["count"] += 1
182 grouped[key]["last_seen"] = max(
183 grouped[key]["last_seen"],
184 error.timestamp
185 )
186
187 return [
188 {
189 **error_group,
190 "first_seen": error_group["first_seen"].isoformat(),
191 "last_seen": error_group["last_seen"].isoformat()
192 }
193 for error_group in sorted(
194 grouped.values(),
195 key=lambda x: x["count"],
196 reverse=True
197 )
198 ]
199
200
201class AlertCollector(TelemetryCollector):
202 """Collects active and recent alerts."""
203
204 def __init__(self, alert_manager):
205 self.alert_manager = alert_manager
206
207 def get_source_type(self) -> DataSource:
208 return DataSource.ALERTS
209
210 async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
211 """Collect alerts within the time range."""
212
213 # Get active alerts
214 active = await self.alert_manager.get_active_alerts()
215
216 # Get recently resolved alerts
217 resolved = await self.alert_manager.get_resolved_alerts(
218 since=time_range.start
219 )
220
221 return [
222 {
223 "alert_id": alert.alert_id,
224 "name": alert.name,
225 "severity": alert.severity.value,
226 "status": alert.status.value,
227 "triggered_at": alert.triggered_at.isoformat(),
228 "resolved_at": (
229 alert.resolved_at.isoformat()
230 if alert.resolved_at else None
231 ),
232 "message": alert.message,
233 "labels": alert.labels
234 }
235 for alert in active + resolved
236 ]Normalization Matters: Collectors transform source-specific data formats into a consistent structure. This allows dashboard components to work with any data source without knowing implementation details.
Metrics Aggregation
Raw telemetry data must be aggregated into meaningful summaries. The aggregation layer computes statistics, calculates trends, and prepares data for visualization:
1"""
2Metrics aggregation for dashboard visualization.
3
4This module transforms raw telemetry data into aggregated
5statistics suitable for charts and summary displays.
6"""
7
8from dataclasses import dataclass
9from typing import Any, Dict, List, Optional, Tuple
10from datetime import datetime, timedelta
11from enum import Enum
12import statistics
13
14
15class AggregationType(Enum):
16 """Types of aggregation operations."""
17 SUM = "sum"
18 AVG = "avg"
19 MIN = "min"
20 MAX = "max"
21 COUNT = "count"
22 P50 = "p50"
23 P90 = "p90"
24 P99 = "p99"
25 RATE = "rate"
26
27
28@dataclass
29class AggregatedMetric:
30 """Result of metric aggregation."""
31 name: str
32 value: float
33 unit: str
34 trend: Optional[float] = None # Percentage change
35 trend_direction: Optional[str] = None # "up", "down", "stable"
36 time_range: Optional[TimeRange] = None
37
38
39class MetricsAggregator:
40 """Aggregates raw metrics into dashboard-ready summaries."""
41
42 def __init__(self, config: DashboardConfig):
43 self.config = config
44
45 def aggregate_time_series(
46 self,
47 data_points: List[Dict[str, Any]],
48 aggregation: AggregationType,
49 interval_seconds: int = 60
50 ) -> List[Dict[str, Any]]:
51 """Aggregate time series data into fixed intervals."""
52
53 if not data_points:
54 return []
55
56 # Group by interval
57 buckets = defaultdict(list)
58 for dp in data_points:
59 ts = datetime.fromisoformat(dp["timestamp"])
60 bucket_ts = ts.replace(
61 second=0,
62 microsecond=0
63 )
64 # Round to interval
65 minute = (bucket_ts.minute // (interval_seconds // 60)) * (interval_seconds // 60)
66 bucket_ts = bucket_ts.replace(minute=minute)
67 buckets[bucket_ts].append(dp["value"])
68
69 # Aggregate each bucket
70 result = []
71 for ts, values in sorted(buckets.items()):
72 agg_value = self._apply_aggregation(values, aggregation)
73 result.append({
74 "timestamp": ts.isoformat(),
75 "value": agg_value
76 })
77
78 return result
79
80 def _apply_aggregation(
81 self,
82 values: List[float],
83 aggregation: AggregationType
84 ) -> float:
85 """Apply aggregation function to values."""
86
87 if not values:
88 return 0.0
89
90 if aggregation == AggregationType.SUM:
91 return sum(values)
92 elif aggregation == AggregationType.AVG:
93 return statistics.mean(values)
94 elif aggregation == AggregationType.MIN:
95 return min(values)
96 elif aggregation == AggregationType.MAX:
97 return max(values)
98 elif aggregation == AggregationType.COUNT:
99 return len(values)
100 elif aggregation == AggregationType.P50:
101 return self._percentile(values, 50)
102 elif aggregation == AggregationType.P90:
103 return self._percentile(values, 90)
104 elif aggregation == AggregationType.P99:
105 return self._percentile(values, 99)
106 elif aggregation == AggregationType.RATE:
107 return len(values) / 60 # per second
108
109 return 0.0
110
111 def _percentile(self, values: List[float], percentile: int) -> float:
112 """Calculate percentile of values."""
113 sorted_values = sorted(values)
114 index = int(len(sorted_values) * percentile / 100)
115 return sorted_values[min(index, len(sorted_values) - 1)]
116
117 def calculate_summary_stats(
118 self,
119 metrics_data: List[Dict[str, Any]],
120 previous_data: Optional[List[Dict[str, Any]]] = None
121 ) -> Dict[str, AggregatedMetric]:
122 """Calculate summary statistics with trend analysis."""
123
124 summaries = {}
125
126 for metric in metrics_data:
127 metric_type = metric["metric_type"]
128 data_points = metric["data_points"]
129
130 if not data_points:
131 continue
132
133 values = [dp["value"] for dp in data_points]
134
135 # Calculate current stats
136 current_avg = statistics.mean(values) if values else 0
137 current_sum = sum(values)
138
139 # Calculate trend if previous data available
140 trend = None
141 trend_direction = None
142
143 if previous_data:
144 prev_metric = next(
145 (m for m in previous_data if m["metric_type"] == metric_type),
146 None
147 )
148 if prev_metric:
149 prev_values = [dp["value"] for dp in prev_metric["data_points"]]
150 if prev_values:
151 prev_avg = statistics.mean(prev_values)
152 if prev_avg > 0:
153 trend = ((current_avg - prev_avg) / prev_avg) * 100
154 if trend > 5:
155 trend_direction = "up"
156 elif trend < -5:
157 trend_direction = "down"
158 else:
159 trend_direction = "stable"
160
161 summaries[metric_type] = AggregatedMetric(
162 name=metric_type,
163 value=current_avg if metric_type == "latency" else current_sum,
164 unit="ms" if metric_type == "latency" else "count",
165 trend=trend,
166 trend_direction=trend_direction
167 )
168
169 return summaries
170
171 def calculate_error_rate(
172 self,
173 error_count: int,
174 request_count: int
175 ) -> AggregatedMetric:
176 """Calculate error rate as a percentage."""
177
178 rate = (error_count / request_count * 100) if request_count > 0 else 0
179
180 return AggregatedMetric(
181 name="error_rate",
182 value=round(rate, 2),
183 unit="%"
184 )
185
186 def calculate_throughput(
187 self,
188 request_count: int,
189 time_range: TimeRange
190 ) -> AggregatedMetric:
191 """Calculate requests per second."""
192
193 duration_seconds = (time_range.end - time_range.start).total_seconds()
194 rps = request_count / duration_seconds if duration_seconds > 0 else 0
195
196 return AggregatedMetric(
197 name="throughput",
198 value=round(rps, 2),
199 unit="req/s",
200 time_range=time_range
201 )
202
203
204class HealthCalculator:
205 """Calculates overall system health from metrics."""
206
207 def __init__(self):
208 self.thresholds = {
209 "error_rate": {"warning": 1.0, "critical": 5.0},
210 "latency_p99": {"warning": 1000, "critical": 5000},
211 "active_alerts": {"warning": 1, "critical": 3}
212 }
213
214 def calculate_health_score(
215 self,
216 error_rate: float,
217 latency_p99: float,
218 active_alerts: int
219 ) -> Dict[str, Any]:
220 """Calculate overall health score (0-100)."""
221
222 scores = []
223 issues = []
224
225 # Error rate contribution (40%)
226 error_score = self._score_metric(
227 error_rate,
228 self.thresholds["error_rate"]
229 )
230 scores.append(error_score * 0.4)
231 if error_score < 70:
232 issues.append(f"High error rate: {error_rate:.2f}%")
233
234 # Latency contribution (40%)
235 latency_score = self._score_metric(
236 latency_p99,
237 self.thresholds["latency_p99"]
238 )
239 scores.append(latency_score * 0.4)
240 if latency_score < 70:
241 issues.append(f"High latency: {latency_p99:.0f}ms (p99)")
242
243 # Alert contribution (20%)
244 alert_score = self._score_metric(
245 active_alerts,
246 self.thresholds["active_alerts"]
247 )
248 scores.append(alert_score * 0.2)
249 if alert_score < 70:
250 issues.append(f"Active alerts: {active_alerts}")
251
252 total_score = sum(scores)
253
254 # Determine status
255 if total_score >= 90:
256 status = "healthy"
257 elif total_score >= 70:
258 status = "degraded"
259 else:
260 status = "critical"
261
262 return {
263 "score": round(total_score),
264 "status": status,
265 "issues": issues,
266 "components": {
267 "error_rate": round(error_score),
268 "latency": round(latency_score),
269 "alerts": round(alert_score)
270 }
271 }
272
273 def _score_metric(
274 self,
275 value: float,
276 thresholds: Dict[str, float]
277 ) -> float:
278 """Score a metric based on thresholds (0-100)."""
279
280 if value <= thresholds["warning"]:
281 return 100
282 elif value <= thresholds["critical"]:
283 # Linear interpolation between warning and critical
284 range_size = thresholds["critical"] - thresholds["warning"]
285 excess = value - thresholds["warning"]
286 return 100 - (excess / range_size * 50) # 100 -> 50
287 else:
288 # Beyond critical, score decreases more rapidly
289 excess = value - thresholds["critical"]
290 return max(0, 50 - (excess / thresholds["critical"] * 50))Key Aggregation Patterns
- Time bucketing: Group data points into fixed intervals
- Trend analysis: Compare current metrics to previous periods
- Health scoring: Combine multiple metrics into overall status
- Percentile calculation: Understand latency distribution
Visualization Components
Visualization components render aggregated data as charts, tables, and status indicators. These components are designed to be reusable and responsive:
1"""
2Visualization components for the observability dashboard.
3
4These components generate the data structures needed for
5frontend chart libraries (e.g., Chart.js, Recharts).
6"""
7
8from dataclasses import dataclass, field
9from typing import Any, Dict, List, Optional
10from enum import Enum
11from datetime import datetime
12
13
14class ChartType(Enum):
15 """Types of charts available."""
16 LINE = "line"
17 BAR = "bar"
18 AREA = "area"
19 PIE = "pie"
20 SCATTER = "scatter"
21 HEATMAP = "heatmap"
22
23
24@dataclass
25class ChartSeries:
26 """A single data series for a chart."""
27 name: str
28 data: List[Dict[str, Any]]
29 color: Optional[str] = None
30 type: ChartType = ChartType.LINE
31
32
33@dataclass
34class ChartConfig:
35 """Configuration for a chart visualization."""
36 title: str
37 series: List[ChartSeries]
38 x_axis_label: str = "Time"
39 y_axis_label: str = "Value"
40 show_legend: bool = True
41 show_grid: bool = True
42 height: int = 300
43
44 def to_chartjs_config(self) -> Dict[str, Any]:
45 """Convert to Chart.js configuration."""
46
47 datasets = []
48 labels = set()
49
50 for series in self.series:
51 for point in series.data:
52 labels.add(point["timestamp"])
53
54 datasets.append({
55 "label": series.name,
56 "data": [point["value"] for point in series.data],
57 "borderColor": series.color or self._default_color(len(datasets)),
58 "fill": series.type == ChartType.AREA,
59 "tension": 0.4
60 })
61
62 return {
63 "type": self.series[0].type.value if self.series else "line",
64 "data": {
65 "labels": sorted(list(labels)),
66 "datasets": datasets
67 },
68 "options": {
69 "responsive": True,
70 "plugins": {
71 "title": {
72 "display": True,
73 "text": self.title
74 },
75 "legend": {
76 "display": self.show_legend
77 }
78 },
79 "scales": {
80 "x": {
81 "title": {
82 "display": True,
83 "text": self.x_axis_label
84 }
85 },
86 "y": {
87 "title": {
88 "display": True,
89 "text": self.y_axis_label
90 }
91 }
92 }
93 }
94 }
95
96 def _default_color(self, index: int) -> str:
97 """Get default color for series."""
98 colors = [
99 "#3b82f6", # Blue
100 "#10b981", # Green
101 "#f59e0b", # Amber
102 "#ef4444", # Red
103 "#8b5cf6", # Purple
104 "#06b6d4", # Cyan
105 ]
106 return colors[index % len(colors)]
107
108
109class VisualizationFactory:
110 """Factory for creating visualization configurations."""
111
112 @staticmethod
113 def create_latency_chart(
114 metrics_data: List[Dict[str, Any]],
115 title: str = "Request Latency"
116 ) -> ChartConfig:
117 """Create a latency time series chart."""
118
119 latency_data = next(
120 (m for m in metrics_data if m["metric_type"] == "latency"),
121 {"data_points": []}
122 )
123
124 return ChartConfig(
125 title=title,
126 series=[
127 ChartSeries(
128 name="Average Latency",
129 data=latency_data["data_points"],
130 color="#3b82f6",
131 type=ChartType.LINE
132 )
133 ],
134 y_axis_label="Latency (ms)"
135 )
136
137 @staticmethod
138 def create_throughput_chart(
139 metrics_data: List[Dict[str, Any]],
140 title: str = "Request Throughput"
141 ) -> ChartConfig:
142 """Create a throughput chart."""
143
144 request_data = next(
145 (m for m in metrics_data if m["metric_type"] == "requests"),
146 {"data_points": []}
147 )
148
149 return ChartConfig(
150 title=title,
151 series=[
152 ChartSeries(
153 name="Requests",
154 data=request_data["data_points"],
155 color="#10b981",
156 type=ChartType.AREA
157 )
158 ],
159 y_axis_label="Requests"
160 )
161
162 @staticmethod
163 def create_error_chart(
164 metrics_data: List[Dict[str, Any]],
165 title: str = "Errors Over Time"
166 ) -> ChartConfig:
167 """Create an error rate chart."""
168
169 error_data = next(
170 (m for m in metrics_data if m["metric_type"] == "errors"),
171 {"data_points": []}
172 )
173
174 return ChartConfig(
175 title=title,
176 series=[
177 ChartSeries(
178 name="Errors",
179 data=error_data["data_points"],
180 color="#ef4444",
181 type=ChartType.BAR
182 )
183 ],
184 y_axis_label="Error Count"
185 )
186
187 @staticmethod
188 def create_token_usage_chart(
189 metrics_data: List[Dict[str, Any]],
190 title: str = "Token Usage"
191 ) -> ChartConfig:
192 """Create a token usage chart."""
193
194 token_data = next(
195 (m for m in metrics_data if m["metric_type"] == "tokens"),
196 {"data_points": []}
197 )
198
199 return ChartConfig(
200 title=title,
201 series=[
202 ChartSeries(
203 name="Tokens",
204 data=token_data["data_points"],
205 color="#8b5cf6",
206 type=ChartType.AREA
207 )
208 ],
209 y_axis_label="Token Count"
210 )
211
212
213@dataclass
214class StatCard:
215 """Configuration for a statistics card."""
216 title: str
217 value: str
218 unit: str
219 trend: Optional[float] = None
220 trend_direction: Optional[str] = None
221 status: str = "normal" # normal, warning, critical
222 icon: Optional[str] = None
223
224 def to_dict(self) -> Dict[str, Any]:
225 """Convert to dictionary for frontend."""
226 return {
227 "title": self.title,
228 "value": self.value,
229 "unit": self.unit,
230 "trend": self.trend,
231 "trendDirection": self.trend_direction,
232 "status": self.status,
233 "icon": self.icon
234 }
235
236
237class StatCardFactory:
238 """Factory for creating stat cards."""
239
240 @staticmethod
241 def create_latency_card(metric: AggregatedMetric) -> StatCard:
242 """Create a latency stat card."""
243
244 status = "normal"
245 if metric.value > 1000:
246 status = "critical"
247 elif metric.value > 500:
248 status = "warning"
249
250 return StatCard(
251 title="Avg Latency",
252 value=f"{metric.value:.0f}",
253 unit="ms",
254 trend=metric.trend,
255 trend_direction=metric.trend_direction,
256 status=status,
257 icon="clock"
258 )
259
260 @staticmethod
261 def create_error_rate_card(metric: AggregatedMetric) -> StatCard:
262 """Create an error rate stat card."""
263
264 status = "normal"
265 if metric.value > 5:
266 status = "critical"
267 elif metric.value > 1:
268 status = "warning"
269
270 return StatCard(
271 title="Error Rate",
272 value=f"{metric.value:.2f}",
273 unit="%",
274 trend=metric.trend,
275 trend_direction=metric.trend_direction,
276 status=status,
277 icon="alert-circle"
278 )
279
280 @staticmethod
281 def create_throughput_card(metric: AggregatedMetric) -> StatCard:
282 """Create a throughput stat card."""
283
284 return StatCard(
285 title="Throughput",
286 value=f"{metric.value:.1f}",
287 unit="req/s",
288 trend=metric.trend,
289 trend_direction=metric.trend_direction,
290 status="normal",
291 icon="activity"
292 )
293
294 @staticmethod
295 def create_health_card(health: Dict[str, Any]) -> StatCard:
296 """Create a health score stat card."""
297
298 status_map = {
299 "healthy": "normal",
300 "degraded": "warning",
301 "critical": "critical"
302 }
303
304 return StatCard(
305 title="System Health",
306 value=str(health["score"]),
307 unit="/ 100",
308 status=status_map.get(health["status"], "normal"),
309 icon="heart"
310 )Frontend Integration: The visualization components generate configuration objects that can be directly consumed by frontend charting libraries like Chart.js, Recharts, or D3.js.
Alerting Integration
The dashboard integrates with the alerting system to display active alerts and enable operators to acknowledge or silence them:
1"""
2Alerting integration for the observability dashboard.
3
4This module connects the dashboard to the alerting system,
5enabling real-time alert display and management.
6"""
7
8from dataclasses import dataclass
9from typing import Any, Callable, Dict, List, Optional
10from datetime import datetime, timedelta
11from enum import Enum
12import asyncio
13
14
15class AlertSeverity(Enum):
16 """Alert severity levels."""
17 INFO = "info"
18 WARNING = "warning"
19 ERROR = "error"
20 CRITICAL = "critical"
21
22
23class AlertStatus(Enum):
24 """Alert status values."""
25 FIRING = "firing"
26 ACKNOWLEDGED = "acknowledged"
27 SILENCED = "silenced"
28 RESOLVED = "resolved"
29
30
31@dataclass
32class Alert:
33 """Represents an alert."""
34 alert_id: str
35 name: str
36 severity: AlertSeverity
37 status: AlertStatus
38 message: str
39 triggered_at: datetime
40 resolved_at: Optional[datetime] = None
41 acknowledged_by: Optional[str] = None
42 labels: Dict[str, str] = field(default_factory=dict)
43
44
45@dataclass
46class AlertRule:
47 """Defines an alert rule."""
48 name: str
49 condition: str # Expression to evaluate
50 severity: AlertSeverity
51 message_template: str
52 cooldown_minutes: int = 5
53 labels: Dict[str, str] = field(default_factory=dict)
54
55
56class DashboardAlertManager:
57 """Manages alerts for the dashboard."""
58
59 def __init__(self, alert_store, notification_service):
60 self.alert_store = alert_store
61 self.notification_service = notification_service
62 self.alert_rules: List[AlertRule] = []
63 self.subscribers: List[Callable[[Alert], None]] = []
64
65 def register_rule(self, rule: AlertRule):
66 """Register an alert rule."""
67 self.alert_rules.append(rule)
68
69 def subscribe(self, callback: Callable[[Alert], None]):
70 """Subscribe to alert updates."""
71 self.subscribers.append(callback)
72
73 async def get_active_alerts(self) -> List[Alert]:
74 """Get all active (firing or acknowledged) alerts."""
75
76 alerts = await self.alert_store.query(
77 status=[AlertStatus.FIRING, AlertStatus.ACKNOWLEDGED]
78 )
79
80 return sorted(
81 alerts,
82 key=lambda a: (
83 -self._severity_priority(a.severity),
84 a.triggered_at
85 )
86 )
87
88 async def get_alert_summary(self) -> Dict[str, Any]:
89 """Get summary of alert counts by severity."""
90
91 alerts = await self.get_active_alerts()
92
93 summary = {
94 "total": len(alerts),
95 "by_severity": {
96 "critical": 0,
97 "error": 0,
98 "warning": 0,
99 "info": 0
100 },
101 "by_status": {
102 "firing": 0,
103 "acknowledged": 0
104 }
105 }
106
107 for alert in alerts:
108 summary["by_severity"][alert.severity.value] += 1
109 summary["by_status"][alert.status.value] += 1
110
111 return summary
112
113 async def acknowledge_alert(
114 self,
115 alert_id: str,
116 user: str,
117 comment: Optional[str] = None
118 ):
119 """Acknowledge an alert."""
120
121 await self.alert_store.update(
122 alert_id=alert_id,
123 status=AlertStatus.ACKNOWLEDGED,
124 acknowledged_by=user,
125 acknowledged_at=datetime.utcnow(),
126 comment=comment
127 )
128
129 # Notify subscribers
130 alert = await self.alert_store.get(alert_id)
131 for subscriber in self.subscribers:
132 subscriber(alert)
133
134 async def silence_alert(
135 self,
136 alert_id: str,
137 duration_minutes: int,
138 user: str
139 ):
140 """Silence an alert for a duration."""
141
142 silence_until = datetime.utcnow() + timedelta(minutes=duration_minutes)
143
144 await self.alert_store.update(
145 alert_id=alert_id,
146 status=AlertStatus.SILENCED,
147 silenced_until=silence_until,
148 silenced_by=user
149 )
150
151 async def evaluate_rules(
152 self,
153 metrics: Dict[str, AggregatedMetric]
154 ) -> List[Alert]:
155 """Evaluate alert rules against current metrics."""
156
157 new_alerts = []
158
159 for rule in self.alert_rules:
160 should_fire = self._evaluate_condition(rule.condition, metrics)
161
162 if should_fire:
163 # Check if already firing
164 existing = await self.alert_store.query(
165 name=rule.name,
166 status=[AlertStatus.FIRING, AlertStatus.ACKNOWLEDGED]
167 )
168
169 if not existing:
170 # Check cooldown
171 recent = await self.alert_store.query(
172 name=rule.name,
173 since=datetime.utcnow() - timedelta(
174 minutes=rule.cooldown_minutes
175 )
176 )
177
178 if not recent:
179 alert = await self._create_alert(rule, metrics)
180 new_alerts.append(alert)
181
182 return new_alerts
183
184 def _evaluate_condition(
185 self,
186 condition: str,
187 metrics: Dict[str, AggregatedMetric]
188 ) -> bool:
189 """Evaluate an alert condition expression."""
190
191 # Build context for evaluation
192 context = {
193 name: metric.value
194 for name, metric in metrics.items()
195 }
196
197 try:
198 # Safe evaluation of simple conditions
199 # In production, use a proper expression parser
200 return eval(condition, {"__builtins__": {}}, context)
201 except Exception:
202 return False
203
204 async def _create_alert(
205 self,
206 rule: AlertRule,
207 metrics: Dict[str, AggregatedMetric]
208 ) -> Alert:
209 """Create and store a new alert."""
210
211 # Format message with metric values
212 message = rule.message_template.format(
213 **{name: f"{m.value:.2f}" for name, m in metrics.items()}
214 )
215
216 alert = Alert(
217 alert_id=self._generate_id(),
218 name=rule.name,
219 severity=rule.severity,
220 status=AlertStatus.FIRING,
221 message=message,
222 triggered_at=datetime.utcnow(),
223 labels=rule.labels
224 )
225
226 await self.alert_store.save(alert)
227
228 # Send notifications
229 await self.notification_service.send_alert(alert)
230
231 # Notify subscribers
232 for subscriber in self.subscribers:
233 subscriber(alert)
234
235 return alert
236
237 def _severity_priority(self, severity: AlertSeverity) -> int:
238 """Get priority number for severity (higher = more severe)."""
239 priorities = {
240 AlertSeverity.INFO: 1,
241 AlertSeverity.WARNING: 2,
242 AlertSeverity.ERROR: 3,
243 AlertSeverity.CRITICAL: 4
244 }
245 return priorities.get(severity, 0)
246
247 def _generate_id(self) -> str:
248 """Generate unique alert ID."""
249 import uuid
250 return str(uuid.uuid4())[:8]
251
252
253# Example alert rules for agent monitoring
254DEFAULT_ALERT_RULES = [
255 AlertRule(
256 name="high_error_rate",
257 condition="error_rate > 5",
258 severity=AlertSeverity.CRITICAL,
259 message_template="Error rate is {error_rate}% (threshold: 5%)",
260 labels={"component": "agent", "type": "reliability"}
261 ),
262 AlertRule(
263 name="high_latency",
264 condition="latency > 2000",
265 severity=AlertSeverity.WARNING,
266 message_template="Latency is {latency}ms (threshold: 2000ms)",
267 labels={"component": "agent", "type": "performance"}
268 ),
269 AlertRule(
270 name="low_throughput",
271 condition="throughput < 0.5",
272 severity=AlertSeverity.WARNING,
273 message_template="Throughput is {throughput} req/s (threshold: 0.5)",
274 labels={"component": "agent", "type": "capacity"}
275 ),
276 AlertRule(
277 name="high_token_usage",
278 condition="tokens > 100000",
279 severity=AlertSeverity.INFO,
280 message_template="Token usage is {tokens} (threshold: 100000)",
281 labels={"component": "agent", "type": "cost"}
282 )
283]Alert Display Components
1"""
2Alert display components for the dashboard.
3"""
4
5@dataclass
6class AlertListConfig:
7 """Configuration for alert list display."""
8 max_alerts: int = 10
9 show_acknowledged: bool = True
10 group_by_severity: bool = True
11
12 def format_alerts(
13 self,
14 alerts: List[Alert]
15 ) -> List[Dict[str, Any]]:
16 """Format alerts for display."""
17
18 displayed = alerts[:self.max_alerts]
19
20 if not self.show_acknowledged:
21 displayed = [
22 a for a in displayed
23 if a.status != AlertStatus.ACKNOWLEDGED
24 ]
25
26 if self.group_by_severity:
27 # Group and sort by severity
28 groups = defaultdict(list)
29 for alert in displayed:
30 groups[alert.severity.value].append(alert)
31
32 result = []
33 for severity in ["critical", "error", "warning", "info"]:
34 for alert in groups.get(severity, []):
35 result.append(self._format_alert(alert))
36 return result
37
38 return [self._format_alert(a) for a in displayed]
39
40 def _format_alert(self, alert: Alert) -> Dict[str, Any]:
41 """Format a single alert."""
42
43 return {
44 "id": alert.alert_id,
45 "name": alert.name,
46 "severity": alert.severity.value,
47 "status": alert.status.value,
48 "message": alert.message,
49 "triggeredAt": alert.triggered_at.isoformat(),
50 "duration": self._format_duration(
51 datetime.utcnow() - alert.triggered_at
52 ),
53 "acknowledgedBy": alert.acknowledged_by,
54 "labels": alert.labels,
55 "actions": self._get_available_actions(alert)
56 }
57
58 def _format_duration(self, delta: timedelta) -> str:
59 """Format duration as human-readable string."""
60
61 total_seconds = int(delta.total_seconds())
62
63 if total_seconds < 60:
64 return f"{total_seconds}s"
65 elif total_seconds < 3600:
66 return f"{total_seconds // 60}m"
67 elif total_seconds < 86400:
68 return f"{total_seconds // 3600}h"
69 else:
70 return f"{total_seconds // 86400}d"
71
72 def _get_available_actions(self, alert: Alert) -> List[str]:
73 """Get available actions for an alert."""
74
75 actions = []
76
77 if alert.status == AlertStatus.FIRING:
78 actions.append("acknowledge")
79 actions.append("silence")
80 elif alert.status == AlertStatus.ACKNOWLEDGED:
81 actions.append("resolve")
82 actions.append("silence")
83
84 actions.append("view_details")
85
86 return actionsComplete Dashboard Implementation
Now we bring all components together into a complete dashboard implementation:
1"""
2Complete observability dashboard implementation.
3
4This module provides the main Dashboard class that orchestrates
5all components into a unified monitoring interface.
6"""
7
8from dataclasses import dataclass, field
9from typing import Any, Callable, Dict, List, Optional
10from datetime import datetime
11import asyncio
12
13
14@dataclass
15class DashboardState:
16 """Current state of the dashboard."""
17 time_range: TimeRange
18 data: Dict[DataSource, List[Dict[str, Any]]] = field(default_factory=dict)
19 summaries: Dict[str, AggregatedMetric] = field(default_factory=dict)
20 health: Dict[str, Any] = field(default_factory=dict)
21 charts: Dict[str, ChartConfig] = field(default_factory=dict)
22 stat_cards: List[StatCard] = field(default_factory=list)
23 alerts: List[Dict[str, Any]] = field(default_factory=list)
24 last_updated: Optional[datetime] = None
25
26
27class ObservabilityDashboard:
28 """Main observability dashboard for AI agents."""
29
30 def __init__(
31 self,
32 config: DashboardConfig,
33 log_store,
34 trace_store,
35 metrics_store,
36 error_store,
37 alert_manager
38 ):
39 self.config = config
40 self.state = DashboardState(
41 time_range=TimeRange.last_minutes(config.default_time_range_minutes)
42 )
43
44 # Initialize aggregator
45 self.aggregator = DataAggregator(config)
46
47 # Register collectors
48 self.aggregator.register_collector(LogCollector(log_store))
49 self.aggregator.register_collector(TraceCollector(trace_store))
50 self.aggregator.register_collector(MetricsCollector(metrics_store))
51 self.aggregator.register_collector(ErrorCollector(error_store))
52 self.aggregator.register_collector(AlertCollector(alert_manager))
53
54 # Initialize processors
55 self.metrics_aggregator = MetricsAggregator(config)
56 self.health_calculator = HealthCalculator()
57 self.alert_manager = DashboardAlertManager(
58 alert_manager.store,
59 alert_manager.notification_service
60 )
61
62 # Register default alert rules
63 for rule in DEFAULT_ALERT_RULES:
64 self.alert_manager.register_rule(rule)
65
66 # State change subscribers
67 self.subscribers: List[Callable[[DashboardState], None]] = []
68
69 # Background tasks
70 self._refresh_task: Optional[asyncio.Task] = None
71
72 def subscribe(self, callback: Callable[[DashboardState], None]):
73 """Subscribe to dashboard state changes."""
74 self.subscribers.append(callback)
75
76 async def start(self):
77 """Start the dashboard with automatic refresh."""
78
79 # Initial load
80 await self.refresh()
81
82 # Start background refresh
83 if self.config.enable_real_time:
84 self._refresh_task = asyncio.create_task(
85 self._refresh_loop()
86 )
87
88 async def stop(self):
89 """Stop the dashboard."""
90 if self._refresh_task:
91 self._refresh_task.cancel()
92 try:
93 await self._refresh_task
94 except asyncio.CancelledError:
95 pass
96
97 async def refresh(self):
98 """Refresh dashboard data."""
99
100 # Fetch all data
101 self.state.data = await self.aggregator.fetch_all(
102 self.state.time_range
103 )
104
105 # Process metrics
106 await self._process_metrics()
107
108 # Calculate health
109 await self._calculate_health()
110
111 # Generate visualizations
112 await self._generate_visualizations()
113
114 # Fetch alerts
115 await self._fetch_alerts()
116
117 # Update timestamp
118 self.state.last_updated = datetime.utcnow()
119
120 # Notify subscribers
121 self._notify_subscribers()
122
123 async def set_time_range(self, time_range: TimeRange):
124 """Change the dashboard time range."""
125 self.state.time_range = time_range
126 await self.refresh()
127
128 async def _process_metrics(self):
129 """Process raw metrics into summaries."""
130
131 metrics_data = self.state.data.get(DataSource.METRICS, [])
132
133 # Get previous period for trend calculation
134 duration = (
135 self.state.time_range.end - self.state.time_range.start
136 )
137 previous_range = TimeRange(
138 start=self.state.time_range.start - duration,
139 end=self.state.time_range.start
140 )
141
142 previous_data = await self.aggregator.fetch_all(
143 previous_range,
144 sources=[DataSource.METRICS]
145 )
146
147 self.state.summaries = self.metrics_aggregator.calculate_summary_stats(
148 metrics_data,
149 previous_data.get(DataSource.METRICS, [])
150 )
151
152 # Calculate error rate
153 error_count = sum(
154 dp["value"]
155 for m in metrics_data if m["metric_type"] == "errors"
156 for dp in m["data_points"]
157 )
158 request_count = sum(
159 dp["value"]
160 for m in metrics_data if m["metric_type"] == "requests"
161 for dp in m["data_points"]
162 )
163
164 self.state.summaries["error_rate"] = (
165 self.metrics_aggregator.calculate_error_rate(
166 error_count,
167 request_count
168 )
169 )
170
171 # Calculate throughput
172 self.state.summaries["throughput"] = (
173 self.metrics_aggregator.calculate_throughput(
174 request_count,
175 self.state.time_range
176 )
177 )
178
179 async def _calculate_health(self):
180 """Calculate system health score."""
181
182 error_rate = self.state.summaries.get(
183 "error_rate",
184 AggregatedMetric("error_rate", 0, "%")
185 ).value
186
187 latency = self.state.summaries.get(
188 "latency",
189 AggregatedMetric("latency", 0, "ms")
190 ).value
191
192 alert_summary = await self.alert_manager.get_alert_summary()
193 active_alerts = alert_summary["by_severity"]["critical"] + (
194 alert_summary["by_severity"]["error"]
195 )
196
197 self.state.health = self.health_calculator.calculate_health_score(
198 error_rate=error_rate,
199 latency_p99=latency,
200 active_alerts=active_alerts
201 )
202
203 async def _generate_visualizations(self):
204 """Generate chart configurations."""
205
206 metrics_data = self.state.data.get(DataSource.METRICS, [])
207
208 # Create charts
209 self.state.charts = {
210 "latency": VisualizationFactory.create_latency_chart(metrics_data),
211 "throughput": VisualizationFactory.create_throughput_chart(metrics_data),
212 "errors": VisualizationFactory.create_error_chart(metrics_data),
213 "tokens": VisualizationFactory.create_token_usage_chart(metrics_data)
214 }
215
216 # Create stat cards
217 self.state.stat_cards = [
218 StatCardFactory.create_health_card(self.state.health),
219 StatCardFactory.create_latency_card(
220 self.state.summaries.get(
221 "latency",
222 AggregatedMetric("latency", 0, "ms")
223 )
224 ),
225 StatCardFactory.create_error_rate_card(
226 self.state.summaries.get(
227 "error_rate",
228 AggregatedMetric("error_rate", 0, "%")
229 )
230 ),
231 StatCardFactory.create_throughput_card(
232 self.state.summaries.get(
233 "throughput",
234 AggregatedMetric("throughput", 0, "req/s")
235 )
236 )
237 ]
238
239 async def _fetch_alerts(self):
240 """Fetch and format alerts."""
241
242 alerts = await self.alert_manager.get_active_alerts()
243
244 alert_config = AlertListConfig()
245 self.state.alerts = alert_config.format_alerts(alerts)
246
247 async def _refresh_loop(self):
248 """Background refresh loop."""
249
250 while True:
251 await asyncio.sleep(self.config.refresh_interval_seconds)
252 await self.refresh()
253
254 def _notify_subscribers(self):
255 """Notify all subscribers of state change."""
256 for subscriber in self.subscribers:
257 try:
258 subscriber(self.state)
259 except Exception:
260 pass # Don't let subscriber errors break the dashboard
261
262 def get_dashboard_data(self) -> Dict[str, Any]:
263 """Get complete dashboard data for API response."""
264
265 return {
266 "timeRange": {
267 "start": self.state.time_range.start.isoformat(),
268 "end": self.state.time_range.end.isoformat()
269 },
270 "lastUpdated": (
271 self.state.last_updated.isoformat()
272 if self.state.last_updated else None
273 ),
274 "health": self.state.health,
275 "statCards": [card.to_dict() for card in self.state.stat_cards],
276 "charts": {
277 name: config.to_chartjs_config()
278 for name, config in self.state.charts.items()
279 },
280 "alerts": self.state.alerts,
281 "logs": self.state.data.get(DataSource.LOGS, [])[:50],
282 "traces": self.state.data.get(DataSource.TRACES, [])[:20],
283 "errors": self.state.data.get(DataSource.ERRORS, [])[:10]
284 }Dashboard API Endpoints
1"""
2FastAPI endpoints for the observability dashboard.
3"""
4
5from fastapi import FastAPI, HTTPException, Query
6from fastapi.responses import JSONResponse
7from datetime import datetime
8from typing import Optional
9
10
11app = FastAPI(title="Agent Observability Dashboard API")
12
13# Initialize dashboard (in production, use dependency injection)
14dashboard: Optional[ObservabilityDashboard] = None
15
16
17@app.on_event("startup")
18async def startup():
19 """Initialize dashboard on startup."""
20 global dashboard
21
22 config = DashboardConfig()
23
24 # Initialize stores (use your actual implementations)
25 log_store = LogStore()
26 trace_store = TraceStore()
27 metrics_store = MetricsStore()
28 error_store = ErrorStore()
29 alert_manager = AlertManager()
30
31 dashboard = ObservabilityDashboard(
32 config=config,
33 log_store=log_store,
34 trace_store=trace_store,
35 metrics_store=metrics_store,
36 error_store=error_store,
37 alert_manager=alert_manager
38 )
39
40 await dashboard.start()
41
42
43@app.on_event("shutdown")
44async def shutdown():
45 """Cleanup on shutdown."""
46 if dashboard:
47 await dashboard.stop()
48
49
50@app.get("/api/dashboard")
51async def get_dashboard(
52 minutes: int = Query(60, ge=1, le=1440)
53):
54 """Get complete dashboard data."""
55
56 if not dashboard:
57 raise HTTPException(status_code=503, detail="Dashboard not initialized")
58
59 # Update time range if different
60 time_range = TimeRange.last_minutes(minutes)
61 if time_range != dashboard.state.time_range:
62 await dashboard.set_time_range(time_range)
63
64 return dashboard.get_dashboard_data()
65
66
67@app.get("/api/dashboard/health")
68async def get_health():
69 """Get system health status."""
70
71 if not dashboard:
72 raise HTTPException(status_code=503, detail="Dashboard not initialized")
73
74 return dashboard.state.health
75
76
77@app.get("/api/dashboard/metrics/{metric_type}")
78async def get_metric(
79 metric_type: str,
80 minutes: int = Query(60, ge=1, le=1440),
81 aggregation: str = Query("avg")
82):
83 """Get specific metric data."""
84
85 if not dashboard:
86 raise HTTPException(status_code=503, detail="Dashboard not initialized")
87
88 metrics_data = dashboard.state.data.get(DataSource.METRICS, [])
89
90 metric = next(
91 (m for m in metrics_data if m["metric_type"] == metric_type),
92 None
93 )
94
95 if not metric:
96 raise HTTPException(status_code=404, detail=f"Metric {metric_type} not found")
97
98 # Apply aggregation
99 aggregated = dashboard.metrics_aggregator.aggregate_time_series(
100 metric["data_points"],
101 AggregationType(aggregation),
102 interval_seconds=60
103 )
104
105 return {
106 "metric_type": metric_type,
107 "aggregation": aggregation,
108 "data_points": aggregated
109 }
110
111
112@app.get("/api/dashboard/alerts")
113async def get_alerts():
114 """Get active alerts."""
115
116 if not dashboard:
117 raise HTTPException(status_code=503, detail="Dashboard not initialized")
118
119 return {
120 "alerts": dashboard.state.alerts,
121 "summary": await dashboard.alert_manager.get_alert_summary()
122 }
123
124
125@app.post("/api/dashboard/alerts/{alert_id}/acknowledge")
126async def acknowledge_alert(
127 alert_id: str,
128 user: str = Query(...),
129 comment: Optional[str] = None
130):
131 """Acknowledge an alert."""
132
133 if not dashboard:
134 raise HTTPException(status_code=503, detail="Dashboard not initialized")
135
136 await dashboard.alert_manager.acknowledge_alert(
137 alert_id=alert_id,
138 user=user,
139 comment=comment
140 )
141
142 return {"status": "acknowledged"}
143
144
145@app.post("/api/dashboard/alerts/{alert_id}/silence")
146async def silence_alert(
147 alert_id: str,
148 duration_minutes: int = Query(..., ge=5, le=1440),
149 user: str = Query(...)
150):
151 """Silence an alert for a duration."""
152
153 if not dashboard:
154 raise HTTPException(status_code=503, detail="Dashboard not initialized")
155
156 await dashboard.alert_manager.silence_alert(
157 alert_id=alert_id,
158 duration_minutes=duration_minutes,
159 user=user
160 )
161
162 return {"status": "silenced", "duration_minutes": duration_minutes}
163
164
165@app.get("/api/dashboard/logs")
166async def get_logs(
167 level: Optional[str] = None,
168 agent_id: Optional[str] = None,
169 limit: int = Query(100, ge=1, le=1000)
170):
171 """Get log entries."""
172
173 if not dashboard:
174 raise HTTPException(status_code=503, detail="Dashboard not initialized")
175
176 logs = dashboard.state.data.get(DataSource.LOGS, [])
177
178 # Filter logs
179 if level:
180 logs = [l for l in logs if l["level"] == level]
181 if agent_id:
182 logs = [l for l in logs if l["agent_id"] == agent_id]
183
184 return {"logs": logs[:limit]}
185
186
187@app.get("/api/dashboard/traces")
188async def get_traces(
189 status: Optional[str] = None,
190 min_duration_ms: Optional[int] = None,
191 limit: int = Query(50, ge=1, le=200)
192):
193 """Get trace data."""
194
195 if not dashboard:
196 raise HTTPException(status_code=503, detail="Dashboard not initialized")
197
198 traces = dashboard.state.data.get(DataSource.TRACES, [])
199
200 # Filter traces
201 if status:
202 traces = [t for t in traces if t["status"] == status]
203 if min_duration_ms:
204 traces = [t for t in traces if t["duration_ms"] >= min_duration_ms]
205
206 return {"traces": traces[:limit]}
207
208
209@app.get("/api/dashboard/traces/{trace_id}")
210async def get_trace_detail(trace_id: str):
211 """Get detailed trace information."""
212
213 if not dashboard:
214 raise HTTPException(status_code=503, detail="Dashboard not initialized")
215
216 traces = dashboard.state.data.get(DataSource.TRACES, [])
217
218 trace = next((t for t in traces if t["trace_id"] == trace_id), None)
219
220 if not trace:
221 raise HTTPException(status_code=404, detail=f"Trace {trace_id} not found")
222
223 return traceDashboard Deployment
Deploy the dashboard as a containerized service with proper configuration for production use:
1# docker-compose.yml for the observability dashboard
2
3version: "3.8"
4
5services:
6 dashboard-api:
7 build:
8 context: .
9 dockerfile: Dockerfile
10 ports:
11 - "8080:8080"
12 environment:
13 - LOG_STORE_URL=http://loki:3100
14 - TRACE_STORE_URL=http://jaeger:14268
15 - METRICS_STORE_URL=http://prometheus:9090
16 - REDIS_URL=redis://redis:6379
17 - DASHBOARD_REFRESH_INTERVAL=30
18 - DASHBOARD_DEFAULT_TIME_RANGE=60
19 depends_on:
20 - redis
21 - loki
22 - prometheus
23 - jaeger
24 healthcheck:
25 test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
26 interval: 30s
27 timeout: 10s
28 retries: 3
29
30 dashboard-frontend:
31 build:
32 context: ./frontend
33 dockerfile: Dockerfile
34 ports:
35 - "3000:3000"
36 environment:
37 - API_URL=http://dashboard-api:8080
38 depends_on:
39 - dashboard-api
40
41 redis:
42 image: redis:7-alpine
43 ports:
44 - "6379:6379"
45 volumes:
46 - redis-data:/data
47
48 loki:
49 image: grafana/loki:2.9.0
50 ports:
51 - "3100:3100"
52 volumes:
53 - loki-data:/loki
54 command: -config.file=/etc/loki/local-config.yaml
55
56 prometheus:
57 image: prom/prometheus:v2.47.0
58 ports:
59 - "9090:9090"
60 volumes:
61 - ./prometheus.yml:/etc/prometheus/prometheus.yml
62 - prometheus-data:/prometheus
63
64 jaeger:
65 image: jaegertracing/all-in-one:1.50
66 ports:
67 - "14268:14268"
68 - "16686:16686"
69 environment:
70 - COLLECTOR_OTLP_ENABLED=true
71
72volumes:
73 redis-data:
74 loki-data:
75 prometheus-data:Production Configuration
1"""
2Production configuration for the observability dashboard.
3"""
4
5import os
6from dataclasses import dataclass
7
8
9@dataclass
10class ProductionConfig(DashboardConfig):
11 """Production-optimized dashboard configuration."""
12
13 # Performance
14 refresh_interval_seconds: int = 30
15 max_data_points: int = 500
16 aggregation_interval_seconds: int = 60
17 batch_size: int = 100
18
19 # Caching
20 cache_ttl_seconds: int = 30
21 enable_query_cache: bool = True
22
23 # Data retention
24 log_retention_hours: int = 24
25 trace_retention_hours: int = 72
26 metric_retention_hours: int = 168
27
28 # Rate limiting
29 max_requests_per_minute: int = 120
30
31 @classmethod
32 def from_environment(cls) -> "ProductionConfig":
33 """Load configuration from environment variables."""
34 return cls(
35 refresh_interval_seconds=int(
36 os.getenv("DASHBOARD_REFRESH_INTERVAL", "30")
37 ),
38 default_time_range_minutes=int(
39 os.getenv("DASHBOARD_DEFAULT_TIME_RANGE", "60")
40 ),
41 max_data_points=int(
42 os.getenv("DASHBOARD_MAX_DATA_POINTS", "500")
43 ),
44 enable_real_time=os.getenv(
45 "DASHBOARD_ENABLE_REALTIME", "true"
46 ).lower() == "true"
47 )
48
49
50# Health check endpoint
51@app.get("/health")
52async def health_check():
53 """Health check endpoint for load balancers."""
54
55 if not dashboard:
56 return JSONResponse(
57 status_code=503,
58 content={"status": "unhealthy", "reason": "Dashboard not initialized"}
59 )
60
61 # Check component health
62 checks = {
63 "dashboard": "healthy",
64 "last_refresh": (
65 dashboard.state.last_updated.isoformat()
66 if dashboard.state.last_updated else None
67 ),
68 "data_sources": {
69 source.value: len(data) > 0
70 for source, data in dashboard.state.data.items()
71 }
72 }
73
74 all_healthy = all(checks["data_sources"].values())
75
76 return JSONResponse(
77 status_code=200 if all_healthy else 503,
78 content={
79 "status": "healthy" if all_healthy else "degraded",
80 "checks": checks
81 }
82 )Production Best Practices: Use environment variables for configuration, implement health checks for load balancers, and configure appropriate data retention policies to balance observability with storage costs.
Summary
In this capstone section, we built a comprehensive observability dashboard that integrates all the monitoring capabilities covered in this chapter:
| Component | Purpose | Key Implementation |
|---|---|---|
| Data Collection | Gather telemetry from sources | TelemetryCollector abstraction |
| Aggregation | Compute statistics and trends | MetricsAggregator with time bucketing |
| Visualization | Render charts and stats | ChartConfig and StatCard factories |
| Alerting | Monitor and notify | DashboardAlertManager with rules |
| API | Expose dashboard data | FastAPI endpoints with filtering |
What You've Learned: How to design a layered dashboard architecture, implement collectors for multiple data sources, aggregate metrics with trend analysis, create visualization configurations, integrate alerting, and deploy the dashboard for production use.
The observability dashboard provides operators with a unified view of agent behavior, enabling them to quickly identify issues, understand their root causes, and take corrective action. By combining logs, traces, metrics, and alerts into a single interface, the dashboard transforms raw telemetry into actionable insights.
In the next chapter, we'll explore evaluation and benchmarking techniques to measure and improve agent performance systematically.