Introduction
A robust tool execution system is the foundation of any agent. It handles tool registration, validation, execution, and result formatting. Let's build a production-ready implementation.
Key Principle: Tool execution should be reliable, secure, and observable. Every tool call should be logged, validated, executed safely, and return structured results.
Building a Tool Registry
The tool registry manages all available tools and their metadata:
🐍tool_registry.py
1from dataclasses import dataclass, field
2from typing import Callable, Any
3from pydantic import BaseModel
4import json
5
6@dataclass
7class Tool:
8 """A tool that can be executed by the agent."""
9 name: str
10 description: str
11 parameters: dict # JSON Schema
12 function: Callable[..., Any]
13 requires_confirmation: bool = False
14 timeout_seconds: float = 30.0
15 tags: list[str] = field(default_factory=list)
16
17 def to_api_format(self, provider: str = "anthropic") -> dict:
18 """Convert to API-specific format."""
19 if provider == "anthropic":
20 return {
21 "name": self.name,
22 "description": self.description,
23 "input_schema": self.parameters
24 }
25 elif provider == "openai":
26 return {
27 "type": "function",
28 "function": {
29 "name": self.name,
30 "description": self.description,
31 "parameters": self.parameters
32 }
33 }
34 else:
35 raise ValueError(f"Unknown provider: {provider}")
36
37
38class ToolRegistry:
39 """Registry for managing agent tools."""
40
41 def __init__(self):
42 self._tools: dict[str, Tool] = {}
43 self._validators: dict[str, type[BaseModel]] = {}
44
45 def register(
46 self,
47 name: str | None = None,
48 description: str | None = None,
49 parameters: dict | type[BaseModel] | None = None,
50 requires_confirmation: bool = False,
51 timeout: float = 30.0,
52 tags: list[str] | None = None
53 ):
54 """Decorator to register a function as a tool."""
55 def decorator(func: Callable) -> Callable:
56 tool_name = name or func.__name__
57 tool_desc = description or func.__doc__ or ""
58
59 # Handle Pydantic model for parameters
60 if isinstance(parameters, type) and issubclass(parameters, BaseModel):
61 schema = parameters.model_json_schema()
62 self._validators[tool_name] = parameters
63 tool_params = {
64 "type": "object",
65 "properties": schema.get("properties", {}),
66 "required": schema.get("required", [])
67 }
68 else:
69 tool_params = parameters or {
70 "type": "object",
71 "properties": {}
72 }
73
74 tool = Tool(
75 name=tool_name,
76 description=tool_desc.strip(),
77 parameters=tool_params,
78 function=func,
79 requires_confirmation=requires_confirmation,
80 timeout_seconds=timeout,
81 tags=tags or []
82 )
83
84 self._tools[tool_name] = tool
85 return func
86
87 return decorator
88
89 def get(self, name: str) -> Tool | None:
90 """Get a tool by name."""
91 return self._tools.get(name)
92
93 def list_tools(self, tag: str | None = None) -> list[Tool]:
94 """List all tools, optionally filtered by tag."""
95 tools = list(self._tools.values())
96 if tag:
97 tools = [t for t in tools if tag in t.tags]
98 return tools
99
100 def get_validator(self, name: str) -> type[BaseModel] | None:
101 """Get Pydantic validator for a tool."""
102 return self._validators.get(name)
103
104 def to_api_format(self, provider: str = "anthropic") -> list[dict]:
105 """Convert all tools to API format."""
106 return [t.to_api_format(provider) for t in self._tools.values()]
107
108
109# Usage example
110registry = ToolRegistry()
111
112@registry.register(
113 description="Read the contents of a file",
114 parameters={
115 "type": "object",
116 "properties": {
117 "path": {
118 "type": "string",
119 "description": "Path to the file"
120 },
121 "encoding": {
122 "type": "string",
123 "default": "utf-8"
124 }
125 },
126 "required": ["path"]
127 },
128 tags=["filesystem"]
129)
130async def read_file(path: str, encoding: str = "utf-8") -> str:
131 """Read file contents."""
132 with open(path, encoding=encoding) as f:
133 return f.read()The Execution Engine
The execution engine handles the actual invocation of tools:
🐍execution_engine.py
1from dataclasses import dataclass
2from typing import Any
3from enum import Enum
4import traceback
5import asyncio
6import time
7
8class ExecutionStatus(Enum):
9 SUCCESS = "success"
10 ERROR = "error"
11 TIMEOUT = "timeout"
12 VALIDATION_ERROR = "validation_error"
13 PERMISSION_DENIED = "permission_denied"
14
15@dataclass
16class ExecutionResult:
17 """Result of tool execution."""
18 status: ExecutionStatus
19 result: Any | None = None
20 error: str | None = None
21 duration_ms: float = 0.0
22 tool_name: str = ""
23 parameters: dict = field(default_factory=dict)
24
25 def to_observation(self) -> str:
26 """Format as observation for LLM."""
27 if self.status == ExecutionStatus.SUCCESS:
28 if isinstance(self.result, str):
29 return self.result
30 return json.dumps(self.result, indent=2)
31
32 return f"Error ({self.status.value}): {self.error}"
33
34
35class ToolExecutor:
36 """Execute tools with validation and error handling."""
37
38 def __init__(self, registry: ToolRegistry):
39 self.registry = registry
40 self.execution_hooks: list[Callable] = []
41
42 def add_hook(self, hook: Callable):
43 """Add execution hook for logging/monitoring."""
44 self.execution_hooks.append(hook)
45
46 async def execute(
47 self,
48 tool_name: str,
49 parameters: dict,
50 context: dict | None = None
51 ) -> ExecutionResult:
52 """Execute a tool with full error handling."""
53
54 start_time = time.time()
55
56 # Get tool
57 tool = self.registry.get(tool_name)
58 if not tool:
59 return ExecutionResult(
60 status=ExecutionStatus.ERROR,
61 error=f"Unknown tool: {tool_name}",
62 tool_name=tool_name,
63 parameters=parameters
64 )
65
66 # Validate parameters
67 validation_result = self._validate(tool_name, parameters)
68 if validation_result:
69 return validation_result
70
71 # Check confirmation if required
72 if tool.requires_confirmation:
73 if not context or not context.get("confirmed"):
74 return ExecutionResult(
75 status=ExecutionStatus.PERMISSION_DENIED,
76 error=f"Tool '{tool_name}' requires confirmation",
77 tool_name=tool_name,
78 parameters=parameters
79 )
80
81 # Execute with timeout
82 try:
83 result = await asyncio.wait_for(
84 self._invoke(tool, parameters),
85 timeout=tool.timeout_seconds
86 )
87
88 execution_result = ExecutionResult(
89 status=ExecutionStatus.SUCCESS,
90 result=result,
91 duration_ms=(time.time() - start_time) * 1000,
92 tool_name=tool_name,
93 parameters=parameters
94 )
95
96 except asyncio.TimeoutError:
97 execution_result = ExecutionResult(
98 status=ExecutionStatus.TIMEOUT,
99 error=f"Tool execution timed out after {tool.timeout_seconds}s",
100 duration_ms=(time.time() - start_time) * 1000,
101 tool_name=tool_name,
102 parameters=parameters
103 )
104
105 except Exception as e:
106 execution_result = ExecutionResult(
107 status=ExecutionStatus.ERROR,
108 error=f"{type(e).__name__}: {str(e)}",
109 duration_ms=(time.time() - start_time) * 1000,
110 tool_name=tool_name,
111 parameters=parameters
112 )
113
114 # Run hooks
115 for hook in self.execution_hooks:
116 try:
117 await hook(execution_result)
118 except Exception:
119 pass # Don't let hooks break execution
120
121 return execution_result
122
123 def _validate(self, tool_name: str, params: dict) -> ExecutionResult | None:
124 """Validate parameters, return error result if invalid."""
125 validator = self.registry.get_validator(tool_name)
126
127 if validator:
128 try:
129 validator(**params)
130 except Exception as e:
131 return ExecutionResult(
132 status=ExecutionStatus.VALIDATION_ERROR,
133 error=str(e),
134 tool_name=tool_name,
135 parameters=params
136 )
137
138 return None
139
140 async def _invoke(self, tool: Tool, params: dict) -> Any:
141 """Invoke the tool function."""
142 if asyncio.iscoroutinefunction(tool.function):
143 return await tool.function(**params)
144 else:
145 # Run sync function in thread pool
146 loop = asyncio.get_event_loop()
147 return await loop.run_in_executor(
148 None,
149 lambda: tool.function(**params)
150 )Async Tool Execution
Handle parallel tool calls efficiently:
🐍async_execution.py
1from dataclasses import dataclass
2import asyncio
3
4@dataclass
5class ToolCall:
6 """A tool call from the LLM."""
7 id: str
8 name: str
9 arguments: dict
10
11class AsyncToolExecutor:
12 """Execute multiple tools concurrently."""
13
14 def __init__(self, executor: ToolExecutor):
15 self.executor = executor
16 self.max_concurrent = 10
17 self._semaphore = asyncio.Semaphore(self.max_concurrent)
18
19 async def execute_calls(
20 self,
21 calls: list[ToolCall],
22 context: dict | None = None
23 ) -> dict[str, ExecutionResult]:
24 """Execute multiple tool calls, possibly in parallel."""
25
26 # Separate calls that need confirmation
27 needs_confirmation = []
28 can_execute = []
29
30 for call in calls:
31 tool = self.executor.registry.get(call.name)
32 if tool and tool.requires_confirmation:
33 needs_confirmation.append(call)
34 else:
35 can_execute.append(call)
36
37 results = {}
38
39 # Execute safe tools in parallel
40 if can_execute:
41 parallel_results = await self._execute_parallel(
42 can_execute, context
43 )
44 results.update(parallel_results)
45
46 # Execute confirmation-required tools sequentially
47 for call in needs_confirmation:
48 result = await self.executor.execute(
49 call.name,
50 call.arguments,
51 context
52 )
53 results[call.id] = result
54
55 return results
56
57 async def _execute_parallel(
58 self,
59 calls: list[ToolCall],
60 context: dict | None
61 ) -> dict[str, ExecutionResult]:
62 """Execute calls in parallel with semaphore."""
63
64 async def execute_with_semaphore(call: ToolCall):
65 async with self._semaphore:
66 result = await self.executor.execute(
67 call.name,
68 call.arguments,
69 context
70 )
71 return (call.id, result)
72
73 tasks = [execute_with_semaphore(call) for call in calls]
74 completed = await asyncio.gather(*tasks)
75
76 return dict(completed)
77
78
79# Example: Streaming execution with progress
80class StreamingToolExecutor:
81 """Execute tools with streaming progress updates."""
82
83 def __init__(self, executor: ToolExecutor):
84 self.executor = executor
85
86 async def execute_with_streaming(
87 self,
88 calls: list[ToolCall],
89 on_start: Callable[[ToolCall], None] | None = None,
90 on_complete: Callable[[ToolCall, ExecutionResult], None] | None = None,
91 ) -> dict[str, ExecutionResult]:
92 """Execute with streaming progress callbacks."""
93
94 results = {}
95
96 for call in calls:
97 if on_start:
98 on_start(call)
99
100 result = await self.executor.execute(
101 call.name,
102 call.arguments
103 )
104 results[call.id] = result
105
106 if on_complete:
107 on_complete(call, result)
108
109 return results
110
111
112# Usage
113async def main():
114 registry = ToolRegistry()
115 executor = ToolExecutor(registry)
116 async_executor = AsyncToolExecutor(executor)
117
118 calls = [
119 ToolCall(id="1", name="read_file", arguments={"path": "a.py"}),
120 ToolCall(id="2", name="read_file", arguments={"path": "b.py"}),
121 ToolCall(id="3", name="read_file", arguments={"path": "c.py"}),
122 ]
123
124 # Execute all in parallel
125 results = await async_executor.execute_calls(calls)
126
127 for call_id, result in results.items():
128 print(f"Call {call_id}: {result.status}")Sandboxed Execution
For code execution tools, sandboxing is essential:
🐍sandboxed_execution.py
1import subprocess
2import tempfile
3import os
4import resource
5from dataclasses import dataclass
6from typing import Any
7
8@dataclass
9class SandboxConfig:
10 """Configuration for sandboxed execution."""
11 timeout_seconds: float = 30.0
12 max_memory_mb: int = 256
13 max_output_bytes: int = 100_000
14 allow_network: bool = False
15 working_dir: str | None = None
16
17class PythonSandbox:
18 """Sandboxed Python code execution."""
19
20 def __init__(self, config: SandboxConfig | None = None):
21 self.config = config or SandboxConfig()
22
23 def execute(self, code: str) -> dict[str, Any]:
24 """Execute Python code in sandbox."""
25
26 # Create temporary file with code
27 with tempfile.NamedTemporaryFile(
28 mode='w',
29 suffix='.py',
30 delete=False
31 ) as f:
32 f.write(code)
33 script_path = f.name
34
35 try:
36 # Build command
37 cmd = ['python', script_path]
38
39 # Set environment
40 env = os.environ.copy()
41 if not self.config.allow_network:
42 # Disable network (Linux-specific)
43 env['HTTP_PROXY'] = ''
44 env['HTTPS_PROXY'] = ''
45
46 # Execute with limits
47 result = subprocess.run(
48 cmd,
49 capture_output=True,
50 text=True,
51 timeout=self.config.timeout_seconds,
52 cwd=self.config.working_dir,
53 env=env
54 )
55
56 # Truncate output if needed
57 stdout = result.stdout[:self.config.max_output_bytes]
58 stderr = result.stderr[:self.config.max_output_bytes]
59
60 return {
61 "success": result.returncode == 0,
62 "stdout": stdout,
63 "stderr": stderr,
64 "return_code": result.returncode
65 }
66
67 except subprocess.TimeoutExpired:
68 return {
69 "success": False,
70 "stdout": "",
71 "stderr": f"Execution timed out after {self.config.timeout_seconds}s",
72 "return_code": -1
73 }
74 except Exception as e:
75 return {
76 "success": False,
77 "stdout": "",
78 "stderr": str(e),
79 "return_code": -1
80 }
81 finally:
82 # Cleanup
83 os.unlink(script_path)
84
85
86class DockerSandbox:
87 """Docker-based sandbox for stronger isolation."""
88
89 def __init__(
90 self,
91 image: str = "python:3.11-slim",
92 config: SandboxConfig | None = None
93 ):
94 self.image = image
95 self.config = config or SandboxConfig()
96
97 def execute(self, code: str) -> dict[str, Any]:
98 """Execute code in Docker container."""
99
100 # Build Docker command with restrictions
101 docker_cmd = [
102 "docker", "run",
103 "--rm", # Remove after execution
104 "--network", "none" if not self.config.allow_network else "bridge",
105 "--memory", f"{self.config.max_memory_mb}m",
106 "--cpus", "1",
107 "--read-only", # Read-only filesystem
108 "--tmpfs", "/tmp:size=100m", # Writable /tmp
109 self.image,
110 "python", "-c", code
111 ]
112
113 try:
114 result = subprocess.run(
115 docker_cmd,
116 capture_output=True,
117 text=True,
118 timeout=self.config.timeout_seconds + 5 # Extra for container overhead
119 )
120
121 return {
122 "success": result.returncode == 0,
123 "stdout": result.stdout[:self.config.max_output_bytes],
124 "stderr": result.stderr[:self.config.max_output_bytes],
125 "return_code": result.returncode
126 }
127
128 except subprocess.TimeoutExpired:
129 return {
130 "success": False,
131 "stdout": "",
132 "stderr": "Container execution timed out",
133 "return_code": -1
134 }
135
136
137# Register sandbox as a tool
138@registry.register(
139 name="execute_python",
140 description="""
141Execute Python code in a secure sandbox.
142
143CAPABILITIES:
144- Standard library access
145- Basic computation and data processing
146- File I/O in /tmp only
147
148LIMITATIONS:
149- No network access
150- 30 second timeout
151- 256MB memory limit
152- No persistent storage
153
154Use for calculations, data processing, and testing code.
155""",
156 timeout=35.0,
157 tags=["execution"]
158)
159async def execute_python(code: str) -> dict:
160 """Execute Python code in sandbox."""
161 sandbox = DockerSandbox()
162 return sandbox.execute(code)Complete Tool System
Here's a complete tool system bringing everything together:
🐍complete_tool_system.py
1from dataclasses import dataclass, field
2from typing import Any, Callable
3from pydantic import BaseModel, Field
4import asyncio
5import json
6import logging
7
8# Configure logging
9logging.basicConfig(level=logging.INFO)
10logger = logging.getLogger(__name__)
11
12
13class ToolSystem:
14 """Complete tool management system."""
15
16 def __init__(self):
17 self.registry = ToolRegistry()
18 self.executor = ToolExecutor(self.registry)
19 self.async_executor = AsyncToolExecutor(self.executor)
20
21 # Add logging hook
22 self.executor.add_hook(self._log_execution)
23
24 async def _log_execution(self, result: ExecutionResult):
25 """Log tool executions."""
26 logger.info(
27 f"Tool: {result.tool_name} | "
28 f"Status: {result.status.value} | "
29 f"Duration: {result.duration_ms:.2f}ms"
30 )
31 if result.status != ExecutionStatus.SUCCESS:
32 logger.error(f"Error: {result.error}")
33
34 def register(self, **kwargs):
35 """Decorator to register tools."""
36 return self.registry.register(**kwargs)
37
38 async def execute(
39 self,
40 tool_name: str,
41 arguments: dict,
42 context: dict | None = None
43 ) -> ExecutionResult:
44 """Execute a single tool."""
45 return await self.executor.execute(tool_name, arguments, context)
46
47 async def execute_calls(
48 self,
49 calls: list[ToolCall],
50 context: dict | None = None
51 ) -> dict[str, ExecutionResult]:
52 """Execute multiple tool calls."""
53 return await self.async_executor.execute_calls(calls, context)
54
55 def get_tools_for_api(self, provider: str = "anthropic") -> list[dict]:
56 """Get tools formatted for API."""
57 return self.registry.to_api_format(provider)
58
59 def format_results_for_llm(
60 self,
61 results: dict[str, ExecutionResult]
62 ) -> list[dict]:
63 """Format results as tool_result messages."""
64 formatted = []
65
66 for call_id, result in results.items():
67 formatted.append({
68 "tool_use_id": call_id,
69 "content": result.to_observation()
70 })
71
72 return formatted
73
74
75# Create system
76tools = ToolSystem()
77
78
79# Register tools with Pydantic models
80class ReadFileParams(BaseModel):
81 path: str = Field(description="File path to read")
82 encoding: str = Field(default="utf-8", description="File encoding")
83
84@tools.register(
85 name="read_file",
86 description="Read the contents of a file",
87 parameters=ReadFileParams,
88 tags=["filesystem"]
89)
90async def read_file(path: str, encoding: str = "utf-8") -> str:
91 """Read file contents."""
92 async with aiofiles.open(path, encoding=encoding) as f:
93 return await f.read()
94
95
96class WriteFileParams(BaseModel):
97 path: str = Field(description="File path to write")
98 content: str = Field(description="Content to write")
99 mode: str = Field(default="w", description="Write mode (w or a)")
100
101@tools.register(
102 name="write_file",
103 description="Write content to a file",
104 parameters=WriteFileParams,
105 requires_confirmation=True,
106 tags=["filesystem"]
107)
108async def write_file(path: str, content: str, mode: str = "w") -> str:
109 """Write to file."""
110 async with aiofiles.open(path, mode) as f:
111 await f.write(content)
112 return f"Successfully wrote {len(content)} bytes to {path}"
113
114
115class SearchParams(BaseModel):
116 query: str = Field(description="Search query")
117 max_results: int = Field(default=10, ge=1, le=50)
118
119@tools.register(
120 name="search_web",
121 description="Search the web for information",
122 parameters=SearchParams,
123 timeout=15.0,
124 tags=["search"]
125)
126async def search_web(query: str, max_results: int = 10) -> list[dict]:
127 """Search the web."""
128 # Implementation would use actual search API
129 return [{"title": "Result", "url": "https://...", "snippet": "..."}]
130
131
132# Full agent integration
133async def run_agent_step(
134 messages: list[dict],
135 tools_system: ToolSystem
136) -> tuple[str, list[dict]]:
137 """Run one step of the agent with tool handling."""
138
139 import anthropic
140 client = anthropic.Anthropic()
141
142 # Get tools for API
143 api_tools = tools_system.get_tools_for_api("anthropic")
144
145 # Call LLM
146 response = client.messages.create(
147 model="claude-sonnet-4-20250514",
148 max_tokens=4096,
149 tools=api_tools,
150 messages=messages
151 )
152
153 # Check for tool use
154 tool_calls = []
155 text_response = ""
156
157 for block in response.content:
158 if block.type == "tool_use":
159 tool_calls.append(ToolCall(
160 id=block.id,
161 name=block.name,
162 arguments=block.input
163 ))
164 elif block.type == "text":
165 text_response = block.text
166
167 # If tool calls, execute them
168 if tool_calls:
169 results = await tools_system.execute_calls(tool_calls)
170 formatted = tools_system.format_results_for_llm(results)
171
172 # Add to messages for next turn
173 new_messages = messages + [
174 {"role": "assistant", "content": response.content},
175 {"role": "user", "content": formatted}
176 ]
177
178 return "", new_messages
179
180 return text_response, messages
181
182
183# Example usage
184async def main():
185 messages = [
186 {"role": "user", "content": "Read the file config.yaml"}
187 ]
188
189 while True:
190 response, messages = await run_agent_step(messages, tools)
191
192 if response:
193 print(f"Agent: {response}")
194 break
195
196
197if __name__ == "__main__":
198 asyncio.run(main())Production Considerations
In production, add rate limiting, cost tracking, audit logging, and metrics collection to your tool system.
Summary
Implementing tool execution:
- Tool Registry: Central management of tools with metadata
- Execution Engine: Handles validation, execution, error handling
- Async Execution: Parallel execution for multiple tool calls
- Sandboxing: Secure execution for code/dangerous operations
- Complete System: Integrates all components for agent use
- Observability: Logging, metrics, and hooks for monitoring
Next: Let's explore error handling and retry strategies for robust tool execution.