Chapter 19
25 min read
Section 122 of 175

Building an Observability Dashboard

Observability and Debugging

Introduction

In this capstone section, we bring together everything we've learned about observability to build a comprehensive monitoring dashboard for AI agents. This dashboard integrates logging, tracing, performance metrics, and error analysis into a unified view that enables operators to understand, debug, and optimize agent behavior in real-time.

What You'll Build: A complete observability dashboard with real-time metrics, trace visualization, performance charts, error tracking, and intelligent alerting—everything you need to monitor production AI agents.

A well-designed observability dashboard transforms raw telemetry data into actionable insights. Rather than drowning in logs and metrics, operators can quickly identify issues, understand their root causes, and take corrective action.


Dashboard Architecture

Our observability dashboard follows a layered architecture that separates data collection, processing, storage, and visualization concerns:

🐍python
1"""
2Observability Dashboard Architecture
3
4This module defines the core architecture for the agent observability
5dashboard, integrating all telemetry sources into a unified view.
6"""
7
8from abc import ABC, abstractmethod
9from dataclasses import dataclass, field
10from datetime import datetime, timedelta
11from enum import Enum
12from typing import Any, Dict, List, Optional, Callable
13from collections import defaultdict
14import asyncio
15import json
16
17
18class DataSource(Enum):
19    """Types of telemetry data sources."""
20    LOGS = "logs"
21    TRACES = "traces"
22    METRICS = "metrics"
23    ERRORS = "errors"
24    ALERTS = "alerts"
25
26
27@dataclass
28class TimeRange:
29    """Time range for queries."""
30    start: datetime
31    end: datetime
32
33    @classmethod
34    def last_minutes(cls, minutes: int) -> "TimeRange":
35        end = datetime.utcnow()
36        start = end - timedelta(minutes=minutes)
37        return cls(start=start, end=end)
38
39    @classmethod
40    def last_hours(cls, hours: int) -> "TimeRange":
41        end = datetime.utcnow()
42        start = end - timedelta(hours=hours)
43        return cls(start=start, end=end)
44
45
46@dataclass
47class DashboardConfig:
48    """Configuration for the observability dashboard."""
49    refresh_interval_seconds: int = 30
50    default_time_range_minutes: int = 60
51    max_data_points: int = 1000
52    enable_real_time: bool = True
53    alert_polling_interval: int = 10
54
55    # Data retention
56    log_retention_hours: int = 24
57    trace_retention_hours: int = 72
58    metric_retention_hours: int = 168  # 7 days
59
60    # Performance settings
61    aggregation_interval_seconds: int = 60
62    batch_size: int = 100
63
64
65class TelemetryCollector(ABC):
66    """Abstract base class for telemetry collectors."""
67
68    @abstractmethod
69    async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
70        """Collect telemetry data within the time range."""
71        pass
72
73    @abstractmethod
74    def get_source_type(self) -> DataSource:
75        """Return the type of data this collector handles."""
76        pass
77
78
79class DataAggregator:
80    """Aggregates telemetry data from multiple sources."""
81
82    def __init__(self, config: DashboardConfig):
83        self.config = config
84        self.collectors: Dict[DataSource, TelemetryCollector] = {}
85        self.cache: Dict[str, Any] = {}
86        self.cache_ttl: Dict[str, datetime] = {}
87
88    def register_collector(self, collector: TelemetryCollector):
89        """Register a telemetry collector."""
90        self.collectors[collector.get_source_type()] = collector
91
92    async def fetch_all(
93        self,
94        time_range: TimeRange,
95        sources: Optional[List[DataSource]] = None
96    ) -> Dict[DataSource, List[Dict[str, Any]]]:
97        """Fetch data from all or specified sources."""
98
99        if sources is None:
100            sources = list(self.collectors.keys())
101
102        results = {}
103        tasks = []
104
105        for source in sources:
106            if source in self.collectors:
107                tasks.append(self._fetch_source(source, time_range))
108
109        gathered = await asyncio.gather(*tasks, return_exceptions=True)
110
111        for source, data in zip(sources, gathered):
112            if isinstance(data, Exception):
113                results[source] = []
114            else:
115                results[source] = data
116
117        return results
118
119    async def _fetch_source(
120        self,
121        source: DataSource,
122        time_range: TimeRange
123    ) -> List[Dict[str, Any]]:
124        """Fetch data from a single source with caching."""
125
126        cache_key = f"{source.value}:{time_range.start}:{time_range.end}"
127
128        # Check cache
129        if cache_key in self.cache:
130            if datetime.utcnow() < self.cache_ttl.get(cache_key, datetime.min):
131                return self.cache[cache_key]
132
133        # Fetch fresh data
134        collector = self.collectors[source]
135        data = await collector.collect(time_range)
136
137        # Update cache
138        self.cache[cache_key] = data
139        self.cache_ttl[cache_key] = datetime.utcnow() + timedelta(
140            seconds=self.config.refresh_interval_seconds
141        )
142
143        return data
144
145
146@dataclass
147class DashboardPanel:
148    """Represents a single panel in the dashboard."""
149    id: str
150    title: str
151    panel_type: str  # "chart", "table", "stat", "log"
152    data_source: DataSource
153    query: Dict[str, Any]
154    width: int = 6  # Grid units (1-12)
155    height: int = 4  # Grid units
156    refresh_interval: Optional[int] = None
157
158
159@dataclass
160class DashboardLayout:
161    """Defines the layout of dashboard panels."""
162    panels: List[DashboardPanel] = field(default_factory=list)
163    columns: int = 12
164
165    def add_panel(self, panel: DashboardPanel):
166        """Add a panel to the layout."""
167        self.panels.append(panel)
168
169    def to_grid(self) -> List[List[Optional[DashboardPanel]]]:
170        """Convert panels to a grid layout."""
171        # Simple grid layout algorithm
172        grid = []
173        current_row = []
174        current_width = 0
175
176        for panel in self.panels:
177            if current_width + panel.width > self.columns:
178                grid.append(current_row)
179                current_row = []
180                current_width = 0
181
182            current_row.append(panel)
183            current_width += panel.width
184
185        if current_row:
186            grid.append(current_row)
187
188        return grid

Core Components

ComponentResponsibilityKey Features
TelemetryCollectorGather raw data from sourcesAsync collection, source abstraction
DataAggregatorCombine and cache dataMulti-source aggregation, caching
DashboardLayoutOrganize visual panelsGrid system, responsive layout
AlertManagerHandle alerting logicThreshold monitoring, notifications

Data Collection Layer

The data collection layer implements collectors for each telemetry source. These collectors transform raw data into a normalized format suitable for dashboard visualization:

🐍python
1"""
2Telemetry collectors for the observability dashboard.
3
4Each collector fetches data from its respective source and
5normalizes it for dashboard consumption.
6"""
7
8from typing import Any, Dict, List
9import asyncio
10
11
12class LogCollector(TelemetryCollector):
13    """Collects log entries from the logging system."""
14
15    def __init__(self, log_store):
16        self.log_store = log_store
17
18    def get_source_type(self) -> DataSource:
19        return DataSource.LOGS
20
21    async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
22        """Collect logs within the time range."""
23
24        logs = await self.log_store.query(
25            start_time=time_range.start,
26            end_time=time_range.end,
27            limit=1000
28        )
29
30        return [
31            {
32                "timestamp": log.timestamp.isoformat(),
33                "level": log.level,
34                "message": log.message,
35                "agent_id": log.extra.get("agent_id"),
36                "trace_id": log.extra.get("trace_id"),
37                "attributes": log.extra
38            }
39            for log in logs
40        ]
41
42
43class TraceCollector(TelemetryCollector):
44    """Collects distributed traces from the tracing system."""
45
46    def __init__(self, trace_store):
47        self.trace_store = trace_store
48
49    def get_source_type(self) -> DataSource:
50        return DataSource.TRACES
51
52    async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
53        """Collect traces within the time range."""
54
55        traces = await self.trace_store.query_traces(
56            start_time=time_range.start,
57            end_time=time_range.end
58        )
59
60        return [
61            {
62                "trace_id": trace.trace_id,
63                "root_span": trace.root_span.name,
64                "start_time": trace.start_time.isoformat(),
65                "duration_ms": trace.duration_ms,
66                "span_count": len(trace.spans),
67                "status": trace.status,
68                "agent_id": trace.attributes.get("agent_id"),
69                "spans": [
70                    {
71                        "span_id": span.span_id,
72                        "name": span.name,
73                        "duration_ms": span.duration_ms,
74                        "status": span.status
75                    }
76                    for span in trace.spans
77                ]
78            }
79            for trace in traces
80        ]
81
82
83class MetricsCollector(TelemetryCollector):
84    """Collects metrics from the metrics system."""
85
86    def __init__(self, metrics_store):
87        self.metrics_store = metrics_store
88
89    def get_source_type(self) -> DataSource:
90        return DataSource.METRICS
91
92    async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
93        """Collect metrics within the time range."""
94
95        # Fetch various metric types
96        metrics = {}
97
98        # Request latency
99        metrics["latency"] = await self.metrics_store.query(
100            metric_name="agent.request.latency",
101            start_time=time_range.start,
102            end_time=time_range.end,
103            aggregation="avg",
104            interval="1m"
105        )
106
107        # Token usage
108        metrics["tokens"] = await self.metrics_store.query(
109            metric_name="agent.tokens.total",
110            start_time=time_range.start,
111            end_time=time_range.end,
112            aggregation="sum",
113            interval="1m"
114        )
115
116        # Error rate
117        metrics["errors"] = await self.metrics_store.query(
118            metric_name="agent.errors.count",
119            start_time=time_range.start,
120            end_time=time_range.end,
121            aggregation="sum",
122            interval="1m"
123        )
124
125        # Request count
126        metrics["requests"] = await self.metrics_store.query(
127            metric_name="agent.requests.count",
128            start_time=time_range.start,
129            end_time=time_range.end,
130            aggregation="sum",
131            interval="1m"
132        )
133
134        return [
135            {
136                "metric_type": metric_type,
137                "data_points": [
138                    {
139                        "timestamp": dp.timestamp.isoformat(),
140                        "value": dp.value
141                    }
142                    for dp in data_points
143                ]
144            }
145            for metric_type, data_points in metrics.items()
146        ]
147
148
149class ErrorCollector(TelemetryCollector):
150    """Collects error events from the error tracking system."""
151
152    def __init__(self, error_store):
153        self.error_store = error_store
154
155    def get_source_type(self) -> DataSource:
156        return DataSource.ERRORS
157
158    async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
159        """Collect errors within the time range."""
160
161        errors = await self.error_store.query(
162            start_time=time_range.start,
163            end_time=time_range.end
164        )
165
166        # Group errors by type
167        grouped = {}
168        for error in errors:
169            key = f"{error.error_type}:{error.message[:50]}"
170            if key not in grouped:
171                grouped[key] = {
172                    "error_type": error.error_type,
173                    "message": error.message,
174                    "count": 0,
175                    "first_seen": error.timestamp,
176                    "last_seen": error.timestamp,
177                    "sample_trace_id": error.trace_id,
178                    "severity": error.severity
179                }
180
181            grouped[key]["count"] += 1
182            grouped[key]["last_seen"] = max(
183                grouped[key]["last_seen"],
184                error.timestamp
185            )
186
187        return [
188            {
189                **error_group,
190                "first_seen": error_group["first_seen"].isoformat(),
191                "last_seen": error_group["last_seen"].isoformat()
192            }
193            for error_group in sorted(
194                grouped.values(),
195                key=lambda x: x["count"],
196                reverse=True
197            )
198        ]
199
200
201class AlertCollector(TelemetryCollector):
202    """Collects active and recent alerts."""
203
204    def __init__(self, alert_manager):
205        self.alert_manager = alert_manager
206
207    def get_source_type(self) -> DataSource:
208        return DataSource.ALERTS
209
210    async def collect(self, time_range: TimeRange) -> List[Dict[str, Any]]:
211        """Collect alerts within the time range."""
212
213        # Get active alerts
214        active = await self.alert_manager.get_active_alerts()
215
216        # Get recently resolved alerts
217        resolved = await self.alert_manager.get_resolved_alerts(
218            since=time_range.start
219        )
220
221        return [
222            {
223                "alert_id": alert.alert_id,
224                "name": alert.name,
225                "severity": alert.severity.value,
226                "status": alert.status.value,
227                "triggered_at": alert.triggered_at.isoformat(),
228                "resolved_at": (
229                    alert.resolved_at.isoformat()
230                    if alert.resolved_at else None
231                ),
232                "message": alert.message,
233                "labels": alert.labels
234            }
235            for alert in active + resolved
236        ]
Normalization Matters: Collectors transform source-specific data formats into a consistent structure. This allows dashboard components to work with any data source without knowing implementation details.

Metrics Aggregation

Raw telemetry data must be aggregated into meaningful summaries. The aggregation layer computes statistics, calculates trends, and prepares data for visualization:

🐍python
1"""
2Metrics aggregation for dashboard visualization.
3
4This module transforms raw telemetry data into aggregated
5statistics suitable for charts and summary displays.
6"""
7
8from dataclasses import dataclass
9from typing import Any, Dict, List, Optional, Tuple
10from datetime import datetime, timedelta
11from enum import Enum
12import statistics
13
14
15class AggregationType(Enum):
16    """Types of aggregation operations."""
17    SUM = "sum"
18    AVG = "avg"
19    MIN = "min"
20    MAX = "max"
21    COUNT = "count"
22    P50 = "p50"
23    P90 = "p90"
24    P99 = "p99"
25    RATE = "rate"
26
27
28@dataclass
29class AggregatedMetric:
30    """Result of metric aggregation."""
31    name: str
32    value: float
33    unit: str
34    trend: Optional[float] = None  # Percentage change
35    trend_direction: Optional[str] = None  # "up", "down", "stable"
36    time_range: Optional[TimeRange] = None
37
38
39class MetricsAggregator:
40    """Aggregates raw metrics into dashboard-ready summaries."""
41
42    def __init__(self, config: DashboardConfig):
43        self.config = config
44
45    def aggregate_time_series(
46        self,
47        data_points: List[Dict[str, Any]],
48        aggregation: AggregationType,
49        interval_seconds: int = 60
50    ) -> List[Dict[str, Any]]:
51        """Aggregate time series data into fixed intervals."""
52
53        if not data_points:
54            return []
55
56        # Group by interval
57        buckets = defaultdict(list)
58        for dp in data_points:
59            ts = datetime.fromisoformat(dp["timestamp"])
60            bucket_ts = ts.replace(
61                second=0,
62                microsecond=0
63            )
64            # Round to interval
65            minute = (bucket_ts.minute // (interval_seconds // 60)) * (interval_seconds // 60)
66            bucket_ts = bucket_ts.replace(minute=minute)
67            buckets[bucket_ts].append(dp["value"])
68
69        # Aggregate each bucket
70        result = []
71        for ts, values in sorted(buckets.items()):
72            agg_value = self._apply_aggregation(values, aggregation)
73            result.append({
74                "timestamp": ts.isoformat(),
75                "value": agg_value
76            })
77
78        return result
79
80    def _apply_aggregation(
81        self,
82        values: List[float],
83        aggregation: AggregationType
84    ) -> float:
85        """Apply aggregation function to values."""
86
87        if not values:
88            return 0.0
89
90        if aggregation == AggregationType.SUM:
91            return sum(values)
92        elif aggregation == AggregationType.AVG:
93            return statistics.mean(values)
94        elif aggregation == AggregationType.MIN:
95            return min(values)
96        elif aggregation == AggregationType.MAX:
97            return max(values)
98        elif aggregation == AggregationType.COUNT:
99            return len(values)
100        elif aggregation == AggregationType.P50:
101            return self._percentile(values, 50)
102        elif aggregation == AggregationType.P90:
103            return self._percentile(values, 90)
104        elif aggregation == AggregationType.P99:
105            return self._percentile(values, 99)
106        elif aggregation == AggregationType.RATE:
107            return len(values) / 60  # per second
108
109        return 0.0
110
111    def _percentile(self, values: List[float], percentile: int) -> float:
112        """Calculate percentile of values."""
113        sorted_values = sorted(values)
114        index = int(len(sorted_values) * percentile / 100)
115        return sorted_values[min(index, len(sorted_values) - 1)]
116
117    def calculate_summary_stats(
118        self,
119        metrics_data: List[Dict[str, Any]],
120        previous_data: Optional[List[Dict[str, Any]]] = None
121    ) -> Dict[str, AggregatedMetric]:
122        """Calculate summary statistics with trend analysis."""
123
124        summaries = {}
125
126        for metric in metrics_data:
127            metric_type = metric["metric_type"]
128            data_points = metric["data_points"]
129
130            if not data_points:
131                continue
132
133            values = [dp["value"] for dp in data_points]
134
135            # Calculate current stats
136            current_avg = statistics.mean(values) if values else 0
137            current_sum = sum(values)
138
139            # Calculate trend if previous data available
140            trend = None
141            trend_direction = None
142
143            if previous_data:
144                prev_metric = next(
145                    (m for m in previous_data if m["metric_type"] == metric_type),
146                    None
147                )
148                if prev_metric:
149                    prev_values = [dp["value"] for dp in prev_metric["data_points"]]
150                    if prev_values:
151                        prev_avg = statistics.mean(prev_values)
152                        if prev_avg > 0:
153                            trend = ((current_avg - prev_avg) / prev_avg) * 100
154                            if trend > 5:
155                                trend_direction = "up"
156                            elif trend < -5:
157                                trend_direction = "down"
158                            else:
159                                trend_direction = "stable"
160
161            summaries[metric_type] = AggregatedMetric(
162                name=metric_type,
163                value=current_avg if metric_type == "latency" else current_sum,
164                unit="ms" if metric_type == "latency" else "count",
165                trend=trend,
166                trend_direction=trend_direction
167            )
168
169        return summaries
170
171    def calculate_error_rate(
172        self,
173        error_count: int,
174        request_count: int
175    ) -> AggregatedMetric:
176        """Calculate error rate as a percentage."""
177
178        rate = (error_count / request_count * 100) if request_count > 0 else 0
179
180        return AggregatedMetric(
181            name="error_rate",
182            value=round(rate, 2),
183            unit="%"
184        )
185
186    def calculate_throughput(
187        self,
188        request_count: int,
189        time_range: TimeRange
190    ) -> AggregatedMetric:
191        """Calculate requests per second."""
192
193        duration_seconds = (time_range.end - time_range.start).total_seconds()
194        rps = request_count / duration_seconds if duration_seconds > 0 else 0
195
196        return AggregatedMetric(
197            name="throughput",
198            value=round(rps, 2),
199            unit="req/s",
200            time_range=time_range
201        )
202
203
204class HealthCalculator:
205    """Calculates overall system health from metrics."""
206
207    def __init__(self):
208        self.thresholds = {
209            "error_rate": {"warning": 1.0, "critical": 5.0},
210            "latency_p99": {"warning": 1000, "critical": 5000},
211            "active_alerts": {"warning": 1, "critical": 3}
212        }
213
214    def calculate_health_score(
215        self,
216        error_rate: float,
217        latency_p99: float,
218        active_alerts: int
219    ) -> Dict[str, Any]:
220        """Calculate overall health score (0-100)."""
221
222        scores = []
223        issues = []
224
225        # Error rate contribution (40%)
226        error_score = self._score_metric(
227            error_rate,
228            self.thresholds["error_rate"]
229        )
230        scores.append(error_score * 0.4)
231        if error_score < 70:
232            issues.append(f"High error rate: {error_rate:.2f}%")
233
234        # Latency contribution (40%)
235        latency_score = self._score_metric(
236            latency_p99,
237            self.thresholds["latency_p99"]
238        )
239        scores.append(latency_score * 0.4)
240        if latency_score < 70:
241            issues.append(f"High latency: {latency_p99:.0f}ms (p99)")
242
243        # Alert contribution (20%)
244        alert_score = self._score_metric(
245            active_alerts,
246            self.thresholds["active_alerts"]
247        )
248        scores.append(alert_score * 0.2)
249        if alert_score < 70:
250            issues.append(f"Active alerts: {active_alerts}")
251
252        total_score = sum(scores)
253
254        # Determine status
255        if total_score >= 90:
256            status = "healthy"
257        elif total_score >= 70:
258            status = "degraded"
259        else:
260            status = "critical"
261
262        return {
263            "score": round(total_score),
264            "status": status,
265            "issues": issues,
266            "components": {
267                "error_rate": round(error_score),
268                "latency": round(latency_score),
269                "alerts": round(alert_score)
270            }
271        }
272
273    def _score_metric(
274        self,
275        value: float,
276        thresholds: Dict[str, float]
277    ) -> float:
278        """Score a metric based on thresholds (0-100)."""
279
280        if value <= thresholds["warning"]:
281            return 100
282        elif value <= thresholds["critical"]:
283            # Linear interpolation between warning and critical
284            range_size = thresholds["critical"] - thresholds["warning"]
285            excess = value - thresholds["warning"]
286            return 100 - (excess / range_size * 50)  # 100 -> 50
287        else:
288            # Beyond critical, score decreases more rapidly
289            excess = value - thresholds["critical"]
290            return max(0, 50 - (excess / thresholds["critical"] * 50))

Key Aggregation Patterns

  • Time bucketing: Group data points into fixed intervals
  • Trend analysis: Compare current metrics to previous periods
  • Health scoring: Combine multiple metrics into overall status
  • Percentile calculation: Understand latency distribution

Visualization Components

Visualization components render aggregated data as charts, tables, and status indicators. These components are designed to be reusable and responsive:

🐍python
1"""
2Visualization components for the observability dashboard.
3
4These components generate the data structures needed for
5frontend chart libraries (e.g., Chart.js, Recharts).
6"""
7
8from dataclasses import dataclass, field
9from typing import Any, Dict, List, Optional
10from enum import Enum
11from datetime import datetime
12
13
14class ChartType(Enum):
15    """Types of charts available."""
16    LINE = "line"
17    BAR = "bar"
18    AREA = "area"
19    PIE = "pie"
20    SCATTER = "scatter"
21    HEATMAP = "heatmap"
22
23
24@dataclass
25class ChartSeries:
26    """A single data series for a chart."""
27    name: str
28    data: List[Dict[str, Any]]
29    color: Optional[str] = None
30    type: ChartType = ChartType.LINE
31
32
33@dataclass
34class ChartConfig:
35    """Configuration for a chart visualization."""
36    title: str
37    series: List[ChartSeries]
38    x_axis_label: str = "Time"
39    y_axis_label: str = "Value"
40    show_legend: bool = True
41    show_grid: bool = True
42    height: int = 300
43
44    def to_chartjs_config(self) -> Dict[str, Any]:
45        """Convert to Chart.js configuration."""
46
47        datasets = []
48        labels = set()
49
50        for series in self.series:
51            for point in series.data:
52                labels.add(point["timestamp"])
53
54            datasets.append({
55                "label": series.name,
56                "data": [point["value"] for point in series.data],
57                "borderColor": series.color or self._default_color(len(datasets)),
58                "fill": series.type == ChartType.AREA,
59                "tension": 0.4
60            })
61
62        return {
63            "type": self.series[0].type.value if self.series else "line",
64            "data": {
65                "labels": sorted(list(labels)),
66                "datasets": datasets
67            },
68            "options": {
69                "responsive": True,
70                "plugins": {
71                    "title": {
72                        "display": True,
73                        "text": self.title
74                    },
75                    "legend": {
76                        "display": self.show_legend
77                    }
78                },
79                "scales": {
80                    "x": {
81                        "title": {
82                            "display": True,
83                            "text": self.x_axis_label
84                        }
85                    },
86                    "y": {
87                        "title": {
88                            "display": True,
89                            "text": self.y_axis_label
90                        }
91                    }
92                }
93            }
94        }
95
96    def _default_color(self, index: int) -> str:
97        """Get default color for series."""
98        colors = [
99            "#3b82f6",  # Blue
100            "#10b981",  # Green
101            "#f59e0b",  # Amber
102            "#ef4444",  # Red
103            "#8b5cf6",  # Purple
104            "#06b6d4",  # Cyan
105        ]
106        return colors[index % len(colors)]
107
108
109class VisualizationFactory:
110    """Factory for creating visualization configurations."""
111
112    @staticmethod
113    def create_latency_chart(
114        metrics_data: List[Dict[str, Any]],
115        title: str = "Request Latency"
116    ) -> ChartConfig:
117        """Create a latency time series chart."""
118
119        latency_data = next(
120            (m for m in metrics_data if m["metric_type"] == "latency"),
121            {"data_points": []}
122        )
123
124        return ChartConfig(
125            title=title,
126            series=[
127                ChartSeries(
128                    name="Average Latency",
129                    data=latency_data["data_points"],
130                    color="#3b82f6",
131                    type=ChartType.LINE
132                )
133            ],
134            y_axis_label="Latency (ms)"
135        )
136
137    @staticmethod
138    def create_throughput_chart(
139        metrics_data: List[Dict[str, Any]],
140        title: str = "Request Throughput"
141    ) -> ChartConfig:
142        """Create a throughput chart."""
143
144        request_data = next(
145            (m for m in metrics_data if m["metric_type"] == "requests"),
146            {"data_points": []}
147        )
148
149        return ChartConfig(
150            title=title,
151            series=[
152                ChartSeries(
153                    name="Requests",
154                    data=request_data["data_points"],
155                    color="#10b981",
156                    type=ChartType.AREA
157                )
158            ],
159            y_axis_label="Requests"
160        )
161
162    @staticmethod
163    def create_error_chart(
164        metrics_data: List[Dict[str, Any]],
165        title: str = "Errors Over Time"
166    ) -> ChartConfig:
167        """Create an error rate chart."""
168
169        error_data = next(
170            (m for m in metrics_data if m["metric_type"] == "errors"),
171            {"data_points": []}
172        )
173
174        return ChartConfig(
175            title=title,
176            series=[
177                ChartSeries(
178                    name="Errors",
179                    data=error_data["data_points"],
180                    color="#ef4444",
181                    type=ChartType.BAR
182                )
183            ],
184            y_axis_label="Error Count"
185        )
186
187    @staticmethod
188    def create_token_usage_chart(
189        metrics_data: List[Dict[str, Any]],
190        title: str = "Token Usage"
191    ) -> ChartConfig:
192        """Create a token usage chart."""
193
194        token_data = next(
195            (m for m in metrics_data if m["metric_type"] == "tokens"),
196            {"data_points": []}
197        )
198
199        return ChartConfig(
200            title=title,
201            series=[
202                ChartSeries(
203                    name="Tokens",
204                    data=token_data["data_points"],
205                    color="#8b5cf6",
206                    type=ChartType.AREA
207                )
208            ],
209            y_axis_label="Token Count"
210        )
211
212
213@dataclass
214class StatCard:
215    """Configuration for a statistics card."""
216    title: str
217    value: str
218    unit: str
219    trend: Optional[float] = None
220    trend_direction: Optional[str] = None
221    status: str = "normal"  # normal, warning, critical
222    icon: Optional[str] = None
223
224    def to_dict(self) -> Dict[str, Any]:
225        """Convert to dictionary for frontend."""
226        return {
227            "title": self.title,
228            "value": self.value,
229            "unit": self.unit,
230            "trend": self.trend,
231            "trendDirection": self.trend_direction,
232            "status": self.status,
233            "icon": self.icon
234        }
235
236
237class StatCardFactory:
238    """Factory for creating stat cards."""
239
240    @staticmethod
241    def create_latency_card(metric: AggregatedMetric) -> StatCard:
242        """Create a latency stat card."""
243
244        status = "normal"
245        if metric.value > 1000:
246            status = "critical"
247        elif metric.value > 500:
248            status = "warning"
249
250        return StatCard(
251            title="Avg Latency",
252            value=f"{metric.value:.0f}",
253            unit="ms",
254            trend=metric.trend,
255            trend_direction=metric.trend_direction,
256            status=status,
257            icon="clock"
258        )
259
260    @staticmethod
261    def create_error_rate_card(metric: AggregatedMetric) -> StatCard:
262        """Create an error rate stat card."""
263
264        status = "normal"
265        if metric.value > 5:
266            status = "critical"
267        elif metric.value > 1:
268            status = "warning"
269
270        return StatCard(
271            title="Error Rate",
272            value=f"{metric.value:.2f}",
273            unit="%",
274            trend=metric.trend,
275            trend_direction=metric.trend_direction,
276            status=status,
277            icon="alert-circle"
278        )
279
280    @staticmethod
281    def create_throughput_card(metric: AggregatedMetric) -> StatCard:
282        """Create a throughput stat card."""
283
284        return StatCard(
285            title="Throughput",
286            value=f"{metric.value:.1f}",
287            unit="req/s",
288            trend=metric.trend,
289            trend_direction=metric.trend_direction,
290            status="normal",
291            icon="activity"
292        )
293
294    @staticmethod
295    def create_health_card(health: Dict[str, Any]) -> StatCard:
296        """Create a health score stat card."""
297
298        status_map = {
299            "healthy": "normal",
300            "degraded": "warning",
301            "critical": "critical"
302        }
303
304        return StatCard(
305            title="System Health",
306            value=str(health["score"]),
307            unit="/ 100",
308            status=status_map.get(health["status"], "normal"),
309            icon="heart"
310        )
Frontend Integration: The visualization components generate configuration objects that can be directly consumed by frontend charting libraries like Chart.js, Recharts, or D3.js.

Alerting Integration

The dashboard integrates with the alerting system to display active alerts and enable operators to acknowledge or silence them:

🐍python
1"""
2Alerting integration for the observability dashboard.
3
4This module connects the dashboard to the alerting system,
5enabling real-time alert display and management.
6"""
7
8from dataclasses import dataclass
9from typing import Any, Callable, Dict, List, Optional
10from datetime import datetime, timedelta
11from enum import Enum
12import asyncio
13
14
15class AlertSeverity(Enum):
16    """Alert severity levels."""
17    INFO = "info"
18    WARNING = "warning"
19    ERROR = "error"
20    CRITICAL = "critical"
21
22
23class AlertStatus(Enum):
24    """Alert status values."""
25    FIRING = "firing"
26    ACKNOWLEDGED = "acknowledged"
27    SILENCED = "silenced"
28    RESOLVED = "resolved"
29
30
31@dataclass
32class Alert:
33    """Represents an alert."""
34    alert_id: str
35    name: str
36    severity: AlertSeverity
37    status: AlertStatus
38    message: str
39    triggered_at: datetime
40    resolved_at: Optional[datetime] = None
41    acknowledged_by: Optional[str] = None
42    labels: Dict[str, str] = field(default_factory=dict)
43
44
45@dataclass
46class AlertRule:
47    """Defines an alert rule."""
48    name: str
49    condition: str  # Expression to evaluate
50    severity: AlertSeverity
51    message_template: str
52    cooldown_minutes: int = 5
53    labels: Dict[str, str] = field(default_factory=dict)
54
55
56class DashboardAlertManager:
57    """Manages alerts for the dashboard."""
58
59    def __init__(self, alert_store, notification_service):
60        self.alert_store = alert_store
61        self.notification_service = notification_service
62        self.alert_rules: List[AlertRule] = []
63        self.subscribers: List[Callable[[Alert], None]] = []
64
65    def register_rule(self, rule: AlertRule):
66        """Register an alert rule."""
67        self.alert_rules.append(rule)
68
69    def subscribe(self, callback: Callable[[Alert], None]):
70        """Subscribe to alert updates."""
71        self.subscribers.append(callback)
72
73    async def get_active_alerts(self) -> List[Alert]:
74        """Get all active (firing or acknowledged) alerts."""
75
76        alerts = await self.alert_store.query(
77            status=[AlertStatus.FIRING, AlertStatus.ACKNOWLEDGED]
78        )
79
80        return sorted(
81            alerts,
82            key=lambda a: (
83                -self._severity_priority(a.severity),
84                a.triggered_at
85            )
86        )
87
88    async def get_alert_summary(self) -> Dict[str, Any]:
89        """Get summary of alert counts by severity."""
90
91        alerts = await self.get_active_alerts()
92
93        summary = {
94            "total": len(alerts),
95            "by_severity": {
96                "critical": 0,
97                "error": 0,
98                "warning": 0,
99                "info": 0
100            },
101            "by_status": {
102                "firing": 0,
103                "acknowledged": 0
104            }
105        }
106
107        for alert in alerts:
108            summary["by_severity"][alert.severity.value] += 1
109            summary["by_status"][alert.status.value] += 1
110
111        return summary
112
113    async def acknowledge_alert(
114        self,
115        alert_id: str,
116        user: str,
117        comment: Optional[str] = None
118    ):
119        """Acknowledge an alert."""
120
121        await self.alert_store.update(
122            alert_id=alert_id,
123            status=AlertStatus.ACKNOWLEDGED,
124            acknowledged_by=user,
125            acknowledged_at=datetime.utcnow(),
126            comment=comment
127        )
128
129        # Notify subscribers
130        alert = await self.alert_store.get(alert_id)
131        for subscriber in self.subscribers:
132            subscriber(alert)
133
134    async def silence_alert(
135        self,
136        alert_id: str,
137        duration_minutes: int,
138        user: str
139    ):
140        """Silence an alert for a duration."""
141
142        silence_until = datetime.utcnow() + timedelta(minutes=duration_minutes)
143
144        await self.alert_store.update(
145            alert_id=alert_id,
146            status=AlertStatus.SILENCED,
147            silenced_until=silence_until,
148            silenced_by=user
149        )
150
151    async def evaluate_rules(
152        self,
153        metrics: Dict[str, AggregatedMetric]
154    ) -> List[Alert]:
155        """Evaluate alert rules against current metrics."""
156
157        new_alerts = []
158
159        for rule in self.alert_rules:
160            should_fire = self._evaluate_condition(rule.condition, metrics)
161
162            if should_fire:
163                # Check if already firing
164                existing = await self.alert_store.query(
165                    name=rule.name,
166                    status=[AlertStatus.FIRING, AlertStatus.ACKNOWLEDGED]
167                )
168
169                if not existing:
170                    # Check cooldown
171                    recent = await self.alert_store.query(
172                        name=rule.name,
173                        since=datetime.utcnow() - timedelta(
174                            minutes=rule.cooldown_minutes
175                        )
176                    )
177
178                    if not recent:
179                        alert = await self._create_alert(rule, metrics)
180                        new_alerts.append(alert)
181
182        return new_alerts
183
184    def _evaluate_condition(
185        self,
186        condition: str,
187        metrics: Dict[str, AggregatedMetric]
188    ) -> bool:
189        """Evaluate an alert condition expression."""
190
191        # Build context for evaluation
192        context = {
193            name: metric.value
194            for name, metric in metrics.items()
195        }
196
197        try:
198            # Safe evaluation of simple conditions
199            # In production, use a proper expression parser
200            return eval(condition, {"__builtins__": {}}, context)
201        except Exception:
202            return False
203
204    async def _create_alert(
205        self,
206        rule: AlertRule,
207        metrics: Dict[str, AggregatedMetric]
208    ) -> Alert:
209        """Create and store a new alert."""
210
211        # Format message with metric values
212        message = rule.message_template.format(
213            **{name: f"{m.value:.2f}" for name, m in metrics.items()}
214        )
215
216        alert = Alert(
217            alert_id=self._generate_id(),
218            name=rule.name,
219            severity=rule.severity,
220            status=AlertStatus.FIRING,
221            message=message,
222            triggered_at=datetime.utcnow(),
223            labels=rule.labels
224        )
225
226        await self.alert_store.save(alert)
227
228        # Send notifications
229        await self.notification_service.send_alert(alert)
230
231        # Notify subscribers
232        for subscriber in self.subscribers:
233            subscriber(alert)
234
235        return alert
236
237    def _severity_priority(self, severity: AlertSeverity) -> int:
238        """Get priority number for severity (higher = more severe)."""
239        priorities = {
240            AlertSeverity.INFO: 1,
241            AlertSeverity.WARNING: 2,
242            AlertSeverity.ERROR: 3,
243            AlertSeverity.CRITICAL: 4
244        }
245        return priorities.get(severity, 0)
246
247    def _generate_id(self) -> str:
248        """Generate unique alert ID."""
249        import uuid
250        return str(uuid.uuid4())[:8]
251
252
253# Example alert rules for agent monitoring
254DEFAULT_ALERT_RULES = [
255    AlertRule(
256        name="high_error_rate",
257        condition="error_rate > 5",
258        severity=AlertSeverity.CRITICAL,
259        message_template="Error rate is {error_rate}% (threshold: 5%)",
260        labels={"component": "agent", "type": "reliability"}
261    ),
262    AlertRule(
263        name="high_latency",
264        condition="latency > 2000",
265        severity=AlertSeverity.WARNING,
266        message_template="Latency is {latency}ms (threshold: 2000ms)",
267        labels={"component": "agent", "type": "performance"}
268    ),
269    AlertRule(
270        name="low_throughput",
271        condition="throughput < 0.5",
272        severity=AlertSeverity.WARNING,
273        message_template="Throughput is {throughput} req/s (threshold: 0.5)",
274        labels={"component": "agent", "type": "capacity"}
275    ),
276    AlertRule(
277        name="high_token_usage",
278        condition="tokens > 100000",
279        severity=AlertSeverity.INFO,
280        message_template="Token usage is {tokens} (threshold: 100000)",
281        labels={"component": "agent", "type": "cost"}
282    )
283]

Alert Display Components

🐍python
1"""
2Alert display components for the dashboard.
3"""
4
5@dataclass
6class AlertListConfig:
7    """Configuration for alert list display."""
8    max_alerts: int = 10
9    show_acknowledged: bool = True
10    group_by_severity: bool = True
11
12    def format_alerts(
13        self,
14        alerts: List[Alert]
15    ) -> List[Dict[str, Any]]:
16        """Format alerts for display."""
17
18        displayed = alerts[:self.max_alerts]
19
20        if not self.show_acknowledged:
21            displayed = [
22                a for a in displayed
23                if a.status != AlertStatus.ACKNOWLEDGED
24            ]
25
26        if self.group_by_severity:
27            # Group and sort by severity
28            groups = defaultdict(list)
29            for alert in displayed:
30                groups[alert.severity.value].append(alert)
31
32            result = []
33            for severity in ["critical", "error", "warning", "info"]:
34                for alert in groups.get(severity, []):
35                    result.append(self._format_alert(alert))
36            return result
37
38        return [self._format_alert(a) for a in displayed]
39
40    def _format_alert(self, alert: Alert) -> Dict[str, Any]:
41        """Format a single alert."""
42
43        return {
44            "id": alert.alert_id,
45            "name": alert.name,
46            "severity": alert.severity.value,
47            "status": alert.status.value,
48            "message": alert.message,
49            "triggeredAt": alert.triggered_at.isoformat(),
50            "duration": self._format_duration(
51                datetime.utcnow() - alert.triggered_at
52            ),
53            "acknowledgedBy": alert.acknowledged_by,
54            "labels": alert.labels,
55            "actions": self._get_available_actions(alert)
56        }
57
58    def _format_duration(self, delta: timedelta) -> str:
59        """Format duration as human-readable string."""
60
61        total_seconds = int(delta.total_seconds())
62
63        if total_seconds < 60:
64            return f"{total_seconds}s"
65        elif total_seconds < 3600:
66            return f"{total_seconds // 60}m"
67        elif total_seconds < 86400:
68            return f"{total_seconds // 3600}h"
69        else:
70            return f"{total_seconds // 86400}d"
71
72    def _get_available_actions(self, alert: Alert) -> List[str]:
73        """Get available actions for an alert."""
74
75        actions = []
76
77        if alert.status == AlertStatus.FIRING:
78            actions.append("acknowledge")
79            actions.append("silence")
80        elif alert.status == AlertStatus.ACKNOWLEDGED:
81            actions.append("resolve")
82            actions.append("silence")
83
84        actions.append("view_details")
85
86        return actions

Complete Dashboard Implementation

Now we bring all components together into a complete dashboard implementation:

🐍python
1"""
2Complete observability dashboard implementation.
3
4This module provides the main Dashboard class that orchestrates
5all components into a unified monitoring interface.
6"""
7
8from dataclasses import dataclass, field
9from typing import Any, Callable, Dict, List, Optional
10from datetime import datetime
11import asyncio
12
13
14@dataclass
15class DashboardState:
16    """Current state of the dashboard."""
17    time_range: TimeRange
18    data: Dict[DataSource, List[Dict[str, Any]]] = field(default_factory=dict)
19    summaries: Dict[str, AggregatedMetric] = field(default_factory=dict)
20    health: Dict[str, Any] = field(default_factory=dict)
21    charts: Dict[str, ChartConfig] = field(default_factory=dict)
22    stat_cards: List[StatCard] = field(default_factory=list)
23    alerts: List[Dict[str, Any]] = field(default_factory=list)
24    last_updated: Optional[datetime] = None
25
26
27class ObservabilityDashboard:
28    """Main observability dashboard for AI agents."""
29
30    def __init__(
31        self,
32        config: DashboardConfig,
33        log_store,
34        trace_store,
35        metrics_store,
36        error_store,
37        alert_manager
38    ):
39        self.config = config
40        self.state = DashboardState(
41            time_range=TimeRange.last_minutes(config.default_time_range_minutes)
42        )
43
44        # Initialize aggregator
45        self.aggregator = DataAggregator(config)
46
47        # Register collectors
48        self.aggregator.register_collector(LogCollector(log_store))
49        self.aggregator.register_collector(TraceCollector(trace_store))
50        self.aggregator.register_collector(MetricsCollector(metrics_store))
51        self.aggregator.register_collector(ErrorCollector(error_store))
52        self.aggregator.register_collector(AlertCollector(alert_manager))
53
54        # Initialize processors
55        self.metrics_aggregator = MetricsAggregator(config)
56        self.health_calculator = HealthCalculator()
57        self.alert_manager = DashboardAlertManager(
58            alert_manager.store,
59            alert_manager.notification_service
60        )
61
62        # Register default alert rules
63        for rule in DEFAULT_ALERT_RULES:
64            self.alert_manager.register_rule(rule)
65
66        # State change subscribers
67        self.subscribers: List[Callable[[DashboardState], None]] = []
68
69        # Background tasks
70        self._refresh_task: Optional[asyncio.Task] = None
71
72    def subscribe(self, callback: Callable[[DashboardState], None]):
73        """Subscribe to dashboard state changes."""
74        self.subscribers.append(callback)
75
76    async def start(self):
77        """Start the dashboard with automatic refresh."""
78
79        # Initial load
80        await self.refresh()
81
82        # Start background refresh
83        if self.config.enable_real_time:
84            self._refresh_task = asyncio.create_task(
85                self._refresh_loop()
86            )
87
88    async def stop(self):
89        """Stop the dashboard."""
90        if self._refresh_task:
91            self._refresh_task.cancel()
92            try:
93                await self._refresh_task
94            except asyncio.CancelledError:
95                pass
96
97    async def refresh(self):
98        """Refresh dashboard data."""
99
100        # Fetch all data
101        self.state.data = await self.aggregator.fetch_all(
102            self.state.time_range
103        )
104
105        # Process metrics
106        await self._process_metrics()
107
108        # Calculate health
109        await self._calculate_health()
110
111        # Generate visualizations
112        await self._generate_visualizations()
113
114        # Fetch alerts
115        await self._fetch_alerts()
116
117        # Update timestamp
118        self.state.last_updated = datetime.utcnow()
119
120        # Notify subscribers
121        self._notify_subscribers()
122
123    async def set_time_range(self, time_range: TimeRange):
124        """Change the dashboard time range."""
125        self.state.time_range = time_range
126        await self.refresh()
127
128    async def _process_metrics(self):
129        """Process raw metrics into summaries."""
130
131        metrics_data = self.state.data.get(DataSource.METRICS, [])
132
133        # Get previous period for trend calculation
134        duration = (
135            self.state.time_range.end - self.state.time_range.start
136        )
137        previous_range = TimeRange(
138            start=self.state.time_range.start - duration,
139            end=self.state.time_range.start
140        )
141
142        previous_data = await self.aggregator.fetch_all(
143            previous_range,
144            sources=[DataSource.METRICS]
145        )
146
147        self.state.summaries = self.metrics_aggregator.calculate_summary_stats(
148            metrics_data,
149            previous_data.get(DataSource.METRICS, [])
150        )
151
152        # Calculate error rate
153        error_count = sum(
154            dp["value"]
155            for m in metrics_data if m["metric_type"] == "errors"
156            for dp in m["data_points"]
157        )
158        request_count = sum(
159            dp["value"]
160            for m in metrics_data if m["metric_type"] == "requests"
161            for dp in m["data_points"]
162        )
163
164        self.state.summaries["error_rate"] = (
165            self.metrics_aggregator.calculate_error_rate(
166                error_count,
167                request_count
168            )
169        )
170
171        # Calculate throughput
172        self.state.summaries["throughput"] = (
173            self.metrics_aggregator.calculate_throughput(
174                request_count,
175                self.state.time_range
176            )
177        )
178
179    async def _calculate_health(self):
180        """Calculate system health score."""
181
182        error_rate = self.state.summaries.get(
183            "error_rate",
184            AggregatedMetric("error_rate", 0, "%")
185        ).value
186
187        latency = self.state.summaries.get(
188            "latency",
189            AggregatedMetric("latency", 0, "ms")
190        ).value
191
192        alert_summary = await self.alert_manager.get_alert_summary()
193        active_alerts = alert_summary["by_severity"]["critical"] + (
194            alert_summary["by_severity"]["error"]
195        )
196
197        self.state.health = self.health_calculator.calculate_health_score(
198            error_rate=error_rate,
199            latency_p99=latency,
200            active_alerts=active_alerts
201        )
202
203    async def _generate_visualizations(self):
204        """Generate chart configurations."""
205
206        metrics_data = self.state.data.get(DataSource.METRICS, [])
207
208        # Create charts
209        self.state.charts = {
210            "latency": VisualizationFactory.create_latency_chart(metrics_data),
211            "throughput": VisualizationFactory.create_throughput_chart(metrics_data),
212            "errors": VisualizationFactory.create_error_chart(metrics_data),
213            "tokens": VisualizationFactory.create_token_usage_chart(metrics_data)
214        }
215
216        # Create stat cards
217        self.state.stat_cards = [
218            StatCardFactory.create_health_card(self.state.health),
219            StatCardFactory.create_latency_card(
220                self.state.summaries.get(
221                    "latency",
222                    AggregatedMetric("latency", 0, "ms")
223                )
224            ),
225            StatCardFactory.create_error_rate_card(
226                self.state.summaries.get(
227                    "error_rate",
228                    AggregatedMetric("error_rate", 0, "%")
229                )
230            ),
231            StatCardFactory.create_throughput_card(
232                self.state.summaries.get(
233                    "throughput",
234                    AggregatedMetric("throughput", 0, "req/s")
235                )
236            )
237        ]
238
239    async def _fetch_alerts(self):
240        """Fetch and format alerts."""
241
242        alerts = await self.alert_manager.get_active_alerts()
243
244        alert_config = AlertListConfig()
245        self.state.alerts = alert_config.format_alerts(alerts)
246
247    async def _refresh_loop(self):
248        """Background refresh loop."""
249
250        while True:
251            await asyncio.sleep(self.config.refresh_interval_seconds)
252            await self.refresh()
253
254    def _notify_subscribers(self):
255        """Notify all subscribers of state change."""
256        for subscriber in self.subscribers:
257            try:
258                subscriber(self.state)
259            except Exception:
260                pass  # Don't let subscriber errors break the dashboard
261
262    def get_dashboard_data(self) -> Dict[str, Any]:
263        """Get complete dashboard data for API response."""
264
265        return {
266            "timeRange": {
267                "start": self.state.time_range.start.isoformat(),
268                "end": self.state.time_range.end.isoformat()
269            },
270            "lastUpdated": (
271                self.state.last_updated.isoformat()
272                if self.state.last_updated else None
273            ),
274            "health": self.state.health,
275            "statCards": [card.to_dict() for card in self.state.stat_cards],
276            "charts": {
277                name: config.to_chartjs_config()
278                for name, config in self.state.charts.items()
279            },
280            "alerts": self.state.alerts,
281            "logs": self.state.data.get(DataSource.LOGS, [])[:50],
282            "traces": self.state.data.get(DataSource.TRACES, [])[:20],
283            "errors": self.state.data.get(DataSource.ERRORS, [])[:10]
284        }

Dashboard API Endpoints

🐍python
1"""
2FastAPI endpoints for the observability dashboard.
3"""
4
5from fastapi import FastAPI, HTTPException, Query
6from fastapi.responses import JSONResponse
7from datetime import datetime
8from typing import Optional
9
10
11app = FastAPI(title="Agent Observability Dashboard API")
12
13# Initialize dashboard (in production, use dependency injection)
14dashboard: Optional[ObservabilityDashboard] = None
15
16
17@app.on_event("startup")
18async def startup():
19    """Initialize dashboard on startup."""
20    global dashboard
21
22    config = DashboardConfig()
23
24    # Initialize stores (use your actual implementations)
25    log_store = LogStore()
26    trace_store = TraceStore()
27    metrics_store = MetricsStore()
28    error_store = ErrorStore()
29    alert_manager = AlertManager()
30
31    dashboard = ObservabilityDashboard(
32        config=config,
33        log_store=log_store,
34        trace_store=trace_store,
35        metrics_store=metrics_store,
36        error_store=error_store,
37        alert_manager=alert_manager
38    )
39
40    await dashboard.start()
41
42
43@app.on_event("shutdown")
44async def shutdown():
45    """Cleanup on shutdown."""
46    if dashboard:
47        await dashboard.stop()
48
49
50@app.get("/api/dashboard")
51async def get_dashboard(
52    minutes: int = Query(60, ge=1, le=1440)
53):
54    """Get complete dashboard data."""
55
56    if not dashboard:
57        raise HTTPException(status_code=503, detail="Dashboard not initialized")
58
59    # Update time range if different
60    time_range = TimeRange.last_minutes(minutes)
61    if time_range != dashboard.state.time_range:
62        await dashboard.set_time_range(time_range)
63
64    return dashboard.get_dashboard_data()
65
66
67@app.get("/api/dashboard/health")
68async def get_health():
69    """Get system health status."""
70
71    if not dashboard:
72        raise HTTPException(status_code=503, detail="Dashboard not initialized")
73
74    return dashboard.state.health
75
76
77@app.get("/api/dashboard/metrics/{metric_type}")
78async def get_metric(
79    metric_type: str,
80    minutes: int = Query(60, ge=1, le=1440),
81    aggregation: str = Query("avg")
82):
83    """Get specific metric data."""
84
85    if not dashboard:
86        raise HTTPException(status_code=503, detail="Dashboard not initialized")
87
88    metrics_data = dashboard.state.data.get(DataSource.METRICS, [])
89
90    metric = next(
91        (m for m in metrics_data if m["metric_type"] == metric_type),
92        None
93    )
94
95    if not metric:
96        raise HTTPException(status_code=404, detail=f"Metric {metric_type} not found")
97
98    # Apply aggregation
99    aggregated = dashboard.metrics_aggregator.aggregate_time_series(
100        metric["data_points"],
101        AggregationType(aggregation),
102        interval_seconds=60
103    )
104
105    return {
106        "metric_type": metric_type,
107        "aggregation": aggregation,
108        "data_points": aggregated
109    }
110
111
112@app.get("/api/dashboard/alerts")
113async def get_alerts():
114    """Get active alerts."""
115
116    if not dashboard:
117        raise HTTPException(status_code=503, detail="Dashboard not initialized")
118
119    return {
120        "alerts": dashboard.state.alerts,
121        "summary": await dashboard.alert_manager.get_alert_summary()
122    }
123
124
125@app.post("/api/dashboard/alerts/{alert_id}/acknowledge")
126async def acknowledge_alert(
127    alert_id: str,
128    user: str = Query(...),
129    comment: Optional[str] = None
130):
131    """Acknowledge an alert."""
132
133    if not dashboard:
134        raise HTTPException(status_code=503, detail="Dashboard not initialized")
135
136    await dashboard.alert_manager.acknowledge_alert(
137        alert_id=alert_id,
138        user=user,
139        comment=comment
140    )
141
142    return {"status": "acknowledged"}
143
144
145@app.post("/api/dashboard/alerts/{alert_id}/silence")
146async def silence_alert(
147    alert_id: str,
148    duration_minutes: int = Query(..., ge=5, le=1440),
149    user: str = Query(...)
150):
151    """Silence an alert for a duration."""
152
153    if not dashboard:
154        raise HTTPException(status_code=503, detail="Dashboard not initialized")
155
156    await dashboard.alert_manager.silence_alert(
157        alert_id=alert_id,
158        duration_minutes=duration_minutes,
159        user=user
160    )
161
162    return {"status": "silenced", "duration_minutes": duration_minutes}
163
164
165@app.get("/api/dashboard/logs")
166async def get_logs(
167    level: Optional[str] = None,
168    agent_id: Optional[str] = None,
169    limit: int = Query(100, ge=1, le=1000)
170):
171    """Get log entries."""
172
173    if not dashboard:
174        raise HTTPException(status_code=503, detail="Dashboard not initialized")
175
176    logs = dashboard.state.data.get(DataSource.LOGS, [])
177
178    # Filter logs
179    if level:
180        logs = [l for l in logs if l["level"] == level]
181    if agent_id:
182        logs = [l for l in logs if l["agent_id"] == agent_id]
183
184    return {"logs": logs[:limit]}
185
186
187@app.get("/api/dashboard/traces")
188async def get_traces(
189    status: Optional[str] = None,
190    min_duration_ms: Optional[int] = None,
191    limit: int = Query(50, ge=1, le=200)
192):
193    """Get trace data."""
194
195    if not dashboard:
196        raise HTTPException(status_code=503, detail="Dashboard not initialized")
197
198    traces = dashboard.state.data.get(DataSource.TRACES, [])
199
200    # Filter traces
201    if status:
202        traces = [t for t in traces if t["status"] == status]
203    if min_duration_ms:
204        traces = [t for t in traces if t["duration_ms"] >= min_duration_ms]
205
206    return {"traces": traces[:limit]}
207
208
209@app.get("/api/dashboard/traces/{trace_id}")
210async def get_trace_detail(trace_id: str):
211    """Get detailed trace information."""
212
213    if not dashboard:
214        raise HTTPException(status_code=503, detail="Dashboard not initialized")
215
216    traces = dashboard.state.data.get(DataSource.TRACES, [])
217
218    trace = next((t for t in traces if t["trace_id"] == trace_id), None)
219
220    if not trace:
221        raise HTTPException(status_code=404, detail=f"Trace {trace_id} not found")
222
223    return trace

Dashboard Deployment

Deploy the dashboard as a containerized service with proper configuration for production use:

📄yaml
1# docker-compose.yml for the observability dashboard
2
3version: "3.8"
4
5services:
6  dashboard-api:
7    build:
8      context: .
9      dockerfile: Dockerfile
10    ports:
11      - "8080:8080"
12    environment:
13      - LOG_STORE_URL=http://loki:3100
14      - TRACE_STORE_URL=http://jaeger:14268
15      - METRICS_STORE_URL=http://prometheus:9090
16      - REDIS_URL=redis://redis:6379
17      - DASHBOARD_REFRESH_INTERVAL=30
18      - DASHBOARD_DEFAULT_TIME_RANGE=60
19    depends_on:
20      - redis
21      - loki
22      - prometheus
23      - jaeger
24    healthcheck:
25      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
26      interval: 30s
27      timeout: 10s
28      retries: 3
29
30  dashboard-frontend:
31    build:
32      context: ./frontend
33      dockerfile: Dockerfile
34    ports:
35      - "3000:3000"
36    environment:
37      - API_URL=http://dashboard-api:8080
38    depends_on:
39      - dashboard-api
40
41  redis:
42    image: redis:7-alpine
43    ports:
44      - "6379:6379"
45    volumes:
46      - redis-data:/data
47
48  loki:
49    image: grafana/loki:2.9.0
50    ports:
51      - "3100:3100"
52    volumes:
53      - loki-data:/loki
54    command: -config.file=/etc/loki/local-config.yaml
55
56  prometheus:
57    image: prom/prometheus:v2.47.0
58    ports:
59      - "9090:9090"
60    volumes:
61      - ./prometheus.yml:/etc/prometheus/prometheus.yml
62      - prometheus-data:/prometheus
63
64  jaeger:
65    image: jaegertracing/all-in-one:1.50
66    ports:
67      - "14268:14268"
68      - "16686:16686"
69    environment:
70      - COLLECTOR_OTLP_ENABLED=true
71
72volumes:
73  redis-data:
74  loki-data:
75  prometheus-data:

Production Configuration

🐍python
1"""
2Production configuration for the observability dashboard.
3"""
4
5import os
6from dataclasses import dataclass
7
8
9@dataclass
10class ProductionConfig(DashboardConfig):
11    """Production-optimized dashboard configuration."""
12
13    # Performance
14    refresh_interval_seconds: int = 30
15    max_data_points: int = 500
16    aggregation_interval_seconds: int = 60
17    batch_size: int = 100
18
19    # Caching
20    cache_ttl_seconds: int = 30
21    enable_query_cache: bool = True
22
23    # Data retention
24    log_retention_hours: int = 24
25    trace_retention_hours: int = 72
26    metric_retention_hours: int = 168
27
28    # Rate limiting
29    max_requests_per_minute: int = 120
30
31    @classmethod
32    def from_environment(cls) -> "ProductionConfig":
33        """Load configuration from environment variables."""
34        return cls(
35            refresh_interval_seconds=int(
36                os.getenv("DASHBOARD_REFRESH_INTERVAL", "30")
37            ),
38            default_time_range_minutes=int(
39                os.getenv("DASHBOARD_DEFAULT_TIME_RANGE", "60")
40            ),
41            max_data_points=int(
42                os.getenv("DASHBOARD_MAX_DATA_POINTS", "500")
43            ),
44            enable_real_time=os.getenv(
45                "DASHBOARD_ENABLE_REALTIME", "true"
46            ).lower() == "true"
47        )
48
49
50# Health check endpoint
51@app.get("/health")
52async def health_check():
53    """Health check endpoint for load balancers."""
54
55    if not dashboard:
56        return JSONResponse(
57            status_code=503,
58            content={"status": "unhealthy", "reason": "Dashboard not initialized"}
59        )
60
61    # Check component health
62    checks = {
63        "dashboard": "healthy",
64        "last_refresh": (
65            dashboard.state.last_updated.isoformat()
66            if dashboard.state.last_updated else None
67        ),
68        "data_sources": {
69            source.value: len(data) > 0
70            for source, data in dashboard.state.data.items()
71        }
72    }
73
74    all_healthy = all(checks["data_sources"].values())
75
76    return JSONResponse(
77        status_code=200 if all_healthy else 503,
78        content={
79            "status": "healthy" if all_healthy else "degraded",
80            "checks": checks
81        }
82    )
Production Best Practices: Use environment variables for configuration, implement health checks for load balancers, and configure appropriate data retention policies to balance observability with storage costs.

Summary

In this capstone section, we built a comprehensive observability dashboard that integrates all the monitoring capabilities covered in this chapter:

ComponentPurposeKey Implementation
Data CollectionGather telemetry from sourcesTelemetryCollector abstraction
AggregationCompute statistics and trendsMetricsAggregator with time bucketing
VisualizationRender charts and statsChartConfig and StatCard factories
AlertingMonitor and notifyDashboardAlertManager with rules
APIExpose dashboard dataFastAPI endpoints with filtering
What You've Learned: How to design a layered dashboard architecture, implement collectors for multiple data sources, aggregate metrics with trend analysis, create visualization configurations, integrate alerting, and deploy the dashboard for production use.

The observability dashboard provides operators with a unified view of agent behavior, enabling them to quickly identify issues, understand their root causes, and take corrective action. By combining logs, traces, metrics, and alerts into a single interface, the dashboard transforms raw telemetry into actionable insights.

In the next chapter, we'll explore evaluation and benchmarking techniques to measure and improve agent performance systematically.