Introduction
Deploying AI agent systems to production requires careful consideration of reliability, scalability, and maintainability. Unlike traditional applications, agent systems have unique deployment challenges including model versioning, long-running processes, and variable resource requirements. This section covers deployment strategies that address these challenges while enabling rapid, safe releases.
Deployment Philosophy: The goal of deployment automation is to make releases boring—predictable, reversible, and routine. Every deployment should be as safe as a configuration change.
Container-Based Deployment
Containerization provides consistency between development and production environments, making it essential for AI agent deployments.
Multi-Stage Dockerfile for Agents
1# Build stage
2FROM python:3.11-slim as builder
3
4WORKDIR /app
5
6# Install build dependencies
7RUN apt-get update && apt-get install -y --no-install-recommends \
8 build-essential \
9 && rm -rf /var/lib/apt/lists/*
10
11# Install Python dependencies
12COPY requirements.txt .
13RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt
14
15# Production stage
16FROM python:3.11-slim as production
17
18WORKDIR /app
19
20# Create non-root user for security
21RUN groupadd -r agent && useradd -r -g agent agent
22
23# Install runtime dependencies only
24RUN apt-get update && apt-get install -y --no-install-recommends \
25 curl \
26 && rm -rf /var/lib/apt/lists/*
27
28# Copy wheels and install
29COPY /app/wheels /wheels
30RUN pip install --no-cache /wheels/*
31
32# Copy application code
33COPY src/ ./src/
34COPY config/ ./config/
35
36# Set environment variables
37ENV PYTHONUNBUFFERED=1
38ENV PYTHONDONTWRITEBYTECODE=1
39ENV APP_ENV=production
40
41# Health check
42HEALTHCHECK \
43 CMD curl -f http://localhost:8000/health || exit 1
44
45# Switch to non-root user
46USER agent
47
48# Start application
49CMD ["python", "-m", "src.main"]Docker Compose for Local Development
1# docker-compose.yml
2version: "3.9"
3
4services:
5 agent-api:
6 build:
7 context: .
8 dockerfile: Dockerfile
9 target: production
10 ports:
11 - "8000:8000"
12 environment:
13 - DATABASE_URL=postgresql://postgres:postgres@db:5432/agents
14 - REDIS_URL=redis://redis:6379/0
15 - OPENAI_API_KEY=${OPENAI_API_KEY}
16 - LOG_LEVEL=INFO
17 depends_on:
18 db:
19 condition: service_healthy
20 redis:
21 condition: service_healthy
22 deploy:
23 resources:
24 limits:
25 cpus: "2"
26 memory: 4G
27 reservations:
28 cpus: "1"
29 memory: 2G
30
31 agent-worker:
32 build:
33 context: .
34 dockerfile: Dockerfile.worker
35 environment:
36 - DATABASE_URL=postgresql://postgres:postgres@db:5432/agents
37 - REDIS_URL=redis://redis:6379/0
38 - OPENAI_API_KEY=${OPENAI_API_KEY}
39 depends_on:
40 - agent-api
41 - redis
42 deploy:
43 replicas: 3
44 resources:
45 limits:
46 cpus: "4"
47 memory: 8G
48
49 db:
50 image: postgres:15-alpine
51 environment:
52 POSTGRES_DB: agents
53 POSTGRES_USER: postgres
54 POSTGRES_PASSWORD: postgres
55 volumes:
56 - postgres_data:/var/lib/postgresql/data
57 healthcheck:
58 test: ["CMD-SHELL", "pg_isready -U postgres"]
59 interval: 5s
60 timeout: 5s
61 retries: 5
62
63 redis:
64 image: redis:7-alpine
65 command: redis-server --appendonly yes
66 volumes:
67 - redis_data:/data
68 healthcheck:
69 test: ["CMD", "redis-cli", "ping"]
70 interval: 5s
71 timeout: 5s
72 retries: 5
73
74volumes:
75 postgres_data:
76 redis_data:Kubernetes Orchestration
Kubernetes provides robust orchestration for agent systems, enabling automatic scaling, self-healing, and declarative configuration.
Agent Deployment Manifest
1# agent-deployment.yaml
2apiVersion: apps/v1
3kind: Deployment
4metadata:
5 name: agent-api
6 labels:
7 app: agent-api
8 version: v1
9spec:
10 replicas: 3
11 strategy:
12 type: RollingUpdate
13 rollingUpdate:
14 maxSurge: 1
15 maxUnavailable: 0
16 selector:
17 matchLabels:
18 app: agent-api
19 template:
20 metadata:
21 labels:
22 app: agent-api
23 version: v1
24 annotations:
25 prometheus.io/scrape: "true"
26 prometheus.io/port: "8000"
27 prometheus.io/path: "/metrics"
28 spec:
29 serviceAccountName: agent-api
30 securityContext:
31 runAsNonRoot: true
32 runAsUser: 1000
33 fsGroup: 1000
34
35 containers:
36 - name: agent-api
37 image: registry.example.com/agent-api:v1.2.3
38 imagePullPolicy: Always
39
40 ports:
41 - containerPort: 8000
42 name: http
43
44 env:
45 - name: DATABASE_URL
46 valueFrom:
47 secretKeyRef:
48 name: agent-secrets
49 key: database-url
50 - name: OPENAI_API_KEY
51 valueFrom:
52 secretKeyRef:
53 name: agent-secrets
54 key: openai-api-key
55 - name: POD_NAME
56 valueFrom:
57 fieldRef:
58 fieldPath: metadata.name
59
60 resources:
61 requests:
62 cpu: "500m"
63 memory: "1Gi"
64 limits:
65 cpu: "2000m"
66 memory: "4Gi"
67
68 livenessProbe:
69 httpGet:
70 path: /health/live
71 port: http
72 initialDelaySeconds: 10
73 periodSeconds: 15
74 timeoutSeconds: 5
75 failureThreshold: 3
76
77 readinessProbe:
78 httpGet:
79 path: /health/ready
80 port: http
81 initialDelaySeconds: 5
82 periodSeconds: 5
83 timeoutSeconds: 3
84 failureThreshold: 3
85
86 lifecycle:
87 preStop:
88 exec:
89 command: ["/bin/sh", "-c", "sleep 10"]
90
91 affinity:
92 podAntiAffinity:
93 preferredDuringSchedulingIgnoredDuringExecution:
94 - weight: 100
95 podAffinityTerm:
96 labelSelector:
97 matchLabels:
98 app: agent-api
99 topologyKey: kubernetes.io/hostname
100
101 topologySpreadConstraints:
102 - maxSkew: 1
103 topologyKey: topology.kubernetes.io/zone
104 whenUnsatisfiable: ScheduleAnyway
105 labelSelector:
106 matchLabels:
107 app: agent-apiHorizontal Pod Autoscaler
1# agent-hpa.yaml
2apiVersion: autoscaling/v2
3kind: HorizontalPodAutoscaler
4metadata:
5 name: agent-api-hpa
6spec:
7 scaleTargetRef:
8 apiVersion: apps/v1
9 kind: Deployment
10 name: agent-api
11 minReplicas: 3
12 maxReplicas: 20
13 metrics:
14 - type: Resource
15 resource:
16 name: cpu
17 target:
18 type: Utilization
19 averageUtilization: 70
20 - type: Resource
21 resource:
22 name: memory
23 target:
24 type: Utilization
25 averageUtilization: 80
26 - type: Pods
27 pods:
28 metric:
29 name: agent_requests_per_second
30 target:
31 type: AverageValue
32 averageValue: "100"
33 behavior:
34 scaleUp:
35 stabilizationWindowSeconds: 60
36 policies:
37 - type: Pods
38 value: 4
39 periodSeconds: 60
40 - type: Percent
41 value: 100
42 periodSeconds: 60
43 selectPolicy: Max
44 scaleDown:
45 stabilizationWindowSeconds: 300
46 policies:
47 - type: Pods
48 value: 1
49 periodSeconds: 120Deployment Patterns
Different deployment patterns offer various trade-offs between risk, speed, and resource usage.
| Pattern | Description | Best For | Rollback Time |
|---|---|---|---|
| Rolling Update | Gradual replacement of pods | Standard deployments | Minutes |
| Blue-Green | Full environment switch | Zero-downtime releases | Seconds |
| Canary | Gradual traffic shift | Risk-sensitive releases | Seconds |
| A/B Testing | User-based routing | Feature experiments | Seconds |
Blue-Green Deployment Implementation
1"""Blue-green deployment controller."""
2from dataclasses import dataclass
3from enum import Enum
4from typing import Optional
5import httpx
6
7
8class Environment(Enum):
9 BLUE = "blue"
10 GREEN = "green"
11
12
13@dataclass
14class DeploymentStatus:
15 active: Environment
16 standby: Environment
17 blue_version: str
18 green_version: str
19 blue_healthy: bool
20 green_healthy: bool
21
22
23class BlueGreenController:
24 """Manages blue-green deployments for agent services."""
25
26 def __init__(
27 self,
28 load_balancer_api: str,
29 blue_endpoint: str,
30 green_endpoint: str,
31 ):
32 self.load_balancer_api = load_balancer_api
33 self.endpoints = {
34 Environment.BLUE: blue_endpoint,
35 Environment.GREEN: green_endpoint,
36 }
37 self.client = httpx.AsyncClient(timeout=30.0)
38
39 async def get_status(self) -> DeploymentStatus:
40 """Get current deployment status."""
41 # Check which environment is active
42 response = await self.client.get(
43 f"{self.load_balancer_api}/active"
44 )
45 active_env = Environment(response.json()["environment"])
46 standby_env = (
47 Environment.GREEN
48 if active_env == Environment.BLUE
49 else Environment.BLUE
50 )
51
52 # Get versions and health
53 blue_info = await self._get_env_info(Environment.BLUE)
54 green_info = await self._get_env_info(Environment.GREEN)
55
56 return DeploymentStatus(
57 active=active_env,
58 standby=standby_env,
59 blue_version=blue_info["version"],
60 green_version=green_info["version"],
61 blue_healthy=blue_info["healthy"],
62 green_healthy=green_info["healthy"],
63 )
64
65 async def _get_env_info(self, env: Environment) -> dict:
66 """Get environment info."""
67 try:
68 response = await self.client.get(
69 f"{self.endpoints[env]}/health"
70 )
71 data = response.json()
72 return {
73 "version": data.get("version", "unknown"),
74 "healthy": response.status_code == 200,
75 }
76 except Exception:
77 return {"version": "unknown", "healthy": False}
78
79 async def deploy_to_standby(self, version: str) -> bool:
80 """Deploy new version to standby environment."""
81 status = await self.get_status()
82 standby = status.standby
83
84 print(f"Deploying {version} to {standby.value} environment")
85
86 # Trigger deployment to standby
87 response = await self.client.post(
88 f"{self.endpoints[standby]}/deploy",
89 json={"version": version},
90 )
91
92 if response.status_code != 200:
93 print(f"Deployment failed: {response.text}")
94 return False
95
96 # Wait for deployment to complete
97 healthy = await self._wait_for_healthy(standby)
98
99 if not healthy:
100 print(f"Deployment unhealthy, rolling back")
101 await self._rollback_standby(standby)
102 return False
103
104 return True
105
106 async def _wait_for_healthy(
107 self,
108 env: Environment,
109 max_attempts: int = 30,
110 ) -> bool:
111 """Wait for environment to become healthy."""
112 import asyncio
113
114 for attempt in range(max_attempts):
115 info = await self._get_env_info(env)
116 if info["healthy"]:
117 return True
118 await asyncio.sleep(10)
119
120 return False
121
122 async def switch_traffic(self) -> bool:
123 """Switch traffic to standby environment."""
124 status = await self.get_status()
125
126 if not status.green_healthy or not status.blue_healthy:
127 print("Cannot switch: environments not healthy")
128 return False
129
130 # Switch load balancer
131 response = await self.client.post(
132 f"{self.load_balancer_api}/switch",
133 json={"target": status.standby.value},
134 )
135
136 if response.status_code != 200:
137 print(f"Switch failed: {response.text}")
138 return False
139
140 print(f"Traffic switched to {status.standby.value}")
141 return True
142
143 async def rollback(self) -> bool:
144 """Rollback to previous environment."""
145 # Simply switch back
146 return await self.switch_traffic()
147
148 async def _rollback_standby(self, env: Environment) -> None:
149 """Rollback standby deployment."""
150 await self.client.post(
151 f"{self.endpoints[env]}/rollback"
152 )Canary Deployment with Traffic Splitting
1"""Canary deployment with progressive traffic shifting."""
2from dataclasses import dataclass, field
3from datetime import datetime, timedelta
4from typing import Optional, Callable, Awaitable
5import asyncio
6
7
8@dataclass
9class CanaryMetrics:
10 """Metrics for canary analysis."""
11 error_rate: float
12 latency_p50: float
13 latency_p99: float
14 success_rate: float
15 request_count: int
16
17
18@dataclass
19class CanaryStage:
20 """A stage in the canary rollout."""
21 traffic_percent: int
22 duration_minutes: int
23 success_criteria: dict
24
25
26@dataclass
27class CanaryConfig:
28 """Canary deployment configuration."""
29 stages: list[CanaryStage] = field(default_factory=list)
30 rollback_on_failure: bool = True
31 analysis_interval_seconds: int = 30
32
33
34class CanaryDeployment:
35 """Manages canary deployments with automated analysis."""
36
37 DEFAULT_STAGES = [
38 CanaryStage(
39 traffic_percent=5,
40 duration_minutes=5,
41 success_criteria={
42 "max_error_rate": 0.01,
43 "max_latency_p99": 1000,
44 },
45 ),
46 CanaryStage(
47 traffic_percent=25,
48 duration_minutes=10,
49 success_criteria={
50 "max_error_rate": 0.01,
51 "max_latency_p99": 1000,
52 },
53 ),
54 CanaryStage(
55 traffic_percent=50,
56 duration_minutes=15,
57 success_criteria={
58 "max_error_rate": 0.005,
59 "max_latency_p99": 800,
60 },
61 ),
62 CanaryStage(
63 traffic_percent=100,
64 duration_minutes=0,
65 success_criteria={},
66 ),
67 ]
68
69 def __init__(
70 self,
71 traffic_manager: "TrafficManager",
72 metrics_collector: "MetricsCollector",
73 config: Optional[CanaryConfig] = None,
74 ):
75 self.traffic_manager = traffic_manager
76 self.metrics_collector = metrics_collector
77 self.config = config or CanaryConfig(stages=self.DEFAULT_STAGES)
78 self._current_stage = 0
79 self._rollback_triggered = False
80
81 async def deploy(
82 self,
83 version: str,
84 on_progress: Optional[Callable[[int, str], Awaitable[None]]] = None,
85 ) -> bool:
86 """Execute canary deployment."""
87 self._current_stage = 0
88 self._rollback_triggered = False
89
90 for i, stage in enumerate(self.config.stages):
91 self._current_stage = i
92
93 # Update traffic split
94 await self.traffic_manager.set_canary_weight(
95 stage.traffic_percent
96 )
97
98 if on_progress:
99 await on_progress(
100 stage.traffic_percent,
101 f"Stage {i + 1}: {stage.traffic_percent}% traffic"
102 )
103
104 # Run analysis for stage duration
105 if stage.duration_minutes > 0:
106 success = await self._analyze_stage(stage)
107
108 if not success:
109 if self.config.rollback_on_failure:
110 await self._rollback()
111 return False
112
113 print(f"Canary deployment of {version} completed successfully")
114 return True
115
116 async def _analyze_stage(self, stage: CanaryStage) -> bool:
117 """Analyze canary metrics during a stage."""
118 end_time = datetime.now() + timedelta(minutes=stage.duration_minutes)
119
120 while datetime.now() < end_time:
121 # Collect metrics
122 baseline_metrics = await self.metrics_collector.get_metrics(
123 target="baseline"
124 )
125 canary_metrics = await self.metrics_collector.get_metrics(
126 target="canary"
127 )
128
129 # Compare against criteria
130 if not self._check_criteria(canary_metrics, stage.success_criteria):
131 print(f"Canary failed criteria check")
132 return False
133
134 # Compare against baseline
135 if not self._compare_to_baseline(canary_metrics, baseline_metrics):
136 print(f"Canary performing worse than baseline")
137 return False
138
139 await asyncio.sleep(self.config.analysis_interval_seconds)
140
141 return True
142
143 def _check_criteria(
144 self,
145 metrics: CanaryMetrics,
146 criteria: dict,
147 ) -> bool:
148 """Check if metrics meet success criteria."""
149 if "max_error_rate" in criteria:
150 if metrics.error_rate > criteria["max_error_rate"]:
151 return False
152
153 if "max_latency_p99" in criteria:
154 if metrics.latency_p99 > criteria["max_latency_p99"]:
155 return False
156
157 return True
158
159 def _compare_to_baseline(
160 self,
161 canary: CanaryMetrics,
162 baseline: CanaryMetrics,
163 ) -> bool:
164 """Compare canary metrics to baseline."""
165 # Allow 10% degradation
166 if canary.error_rate > baseline.error_rate * 1.1:
167 return False
168
169 if canary.latency_p99 > baseline.latency_p99 * 1.1:
170 return False
171
172 return True
173
174 async def _rollback(self) -> None:
175 """Rollback canary deployment."""
176 self._rollback_triggered = True
177 await self.traffic_manager.set_canary_weight(0)
178 print("Canary rollback completed")CI/CD Pipelines
Continuous integration and deployment pipelines automate testing and release processes for agent systems.
GitHub Actions Workflow
1# .github/workflows/deploy.yml
2name: Deploy Agent System
3
4on:
5 push:
6 branches: [main]
7 pull_request:
8 branches: [main]
9
10env:
11 REGISTRY: ghcr.io
12 IMAGE_NAME: ${{ github.repository }}
13
14jobs:
15 test:
16 runs-on: ubuntu-latest
17 steps:
18 - uses: actions/checkout@v4
19
20 - name: Set up Python
21 uses: actions/setup-python@v5
22 with:
23 python-version: "3.11"
24 cache: "pip"
25
26 - name: Install dependencies
27 run: |
28 pip install -r requirements.txt
29 pip install -r requirements-dev.txt
30
31 - name: Run linting
32 run: |
33 ruff check src/
34 mypy src/
35
36 - name: Run unit tests
37 run: pytest tests/unit -v --cov=src --cov-report=xml
38
39 - name: Run integration tests
40 env:
41 OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
42 run: pytest tests/integration -v
43
44 - name: Upload coverage
45 uses: codecov/codecov-action@v4
46 with:
47 files: ./coverage.xml
48
49 build:
50 needs: test
51 runs-on: ubuntu-latest
52 outputs:
53 image_tag: ${{ steps.meta.outputs.tags }}
54
55 steps:
56 - uses: actions/checkout@v4
57
58 - name: Set up Docker Buildx
59 uses: docker/setup-buildx-action@v3
60
61 - name: Log in to registry
62 uses: docker/login-action@v3
63 with:
64 registry: ${{ env.REGISTRY }}
65 username: ${{ github.actor }}
66 password: ${{ secrets.GITHUB_TOKEN }}
67
68 - name: Extract metadata
69 id: meta
70 uses: docker/metadata-action@v5
71 with:
72 images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
73 tags: |
74 type=sha,prefix=
75 type=ref,event=branch
76 type=semver,pattern={{version}}
77
78 - name: Build and push
79 uses: docker/build-push-action@v5
80 with:
81 context: .
82 push: ${{ github.event_name != 'pull_request' }}
83 tags: ${{ steps.meta.outputs.tags }}
84 labels: ${{ steps.meta.outputs.labels }}
85 cache-from: type=gha
86 cache-to: type=gha,mode=max
87
88 deploy-staging:
89 needs: build
90 if: github.ref == 'refs/heads/main'
91 runs-on: ubuntu-latest
92 environment: staging
93
94 steps:
95 - uses: actions/checkout@v4
96
97 - name: Set up kubectl
98 uses: azure/setup-kubectl@v3
99
100 - name: Configure kubectl
101 run: |
102 echo "${{ secrets.KUBE_CONFIG_STAGING }}" | base64 -d > kubeconfig
103 export KUBECONFIG=kubeconfig
104
105 - name: Deploy to staging
106 run: |
107 kubectl set image deployment/agent-api \
108 agent-api=${{ needs.build.outputs.image_tag }} \
109 --namespace=staging
110 kubectl rollout status deployment/agent-api \
111 --namespace=staging --timeout=300s
112
113 - name: Run smoke tests
114 env:
115 STAGING_URL: ${{ vars.STAGING_URL }}
116 run: |
117 python scripts/smoke_tests.py --url $STAGING_URL
118
119 deploy-production:
120 needs: [build, deploy-staging]
121 if: github.ref == 'refs/heads/main'
122 runs-on: ubuntu-latest
123 environment: production
124
125 steps:
126 - uses: actions/checkout@v4
127
128 - name: Set up kubectl
129 uses: azure/setup-kubectl@v3
130
131 - name: Configure kubectl
132 run: |
133 echo "${{ secrets.KUBE_CONFIG_PROD }}" | base64 -d > kubeconfig
134 export KUBECONFIG=kubeconfig
135
136 - name: Deploy canary
137 run: |
138 python scripts/canary_deploy.py \
139 --image ${{ needs.build.outputs.image_tag }} \
140 --namespace production \
141 --stages "5,25,50,100" \
142 --stage-duration 300
143
144 - name: Notify deployment
145 uses: slackapi/slack-github-action@v1
146 with:
147 payload: |
148 {
149 "text": "Deployed ${{ needs.build.outputs.image_tag }} to production"
150 }
151 env:
152 SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}Environment Management
Managing multiple environments requires consistent configuration and secret management across development, staging, and production.
Environment Configuration Manager
1"""Environment configuration management."""
2from dataclasses import dataclass
3from enum import Enum
4from typing import Any, Optional
5from pathlib import Path
6import os
7import json
8
9
10class EnvironmentType(Enum):
11 DEVELOPMENT = "development"
12 STAGING = "staging"
13 PRODUCTION = "production"
14
15
16@dataclass
17class EnvironmentConfig:
18 """Configuration for a specific environment."""
19 env_type: EnvironmentType
20 api_url: str
21 database_url: str
22 redis_url: str
23 log_level: str
24 debug: bool
25 feature_flags: dict[str, bool]
26 rate_limits: dict[str, int]
27 llm_config: dict[str, Any]
28
29
30class ConfigurationManager:
31 """Manages environment-specific configurations."""
32
33 def __init__(self):
34 self._configs: dict[EnvironmentType, EnvironmentConfig] = {}
35 self._secrets: dict[str, str] = {}
36 self._current_env: Optional[EnvironmentType] = None
37
38 def load_from_file(self, config_path: Path) -> None:
39 """Load configuration from file."""
40 with open(config_path) as f:
41 data = json.load(f)
42
43 for env_name, env_config in data.get("environments", {}).items():
44 env_type = EnvironmentType(env_name)
45 self._configs[env_type] = EnvironmentConfig(
46 env_type=env_type,
47 api_url=env_config["api_url"],
48 database_url=self._resolve_secret(
49 env_config["database_url"]
50 ),
51 redis_url=self._resolve_secret(
52 env_config["redis_url"]
53 ),
54 log_level=env_config.get("log_level", "INFO"),
55 debug=env_config.get("debug", False),
56 feature_flags=env_config.get("feature_flags", {}),
57 rate_limits=env_config.get("rate_limits", {}),
58 llm_config=env_config.get("llm_config", {}),
59 )
60
61 def _resolve_secret(self, value: str) -> str:
62 """Resolve secret references in config values."""
63 if value.startswith("$"):
64 env_var = value[2:-1] if value.startswith("${") else value[1:]
65 return os.environ.get(env_var, value)
66 return value
67
68 def get_config(
69 self,
70 env_type: Optional[EnvironmentType] = None,
71 ) -> EnvironmentConfig:
72 """Get configuration for specified or current environment."""
73 env = env_type or self._current_env
74 if env is None:
75 env = self._detect_environment()
76
77 if env not in self._configs:
78 raise ValueError(f"No configuration for {env}")
79
80 return self._configs[env]
81
82 def _detect_environment(self) -> EnvironmentType:
83 """Detect current environment from environment variables."""
84 env_name = os.environ.get("APP_ENV", "development")
85 return EnvironmentType(env_name)
86
87 def set_environment(self, env_type: EnvironmentType) -> None:
88 """Set the current environment."""
89 self._current_env = env_type
90
91
92# Example configuration file
93EXAMPLE_CONFIG = """
94{
95 "environments": {
96 "development": {
97 "api_url": "http://localhost:8000",
98 "database_url": "${DATABASE_URL}",
99 "redis_url": "redis://localhost:6379/0",
100 "log_level": "DEBUG",
101 "debug": true,
102 "feature_flags": {
103 "enable_new_agent": true,
104 "enable_streaming": true
105 },
106 "rate_limits": {
107 "requests_per_minute": 1000
108 },
109 "llm_config": {
110 "model": "gpt-4o-mini",
111 "temperature": 0.7,
112 "max_tokens": 4096
113 }
114 },
115 "staging": {
116 "api_url": "https://staging-api.example.com",
117 "database_url": "${STAGING_DATABASE_URL}",
118 "redis_url": "${STAGING_REDIS_URL}",
119 "log_level": "INFO",
120 "debug": false,
121 "feature_flags": {
122 "enable_new_agent": true,
123 "enable_streaming": true
124 },
125 "rate_limits": {
126 "requests_per_minute": 500
127 },
128 "llm_config": {
129 "model": "gpt-4o",
130 "temperature": 0.5,
131 "max_tokens": 8192
132 }
133 },
134 "production": {
135 "api_url": "https://api.example.com",
136 "database_url": "${PROD_DATABASE_URL}",
137 "redis_url": "${PROD_REDIS_URL}",
138 "log_level": "WARNING",
139 "debug": false,
140 "feature_flags": {
141 "enable_new_agent": false,
142 "enable_streaming": true
143 },
144 "rate_limits": {
145 "requests_per_minute": 100
146 },
147 "llm_config": {
148 "model": "gpt-4o",
149 "temperature": 0.3,
150 "max_tokens": 8192
151 }
152 }
153 }
154}
155"""Feature Flags for Agents
Feature flags enable gradual rollout of new agent capabilities and quick disabling of problematic features without deployment.
Agent Feature Flag System
1"""Feature flag system for agent capabilities."""
2from dataclasses import dataclass, field
3from datetime import datetime
4from enum import Enum
5from typing import Any, Optional, Callable
6import hashlib
7import json
8
9
10class RolloutStrategy(Enum):
11 ALL = "all"
12 NONE = "none"
13 PERCENTAGE = "percentage"
14 USER_LIST = "user_list"
15 GRADUAL = "gradual"
16
17
18@dataclass
19class FeatureFlag:
20 """Definition of a feature flag."""
21 name: str
22 description: str
23 enabled: bool
24 rollout_strategy: RolloutStrategy
25 rollout_percentage: int = 0
26 allowed_users: list[str] = field(default_factory=list)
27 metadata: dict[str, Any] = field(default_factory=dict)
28 created_at: datetime = field(default_factory=datetime.now)
29 updated_at: datetime = field(default_factory=datetime.now)
30
31
32class FeatureFlagService:
33 """Manages feature flags for agent system."""
34
35 def __init__(self, config_source: Optional[str] = None):
36 self._flags: dict[str, FeatureFlag] = {}
37 self._overrides: dict[str, dict[str, bool]] = {}
38 self._listeners: list[Callable[[str, bool], None]] = []
39
40 if config_source:
41 self._load_flags(config_source)
42
43 def _load_flags(self, source: str) -> None:
44 """Load flags from configuration source."""
45 with open(source) as f:
46 data = json.load(f)
47
48 for flag_data in data.get("flags", []):
49 flag = FeatureFlag(
50 name=flag_data["name"],
51 description=flag_data.get("description", ""),
52 enabled=flag_data.get("enabled", False),
53 rollout_strategy=RolloutStrategy(
54 flag_data.get("rollout_strategy", "none")
55 ),
56 rollout_percentage=flag_data.get("rollout_percentage", 0),
57 allowed_users=flag_data.get("allowed_users", []),
58 metadata=flag_data.get("metadata", {}),
59 )
60 self._flags[flag.name] = flag
61
62 def is_enabled(
63 self,
64 flag_name: str,
65 user_id: Optional[str] = None,
66 context: Optional[dict[str, Any]] = None,
67 ) -> bool:
68 """Check if a feature flag is enabled."""
69 # Check for user-specific override
70 if user_id and user_id in self._overrides:
71 if flag_name in self._overrides[user_id]:
72 return self._overrides[user_id][flag_name]
73
74 flag = self._flags.get(flag_name)
75 if not flag:
76 return False
77
78 if not flag.enabled:
79 return False
80
81 return self._evaluate_rollout(flag, user_id, context)
82
83 def _evaluate_rollout(
84 self,
85 flag: FeatureFlag,
86 user_id: Optional[str],
87 context: Optional[dict[str, Any]],
88 ) -> bool:
89 """Evaluate rollout strategy."""
90 strategy = flag.rollout_strategy
91
92 if strategy == RolloutStrategy.ALL:
93 return True
94
95 if strategy == RolloutStrategy.NONE:
96 return False
97
98 if strategy == RolloutStrategy.USER_LIST:
99 return user_id in flag.allowed_users if user_id else False
100
101 if strategy == RolloutStrategy.PERCENTAGE:
102 if not user_id:
103 return False
104 # Consistent hashing for user
105 hash_input = f"{flag.name}:{user_id}"
106 hash_value = int(
107 hashlib.md5(hash_input.encode()).hexdigest(), 16
108 )
109 return (hash_value % 100) < flag.rollout_percentage
110
111 if strategy == RolloutStrategy.GRADUAL:
112 # Time-based gradual rollout
113 if not flag.metadata.get("rollout_start"):
114 return False
115
116 start = datetime.fromisoformat(flag.metadata["rollout_start"])
117 end = datetime.fromisoformat(flag.metadata["rollout_end"])
118 now = datetime.now()
119
120 if now < start:
121 return False
122 if now > end:
123 return True
124
125 # Calculate current percentage
126 total_duration = (end - start).total_seconds()
127 elapsed = (now - start).total_seconds()
128 current_pct = int((elapsed / total_duration) * 100)
129
130 if not user_id:
131 return False
132
133 hash_input = f"{flag.name}:{user_id}"
134 hash_value = int(
135 hashlib.md5(hash_input.encode()).hexdigest(), 16
136 )
137 return (hash_value % 100) < current_pct
138
139 return False
140
141 def set_override(
142 self,
143 user_id: str,
144 flag_name: str,
145 enabled: bool,
146 ) -> None:
147 """Set a user-specific override for a flag."""
148 if user_id not in self._overrides:
149 self._overrides[user_id] = {}
150 self._overrides[user_id][flag_name] = enabled
151
152 def update_flag(
153 self,
154 flag_name: str,
155 enabled: Optional[bool] = None,
156 rollout_percentage: Optional[int] = None,
157 ) -> None:
158 """Update a feature flag."""
159 if flag_name not in self._flags:
160 raise ValueError(f"Unknown flag: {flag_name}")
161
162 flag = self._flags[flag_name]
163
164 if enabled is not None:
165 flag.enabled = enabled
166
167 if rollout_percentage is not None:
168 flag.rollout_percentage = rollout_percentage
169
170 flag.updated_at = datetime.now()
171
172 # Notify listeners
173 for listener in self._listeners:
174 listener(flag_name, flag.enabled)
175
176
177# Usage with agent capabilities
178class AgentWithFeatureFlags:
179 """Agent that uses feature flags for capabilities."""
180
181 def __init__(self, feature_flags: FeatureFlagService):
182 self.flags = feature_flags
183
184 async def process(
185 self,
186 request: dict,
187 user_id: str,
188 ) -> dict:
189 """Process request with feature-flagged capabilities."""
190 response = {"result": None, "features_used": []}
191
192 # Check for streaming capability
193 if self.flags.is_enabled("enable_streaming", user_id):
194 response["streaming"] = True
195 response["features_used"].append("streaming")
196
197 # Check for new model
198 if self.flags.is_enabled("use_gpt4_turbo", user_id):
199 model = "gpt-4-turbo"
200 response["features_used"].append("gpt4_turbo")
201 else:
202 model = "gpt-4"
203
204 # Check for tool use
205 if self.flags.is_enabled("enable_tool_use", user_id):
206 tools = self._get_available_tools()
207 response["features_used"].append("tool_use")
208 else:
209 tools = []
210
211 # Process with selected features
212 result = await self._execute(request, model, tools)
213 response["result"] = result
214
215 return responseRollback Strategies
Having robust rollback strategies is essential for maintaining system reliability when deployments encounter issues.
Automated Rollback System
1"""Automated rollback system for agent deployments."""
2from dataclasses import dataclass, field
3from datetime import datetime, timedelta
4from enum import Enum
5from typing import Optional, Callable, Awaitable
6import asyncio
7
8
9class RollbackReason(Enum):
10 ERROR_RATE_HIGH = "error_rate_high"
11 LATENCY_DEGRADED = "latency_degraded"
12 HEALTH_CHECK_FAILED = "health_check_failed"
13 MANUAL = "manual"
14 CIRCUIT_BREAKER = "circuit_breaker"
15
16
17@dataclass
18class RollbackEvent:
19 """Record of a rollback event."""
20 timestamp: datetime
21 from_version: str
22 to_version: str
23 reason: RollbackReason
24 details: str
25 duration_seconds: float
26
27
28@dataclass
29class RollbackConfig:
30 """Configuration for automated rollback."""
31 # Error thresholds
32 max_error_rate: float = 0.05
33 error_rate_window_seconds: int = 60
34
35 # Latency thresholds
36 max_latency_p99_ms: float = 2000
37 latency_window_seconds: int = 60
38
39 # Health check settings
40 health_check_interval_seconds: int = 10
41 health_check_failures_threshold: int = 3
42
43 # Rollback behavior
44 cooldown_seconds: int = 300
45 max_rollbacks_per_hour: int = 3
46
47
48class RollbackManager:
49 """Manages automated rollbacks for deployments."""
50
51 def __init__(
52 self,
53 config: RollbackConfig,
54 deployment_manager: "DeploymentManager",
55 metrics_collector: "MetricsCollector",
56 ):
57 self.config = config
58 self.deployment_manager = deployment_manager
59 self.metrics_collector = metrics_collector
60 self._rollback_history: list[RollbackEvent] = []
61 self._last_rollback: Optional[datetime] = None
62 self._health_failures = 0
63 self._monitoring = False
64
65 async def start_monitoring(self) -> None:
66 """Start monitoring for rollback conditions."""
67 self._monitoring = True
68
69 while self._monitoring:
70 try:
71 await self._check_rollback_conditions()
72 except Exception as e:
73 print(f"Error checking rollback conditions: {e}")
74
75 await asyncio.sleep(
76 self.config.health_check_interval_seconds
77 )
78
79 def stop_monitoring(self) -> None:
80 """Stop monitoring."""
81 self._monitoring = False
82
83 async def _check_rollback_conditions(self) -> None:
84 """Check if rollback is needed."""
85 # Check error rate
86 error_rate = await self.metrics_collector.get_error_rate(
87 window_seconds=self.config.error_rate_window_seconds
88 )
89
90 if error_rate > self.config.max_error_rate:
91 await self._trigger_rollback(
92 RollbackReason.ERROR_RATE_HIGH,
93 f"Error rate {error_rate:.2%} exceeds threshold "
94 f"{self.config.max_error_rate:.2%}",
95 )
96 return
97
98 # Check latency
99 latency_p99 = await self.metrics_collector.get_latency_percentile(
100 percentile=99,
101 window_seconds=self.config.latency_window_seconds,
102 )
103
104 if latency_p99 > self.config.max_latency_p99_ms:
105 await self._trigger_rollback(
106 RollbackReason.LATENCY_DEGRADED,
107 f"P99 latency {latency_p99}ms exceeds threshold "
108 f"{self.config.max_latency_p99_ms}ms",
109 )
110 return
111
112 # Check health
113 healthy = await self.deployment_manager.health_check()
114
115 if not healthy:
116 self._health_failures += 1
117
118 if self._health_failures >= self.config.health_check_failures_threshold:
119 await self._trigger_rollback(
120 RollbackReason.HEALTH_CHECK_FAILED,
121 f"Health check failed {self._health_failures} times",
122 )
123 self._health_failures = 0
124 else:
125 self._health_failures = 0
126
127 async def _trigger_rollback(
128 self,
129 reason: RollbackReason,
130 details: str,
131 ) -> bool:
132 """Trigger a rollback."""
133 # Check cooldown
134 if self._last_rollback:
135 cooldown_end = self._last_rollback + timedelta(
136 seconds=self.config.cooldown_seconds
137 )
138 if datetime.now() < cooldown_end:
139 print(f"Rollback blocked: in cooldown until {cooldown_end}")
140 return False
141
142 # Check rate limit
143 recent_rollbacks = [
144 r for r in self._rollback_history
145 if r.timestamp > datetime.now() - timedelta(hours=1)
146 ]
147
148 if len(recent_rollbacks) >= self.config.max_rollbacks_per_hour:
149 print(
150 f"Rollback blocked: {len(recent_rollbacks)} rollbacks "
151 f"in last hour (max: {self.config.max_rollbacks_per_hour})"
152 )
153 return False
154
155 # Execute rollback
156 print(f"Triggering rollback: {reason.value} - {details}")
157
158 start_time = datetime.now()
159 current_version = await self.deployment_manager.get_current_version()
160 previous_version = await self.deployment_manager.get_previous_version()
161
162 success = await self.deployment_manager.rollback_to_version(
163 previous_version
164 )
165
166 duration = (datetime.now() - start_time).total_seconds()
167
168 if success:
169 event = RollbackEvent(
170 timestamp=datetime.now(),
171 from_version=current_version,
172 to_version=previous_version,
173 reason=reason,
174 details=details,
175 duration_seconds=duration,
176 )
177 self._rollback_history.append(event)
178 self._last_rollback = datetime.now()
179
180 # Send notifications
181 await self._notify_rollback(event)
182
183 print(f"Rollback completed in {duration:.2f}s")
184 return True
185
186 print("Rollback failed!")
187 return False
188
189 async def manual_rollback(
190 self,
191 target_version: Optional[str] = None,
192 reason: str = "Manual rollback",
193 ) -> bool:
194 """Perform manual rollback."""
195 if target_version is None:
196 target_version = await self.deployment_manager.get_previous_version()
197
198 return await self._trigger_rollback(
199 RollbackReason.MANUAL,
200 reason,
201 )
202
203 async def _notify_rollback(self, event: RollbackEvent) -> None:
204 """Send rollback notifications."""
205 # This would integrate with alerting systems
206 print(f"ROLLBACK NOTIFICATION: {event.reason.value}")
207 print(f" From: {event.from_version}")
208 print(f" To: {event.to_version}")
209 print(f" Details: {event.details}")
210
211 def get_rollback_history(
212 self,
213 since: Optional[datetime] = None,
214 ) -> list[RollbackEvent]:
215 """Get rollback history."""
216 if since:
217 return [r for r in self._rollback_history if r.timestamp >= since]
218 return self._rollback_history.copy()Database Migration Rollback
1"""Database migration rollback support."""
2from dataclasses import dataclass
3from datetime import datetime
4from typing import Optional
5import asyncpg
6
7
8@dataclass
9class Migration:
10 """Database migration record."""
11 version: str
12 name: str
13 up_sql: str
14 down_sql: str
15 applied_at: Optional[datetime] = None
16
17
18class MigrationManager:
19 """Manages database migrations with rollback support."""
20
21 def __init__(self, database_url: str):
22 self.database_url = database_url
23 self._pool: Optional[asyncpg.Pool] = None
24
25 async def connect(self) -> None:
26 """Connect to database."""
27 self._pool = await asyncpg.create_pool(self.database_url)
28 await self._ensure_migrations_table()
29
30 async def _ensure_migrations_table(self) -> None:
31 """Create migrations tracking table."""
32 async with self._pool.acquire() as conn:
33 await conn.execute("""
34 CREATE TABLE IF NOT EXISTS _migrations (
35 version VARCHAR(255) PRIMARY KEY,
36 name VARCHAR(255) NOT NULL,
37 applied_at TIMESTAMP DEFAULT NOW(),
38 down_sql TEXT
39 )
40 """)
41
42 async def apply_migration(self, migration: Migration) -> bool:
43 """Apply a migration."""
44 async with self._pool.acquire() as conn:
45 async with conn.transaction():
46 try:
47 # Execute up migration
48 await conn.execute(migration.up_sql)
49
50 # Record migration
51 await conn.execute(
52 """
53 INSERT INTO _migrations (version, name, down_sql)
54 VALUES ($1, $2, $3)
55 """,
56 migration.version,
57 migration.name,
58 migration.down_sql,
59 )
60
61 print(f"Applied migration: {migration.version}")
62 return True
63
64 except Exception as e:
65 print(f"Migration failed: {e}")
66 raise
67
68 async def rollback_migration(self, version: str) -> bool:
69 """Rollback a specific migration."""
70 async with self._pool.acquire() as conn:
71 # Get migration info
72 row = await conn.fetchrow(
73 "SELECT down_sql FROM _migrations WHERE version = $1",
74 version,
75 )
76
77 if not row:
78 print(f"Migration {version} not found")
79 return False
80
81 async with conn.transaction():
82 try:
83 # Execute down migration
84 if row["down_sql"]:
85 await conn.execute(row["down_sql"])
86
87 # Remove migration record
88 await conn.execute(
89 "DELETE FROM _migrations WHERE version = $1",
90 version,
91 )
92
93 print(f"Rolled back migration: {version}")
94 return True
95
96 except Exception as e:
97 print(f"Rollback failed: {e}")
98 raise
99
100 async def rollback_to_version(self, target_version: str) -> bool:
101 """Rollback all migrations after target version."""
102 async with self._pool.acquire() as conn:
103 # Get migrations to rollback (in reverse order)
104 rows = await conn.fetch(
105 """
106 SELECT version, down_sql FROM _migrations
107 WHERE version > $1
108 ORDER BY version DESC
109 """,
110 target_version,
111 )
112
113 for row in rows:
114 success = await self.rollback_migration(row["version"])
115 if not success:
116 return False
117
118 return True
119
120 async def get_current_version(self) -> Optional[str]:
121 """Get current migration version."""
122 async with self._pool.acquire() as conn:
123 row = await conn.fetchrow(
124 "SELECT version FROM _migrations ORDER BY version DESC LIMIT 1"
125 )
126 return row["version"] if row else NoneSummary
This section covered comprehensive deployment strategies for AI agent systems:
- Container-Based Deployment: Multi-stage Dockerfiles for optimized agent images with security best practices
- Kubernetes Orchestration: Deployment manifests, autoscaling, and resource management for agent workloads
- Deployment Patterns: Blue-green and canary strategies for safe, gradual rollouts with automated analysis
- CI/CD Pipelines: Automated testing and deployment workflows using GitHub Actions
- Environment Management: Configuration management across development, staging, and production
- Feature Flags: Gradual rollout of agent capabilities with user targeting and percentage-based strategies
- Rollback Strategies: Automated rollback with health monitoring and database migration support
Production Readiness: A well-designed deployment pipeline enables confidence in releases. When deployments are automated, tested, and reversible, teams can ship faster and more safely.
This concludes Chapter 21 on Production Deployment. The combination of containerization, orchestration, progressive deployment patterns, and robust rollback mechanisms creates a foundation for reliably operating AI agent systems at scale.