Introduction
Well-designed APIs are crucial for exposing agent capabilities to clients. Agent APIs have unique requirements including support for streaming responses, long-running operations, and complex input/output schemas. This section covers best practices for designing APIs that are intuitive, performant, and maintainable.
Learning Objectives: By the end of this section, you will understand how to design RESTful APIs for agent services, implement streaming for real-time token delivery, handle long-running async operations, manage API versioning, and create comprehensive documentation.
Agent APIs differ from traditional APIs in several ways. Responses may stream incrementally, operations can take minutes to complete, and clients often need progress updates. Good API design accommodates these requirements while remaining consistent with REST principles.
REST API Design
RESTful API design provides a consistent, predictable interface for agent operations. Following REST conventions makes APIs easier to learn and integrate with existing systems.
Agent API Structure
1from fastapi import FastAPI, HTTPException, Depends, Request
2from fastapi.responses import JSONResponse
3from pydantic import BaseModel, Field
4from typing import Any, Optional
5from datetime import datetime
6from enum import Enum
7import uuid
8
9
10# Models
11class TaskStatus(str, Enum):
12 """Status of an agent task."""
13 PENDING = "pending"
14 RUNNING = "running"
15 COMPLETED = "completed"
16 FAILED = "failed"
17 CANCELLED = "cancelled"
18
19
20class AgentRequest(BaseModel):
21 """Request to execute an agent task."""
22 task: str = Field(..., description="The task to perform")
23 context: Optional[dict[str, Any]] = Field(
24 default=None,
25 description="Additional context for the task"
26 )
27 session_id: Optional[str] = Field(
28 default=None,
29 description="Session ID for conversation continuity"
30 )
31 max_tokens: Optional[int] = Field(
32 default=4096,
33 description="Maximum tokens in response"
34 )
35 stream: bool = Field(
36 default=False,
37 description="Whether to stream the response"
38 )
39
40 class Config:
41 json_schema_extra = {
42 "example": {
43 "task": "Summarize the key points from this document",
44 "context": {"document_url": "https://example.com/doc.pdf"},
45 "session_id": "sess_123",
46 "max_tokens": 1000,
47 "stream": False
48 }
49 }
50
51
52class AgentResponse(BaseModel):
53 """Response from an agent task."""
54 task_id: str = Field(..., description="Unique task identifier")
55 status: TaskStatus = Field(..., description="Current task status")
56 result: Optional[str] = Field(
57 default=None,
58 description="Task result when completed"
59 )
60 metadata: dict[str, Any] = Field(
61 default_factory=dict,
62 description="Additional metadata"
63 )
64 created_at: datetime = Field(..., description="When task was created")
65 completed_at: Optional[datetime] = Field(
66 default=None,
67 description="When task completed"
68 )
69 usage: Optional[dict[str, int]] = Field(
70 default=None,
71 description="Token usage statistics"
72 )
73
74
75class ErrorResponse(BaseModel):
76 """Standard error response."""
77 error: str = Field(..., description="Error type")
78 message: str = Field(..., description="Human-readable error message")
79 details: Optional[dict[str, Any]] = Field(
80 default=None,
81 description="Additional error details"
82 )
83 request_id: str = Field(..., description="Request ID for debugging")
84
85
86# API Implementation
87app = FastAPI(
88 title="Agent API",
89 description="API for interacting with AI agents",
90 version="1.0.0"
91)
92
93
94@app.exception_handler(Exception)
95async def global_exception_handler(request: Request, exc: Exception):
96 """Handle all unhandled exceptions."""
97 request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
98
99 return JSONResponse(
100 status_code=500,
101 content=ErrorResponse(
102 error="internal_error",
103 message="An unexpected error occurred",
104 request_id=request_id
105 ).model_dump()
106 )
107
108
109@app.post(
110 "/v1/agents/{agent_id}/tasks",
111 response_model=AgentResponse,
112 status_code=201,
113 responses={
114 201: {"description": "Task created successfully"},
115 400: {"model": ErrorResponse, "description": "Invalid request"},
116 404: {"model": ErrorResponse, "description": "Agent not found"},
117 429: {"model": ErrorResponse, "description": "Rate limit exceeded"}
118 }
119)
120async def create_task(
121 agent_id: str,
122 request: AgentRequest,
123 http_request: Request
124):
125 """Create a new agent task."""
126 request_id = http_request.headers.get("X-Request-ID", str(uuid.uuid4()))
127
128 # Validate agent exists
129 agent = await get_agent(agent_id)
130 if not agent:
131 raise HTTPException(
132 status_code=404,
133 detail=ErrorResponse(
134 error="not_found",
135 message=f"Agent {agent_id} not found",
136 request_id=request_id
137 ).model_dump()
138 )
139
140 # Create task
141 task_id = str(uuid.uuid4())
142 task = await create_agent_task(
143 agent_id=agent_id,
144 task_id=task_id,
145 request=request
146 )
147
148 return AgentResponse(
149 task_id=task_id,
150 status=TaskStatus.PENDING,
151 created_at=datetime.utcnow(),
152 metadata={"agent_id": agent_id, "request_id": request_id}
153 )
154
155
156@app.get(
157 "/v1/agents/{agent_id}/tasks/{task_id}",
158 response_model=AgentResponse
159)
160async def get_task(agent_id: str, task_id: str):
161 """Get task status and result."""
162 task = await get_agent_task(agent_id, task_id)
163
164 if not task:
165 raise HTTPException(status_code=404, detail="Task not found")
166
167 return task
168
169
170@app.delete("/v1/agents/{agent_id}/tasks/{task_id}")
171async def cancel_task(agent_id: str, task_id: str):
172 """Cancel a running task."""
173 success = await cancel_agent_task(agent_id, task_id)
174
175 if not success:
176 raise HTTPException(status_code=404, detail="Task not found")
177
178 return {"status": "cancelled", "task_id": task_id}
179
180
181@app.get("/v1/agents/{agent_id}/tasks")
182async def list_tasks(
183 agent_id: str,
184 status: Optional[TaskStatus] = None,
185 limit: int = 20,
186 offset: int = 0
187):
188 """List tasks for an agent."""
189 tasks = await list_agent_tasks(
190 agent_id=agent_id,
191 status=status,
192 limit=limit,
193 offset=offset
194 )
195
196 return {
197 "tasks": tasks,
198 "total": len(tasks),
199 "limit": limit,
200 "offset": offset
201 }
202
203
204# Placeholder implementations
205async def get_agent(agent_id: str):
206 return {"id": agent_id}
207
208async def create_agent_task(agent_id: str, task_id: str, request: AgentRequest):
209 return {"task_id": task_id}
210
211async def get_agent_task(agent_id: str, task_id: str):
212 return None
213
214async def cancel_agent_task(agent_id: str, task_id: str):
215 return True
216
217async def list_agent_tasks(agent_id: str, status, limit: int, offset: int):
218 return []The REST API follows standard conventions: resources are nouns, HTTP methods indicate actions, and responses use consistent schemas. Error responses include request IDs for debugging and follow a predictable structure.
| Endpoint | Method | Purpose |
|---|---|---|
| /v1/agents/{id}/tasks | POST | Create new task |
| /v1/agents/{id}/tasks/{task_id} | GET | Get task status |
| /v1/agents/{id}/tasks/{task_id} | DELETE | Cancel task |
| /v1/agents/{id}/tasks | GET | List tasks |
| /v1/agents/{id}/sessions | POST | Create session |
| /v1/agents/{id}/sessions/{id} | GET | Get session |
Streaming APIs
Streaming enables real-time token delivery as the agent generates responses. This improves perceived latency and enables progress monitoring for long operations.
Server-Sent Events Streaming
1from fastapi import FastAPI
2from fastapi.responses import StreamingResponse
3from pydantic import BaseModel
4from typing import AsyncIterator
5import json
6import asyncio
7
8
9class StreamEvent(BaseModel):
10 """Event in a streaming response."""
11 event: str
12 data: dict
13
14
15class StreamChunk(BaseModel):
16 """A chunk of streamed content."""
17 content: str
18 finish_reason: str | None = None
19 usage: dict[str, int] | None = None
20
21
22async def format_sse(event: StreamEvent) -> str:
23 """Format event for SSE."""
24 data = json.dumps(event.data)
25 return f"event: {event.event}\ndata: {data}\n\n"
26
27
28async def stream_agent_response(
29 agent_id: str,
30 task_id: str,
31 request: AgentRequest
32) -> AsyncIterator[str]:
33 """Stream agent response as SSE events."""
34
35 # Send start event
36 yield await format_sse(StreamEvent(
37 event="start",
38 data={"task_id": task_id, "agent_id": agent_id}
39 ))
40
41 # Stream content chunks
42 content_buffer = ""
43 async for chunk in generate_agent_response(agent_id, request):
44 content_buffer += chunk.content
45
46 yield await format_sse(StreamEvent(
47 event="content",
48 data={
49 "content": chunk.content,
50 "finish_reason": chunk.finish_reason
51 }
52 ))
53
54 if chunk.finish_reason:
55 break
56
57 # Send completion event
58 yield await format_sse(StreamEvent(
59 event="done",
60 data={
61 "task_id": task_id,
62 "content": content_buffer,
63 "usage": {"total_tokens": len(content_buffer.split())}
64 }
65 ))
66
67
68@app.post("/v1/agents/{agent_id}/tasks/stream")
69async def stream_task(
70 agent_id: str,
71 request: AgentRequest
72):
73 """Create streaming task."""
74 task_id = str(uuid.uuid4())
75
76 return StreamingResponse(
77 stream_agent_response(agent_id, task_id, request),
78 media_type="text/event-stream",
79 headers={
80 "Cache-Control": "no-cache",
81 "Connection": "keep-alive",
82 "X-Task-ID": task_id
83 }
84 )
85
86
87async def generate_agent_response(
88 agent_id: str,
89 request: AgentRequest
90) -> AsyncIterator[StreamChunk]:
91 """Generate agent response chunks."""
92 # Simulate streaming response
93 words = request.task.split()
94
95 for i, word in enumerate(words):
96 await asyncio.sleep(0.1) # Simulate generation delay
97
98 is_last = i == len(words) - 1
99 yield StreamChunk(
100 content=word + " ",
101 finish_reason="stop" if is_last else None
102 )
103
104
105# WebSocket streaming alternative
106from fastapi import WebSocket, WebSocketDisconnect
107
108
109@app.websocket("/v1/agents/{agent_id}/ws")
110async def websocket_stream(
111 websocket: WebSocket,
112 agent_id: str
113):
114 """WebSocket endpoint for bidirectional streaming."""
115 await websocket.accept()
116
117 try:
118 while True:
119 # Receive request
120 data = await websocket.receive_json()
121 request = AgentRequest(**data)
122
123 # Stream response
124 async for chunk in generate_agent_response(agent_id, request):
125 await websocket.send_json({
126 "type": "content",
127 "content": chunk.content,
128 "finish_reason": chunk.finish_reason
129 })
130
131 if chunk.finish_reason:
132 break
133
134 await websocket.send_json({"type": "done"})
135
136 except WebSocketDisconnect:
137 pass
138
139
140# Client-side usage example
141CLIENT_EXAMPLE = '''
142import httpx
143
144async def consume_stream():
145 async with httpx.AsyncClient() as client:
146 async with client.stream(
147 "POST",
148 "http://api.example.com/v1/agents/agent-1/tasks/stream",
149 json={"task": "Explain quantum computing"}
150 ) as response:
151 async for line in response.aiter_lines():
152 if line.startswith("data: "):
153 data = json.loads(line[6:])
154 if data.get("content"):
155 print(data["content"], end="", flush=True)
156'''Server-Sent Events (SSE) provide a simple streaming protocol over HTTP. WebSockets offer bidirectional communication for more complex interactions. Both approaches deliver tokens in real-time as they're generated.
Async Operations
Long-running agent tasks require async processing patterns. Clients submit requests and poll for results, or receive webhook notifications when tasks complete.
Async Task Processing
1from fastapi import FastAPI, BackgroundTasks, HTTPException
2from pydantic import BaseModel, HttpUrl
3from typing import Optional
4import asyncio
5import httpx
6
7
8class AsyncTaskRequest(BaseModel):
9 """Request for async task processing."""
10 task: str
11 callback_url: Optional[HttpUrl] = None
12 webhook_secret: Optional[str] = None
13 priority: int = 0
14
15
16class AsyncTaskResponse(BaseModel):
17 """Response when task is queued."""
18 task_id: str
19 status: str
20 poll_url: str
21 estimated_completion_seconds: Optional[int] = None
22
23
24class TaskResult(BaseModel):
25 """Result of completed task."""
26 task_id: str
27 status: str
28 result: Optional[str] = None
29 error: Optional[str] = None
30 duration_ms: int
31 usage: dict[str, int]
32
33
34class AsyncTaskManager:
35 """Manages async task lifecycle."""
36
37 def __init__(self):
38 self._tasks: dict[str, dict] = {}
39 self._results: dict[str, TaskResult] = {}
40
41 async def submit(
42 self,
43 task_id: str,
44 request: AsyncTaskRequest,
45 background_tasks: BackgroundTasks
46 ) -> AsyncTaskResponse:
47 """Submit task for async processing."""
48 self._tasks[task_id] = {
49 "request": request,
50 "status": "pending",
51 "created_at": asyncio.get_event_loop().time()
52 }
53
54 # Queue background processing
55 background_tasks.add_task(
56 self._process_task,
57 task_id,
58 request
59 )
60
61 return AsyncTaskResponse(
62 task_id=task_id,
63 status="pending",
64 poll_url=f"/v1/tasks/{task_id}",
65 estimated_completion_seconds=30
66 )
67
68 async def _process_task(
69 self,
70 task_id: str,
71 request: AsyncTaskRequest
72 ) -> None:
73 """Process task in background."""
74 import time
75
76 start_time = time.time()
77 self._tasks[task_id]["status"] = "running"
78
79 try:
80 # Execute agent task
81 result = await self._execute_agent_task(request)
82
83 duration_ms = int((time.time() - start_time) * 1000)
84
85 self._results[task_id] = TaskResult(
86 task_id=task_id,
87 status="completed",
88 result=result,
89 duration_ms=duration_ms,
90 usage={"total_tokens": len(result.split())}
91 )
92 self._tasks[task_id]["status"] = "completed"
93
94 # Send webhook if configured
95 if request.callback_url:
96 await self._send_webhook(
97 request.callback_url,
98 request.webhook_secret,
99 self._results[task_id]
100 )
101
102 except Exception as e:
103 duration_ms = int((time.time() - start_time) * 1000)
104
105 self._results[task_id] = TaskResult(
106 task_id=task_id,
107 status="failed",
108 error=str(e),
109 duration_ms=duration_ms,
110 usage={}
111 )
112 self._tasks[task_id]["status"] = "failed"
113
114 async def _execute_agent_task(self, request: AsyncTaskRequest) -> str:
115 """Execute the agent task."""
116 # Simulate processing
117 await asyncio.sleep(2)
118 return f"Completed task: {request.task}"
119
120 async def _send_webhook(
121 self,
122 url: str,
123 secret: Optional[str],
124 result: TaskResult
125 ) -> None:
126 """Send webhook notification."""
127 import hmac
128 import hashlib
129
130 payload = result.model_dump_json()
131 headers = {"Content-Type": "application/json"}
132
133 if secret:
134 signature = hmac.new(
135 secret.encode(),
136 payload.encode(),
137 hashlib.sha256
138 ).hexdigest()
139 headers["X-Webhook-Signature"] = signature
140
141 try:
142 async with httpx.AsyncClient() as client:
143 await client.post(
144 str(url),
145 content=payload,
146 headers=headers,
147 timeout=10.0
148 )
149 except Exception:
150 # Log webhook failure but dont fail task
151 pass
152
153 def get_status(self, task_id: str) -> Optional[dict]:
154 """Get task status."""
155 if task_id in self._results:
156 return self._results[task_id].model_dump()
157
158 if task_id in self._tasks:
159 return {
160 "task_id": task_id,
161 "status": self._tasks[task_id]["status"]
162 }
163
164 return None
165
166
167# API endpoints
168task_manager = AsyncTaskManager()
169
170
171@app.post("/v1/tasks/async", response_model=AsyncTaskResponse)
172async def create_async_task(
173 request: AsyncTaskRequest,
174 background_tasks: BackgroundTasks
175):
176 """Create an async task."""
177 task_id = str(uuid.uuid4())
178
179 return await task_manager.submit(
180 task_id,
181 request,
182 background_tasks
183 )
184
185
186@app.get("/v1/tasks/{task_id}")
187async def get_task_status(task_id: str):
188 """Poll for task status."""
189 status = task_manager.get_status(task_id)
190
191 if not status:
192 raise HTTPException(status_code=404, detail="Task not found")
193
194 # Include retry-after header if still processing
195 if status["status"] in ("pending", "running"):
196 return JSONResponse(
197 content=status,
198 headers={"Retry-After": "5"}
199 )
200
201 return status
202
203
204@app.post("/v1/tasks/{task_id}/wait")
205async def wait_for_task(
206 task_id: str,
207 timeout_seconds: int = 30
208):
209 """Long-poll for task completion."""
210 deadline = asyncio.get_event_loop().time() + timeout_seconds
211
212 while asyncio.get_event_loop().time() < deadline:
213 status = task_manager.get_status(task_id)
214
215 if not status:
216 raise HTTPException(status_code=404, detail="Task not found")
217
218 if status["status"] in ("completed", "failed"):
219 return status
220
221 await asyncio.sleep(1)
222
223 return {
224 "task_id": task_id,
225 "status": "timeout",
226 "message": "Task still processing"
227 }The async pattern supports both polling and webhooks. Long-polling provides a middle ground, allowing clients to wait for completion without constant polling. Webhooks enable push notifications for server-to-server integrations.
API Versioning
API versioning ensures backward compatibility as the API evolves. Multiple versioning strategies exist, each with trade-offs for maintenance and usability.
Version Management
1from fastapi import FastAPI, APIRouter, Request, Depends
2from fastapi.routing import APIRoute
3from typing import Callable, Optional
4from enum import Enum
5
6
7class APIVersion(str, Enum):
8 """Supported API versions."""
9 V1 = "v1"
10 V2 = "v2"
11
12
13class VersionedRoute(APIRoute):
14 """Custom route with version detection."""
15
16 def get_route_handler(self) -> Callable:
17 original_handler = super().get_route_handler()
18
19 async def versioned_handler(request: Request):
20 # Extract version from path or header
21 version = self._extract_version(request)
22 request.state.api_version = version
23 return await original_handler(request)
24
25 return versioned_handler
26
27 def _extract_version(self, request: Request) -> str:
28 # Check path prefix
29 path = request.url.path
30 for version in APIVersion:
31 if path.startswith(f"/{version.value}/"):
32 return version.value
33
34 # Check header
35 header_version = request.headers.get("X-API-Version", "v1")
36 return header_version
37
38
39def get_api_version(request: Request) -> str:
40 """Dependency to get current API version."""
41 return getattr(request.state, "api_version", "v1")
42
43
44# Version-specific routers
45v1_router = APIRouter(prefix="/v1", tags=["v1"])
46v2_router = APIRouter(prefix="/v2", tags=["v2"])
47
48
49# V1 endpoints
50@v1_router.post("/agents/{agent_id}/complete")
51async def v1_complete(agent_id: str, request: AgentRequest):
52 """V1 completion endpoint."""
53 return {"version": "v1", "result": "V1 response"}
54
55
56# V2 endpoints with breaking changes
57class V2AgentRequest(BaseModel):
58 """V2 request with different schema."""
59 messages: list[dict[str, str]]
60 model: str = "default"
61 max_tokens: int = 4096
62 temperature: float = 0.7
63
64
65@v2_router.post("/agents/{agent_id}/complete")
66async def v2_complete(agent_id: str, request: V2AgentRequest):
67 """V2 completion endpoint with messages format."""
68 return {"version": "v2", "result": "V2 response"}
69
70
71# Version adapter for backward compatibility
72class VersionAdapter:
73 """Adapts requests between API versions."""
74
75 @staticmethod
76 def v1_to_v2(v1_request: AgentRequest) -> V2AgentRequest:
77 """Convert V1 request to V2 format."""
78 return V2AgentRequest(
79 messages=[{"role": "user", "content": v1_request.task}],
80 max_tokens=v1_request.max_tokens or 4096
81 )
82
83 @staticmethod
84 def v2_to_v1_response(v2_response: dict) -> dict:
85 """Convert V2 response to V1 format."""
86 return {
87 "result": v2_response.get("choices", [{}])[0].get("message", {}).get("content", ""),
88 "usage": v2_response.get("usage", {})
89 }
90
91
92# Deprecation handling
93from fastapi import Response
94import warnings
95from datetime import date
96
97
98def deprecation_warning(
99 version: str,
100 sunset_date: date,
101 replacement: str
102):
103 """Add deprecation headers to response."""
104 def decorator(func):
105 async def wrapper(*args, **kwargs):
106 response = await func(*args, **kwargs)
107
108 if isinstance(response, Response):
109 response.headers["Deprecation"] = "true"
110 response.headers["Sunset"] = sunset_date.isoformat()
111 response.headers["Link"] = f'<{replacement}>; rel="successor-version"'
112 return response
113
114 return response
115
116 return wrapper
117 return decorator
118
119
120@v1_router.post("/agents/{agent_id}/chat")
121@deprecation_warning(
122 version="v1",
123 sunset_date=date(2025, 6, 1),
124 replacement="/v2/agents/{agent_id}/complete"
125)
126async def v1_chat_deprecated(agent_id: str, request: AgentRequest):
127 """Deprecated V1 chat endpoint."""
128 return {"version": "v1", "deprecated": True}
129
130
131# Mount routers
132app.include_router(v1_router)
133app.include_router(v2_router)URL path versioning provides clear version identification. Version adapters enable backward compatibility by translating between formats. Deprecation headers inform clients about upcoming changes.
| Strategy | Example | Pros | Cons |
|---|---|---|---|
| URL Path | /v1/agents | Clear, cacheable | URL changes |
| Header | X-API-Version: 2 | Clean URLs | Less visible |
| Query Param | ?version=2 | Easy to test | Clutters URLs |
| Content Type | application/vnd.api.v2+json | RESTful | Complex |
Documentation and SDKs
Comprehensive documentation and SDKs improve developer experience and reduce integration time. OpenAPI specifications enable automatic documentation and client generation.
OpenAPI Documentation
1from fastapi import FastAPI
2from fastapi.openapi.utils import get_openapi
3from pydantic import BaseModel, Field
4from typing import Any
5
6
7def custom_openapi():
8 """Generate custom OpenAPI schema."""
9 if app.openapi_schema:
10 return app.openapi_schema
11
12 openapi_schema = get_openapi(
13 title="Agent API",
14 version="1.0.0",
15 description="""
16## Agent API
17
18This API provides access to AI agent capabilities.
19
20### Authentication
21
22All requests require an API key in the Authorization header:
23
24```
25Authorization: Bearer your-api-key
26```
27
28### Rate Limits
29
30- 100 requests per minute for standard tier
31- 1000 requests per minute for enterprise tier
32
33### Streaming
34
35For real-time responses, use the streaming endpoints.
36Responses are delivered as Server-Sent Events.
37
38### Error Handling
39
40All errors follow a consistent format with error codes
41and human-readable messages.
42 """,
43 routes=app.routes,
44 )
45
46 # Add security schemes
47 openapi_schema["components"]["securitySchemes"] = {
48 "bearerAuth": {
49 "type": "http",
50 "scheme": "bearer",
51 "bearerFormat": "API Key"
52 }
53 }
54
55 # Add global security requirement
56 openapi_schema["security"] = [{"bearerAuth": []}]
57
58 # Add servers
59 openapi_schema["servers"] = [
60 {"url": "https://api.example.com", "description": "Production"},
61 {"url": "https://staging-api.example.com", "description": "Staging"}
62 ]
63
64 # Add examples
65 for path in openapi_schema["paths"].values():
66 for operation in path.values():
67 if isinstance(operation, dict) and "requestBody" in operation:
68 add_examples(operation)
69
70 app.openapi_schema = openapi_schema
71 return app.openapi_schema
72
73
74def add_examples(operation: dict) -> None:
75 """Add examples to operation."""
76 content = operation.get("requestBody", {}).get("content", {})
77
78 if "application/json" in content:
79 content["application/json"]["examples"] = {
80 "simple_task": {
81 "summary": "Simple task",
82 "value": {
83 "task": "What is the capital of France?",
84 "max_tokens": 100
85 }
86 },
87 "with_context": {
88 "summary": "Task with context",
89 "value": {
90 "task": "Summarize this document",
91 "context": {"document_id": "doc-123"},
92 "max_tokens": 500
93 }
94 }
95 }
96
97
98app.openapi = custom_openapi
99
100
101# SDK generation helper
102SDK_TEMPLATE = '''
103# Generated Python SDK
104
105import httpx
106from typing import Optional, AsyncIterator
107from dataclasses import dataclass
108
109
110@dataclass
111class AgentSDK:
112 """Python SDK for Agent API."""
113
114 base_url: str
115 api_key: str
116 timeout: float = 30.0
117
118 @property
119 def _headers(self) -> dict:
120 return {
121 "Authorization": f"Bearer {self.api_key}",
122 "Content-Type": "application/json"
123 }
124
125 async def complete(
126 self,
127 agent_id: str,
128 task: str,
129 max_tokens: int = 4096,
130 stream: bool = False
131 ) -> dict:
132 """Complete a task with an agent."""
133 async with httpx.AsyncClient() as client:
134 response = await client.post(
135 f"{self.base_url}/v1/agents/{agent_id}/tasks",
136 headers=self._headers,
137 json={
138 "task": task,
139 "max_tokens": max_tokens,
140 "stream": stream
141 },
142 timeout=self.timeout
143 )
144 response.raise_for_status()
145 return response.json()
146
147 async def stream_complete(
148 self,
149 agent_id: str,
150 task: str
151 ) -> AsyncIterator[str]:
152 """Stream completion from an agent."""
153 async with httpx.AsyncClient() as client:
154 async with client.stream(
155 "POST",
156 f"{self.base_url}/v1/agents/{agent_id}/tasks/stream",
157 headers=self._headers,
158 json={"task": task}
159 ) as response:
160 async for line in response.aiter_lines():
161 if line.startswith("data: "):
162 import json
163 data = json.loads(line[6:])
164 if content := data.get("content"):
165 yield content
166
167
168# Usage
169async def main():
170 sdk = AgentSDK(
171 base_url="https://api.example.com",
172 api_key="your-api-key"
173 )
174
175 # Synchronous completion
176 result = await sdk.complete("agent-1", "What is Python?")
177 print(result)
178
179 # Streaming completion
180 async for chunk in sdk.stream_complete("agent-1", "Explain AI"):
181 print(chunk, end="", flush=True)
182'''
183
184
185@app.get("/sdk/python")
186async def get_python_sdk():
187 """Download Python SDK."""
188 return {"sdk": SDK_TEMPLATE}OpenAPI documentation is automatically generated from route definitions and enhanced with custom descriptions, examples, and security schemes. SDKs can be generated from the OpenAPI spec for multiple languages.
Summary
Well-designed APIs make agent capabilities accessible and easy to integrate. Following REST conventions, supporting streaming, handling async operations, versioning properly, and providing good documentation creates a developer-friendly experience.
Key Takeaways
- REST Conventions - Use consistent resource naming, HTTP methods, and response formats
- Streaming Support - Implement SSE or WebSockets for real-time token delivery
- Async Operations - Support polling and webhooks for long-running tasks
- API Versioning - Use URL path versioning with deprecation headers
- Documentation - Provide OpenAPI specs with examples and generated SDKs
Next Steps: The final section covers deployment strategies for taking agent systems to production, including blue-green deployments, canary releases, and infrastructure as code.