Chapter 21
15 min read
Section 133 of 175

API Design for Agent Services

Production Deployment

Introduction

Well-designed APIs are crucial for exposing agent capabilities to clients. Agent APIs have unique requirements including support for streaming responses, long-running operations, and complex input/output schemas. This section covers best practices for designing APIs that are intuitive, performant, and maintainable.

Learning Objectives: By the end of this section, you will understand how to design RESTful APIs for agent services, implement streaming for real-time token delivery, handle long-running async operations, manage API versioning, and create comprehensive documentation.

Agent APIs differ from traditional APIs in several ways. Responses may stream incrementally, operations can take minutes to complete, and clients often need progress updates. Good API design accommodates these requirements while remaining consistent with REST principles.


REST API Design

RESTful API design provides a consistent, predictable interface for agent operations. Following REST conventions makes APIs easier to learn and integrate with existing systems.

Agent API Structure

🐍python
1from fastapi import FastAPI, HTTPException, Depends, Request
2from fastapi.responses import JSONResponse
3from pydantic import BaseModel, Field
4from typing import Any, Optional
5from datetime import datetime
6from enum import Enum
7import uuid
8
9
10# Models
11class TaskStatus(str, Enum):
12    """Status of an agent task."""
13    PENDING = "pending"
14    RUNNING = "running"
15    COMPLETED = "completed"
16    FAILED = "failed"
17    CANCELLED = "cancelled"
18
19
20class AgentRequest(BaseModel):
21    """Request to execute an agent task."""
22    task: str = Field(..., description="The task to perform")
23    context: Optional[dict[str, Any]] = Field(
24        default=None,
25        description="Additional context for the task"
26    )
27    session_id: Optional[str] = Field(
28        default=None,
29        description="Session ID for conversation continuity"
30    )
31    max_tokens: Optional[int] = Field(
32        default=4096,
33        description="Maximum tokens in response"
34    )
35    stream: bool = Field(
36        default=False,
37        description="Whether to stream the response"
38    )
39
40    class Config:
41        json_schema_extra = {
42            "example": {
43                "task": "Summarize the key points from this document",
44                "context": {"document_url": "https://example.com/doc.pdf"},
45                "session_id": "sess_123",
46                "max_tokens": 1000,
47                "stream": False
48            }
49        }
50
51
52class AgentResponse(BaseModel):
53    """Response from an agent task."""
54    task_id: str = Field(..., description="Unique task identifier")
55    status: TaskStatus = Field(..., description="Current task status")
56    result: Optional[str] = Field(
57        default=None,
58        description="Task result when completed"
59    )
60    metadata: dict[str, Any] = Field(
61        default_factory=dict,
62        description="Additional metadata"
63    )
64    created_at: datetime = Field(..., description="When task was created")
65    completed_at: Optional[datetime] = Field(
66        default=None,
67        description="When task completed"
68    )
69    usage: Optional[dict[str, int]] = Field(
70        default=None,
71        description="Token usage statistics"
72    )
73
74
75class ErrorResponse(BaseModel):
76    """Standard error response."""
77    error: str = Field(..., description="Error type")
78    message: str = Field(..., description="Human-readable error message")
79    details: Optional[dict[str, Any]] = Field(
80        default=None,
81        description="Additional error details"
82    )
83    request_id: str = Field(..., description="Request ID for debugging")
84
85
86# API Implementation
87app = FastAPI(
88    title="Agent API",
89    description="API for interacting with AI agents",
90    version="1.0.0"
91)
92
93
94@app.exception_handler(Exception)
95async def global_exception_handler(request: Request, exc: Exception):
96    """Handle all unhandled exceptions."""
97    request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
98
99    return JSONResponse(
100        status_code=500,
101        content=ErrorResponse(
102            error="internal_error",
103            message="An unexpected error occurred",
104            request_id=request_id
105        ).model_dump()
106    )
107
108
109@app.post(
110    "/v1/agents/{agent_id}/tasks",
111    response_model=AgentResponse,
112    status_code=201,
113    responses={
114        201: {"description": "Task created successfully"},
115        400: {"model": ErrorResponse, "description": "Invalid request"},
116        404: {"model": ErrorResponse, "description": "Agent not found"},
117        429: {"model": ErrorResponse, "description": "Rate limit exceeded"}
118    }
119)
120async def create_task(
121    agent_id: str,
122    request: AgentRequest,
123    http_request: Request
124):
125    """Create a new agent task."""
126    request_id = http_request.headers.get("X-Request-ID", str(uuid.uuid4()))
127
128    # Validate agent exists
129    agent = await get_agent(agent_id)
130    if not agent:
131        raise HTTPException(
132            status_code=404,
133            detail=ErrorResponse(
134                error="not_found",
135                message=f"Agent {agent_id} not found",
136                request_id=request_id
137            ).model_dump()
138        )
139
140    # Create task
141    task_id = str(uuid.uuid4())
142    task = await create_agent_task(
143        agent_id=agent_id,
144        task_id=task_id,
145        request=request
146    )
147
148    return AgentResponse(
149        task_id=task_id,
150        status=TaskStatus.PENDING,
151        created_at=datetime.utcnow(),
152        metadata={"agent_id": agent_id, "request_id": request_id}
153    )
154
155
156@app.get(
157    "/v1/agents/{agent_id}/tasks/{task_id}",
158    response_model=AgentResponse
159)
160async def get_task(agent_id: str, task_id: str):
161    """Get task status and result."""
162    task = await get_agent_task(agent_id, task_id)
163
164    if not task:
165        raise HTTPException(status_code=404, detail="Task not found")
166
167    return task
168
169
170@app.delete("/v1/agents/{agent_id}/tasks/{task_id}")
171async def cancel_task(agent_id: str, task_id: str):
172    """Cancel a running task."""
173    success = await cancel_agent_task(agent_id, task_id)
174
175    if not success:
176        raise HTTPException(status_code=404, detail="Task not found")
177
178    return {"status": "cancelled", "task_id": task_id}
179
180
181@app.get("/v1/agents/{agent_id}/tasks")
182async def list_tasks(
183    agent_id: str,
184    status: Optional[TaskStatus] = None,
185    limit: int = 20,
186    offset: int = 0
187):
188    """List tasks for an agent."""
189    tasks = await list_agent_tasks(
190        agent_id=agent_id,
191        status=status,
192        limit=limit,
193        offset=offset
194    )
195
196    return {
197        "tasks": tasks,
198        "total": len(tasks),
199        "limit": limit,
200        "offset": offset
201    }
202
203
204# Placeholder implementations
205async def get_agent(agent_id: str):
206    return {"id": agent_id}
207
208async def create_agent_task(agent_id: str, task_id: str, request: AgentRequest):
209    return {"task_id": task_id}
210
211async def get_agent_task(agent_id: str, task_id: str):
212    return None
213
214async def cancel_agent_task(agent_id: str, task_id: str):
215    return True
216
217async def list_agent_tasks(agent_id: str, status, limit: int, offset: int):
218    return []

The REST API follows standard conventions: resources are nouns, HTTP methods indicate actions, and responses use consistent schemas. Error responses include request IDs for debugging and follow a predictable structure.

EndpointMethodPurpose
/v1/agents/{id}/tasksPOSTCreate new task
/v1/agents/{id}/tasks/{task_id}GETGet task status
/v1/agents/{id}/tasks/{task_id}DELETECancel task
/v1/agents/{id}/tasksGETList tasks
/v1/agents/{id}/sessionsPOSTCreate session
/v1/agents/{id}/sessions/{id}GETGet session

Streaming APIs

Streaming enables real-time token delivery as the agent generates responses. This improves perceived latency and enables progress monitoring for long operations.

Server-Sent Events Streaming

🐍python
1from fastapi import FastAPI
2from fastapi.responses import StreamingResponse
3from pydantic import BaseModel
4from typing import AsyncIterator
5import json
6import asyncio
7
8
9class StreamEvent(BaseModel):
10    """Event in a streaming response."""
11    event: str
12    data: dict
13
14
15class StreamChunk(BaseModel):
16    """A chunk of streamed content."""
17    content: str
18    finish_reason: str | None = None
19    usage: dict[str, int] | None = None
20
21
22async def format_sse(event: StreamEvent) -> str:
23    """Format event for SSE."""
24    data = json.dumps(event.data)
25    return f"event: {event.event}\ndata: {data}\n\n"
26
27
28async def stream_agent_response(
29    agent_id: str,
30    task_id: str,
31    request: AgentRequest
32) -> AsyncIterator[str]:
33    """Stream agent response as SSE events."""
34
35    # Send start event
36    yield await format_sse(StreamEvent(
37        event="start",
38        data={"task_id": task_id, "agent_id": agent_id}
39    ))
40
41    # Stream content chunks
42    content_buffer = ""
43    async for chunk in generate_agent_response(agent_id, request):
44        content_buffer += chunk.content
45
46        yield await format_sse(StreamEvent(
47            event="content",
48            data={
49                "content": chunk.content,
50                "finish_reason": chunk.finish_reason
51            }
52        ))
53
54        if chunk.finish_reason:
55            break
56
57    # Send completion event
58    yield await format_sse(StreamEvent(
59        event="done",
60        data={
61            "task_id": task_id,
62            "content": content_buffer,
63            "usage": {"total_tokens": len(content_buffer.split())}
64        }
65    ))
66
67
68@app.post("/v1/agents/{agent_id}/tasks/stream")
69async def stream_task(
70    agent_id: str,
71    request: AgentRequest
72):
73    """Create streaming task."""
74    task_id = str(uuid.uuid4())
75
76    return StreamingResponse(
77        stream_agent_response(agent_id, task_id, request),
78        media_type="text/event-stream",
79        headers={
80            "Cache-Control": "no-cache",
81            "Connection": "keep-alive",
82            "X-Task-ID": task_id
83        }
84    )
85
86
87async def generate_agent_response(
88    agent_id: str,
89    request: AgentRequest
90) -> AsyncIterator[StreamChunk]:
91    """Generate agent response chunks."""
92    # Simulate streaming response
93    words = request.task.split()
94
95    for i, word in enumerate(words):
96        await asyncio.sleep(0.1)  # Simulate generation delay
97
98        is_last = i == len(words) - 1
99        yield StreamChunk(
100            content=word + " ",
101            finish_reason="stop" if is_last else None
102        )
103
104
105# WebSocket streaming alternative
106from fastapi import WebSocket, WebSocketDisconnect
107
108
109@app.websocket("/v1/agents/{agent_id}/ws")
110async def websocket_stream(
111    websocket: WebSocket,
112    agent_id: str
113):
114    """WebSocket endpoint for bidirectional streaming."""
115    await websocket.accept()
116
117    try:
118        while True:
119            # Receive request
120            data = await websocket.receive_json()
121            request = AgentRequest(**data)
122
123            # Stream response
124            async for chunk in generate_agent_response(agent_id, request):
125                await websocket.send_json({
126                    "type": "content",
127                    "content": chunk.content,
128                    "finish_reason": chunk.finish_reason
129                })
130
131                if chunk.finish_reason:
132                    break
133
134            await websocket.send_json({"type": "done"})
135
136    except WebSocketDisconnect:
137        pass
138
139
140# Client-side usage example
141CLIENT_EXAMPLE = '''
142import httpx
143
144async def consume_stream():
145    async with httpx.AsyncClient() as client:
146        async with client.stream(
147            "POST",
148            "http://api.example.com/v1/agents/agent-1/tasks/stream",
149            json={"task": "Explain quantum computing"}
150        ) as response:
151            async for line in response.aiter_lines():
152                if line.startswith("data: "):
153                    data = json.loads(line[6:])
154                    if data.get("content"):
155                        print(data["content"], end="", flush=True)
156'''

Server-Sent Events (SSE) provide a simple streaming protocol over HTTP. WebSockets offer bidirectional communication for more complex interactions. Both approaches deliver tokens in real-time as they're generated.


Async Operations

Long-running agent tasks require async processing patterns. Clients submit requests and poll for results, or receive webhook notifications when tasks complete.

Async Task Processing

🐍python
1from fastapi import FastAPI, BackgroundTasks, HTTPException
2from pydantic import BaseModel, HttpUrl
3from typing import Optional
4import asyncio
5import httpx
6
7
8class AsyncTaskRequest(BaseModel):
9    """Request for async task processing."""
10    task: str
11    callback_url: Optional[HttpUrl] = None
12    webhook_secret: Optional[str] = None
13    priority: int = 0
14
15
16class AsyncTaskResponse(BaseModel):
17    """Response when task is queued."""
18    task_id: str
19    status: str
20    poll_url: str
21    estimated_completion_seconds: Optional[int] = None
22
23
24class TaskResult(BaseModel):
25    """Result of completed task."""
26    task_id: str
27    status: str
28    result: Optional[str] = None
29    error: Optional[str] = None
30    duration_ms: int
31    usage: dict[str, int]
32
33
34class AsyncTaskManager:
35    """Manages async task lifecycle."""
36
37    def __init__(self):
38        self._tasks: dict[str, dict] = {}
39        self._results: dict[str, TaskResult] = {}
40
41    async def submit(
42        self,
43        task_id: str,
44        request: AsyncTaskRequest,
45        background_tasks: BackgroundTasks
46    ) -> AsyncTaskResponse:
47        """Submit task for async processing."""
48        self._tasks[task_id] = {
49            "request": request,
50            "status": "pending",
51            "created_at": asyncio.get_event_loop().time()
52        }
53
54        # Queue background processing
55        background_tasks.add_task(
56            self._process_task,
57            task_id,
58            request
59        )
60
61        return AsyncTaskResponse(
62            task_id=task_id,
63            status="pending",
64            poll_url=f"/v1/tasks/{task_id}",
65            estimated_completion_seconds=30
66        )
67
68    async def _process_task(
69        self,
70        task_id: str,
71        request: AsyncTaskRequest
72    ) -> None:
73        """Process task in background."""
74        import time
75
76        start_time = time.time()
77        self._tasks[task_id]["status"] = "running"
78
79        try:
80            # Execute agent task
81            result = await self._execute_agent_task(request)
82
83            duration_ms = int((time.time() - start_time) * 1000)
84
85            self._results[task_id] = TaskResult(
86                task_id=task_id,
87                status="completed",
88                result=result,
89                duration_ms=duration_ms,
90                usage={"total_tokens": len(result.split())}
91            )
92            self._tasks[task_id]["status"] = "completed"
93
94            # Send webhook if configured
95            if request.callback_url:
96                await self._send_webhook(
97                    request.callback_url,
98                    request.webhook_secret,
99                    self._results[task_id]
100                )
101
102        except Exception as e:
103            duration_ms = int((time.time() - start_time) * 1000)
104
105            self._results[task_id] = TaskResult(
106                task_id=task_id,
107                status="failed",
108                error=str(e),
109                duration_ms=duration_ms,
110                usage={}
111            )
112            self._tasks[task_id]["status"] = "failed"
113
114    async def _execute_agent_task(self, request: AsyncTaskRequest) -> str:
115        """Execute the agent task."""
116        # Simulate processing
117        await asyncio.sleep(2)
118        return f"Completed task: {request.task}"
119
120    async def _send_webhook(
121        self,
122        url: str,
123        secret: Optional[str],
124        result: TaskResult
125    ) -> None:
126        """Send webhook notification."""
127        import hmac
128        import hashlib
129
130        payload = result.model_dump_json()
131        headers = {"Content-Type": "application/json"}
132
133        if secret:
134            signature = hmac.new(
135                secret.encode(),
136                payload.encode(),
137                hashlib.sha256
138            ).hexdigest()
139            headers["X-Webhook-Signature"] = signature
140
141        try:
142            async with httpx.AsyncClient() as client:
143                await client.post(
144                    str(url),
145                    content=payload,
146                    headers=headers,
147                    timeout=10.0
148                )
149        except Exception:
150            # Log webhook failure but dont fail task
151            pass
152
153    def get_status(self, task_id: str) -> Optional[dict]:
154        """Get task status."""
155        if task_id in self._results:
156            return self._results[task_id].model_dump()
157
158        if task_id in self._tasks:
159            return {
160                "task_id": task_id,
161                "status": self._tasks[task_id]["status"]
162            }
163
164        return None
165
166
167# API endpoints
168task_manager = AsyncTaskManager()
169
170
171@app.post("/v1/tasks/async", response_model=AsyncTaskResponse)
172async def create_async_task(
173    request: AsyncTaskRequest,
174    background_tasks: BackgroundTasks
175):
176    """Create an async task."""
177    task_id = str(uuid.uuid4())
178
179    return await task_manager.submit(
180        task_id,
181        request,
182        background_tasks
183    )
184
185
186@app.get("/v1/tasks/{task_id}")
187async def get_task_status(task_id: str):
188    """Poll for task status."""
189    status = task_manager.get_status(task_id)
190
191    if not status:
192        raise HTTPException(status_code=404, detail="Task not found")
193
194    # Include retry-after header if still processing
195    if status["status"] in ("pending", "running"):
196        return JSONResponse(
197            content=status,
198            headers={"Retry-After": "5"}
199        )
200
201    return status
202
203
204@app.post("/v1/tasks/{task_id}/wait")
205async def wait_for_task(
206    task_id: str,
207    timeout_seconds: int = 30
208):
209    """Long-poll for task completion."""
210    deadline = asyncio.get_event_loop().time() + timeout_seconds
211
212    while asyncio.get_event_loop().time() < deadline:
213        status = task_manager.get_status(task_id)
214
215        if not status:
216            raise HTTPException(status_code=404, detail="Task not found")
217
218        if status["status"] in ("completed", "failed"):
219            return status
220
221        await asyncio.sleep(1)
222
223    return {
224        "task_id": task_id,
225        "status": "timeout",
226        "message": "Task still processing"
227    }

The async pattern supports both polling and webhooks. Long-polling provides a middle ground, allowing clients to wait for completion without constant polling. Webhooks enable push notifications for server-to-server integrations.


API Versioning

API versioning ensures backward compatibility as the API evolves. Multiple versioning strategies exist, each with trade-offs for maintenance and usability.

Version Management

🐍python
1from fastapi import FastAPI, APIRouter, Request, Depends
2from fastapi.routing import APIRoute
3from typing import Callable, Optional
4from enum import Enum
5
6
7class APIVersion(str, Enum):
8    """Supported API versions."""
9    V1 = "v1"
10    V2 = "v2"
11
12
13class VersionedRoute(APIRoute):
14    """Custom route with version detection."""
15
16    def get_route_handler(self) -> Callable:
17        original_handler = super().get_route_handler()
18
19        async def versioned_handler(request: Request):
20            # Extract version from path or header
21            version = self._extract_version(request)
22            request.state.api_version = version
23            return await original_handler(request)
24
25        return versioned_handler
26
27    def _extract_version(self, request: Request) -> str:
28        # Check path prefix
29        path = request.url.path
30        for version in APIVersion:
31            if path.startswith(f"/{version.value}/"):
32                return version.value
33
34        # Check header
35        header_version = request.headers.get("X-API-Version", "v1")
36        return header_version
37
38
39def get_api_version(request: Request) -> str:
40    """Dependency to get current API version."""
41    return getattr(request.state, "api_version", "v1")
42
43
44# Version-specific routers
45v1_router = APIRouter(prefix="/v1", tags=["v1"])
46v2_router = APIRouter(prefix="/v2", tags=["v2"])
47
48
49# V1 endpoints
50@v1_router.post("/agents/{agent_id}/complete")
51async def v1_complete(agent_id: str, request: AgentRequest):
52    """V1 completion endpoint."""
53    return {"version": "v1", "result": "V1 response"}
54
55
56# V2 endpoints with breaking changes
57class V2AgentRequest(BaseModel):
58    """V2 request with different schema."""
59    messages: list[dict[str, str]]
60    model: str = "default"
61    max_tokens: int = 4096
62    temperature: float = 0.7
63
64
65@v2_router.post("/agents/{agent_id}/complete")
66async def v2_complete(agent_id: str, request: V2AgentRequest):
67    """V2 completion endpoint with messages format."""
68    return {"version": "v2", "result": "V2 response"}
69
70
71# Version adapter for backward compatibility
72class VersionAdapter:
73    """Adapts requests between API versions."""
74
75    @staticmethod
76    def v1_to_v2(v1_request: AgentRequest) -> V2AgentRequest:
77        """Convert V1 request to V2 format."""
78        return V2AgentRequest(
79            messages=[{"role": "user", "content": v1_request.task}],
80            max_tokens=v1_request.max_tokens or 4096
81        )
82
83    @staticmethod
84    def v2_to_v1_response(v2_response: dict) -> dict:
85        """Convert V2 response to V1 format."""
86        return {
87            "result": v2_response.get("choices", [{}])[0].get("message", {}).get("content", ""),
88            "usage": v2_response.get("usage", {})
89        }
90
91
92# Deprecation handling
93from fastapi import Response
94import warnings
95from datetime import date
96
97
98def deprecation_warning(
99    version: str,
100    sunset_date: date,
101    replacement: str
102):
103    """Add deprecation headers to response."""
104    def decorator(func):
105        async def wrapper(*args, **kwargs):
106            response = await func(*args, **kwargs)
107
108            if isinstance(response, Response):
109                response.headers["Deprecation"] = "true"
110                response.headers["Sunset"] = sunset_date.isoformat()
111                response.headers["Link"] = f'<{replacement}>; rel="successor-version"'
112                return response
113
114            return response
115
116        return wrapper
117    return decorator
118
119
120@v1_router.post("/agents/{agent_id}/chat")
121@deprecation_warning(
122    version="v1",
123    sunset_date=date(2025, 6, 1),
124    replacement="/v2/agents/{agent_id}/complete"
125)
126async def v1_chat_deprecated(agent_id: str, request: AgentRequest):
127    """Deprecated V1 chat endpoint."""
128    return {"version": "v1", "deprecated": True}
129
130
131# Mount routers
132app.include_router(v1_router)
133app.include_router(v2_router)

URL path versioning provides clear version identification. Version adapters enable backward compatibility by translating between formats. Deprecation headers inform clients about upcoming changes.

StrategyExampleProsCons
URL Path/v1/agentsClear, cacheableURL changes
HeaderX-API-Version: 2Clean URLsLess visible
Query Param?version=2Easy to testClutters URLs
Content Typeapplication/vnd.api.v2+jsonRESTfulComplex

Documentation and SDKs

Comprehensive documentation and SDKs improve developer experience and reduce integration time. OpenAPI specifications enable automatic documentation and client generation.

OpenAPI Documentation

🐍python
1from fastapi import FastAPI
2from fastapi.openapi.utils import get_openapi
3from pydantic import BaseModel, Field
4from typing import Any
5
6
7def custom_openapi():
8    """Generate custom OpenAPI schema."""
9    if app.openapi_schema:
10        return app.openapi_schema
11
12    openapi_schema = get_openapi(
13        title="Agent API",
14        version="1.0.0",
15        description="""
16## Agent API
17
18This API provides access to AI agent capabilities.
19
20### Authentication
21
22All requests require an API key in the Authorization header:
23
24```
25Authorization: Bearer your-api-key
26```
27
28### Rate Limits
29
30- 100 requests per minute for standard tier
31- 1000 requests per minute for enterprise tier
32
33### Streaming
34
35For real-time responses, use the streaming endpoints.
36Responses are delivered as Server-Sent Events.
37
38### Error Handling
39
40All errors follow a consistent format with error codes
41and human-readable messages.
42        """,
43        routes=app.routes,
44    )
45
46    # Add security schemes
47    openapi_schema["components"]["securitySchemes"] = {
48        "bearerAuth": {
49            "type": "http",
50            "scheme": "bearer",
51            "bearerFormat": "API Key"
52        }
53    }
54
55    # Add global security requirement
56    openapi_schema["security"] = [{"bearerAuth": []}]
57
58    # Add servers
59    openapi_schema["servers"] = [
60        {"url": "https://api.example.com", "description": "Production"},
61        {"url": "https://staging-api.example.com", "description": "Staging"}
62    ]
63
64    # Add examples
65    for path in openapi_schema["paths"].values():
66        for operation in path.values():
67            if isinstance(operation, dict) and "requestBody" in operation:
68                add_examples(operation)
69
70    app.openapi_schema = openapi_schema
71    return app.openapi_schema
72
73
74def add_examples(operation: dict) -> None:
75    """Add examples to operation."""
76    content = operation.get("requestBody", {}).get("content", {})
77
78    if "application/json" in content:
79        content["application/json"]["examples"] = {
80            "simple_task": {
81                "summary": "Simple task",
82                "value": {
83                    "task": "What is the capital of France?",
84                    "max_tokens": 100
85                }
86            },
87            "with_context": {
88                "summary": "Task with context",
89                "value": {
90                    "task": "Summarize this document",
91                    "context": {"document_id": "doc-123"},
92                    "max_tokens": 500
93                }
94            }
95        }
96
97
98app.openapi = custom_openapi
99
100
101# SDK generation helper
102SDK_TEMPLATE = '''
103# Generated Python SDK
104
105import httpx
106from typing import Optional, AsyncIterator
107from dataclasses import dataclass
108
109
110@dataclass
111class AgentSDK:
112    """Python SDK for Agent API."""
113
114    base_url: str
115    api_key: str
116    timeout: float = 30.0
117
118    @property
119    def _headers(self) -> dict:
120        return {
121            "Authorization": f"Bearer {self.api_key}",
122            "Content-Type": "application/json"
123        }
124
125    async def complete(
126        self,
127        agent_id: str,
128        task: str,
129        max_tokens: int = 4096,
130        stream: bool = False
131    ) -> dict:
132        """Complete a task with an agent."""
133        async with httpx.AsyncClient() as client:
134            response = await client.post(
135                f"{self.base_url}/v1/agents/{agent_id}/tasks",
136                headers=self._headers,
137                json={
138                    "task": task,
139                    "max_tokens": max_tokens,
140                    "stream": stream
141                },
142                timeout=self.timeout
143            )
144            response.raise_for_status()
145            return response.json()
146
147    async def stream_complete(
148        self,
149        agent_id: str,
150        task: str
151    ) -> AsyncIterator[str]:
152        """Stream completion from an agent."""
153        async with httpx.AsyncClient() as client:
154            async with client.stream(
155                "POST",
156                f"{self.base_url}/v1/agents/{agent_id}/tasks/stream",
157                headers=self._headers,
158                json={"task": task}
159            ) as response:
160                async for line in response.aiter_lines():
161                    if line.startswith("data: "):
162                        import json
163                        data = json.loads(line[6:])
164                        if content := data.get("content"):
165                            yield content
166
167
168# Usage
169async def main():
170    sdk = AgentSDK(
171        base_url="https://api.example.com",
172        api_key="your-api-key"
173    )
174
175    # Synchronous completion
176    result = await sdk.complete("agent-1", "What is Python?")
177    print(result)
178
179    # Streaming completion
180    async for chunk in sdk.stream_complete("agent-1", "Explain AI"):
181        print(chunk, end="", flush=True)
182'''
183
184
185@app.get("/sdk/python")
186async def get_python_sdk():
187    """Download Python SDK."""
188    return {"sdk": SDK_TEMPLATE}

OpenAPI documentation is automatically generated from route definitions and enhanced with custom descriptions, examples, and security schemes. SDKs can be generated from the OpenAPI spec for multiple languages.


Summary

Well-designed APIs make agent capabilities accessible and easy to integrate. Following REST conventions, supporting streaming, handling async operations, versioning properly, and providing good documentation creates a developer-friendly experience.

Key Takeaways

  • REST Conventions - Use consistent resource naming, HTTP methods, and response formats
  • Streaming Support - Implement SSE or WebSockets for real-time token delivery
  • Async Operations - Support polling and webhooks for long-running tasks
  • API Versioning - Use URL path versioning with deprecation headers
  • Documentation - Provide OpenAPI specs with examples and generated SDKs
Next Steps: The final section covers deployment strategies for taking agent systems to production, including blue-green deployments, canary releases, and infrastructure as code.