Chapter 21
15 min read
Section 134 of 175

Deployment Strategies

Production Deployment

Introduction

Deploying AI agent systems to production requires careful consideration of reliability, scalability, and maintainability. Unlike traditional applications, agent systems have unique deployment challenges including model versioning, long-running processes, and variable resource requirements. This section covers deployment strategies that address these challenges while enabling rapid, safe releases.

Deployment Philosophy: The goal of deployment automation is to make releases boring—predictable, reversible, and routine. Every deployment should be as safe as a configuration change.

Container-Based Deployment

Containerization provides consistency between development and production environments, making it essential for AI agent deployments.

Multi-Stage Dockerfile for Agents

🐳dockerfile
1# Build stage
2FROM python:3.11-slim as builder
3
4WORKDIR /app
5
6# Install build dependencies
7RUN apt-get update && apt-get install -y --no-install-recommends \
8    build-essential \
9    && rm -rf /var/lib/apt/lists/*
10
11# Install Python dependencies
12COPY requirements.txt .
13RUN pip wheel --no-cache-dir --no-deps --wheel-dir /app/wheels -r requirements.txt
14
15# Production stage
16FROM python:3.11-slim as production
17
18WORKDIR /app
19
20# Create non-root user for security
21RUN groupadd -r agent && useradd -r -g agent agent
22
23# Install runtime dependencies only
24RUN apt-get update && apt-get install -y --no-install-recommends \
25    curl \
26    && rm -rf /var/lib/apt/lists/*
27
28# Copy wheels and install
29COPY --from=builder /app/wheels /wheels
30RUN pip install --no-cache /wheels/*
31
32# Copy application code
33COPY --chown=agent:agent src/ ./src/
34COPY --chown=agent:agent config/ ./config/
35
36# Set environment variables
37ENV PYTHONUNBUFFERED=1
38ENV PYTHONDONTWRITEBYTECODE=1
39ENV APP_ENV=production
40
41# Health check
42HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
43    CMD curl -f http://localhost:8000/health || exit 1
44
45# Switch to non-root user
46USER agent
47
48# Start application
49CMD ["python", "-m", "src.main"]

Docker Compose for Local Development

📄yaml
1# docker-compose.yml
2version: "3.9"
3
4services:
5  agent-api:
6    build:
7      context: .
8      dockerfile: Dockerfile
9      target: production
10    ports:
11      - "8000:8000"
12    environment:
13      - DATABASE_URL=postgresql://postgres:postgres@db:5432/agents
14      - REDIS_URL=redis://redis:6379/0
15      - OPENAI_API_KEY=${OPENAI_API_KEY}
16      - LOG_LEVEL=INFO
17    depends_on:
18      db:
19        condition: service_healthy
20      redis:
21        condition: service_healthy
22    deploy:
23      resources:
24        limits:
25          cpus: "2"
26          memory: 4G
27        reservations:
28          cpus: "1"
29          memory: 2G
30
31  agent-worker:
32    build:
33      context: .
34      dockerfile: Dockerfile.worker
35    environment:
36      - DATABASE_URL=postgresql://postgres:postgres@db:5432/agents
37      - REDIS_URL=redis://redis:6379/0
38      - OPENAI_API_KEY=${OPENAI_API_KEY}
39    depends_on:
40      - agent-api
41      - redis
42    deploy:
43      replicas: 3
44      resources:
45        limits:
46          cpus: "4"
47          memory: 8G
48
49  db:
50    image: postgres:15-alpine
51    environment:
52      POSTGRES_DB: agents
53      POSTGRES_USER: postgres
54      POSTGRES_PASSWORD: postgres
55    volumes:
56      - postgres_data:/var/lib/postgresql/data
57    healthcheck:
58      test: ["CMD-SHELL", "pg_isready -U postgres"]
59      interval: 5s
60      timeout: 5s
61      retries: 5
62
63  redis:
64    image: redis:7-alpine
65    command: redis-server --appendonly yes
66    volumes:
67      - redis_data:/data
68    healthcheck:
69      test: ["CMD", "redis-cli", "ping"]
70      interval: 5s
71      timeout: 5s
72      retries: 5
73
74volumes:
75  postgres_data:
76  redis_data:

Kubernetes Orchestration

Kubernetes provides robust orchestration for agent systems, enabling automatic scaling, self-healing, and declarative configuration.

Agent Deployment Manifest

📄yaml
1# agent-deployment.yaml
2apiVersion: apps/v1
3kind: Deployment
4metadata:
5  name: agent-api
6  labels:
7    app: agent-api
8    version: v1
9spec:
10  replicas: 3
11  strategy:
12    type: RollingUpdate
13    rollingUpdate:
14      maxSurge: 1
15      maxUnavailable: 0
16  selector:
17    matchLabels:
18      app: agent-api
19  template:
20    metadata:
21      labels:
22        app: agent-api
23        version: v1
24      annotations:
25        prometheus.io/scrape: "true"
26        prometheus.io/port: "8000"
27        prometheus.io/path: "/metrics"
28    spec:
29      serviceAccountName: agent-api
30      securityContext:
31        runAsNonRoot: true
32        runAsUser: 1000
33        fsGroup: 1000
34
35      containers:
36        - name: agent-api
37          image: registry.example.com/agent-api:v1.2.3
38          imagePullPolicy: Always
39
40          ports:
41            - containerPort: 8000
42              name: http
43
44          env:
45            - name: DATABASE_URL
46              valueFrom:
47                secretKeyRef:
48                  name: agent-secrets
49                  key: database-url
50            - name: OPENAI_API_KEY
51              valueFrom:
52                secretKeyRef:
53                  name: agent-secrets
54                  key: openai-api-key
55            - name: POD_NAME
56              valueFrom:
57                fieldRef:
58                  fieldPath: metadata.name
59
60          resources:
61            requests:
62              cpu: "500m"
63              memory: "1Gi"
64            limits:
65              cpu: "2000m"
66              memory: "4Gi"
67
68          livenessProbe:
69            httpGet:
70              path: /health/live
71              port: http
72            initialDelaySeconds: 10
73            periodSeconds: 15
74            timeoutSeconds: 5
75            failureThreshold: 3
76
77          readinessProbe:
78            httpGet:
79              path: /health/ready
80              port: http
81            initialDelaySeconds: 5
82            periodSeconds: 5
83            timeoutSeconds: 3
84            failureThreshold: 3
85
86          lifecycle:
87            preStop:
88              exec:
89                command: ["/bin/sh", "-c", "sleep 10"]
90
91      affinity:
92        podAntiAffinity:
93          preferredDuringSchedulingIgnoredDuringExecution:
94            - weight: 100
95              podAffinityTerm:
96                labelSelector:
97                  matchLabels:
98                    app: agent-api
99                topologyKey: kubernetes.io/hostname
100
101      topologySpreadConstraints:
102        - maxSkew: 1
103          topologyKey: topology.kubernetes.io/zone
104          whenUnsatisfiable: ScheduleAnyway
105          labelSelector:
106            matchLabels:
107              app: agent-api

Horizontal Pod Autoscaler

📄yaml
1# agent-hpa.yaml
2apiVersion: autoscaling/v2
3kind: HorizontalPodAutoscaler
4metadata:
5  name: agent-api-hpa
6spec:
7  scaleTargetRef:
8    apiVersion: apps/v1
9    kind: Deployment
10    name: agent-api
11  minReplicas: 3
12  maxReplicas: 20
13  metrics:
14    - type: Resource
15      resource:
16        name: cpu
17        target:
18          type: Utilization
19          averageUtilization: 70
20    - type: Resource
21      resource:
22        name: memory
23        target:
24          type: Utilization
25          averageUtilization: 80
26    - type: Pods
27      pods:
28        metric:
29          name: agent_requests_per_second
30        target:
31          type: AverageValue
32          averageValue: "100"
33  behavior:
34    scaleUp:
35      stabilizationWindowSeconds: 60
36      policies:
37        - type: Pods
38          value: 4
39          periodSeconds: 60
40        - type: Percent
41          value: 100
42          periodSeconds: 60
43      selectPolicy: Max
44    scaleDown:
45      stabilizationWindowSeconds: 300
46      policies:
47        - type: Pods
48          value: 1
49          periodSeconds: 120

Deployment Patterns

Different deployment patterns offer various trade-offs between risk, speed, and resource usage.

PatternDescriptionBest ForRollback Time
Rolling UpdateGradual replacement of podsStandard deploymentsMinutes
Blue-GreenFull environment switchZero-downtime releasesSeconds
CanaryGradual traffic shiftRisk-sensitive releasesSeconds
A/B TestingUser-based routingFeature experimentsSeconds

Blue-Green Deployment Implementation

🐍python
1"""Blue-green deployment controller."""
2from dataclasses import dataclass
3from enum import Enum
4from typing import Optional
5import httpx
6
7
8class Environment(Enum):
9    BLUE = "blue"
10    GREEN = "green"
11
12
13@dataclass
14class DeploymentStatus:
15    active: Environment
16    standby: Environment
17    blue_version: str
18    green_version: str
19    blue_healthy: bool
20    green_healthy: bool
21
22
23class BlueGreenController:
24    """Manages blue-green deployments for agent services."""
25
26    def __init__(
27        self,
28        load_balancer_api: str,
29        blue_endpoint: str,
30        green_endpoint: str,
31    ):
32        self.load_balancer_api = load_balancer_api
33        self.endpoints = {
34            Environment.BLUE: blue_endpoint,
35            Environment.GREEN: green_endpoint,
36        }
37        self.client = httpx.AsyncClient(timeout=30.0)
38
39    async def get_status(self) -> DeploymentStatus:
40        """Get current deployment status."""
41        # Check which environment is active
42        response = await self.client.get(
43            f"{self.load_balancer_api}/active"
44        )
45        active_env = Environment(response.json()["environment"])
46        standby_env = (
47            Environment.GREEN
48            if active_env == Environment.BLUE
49            else Environment.BLUE
50        )
51
52        # Get versions and health
53        blue_info = await self._get_env_info(Environment.BLUE)
54        green_info = await self._get_env_info(Environment.GREEN)
55
56        return DeploymentStatus(
57            active=active_env,
58            standby=standby_env,
59            blue_version=blue_info["version"],
60            green_version=green_info["version"],
61            blue_healthy=blue_info["healthy"],
62            green_healthy=green_info["healthy"],
63        )
64
65    async def _get_env_info(self, env: Environment) -> dict:
66        """Get environment info."""
67        try:
68            response = await self.client.get(
69                f"{self.endpoints[env]}/health"
70            )
71            data = response.json()
72            return {
73                "version": data.get("version", "unknown"),
74                "healthy": response.status_code == 200,
75            }
76        except Exception:
77            return {"version": "unknown", "healthy": False}
78
79    async def deploy_to_standby(self, version: str) -> bool:
80        """Deploy new version to standby environment."""
81        status = await self.get_status()
82        standby = status.standby
83
84        print(f"Deploying {version} to {standby.value} environment")
85
86        # Trigger deployment to standby
87        response = await self.client.post(
88            f"{self.endpoints[standby]}/deploy",
89            json={"version": version},
90        )
91
92        if response.status_code != 200:
93            print(f"Deployment failed: {response.text}")
94            return False
95
96        # Wait for deployment to complete
97        healthy = await self._wait_for_healthy(standby)
98
99        if not healthy:
100            print(f"Deployment unhealthy, rolling back")
101            await self._rollback_standby(standby)
102            return False
103
104        return True
105
106    async def _wait_for_healthy(
107        self,
108        env: Environment,
109        max_attempts: int = 30,
110    ) -> bool:
111        """Wait for environment to become healthy."""
112        import asyncio
113
114        for attempt in range(max_attempts):
115            info = await self._get_env_info(env)
116            if info["healthy"]:
117                return True
118            await asyncio.sleep(10)
119
120        return False
121
122    async def switch_traffic(self) -> bool:
123        """Switch traffic to standby environment."""
124        status = await self.get_status()
125
126        if not status.green_healthy or not status.blue_healthy:
127            print("Cannot switch: environments not healthy")
128            return False
129
130        # Switch load balancer
131        response = await self.client.post(
132            f"{self.load_balancer_api}/switch",
133            json={"target": status.standby.value},
134        )
135
136        if response.status_code != 200:
137            print(f"Switch failed: {response.text}")
138            return False
139
140        print(f"Traffic switched to {status.standby.value}")
141        return True
142
143    async def rollback(self) -> bool:
144        """Rollback to previous environment."""
145        # Simply switch back
146        return await self.switch_traffic()
147
148    async def _rollback_standby(self, env: Environment) -> None:
149        """Rollback standby deployment."""
150        await self.client.post(
151            f"{self.endpoints[env]}/rollback"
152        )

Canary Deployment with Traffic Splitting

🐍python
1"""Canary deployment with progressive traffic shifting."""
2from dataclasses import dataclass, field
3from datetime import datetime, timedelta
4from typing import Optional, Callable, Awaitable
5import asyncio
6
7
8@dataclass
9class CanaryMetrics:
10    """Metrics for canary analysis."""
11    error_rate: float
12    latency_p50: float
13    latency_p99: float
14    success_rate: float
15    request_count: int
16
17
18@dataclass
19class CanaryStage:
20    """A stage in the canary rollout."""
21    traffic_percent: int
22    duration_minutes: int
23    success_criteria: dict
24
25
26@dataclass
27class CanaryConfig:
28    """Canary deployment configuration."""
29    stages: list[CanaryStage] = field(default_factory=list)
30    rollback_on_failure: bool = True
31    analysis_interval_seconds: int = 30
32
33
34class CanaryDeployment:
35    """Manages canary deployments with automated analysis."""
36
37    DEFAULT_STAGES = [
38        CanaryStage(
39            traffic_percent=5,
40            duration_minutes=5,
41            success_criteria={
42                "max_error_rate": 0.01,
43                "max_latency_p99": 1000,
44            },
45        ),
46        CanaryStage(
47            traffic_percent=25,
48            duration_minutes=10,
49            success_criteria={
50                "max_error_rate": 0.01,
51                "max_latency_p99": 1000,
52            },
53        ),
54        CanaryStage(
55            traffic_percent=50,
56            duration_minutes=15,
57            success_criteria={
58                "max_error_rate": 0.005,
59                "max_latency_p99": 800,
60            },
61        ),
62        CanaryStage(
63            traffic_percent=100,
64            duration_minutes=0,
65            success_criteria={},
66        ),
67    ]
68
69    def __init__(
70        self,
71        traffic_manager: "TrafficManager",
72        metrics_collector: "MetricsCollector",
73        config: Optional[CanaryConfig] = None,
74    ):
75        self.traffic_manager = traffic_manager
76        self.metrics_collector = metrics_collector
77        self.config = config or CanaryConfig(stages=self.DEFAULT_STAGES)
78        self._current_stage = 0
79        self._rollback_triggered = False
80
81    async def deploy(
82        self,
83        version: str,
84        on_progress: Optional[Callable[[int, str], Awaitable[None]]] = None,
85    ) -> bool:
86        """Execute canary deployment."""
87        self._current_stage = 0
88        self._rollback_triggered = False
89
90        for i, stage in enumerate(self.config.stages):
91            self._current_stage = i
92
93            # Update traffic split
94            await self.traffic_manager.set_canary_weight(
95                stage.traffic_percent
96            )
97
98            if on_progress:
99                await on_progress(
100                    stage.traffic_percent,
101                    f"Stage {i + 1}: {stage.traffic_percent}% traffic"
102                )
103
104            # Run analysis for stage duration
105            if stage.duration_minutes > 0:
106                success = await self._analyze_stage(stage)
107
108                if not success:
109                    if self.config.rollback_on_failure:
110                        await self._rollback()
111                    return False
112
113        print(f"Canary deployment of {version} completed successfully")
114        return True
115
116    async def _analyze_stage(self, stage: CanaryStage) -> bool:
117        """Analyze canary metrics during a stage."""
118        end_time = datetime.now() + timedelta(minutes=stage.duration_minutes)
119
120        while datetime.now() < end_time:
121            # Collect metrics
122            baseline_metrics = await self.metrics_collector.get_metrics(
123                target="baseline"
124            )
125            canary_metrics = await self.metrics_collector.get_metrics(
126                target="canary"
127            )
128
129            # Compare against criteria
130            if not self._check_criteria(canary_metrics, stage.success_criteria):
131                print(f"Canary failed criteria check")
132                return False
133
134            # Compare against baseline
135            if not self._compare_to_baseline(canary_metrics, baseline_metrics):
136                print(f"Canary performing worse than baseline")
137                return False
138
139            await asyncio.sleep(self.config.analysis_interval_seconds)
140
141        return True
142
143    def _check_criteria(
144        self,
145        metrics: CanaryMetrics,
146        criteria: dict,
147    ) -> bool:
148        """Check if metrics meet success criteria."""
149        if "max_error_rate" in criteria:
150            if metrics.error_rate > criteria["max_error_rate"]:
151                return False
152
153        if "max_latency_p99" in criteria:
154            if metrics.latency_p99 > criteria["max_latency_p99"]:
155                return False
156
157        return True
158
159    def _compare_to_baseline(
160        self,
161        canary: CanaryMetrics,
162        baseline: CanaryMetrics,
163    ) -> bool:
164        """Compare canary metrics to baseline."""
165        # Allow 10% degradation
166        if canary.error_rate > baseline.error_rate * 1.1:
167            return False
168
169        if canary.latency_p99 > baseline.latency_p99 * 1.1:
170            return False
171
172        return True
173
174    async def _rollback(self) -> None:
175        """Rollback canary deployment."""
176        self._rollback_triggered = True
177        await self.traffic_manager.set_canary_weight(0)
178        print("Canary rollback completed")

CI/CD Pipelines

Continuous integration and deployment pipelines automate testing and release processes for agent systems.

GitHub Actions Workflow

📄yaml
1# .github/workflows/deploy.yml
2name: Deploy Agent System
3
4on:
5  push:
6    branches: [main]
7  pull_request:
8    branches: [main]
9
10env:
11  REGISTRY: ghcr.io
12  IMAGE_NAME: ${{ github.repository }}
13
14jobs:
15  test:
16    runs-on: ubuntu-latest
17    steps:
18      - uses: actions/checkout@v4
19
20      - name: Set up Python
21        uses: actions/setup-python@v5
22        with:
23          python-version: "3.11"
24          cache: "pip"
25
26      - name: Install dependencies
27        run: |
28          pip install -r requirements.txt
29          pip install -r requirements-dev.txt
30
31      - name: Run linting
32        run: |
33          ruff check src/
34          mypy src/
35
36      - name: Run unit tests
37        run: pytest tests/unit -v --cov=src --cov-report=xml
38
39      - name: Run integration tests
40        env:
41          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
42        run: pytest tests/integration -v
43
44      - name: Upload coverage
45        uses: codecov/codecov-action@v4
46        with:
47          files: ./coverage.xml
48
49  build:
50    needs: test
51    runs-on: ubuntu-latest
52    outputs:
53      image_tag: ${{ steps.meta.outputs.tags }}
54
55    steps:
56      - uses: actions/checkout@v4
57
58      - name: Set up Docker Buildx
59        uses: docker/setup-buildx-action@v3
60
61      - name: Log in to registry
62        uses: docker/login-action@v3
63        with:
64          registry: ${{ env.REGISTRY }}
65          username: ${{ github.actor }}
66          password: ${{ secrets.GITHUB_TOKEN }}
67
68      - name: Extract metadata
69        id: meta
70        uses: docker/metadata-action@v5
71        with:
72          images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
73          tags: |
74            type=sha,prefix=
75            type=ref,event=branch
76            type=semver,pattern={{version}}
77
78      - name: Build and push
79        uses: docker/build-push-action@v5
80        with:
81          context: .
82          push: ${{ github.event_name != 'pull_request' }}
83          tags: ${{ steps.meta.outputs.tags }}
84          labels: ${{ steps.meta.outputs.labels }}
85          cache-from: type=gha
86          cache-to: type=gha,mode=max
87
88  deploy-staging:
89    needs: build
90    if: github.ref == 'refs/heads/main'
91    runs-on: ubuntu-latest
92    environment: staging
93
94    steps:
95      - uses: actions/checkout@v4
96
97      - name: Set up kubectl
98        uses: azure/setup-kubectl@v3
99
100      - name: Configure kubectl
101        run: |
102          echo "${{ secrets.KUBE_CONFIG_STAGING }}" | base64 -d > kubeconfig
103          export KUBECONFIG=kubeconfig
104
105      - name: Deploy to staging
106        run: |
107          kubectl set image deployment/agent-api \
108            agent-api=${{ needs.build.outputs.image_tag }} \
109            --namespace=staging
110          kubectl rollout status deployment/agent-api \
111            --namespace=staging --timeout=300s
112
113      - name: Run smoke tests
114        env:
115          STAGING_URL: ${{ vars.STAGING_URL }}
116        run: |
117          python scripts/smoke_tests.py --url $STAGING_URL
118
119  deploy-production:
120    needs: [build, deploy-staging]
121    if: github.ref == 'refs/heads/main'
122    runs-on: ubuntu-latest
123    environment: production
124
125    steps:
126      - uses: actions/checkout@v4
127
128      - name: Set up kubectl
129        uses: azure/setup-kubectl@v3
130
131      - name: Configure kubectl
132        run: |
133          echo "${{ secrets.KUBE_CONFIG_PROD }}" | base64 -d > kubeconfig
134          export KUBECONFIG=kubeconfig
135
136      - name: Deploy canary
137        run: |
138          python scripts/canary_deploy.py \
139            --image ${{ needs.build.outputs.image_tag }} \
140            --namespace production \
141            --stages "5,25,50,100" \
142            --stage-duration 300
143
144      - name: Notify deployment
145        uses: slackapi/slack-github-action@v1
146        with:
147          payload: |
148            {
149              "text": "Deployed ${{ needs.build.outputs.image_tag }} to production"
150            }
151        env:
152          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

Environment Management

Managing multiple environments requires consistent configuration and secret management across development, staging, and production.

Environment Configuration Manager

🐍python
1"""Environment configuration management."""
2from dataclasses import dataclass
3from enum import Enum
4from typing import Any, Optional
5from pathlib import Path
6import os
7import json
8
9
10class EnvironmentType(Enum):
11    DEVELOPMENT = "development"
12    STAGING = "staging"
13    PRODUCTION = "production"
14
15
16@dataclass
17class EnvironmentConfig:
18    """Configuration for a specific environment."""
19    env_type: EnvironmentType
20    api_url: str
21    database_url: str
22    redis_url: str
23    log_level: str
24    debug: bool
25    feature_flags: dict[str, bool]
26    rate_limits: dict[str, int]
27    llm_config: dict[str, Any]
28
29
30class ConfigurationManager:
31    """Manages environment-specific configurations."""
32
33    def __init__(self):
34        self._configs: dict[EnvironmentType, EnvironmentConfig] = {}
35        self._secrets: dict[str, str] = {}
36        self._current_env: Optional[EnvironmentType] = None
37
38    def load_from_file(self, config_path: Path) -> None:
39        """Load configuration from file."""
40        with open(config_path) as f:
41            data = json.load(f)
42
43        for env_name, env_config in data.get("environments", {}).items():
44            env_type = EnvironmentType(env_name)
45            self._configs[env_type] = EnvironmentConfig(
46                env_type=env_type,
47                api_url=env_config["api_url"],
48                database_url=self._resolve_secret(
49                    env_config["database_url"]
50                ),
51                redis_url=self._resolve_secret(
52                    env_config["redis_url"]
53                ),
54                log_level=env_config.get("log_level", "INFO"),
55                debug=env_config.get("debug", False),
56                feature_flags=env_config.get("feature_flags", {}),
57                rate_limits=env_config.get("rate_limits", {}),
58                llm_config=env_config.get("llm_config", {}),
59            )
60
61    def _resolve_secret(self, value: str) -> str:
62        """Resolve secret references in config values."""
63        if value.startswith("$"):
64            env_var = value[2:-1] if value.startswith("${") else value[1:]
65            return os.environ.get(env_var, value)
66        return value
67
68    def get_config(
69        self,
70        env_type: Optional[EnvironmentType] = None,
71    ) -> EnvironmentConfig:
72        """Get configuration for specified or current environment."""
73        env = env_type or self._current_env
74        if env is None:
75            env = self._detect_environment()
76
77        if env not in self._configs:
78            raise ValueError(f"No configuration for {env}")
79
80        return self._configs[env]
81
82    def _detect_environment(self) -> EnvironmentType:
83        """Detect current environment from environment variables."""
84        env_name = os.environ.get("APP_ENV", "development")
85        return EnvironmentType(env_name)
86
87    def set_environment(self, env_type: EnvironmentType) -> None:
88        """Set the current environment."""
89        self._current_env = env_type
90
91
92# Example configuration file
93EXAMPLE_CONFIG = """
94{
95  "environments": {
96    "development": {
97      "api_url": "http://localhost:8000",
98      "database_url": "${DATABASE_URL}",
99      "redis_url": "redis://localhost:6379/0",
100      "log_level": "DEBUG",
101      "debug": true,
102      "feature_flags": {
103        "enable_new_agent": true,
104        "enable_streaming": true
105      },
106      "rate_limits": {
107        "requests_per_minute": 1000
108      },
109      "llm_config": {
110        "model": "gpt-4o-mini",
111        "temperature": 0.7,
112        "max_tokens": 4096
113      }
114    },
115    "staging": {
116      "api_url": "https://staging-api.example.com",
117      "database_url": "${STAGING_DATABASE_URL}",
118      "redis_url": "${STAGING_REDIS_URL}",
119      "log_level": "INFO",
120      "debug": false,
121      "feature_flags": {
122        "enable_new_agent": true,
123        "enable_streaming": true
124      },
125      "rate_limits": {
126        "requests_per_minute": 500
127      },
128      "llm_config": {
129        "model": "gpt-4o",
130        "temperature": 0.5,
131        "max_tokens": 8192
132      }
133    },
134    "production": {
135      "api_url": "https://api.example.com",
136      "database_url": "${PROD_DATABASE_URL}",
137      "redis_url": "${PROD_REDIS_URL}",
138      "log_level": "WARNING",
139      "debug": false,
140      "feature_flags": {
141        "enable_new_agent": false,
142        "enable_streaming": true
143      },
144      "rate_limits": {
145        "requests_per_minute": 100
146      },
147      "llm_config": {
148        "model": "gpt-4o",
149        "temperature": 0.3,
150        "max_tokens": 8192
151      }
152    }
153  }
154}
155"""

Feature Flags for Agents

Feature flags enable gradual rollout of new agent capabilities and quick disabling of problematic features without deployment.

Agent Feature Flag System

🐍python
1"""Feature flag system for agent capabilities."""
2from dataclasses import dataclass, field
3from datetime import datetime
4from enum import Enum
5from typing import Any, Optional, Callable
6import hashlib
7import json
8
9
10class RolloutStrategy(Enum):
11    ALL = "all"
12    NONE = "none"
13    PERCENTAGE = "percentage"
14    USER_LIST = "user_list"
15    GRADUAL = "gradual"
16
17
18@dataclass
19class FeatureFlag:
20    """Definition of a feature flag."""
21    name: str
22    description: str
23    enabled: bool
24    rollout_strategy: RolloutStrategy
25    rollout_percentage: int = 0
26    allowed_users: list[str] = field(default_factory=list)
27    metadata: dict[str, Any] = field(default_factory=dict)
28    created_at: datetime = field(default_factory=datetime.now)
29    updated_at: datetime = field(default_factory=datetime.now)
30
31
32class FeatureFlagService:
33    """Manages feature flags for agent system."""
34
35    def __init__(self, config_source: Optional[str] = None):
36        self._flags: dict[str, FeatureFlag] = {}
37        self._overrides: dict[str, dict[str, bool]] = {}
38        self._listeners: list[Callable[[str, bool], None]] = []
39
40        if config_source:
41            self._load_flags(config_source)
42
43    def _load_flags(self, source: str) -> None:
44        """Load flags from configuration source."""
45        with open(source) as f:
46            data = json.load(f)
47
48        for flag_data in data.get("flags", []):
49            flag = FeatureFlag(
50                name=flag_data["name"],
51                description=flag_data.get("description", ""),
52                enabled=flag_data.get("enabled", False),
53                rollout_strategy=RolloutStrategy(
54                    flag_data.get("rollout_strategy", "none")
55                ),
56                rollout_percentage=flag_data.get("rollout_percentage", 0),
57                allowed_users=flag_data.get("allowed_users", []),
58                metadata=flag_data.get("metadata", {}),
59            )
60            self._flags[flag.name] = flag
61
62    def is_enabled(
63        self,
64        flag_name: str,
65        user_id: Optional[str] = None,
66        context: Optional[dict[str, Any]] = None,
67    ) -> bool:
68        """Check if a feature flag is enabled."""
69        # Check for user-specific override
70        if user_id and user_id in self._overrides:
71            if flag_name in self._overrides[user_id]:
72                return self._overrides[user_id][flag_name]
73
74        flag = self._flags.get(flag_name)
75        if not flag:
76            return False
77
78        if not flag.enabled:
79            return False
80
81        return self._evaluate_rollout(flag, user_id, context)
82
83    def _evaluate_rollout(
84        self,
85        flag: FeatureFlag,
86        user_id: Optional[str],
87        context: Optional[dict[str, Any]],
88    ) -> bool:
89        """Evaluate rollout strategy."""
90        strategy = flag.rollout_strategy
91
92        if strategy == RolloutStrategy.ALL:
93            return True
94
95        if strategy == RolloutStrategy.NONE:
96            return False
97
98        if strategy == RolloutStrategy.USER_LIST:
99            return user_id in flag.allowed_users if user_id else False
100
101        if strategy == RolloutStrategy.PERCENTAGE:
102            if not user_id:
103                return False
104            # Consistent hashing for user
105            hash_input = f"{flag.name}:{user_id}"
106            hash_value = int(
107                hashlib.md5(hash_input.encode()).hexdigest(), 16
108            )
109            return (hash_value % 100) < flag.rollout_percentage
110
111        if strategy == RolloutStrategy.GRADUAL:
112            # Time-based gradual rollout
113            if not flag.metadata.get("rollout_start"):
114                return False
115
116            start = datetime.fromisoformat(flag.metadata["rollout_start"])
117            end = datetime.fromisoformat(flag.metadata["rollout_end"])
118            now = datetime.now()
119
120            if now < start:
121                return False
122            if now > end:
123                return True
124
125            # Calculate current percentage
126            total_duration = (end - start).total_seconds()
127            elapsed = (now - start).total_seconds()
128            current_pct = int((elapsed / total_duration) * 100)
129
130            if not user_id:
131                return False
132
133            hash_input = f"{flag.name}:{user_id}"
134            hash_value = int(
135                hashlib.md5(hash_input.encode()).hexdigest(), 16
136            )
137            return (hash_value % 100) < current_pct
138
139        return False
140
141    def set_override(
142        self,
143        user_id: str,
144        flag_name: str,
145        enabled: bool,
146    ) -> None:
147        """Set a user-specific override for a flag."""
148        if user_id not in self._overrides:
149            self._overrides[user_id] = {}
150        self._overrides[user_id][flag_name] = enabled
151
152    def update_flag(
153        self,
154        flag_name: str,
155        enabled: Optional[bool] = None,
156        rollout_percentage: Optional[int] = None,
157    ) -> None:
158        """Update a feature flag."""
159        if flag_name not in self._flags:
160            raise ValueError(f"Unknown flag: {flag_name}")
161
162        flag = self._flags[flag_name]
163
164        if enabled is not None:
165            flag.enabled = enabled
166
167        if rollout_percentage is not None:
168            flag.rollout_percentage = rollout_percentage
169
170        flag.updated_at = datetime.now()
171
172        # Notify listeners
173        for listener in self._listeners:
174            listener(flag_name, flag.enabled)
175
176
177# Usage with agent capabilities
178class AgentWithFeatureFlags:
179    """Agent that uses feature flags for capabilities."""
180
181    def __init__(self, feature_flags: FeatureFlagService):
182        self.flags = feature_flags
183
184    async def process(
185        self,
186        request: dict,
187        user_id: str,
188    ) -> dict:
189        """Process request with feature-flagged capabilities."""
190        response = {"result": None, "features_used": []}
191
192        # Check for streaming capability
193        if self.flags.is_enabled("enable_streaming", user_id):
194            response["streaming"] = True
195            response["features_used"].append("streaming")
196
197        # Check for new model
198        if self.flags.is_enabled("use_gpt4_turbo", user_id):
199            model = "gpt-4-turbo"
200            response["features_used"].append("gpt4_turbo")
201        else:
202            model = "gpt-4"
203
204        # Check for tool use
205        if self.flags.is_enabled("enable_tool_use", user_id):
206            tools = self._get_available_tools()
207            response["features_used"].append("tool_use")
208        else:
209            tools = []
210
211        # Process with selected features
212        result = await self._execute(request, model, tools)
213        response["result"] = result
214
215        return response

Rollback Strategies

Having robust rollback strategies is essential for maintaining system reliability when deployments encounter issues.

Automated Rollback System

🐍python
1"""Automated rollback system for agent deployments."""
2from dataclasses import dataclass, field
3from datetime import datetime, timedelta
4from enum import Enum
5from typing import Optional, Callable, Awaitable
6import asyncio
7
8
9class RollbackReason(Enum):
10    ERROR_RATE_HIGH = "error_rate_high"
11    LATENCY_DEGRADED = "latency_degraded"
12    HEALTH_CHECK_FAILED = "health_check_failed"
13    MANUAL = "manual"
14    CIRCUIT_BREAKER = "circuit_breaker"
15
16
17@dataclass
18class RollbackEvent:
19    """Record of a rollback event."""
20    timestamp: datetime
21    from_version: str
22    to_version: str
23    reason: RollbackReason
24    details: str
25    duration_seconds: float
26
27
28@dataclass
29class RollbackConfig:
30    """Configuration for automated rollback."""
31    # Error thresholds
32    max_error_rate: float = 0.05
33    error_rate_window_seconds: int = 60
34
35    # Latency thresholds
36    max_latency_p99_ms: float = 2000
37    latency_window_seconds: int = 60
38
39    # Health check settings
40    health_check_interval_seconds: int = 10
41    health_check_failures_threshold: int = 3
42
43    # Rollback behavior
44    cooldown_seconds: int = 300
45    max_rollbacks_per_hour: int = 3
46
47
48class RollbackManager:
49    """Manages automated rollbacks for deployments."""
50
51    def __init__(
52        self,
53        config: RollbackConfig,
54        deployment_manager: "DeploymentManager",
55        metrics_collector: "MetricsCollector",
56    ):
57        self.config = config
58        self.deployment_manager = deployment_manager
59        self.metrics_collector = metrics_collector
60        self._rollback_history: list[RollbackEvent] = []
61        self._last_rollback: Optional[datetime] = None
62        self._health_failures = 0
63        self._monitoring = False
64
65    async def start_monitoring(self) -> None:
66        """Start monitoring for rollback conditions."""
67        self._monitoring = True
68
69        while self._monitoring:
70            try:
71                await self._check_rollback_conditions()
72            except Exception as e:
73                print(f"Error checking rollback conditions: {e}")
74
75            await asyncio.sleep(
76                self.config.health_check_interval_seconds
77            )
78
79    def stop_monitoring(self) -> None:
80        """Stop monitoring."""
81        self._monitoring = False
82
83    async def _check_rollback_conditions(self) -> None:
84        """Check if rollback is needed."""
85        # Check error rate
86        error_rate = await self.metrics_collector.get_error_rate(
87            window_seconds=self.config.error_rate_window_seconds
88        )
89
90        if error_rate > self.config.max_error_rate:
91            await self._trigger_rollback(
92                RollbackReason.ERROR_RATE_HIGH,
93                f"Error rate {error_rate:.2%} exceeds threshold "
94                f"{self.config.max_error_rate:.2%}",
95            )
96            return
97
98        # Check latency
99        latency_p99 = await self.metrics_collector.get_latency_percentile(
100            percentile=99,
101            window_seconds=self.config.latency_window_seconds,
102        )
103
104        if latency_p99 > self.config.max_latency_p99_ms:
105            await self._trigger_rollback(
106                RollbackReason.LATENCY_DEGRADED,
107                f"P99 latency {latency_p99}ms exceeds threshold "
108                f"{self.config.max_latency_p99_ms}ms",
109            )
110            return
111
112        # Check health
113        healthy = await self.deployment_manager.health_check()
114
115        if not healthy:
116            self._health_failures += 1
117
118            if self._health_failures >= self.config.health_check_failures_threshold:
119                await self._trigger_rollback(
120                    RollbackReason.HEALTH_CHECK_FAILED,
121                    f"Health check failed {self._health_failures} times",
122                )
123                self._health_failures = 0
124        else:
125            self._health_failures = 0
126
127    async def _trigger_rollback(
128        self,
129        reason: RollbackReason,
130        details: str,
131    ) -> bool:
132        """Trigger a rollback."""
133        # Check cooldown
134        if self._last_rollback:
135            cooldown_end = self._last_rollback + timedelta(
136                seconds=self.config.cooldown_seconds
137            )
138            if datetime.now() < cooldown_end:
139                print(f"Rollback blocked: in cooldown until {cooldown_end}")
140                return False
141
142        # Check rate limit
143        recent_rollbacks = [
144            r for r in self._rollback_history
145            if r.timestamp > datetime.now() - timedelta(hours=1)
146        ]
147
148        if len(recent_rollbacks) >= self.config.max_rollbacks_per_hour:
149            print(
150                f"Rollback blocked: {len(recent_rollbacks)} rollbacks "
151                f"in last hour (max: {self.config.max_rollbacks_per_hour})"
152            )
153            return False
154
155        # Execute rollback
156        print(f"Triggering rollback: {reason.value} - {details}")
157
158        start_time = datetime.now()
159        current_version = await self.deployment_manager.get_current_version()
160        previous_version = await self.deployment_manager.get_previous_version()
161
162        success = await self.deployment_manager.rollback_to_version(
163            previous_version
164        )
165
166        duration = (datetime.now() - start_time).total_seconds()
167
168        if success:
169            event = RollbackEvent(
170                timestamp=datetime.now(),
171                from_version=current_version,
172                to_version=previous_version,
173                reason=reason,
174                details=details,
175                duration_seconds=duration,
176            )
177            self._rollback_history.append(event)
178            self._last_rollback = datetime.now()
179
180            # Send notifications
181            await self._notify_rollback(event)
182
183            print(f"Rollback completed in {duration:.2f}s")
184            return True
185
186        print("Rollback failed!")
187        return False
188
189    async def manual_rollback(
190        self,
191        target_version: Optional[str] = None,
192        reason: str = "Manual rollback",
193    ) -> bool:
194        """Perform manual rollback."""
195        if target_version is None:
196            target_version = await self.deployment_manager.get_previous_version()
197
198        return await self._trigger_rollback(
199            RollbackReason.MANUAL,
200            reason,
201        )
202
203    async def _notify_rollback(self, event: RollbackEvent) -> None:
204        """Send rollback notifications."""
205        # This would integrate with alerting systems
206        print(f"ROLLBACK NOTIFICATION: {event.reason.value}")
207        print(f"  From: {event.from_version}")
208        print(f"  To: {event.to_version}")
209        print(f"  Details: {event.details}")
210
211    def get_rollback_history(
212        self,
213        since: Optional[datetime] = None,
214    ) -> list[RollbackEvent]:
215        """Get rollback history."""
216        if since:
217            return [r for r in self._rollback_history if r.timestamp >= since]
218        return self._rollback_history.copy()

Database Migration Rollback

🐍python
1"""Database migration rollback support."""
2from dataclasses import dataclass
3from datetime import datetime
4from typing import Optional
5import asyncpg
6
7
8@dataclass
9class Migration:
10    """Database migration record."""
11    version: str
12    name: str
13    up_sql: str
14    down_sql: str
15    applied_at: Optional[datetime] = None
16
17
18class MigrationManager:
19    """Manages database migrations with rollback support."""
20
21    def __init__(self, database_url: str):
22        self.database_url = database_url
23        self._pool: Optional[asyncpg.Pool] = None
24
25    async def connect(self) -> None:
26        """Connect to database."""
27        self._pool = await asyncpg.create_pool(self.database_url)
28        await self._ensure_migrations_table()
29
30    async def _ensure_migrations_table(self) -> None:
31        """Create migrations tracking table."""
32        async with self._pool.acquire() as conn:
33            await conn.execute("""
34                CREATE TABLE IF NOT EXISTS _migrations (
35                    version VARCHAR(255) PRIMARY KEY,
36                    name VARCHAR(255) NOT NULL,
37                    applied_at TIMESTAMP DEFAULT NOW(),
38                    down_sql TEXT
39                )
40            """)
41
42    async def apply_migration(self, migration: Migration) -> bool:
43        """Apply a migration."""
44        async with self._pool.acquire() as conn:
45            async with conn.transaction():
46                try:
47                    # Execute up migration
48                    await conn.execute(migration.up_sql)
49
50                    # Record migration
51                    await conn.execute(
52                        """
53                        INSERT INTO _migrations (version, name, down_sql)
54                        VALUES ($1, $2, $3)
55                        """,
56                        migration.version,
57                        migration.name,
58                        migration.down_sql,
59                    )
60
61                    print(f"Applied migration: {migration.version}")
62                    return True
63
64                except Exception as e:
65                    print(f"Migration failed: {e}")
66                    raise
67
68    async def rollback_migration(self, version: str) -> bool:
69        """Rollback a specific migration."""
70        async with self._pool.acquire() as conn:
71            # Get migration info
72            row = await conn.fetchrow(
73                "SELECT down_sql FROM _migrations WHERE version = $1",
74                version,
75            )
76
77            if not row:
78                print(f"Migration {version} not found")
79                return False
80
81            async with conn.transaction():
82                try:
83                    # Execute down migration
84                    if row["down_sql"]:
85                        await conn.execute(row["down_sql"])
86
87                    # Remove migration record
88                    await conn.execute(
89                        "DELETE FROM _migrations WHERE version = $1",
90                        version,
91                    )
92
93                    print(f"Rolled back migration: {version}")
94                    return True
95
96                except Exception as e:
97                    print(f"Rollback failed: {e}")
98                    raise
99
100    async def rollback_to_version(self, target_version: str) -> bool:
101        """Rollback all migrations after target version."""
102        async with self._pool.acquire() as conn:
103            # Get migrations to rollback (in reverse order)
104            rows = await conn.fetch(
105                """
106                SELECT version, down_sql FROM _migrations
107                WHERE version > $1
108                ORDER BY version DESC
109                """,
110                target_version,
111            )
112
113            for row in rows:
114                success = await self.rollback_migration(row["version"])
115                if not success:
116                    return False
117
118            return True
119
120    async def get_current_version(self) -> Optional[str]:
121        """Get current migration version."""
122        async with self._pool.acquire() as conn:
123            row = await conn.fetchrow(
124                "SELECT version FROM _migrations ORDER BY version DESC LIMIT 1"
125            )
126            return row["version"] if row else None

Summary

This section covered comprehensive deployment strategies for AI agent systems:

  • Container-Based Deployment: Multi-stage Dockerfiles for optimized agent images with security best practices
  • Kubernetes Orchestration: Deployment manifests, autoscaling, and resource management for agent workloads
  • Deployment Patterns: Blue-green and canary strategies for safe, gradual rollouts with automated analysis
  • CI/CD Pipelines: Automated testing and deployment workflows using GitHub Actions
  • Environment Management: Configuration management across development, staging, and production
  • Feature Flags: Gradual rollout of agent capabilities with user targeting and percentage-based strategies
  • Rollback Strategies: Automated rollback with health monitoring and database migration support
Production Readiness: A well-designed deployment pipeline enables confidence in releases. When deployments are automated, tested, and reversible, teams can ship faster and more safely.

This concludes Chapter 21 on Production Deployment. The combination of containerization, orchestration, progressive deployment patterns, and robust rollback mechanisms creates a foundation for reliably operating AI agent systems at scale.