Chapter 7
25 min read
Section 43 of 175

Implementing Tool Execution

Tool Use and Function Calling

Introduction

A robust tool execution system is the foundation of any agent. It handles tool registration, validation, execution, and result formatting. Let's build a production-ready implementation.

Key Principle: Tool execution should be reliable, secure, and observable. Every tool call should be logged, validated, executed safely, and return structured results.

Building a Tool Registry

The tool registry manages all available tools and their metadata:

🐍tool_registry.py
1from dataclasses import dataclass, field
2from typing import Callable, Any
3from pydantic import BaseModel
4import json
5
6@dataclass
7class Tool:
8    """A tool that can be executed by the agent."""
9    name: str
10    description: str
11    parameters: dict  # JSON Schema
12    function: Callable[..., Any]
13    requires_confirmation: bool = False
14    timeout_seconds: float = 30.0
15    tags: list[str] = field(default_factory=list)
16
17    def to_api_format(self, provider: str = "anthropic") -> dict:
18        """Convert to API-specific format."""
19        if provider == "anthropic":
20            return {
21                "name": self.name,
22                "description": self.description,
23                "input_schema": self.parameters
24            }
25        elif provider == "openai":
26            return {
27                "type": "function",
28                "function": {
29                    "name": self.name,
30                    "description": self.description,
31                    "parameters": self.parameters
32                }
33            }
34        else:
35            raise ValueError(f"Unknown provider: {provider}")
36
37
38class ToolRegistry:
39    """Registry for managing agent tools."""
40
41    def __init__(self):
42        self._tools: dict[str, Tool] = {}
43        self._validators: dict[str, type[BaseModel]] = {}
44
45    def register(
46        self,
47        name: str | None = None,
48        description: str | None = None,
49        parameters: dict | type[BaseModel] | None = None,
50        requires_confirmation: bool = False,
51        timeout: float = 30.0,
52        tags: list[str] | None = None
53    ):
54        """Decorator to register a function as a tool."""
55        def decorator(func: Callable) -> Callable:
56            tool_name = name or func.__name__
57            tool_desc = description or func.__doc__ or ""
58
59            # Handle Pydantic model for parameters
60            if isinstance(parameters, type) and issubclass(parameters, BaseModel):
61                schema = parameters.model_json_schema()
62                self._validators[tool_name] = parameters
63                tool_params = {
64                    "type": "object",
65                    "properties": schema.get("properties", {}),
66                    "required": schema.get("required", [])
67                }
68            else:
69                tool_params = parameters or {
70                    "type": "object",
71                    "properties": {}
72                }
73
74            tool = Tool(
75                name=tool_name,
76                description=tool_desc.strip(),
77                parameters=tool_params,
78                function=func,
79                requires_confirmation=requires_confirmation,
80                timeout_seconds=timeout,
81                tags=tags or []
82            )
83
84            self._tools[tool_name] = tool
85            return func
86
87        return decorator
88
89    def get(self, name: str) -> Tool | None:
90        """Get a tool by name."""
91        return self._tools.get(name)
92
93    def list_tools(self, tag: str | None = None) -> list[Tool]:
94        """List all tools, optionally filtered by tag."""
95        tools = list(self._tools.values())
96        if tag:
97            tools = [t for t in tools if tag in t.tags]
98        return tools
99
100    def get_validator(self, name: str) -> type[BaseModel] | None:
101        """Get Pydantic validator for a tool."""
102        return self._validators.get(name)
103
104    def to_api_format(self, provider: str = "anthropic") -> list[dict]:
105        """Convert all tools to API format."""
106        return [t.to_api_format(provider) for t in self._tools.values()]
107
108
109# Usage example
110registry = ToolRegistry()
111
112@registry.register(
113    description="Read the contents of a file",
114    parameters={
115        "type": "object",
116        "properties": {
117            "path": {
118                "type": "string",
119                "description": "Path to the file"
120            },
121            "encoding": {
122                "type": "string",
123                "default": "utf-8"
124            }
125        },
126        "required": ["path"]
127    },
128    tags=["filesystem"]
129)
130async def read_file(path: str, encoding: str = "utf-8") -> str:
131    """Read file contents."""
132    with open(path, encoding=encoding) as f:
133        return f.read()

The Execution Engine

The execution engine handles the actual invocation of tools:

🐍execution_engine.py
1from dataclasses import dataclass
2from typing import Any
3from enum import Enum
4import traceback
5import asyncio
6import time
7
8class ExecutionStatus(Enum):
9    SUCCESS = "success"
10    ERROR = "error"
11    TIMEOUT = "timeout"
12    VALIDATION_ERROR = "validation_error"
13    PERMISSION_DENIED = "permission_denied"
14
15@dataclass
16class ExecutionResult:
17    """Result of tool execution."""
18    status: ExecutionStatus
19    result: Any | None = None
20    error: str | None = None
21    duration_ms: float = 0.0
22    tool_name: str = ""
23    parameters: dict = field(default_factory=dict)
24
25    def to_observation(self) -> str:
26        """Format as observation for LLM."""
27        if self.status == ExecutionStatus.SUCCESS:
28            if isinstance(self.result, str):
29                return self.result
30            return json.dumps(self.result, indent=2)
31
32        return f"Error ({self.status.value}): {self.error}"
33
34
35class ToolExecutor:
36    """Execute tools with validation and error handling."""
37
38    def __init__(self, registry: ToolRegistry):
39        self.registry = registry
40        self.execution_hooks: list[Callable] = []
41
42    def add_hook(self, hook: Callable):
43        """Add execution hook for logging/monitoring."""
44        self.execution_hooks.append(hook)
45
46    async def execute(
47        self,
48        tool_name: str,
49        parameters: dict,
50        context: dict | None = None
51    ) -> ExecutionResult:
52        """Execute a tool with full error handling."""
53
54        start_time = time.time()
55
56        # Get tool
57        tool = self.registry.get(tool_name)
58        if not tool:
59            return ExecutionResult(
60                status=ExecutionStatus.ERROR,
61                error=f"Unknown tool: {tool_name}",
62                tool_name=tool_name,
63                parameters=parameters
64            )
65
66        # Validate parameters
67        validation_result = self._validate(tool_name, parameters)
68        if validation_result:
69            return validation_result
70
71        # Check confirmation if required
72        if tool.requires_confirmation:
73            if not context or not context.get("confirmed"):
74                return ExecutionResult(
75                    status=ExecutionStatus.PERMISSION_DENIED,
76                    error=f"Tool '{tool_name}' requires confirmation",
77                    tool_name=tool_name,
78                    parameters=parameters
79                )
80
81        # Execute with timeout
82        try:
83            result = await asyncio.wait_for(
84                self._invoke(tool, parameters),
85                timeout=tool.timeout_seconds
86            )
87
88            execution_result = ExecutionResult(
89                status=ExecutionStatus.SUCCESS,
90                result=result,
91                duration_ms=(time.time() - start_time) * 1000,
92                tool_name=tool_name,
93                parameters=parameters
94            )
95
96        except asyncio.TimeoutError:
97            execution_result = ExecutionResult(
98                status=ExecutionStatus.TIMEOUT,
99                error=f"Tool execution timed out after {tool.timeout_seconds}s",
100                duration_ms=(time.time() - start_time) * 1000,
101                tool_name=tool_name,
102                parameters=parameters
103            )
104
105        except Exception as e:
106            execution_result = ExecutionResult(
107                status=ExecutionStatus.ERROR,
108                error=f"{type(e).__name__}: {str(e)}",
109                duration_ms=(time.time() - start_time) * 1000,
110                tool_name=tool_name,
111                parameters=parameters
112            )
113
114        # Run hooks
115        for hook in self.execution_hooks:
116            try:
117                await hook(execution_result)
118            except Exception:
119                pass  # Don't let hooks break execution
120
121        return execution_result
122
123    def _validate(self, tool_name: str, params: dict) -> ExecutionResult | None:
124        """Validate parameters, return error result if invalid."""
125        validator = self.registry.get_validator(tool_name)
126
127        if validator:
128            try:
129                validator(**params)
130            except Exception as e:
131                return ExecutionResult(
132                    status=ExecutionStatus.VALIDATION_ERROR,
133                    error=str(e),
134                    tool_name=tool_name,
135                    parameters=params
136                )
137
138        return None
139
140    async def _invoke(self, tool: Tool, params: dict) -> Any:
141        """Invoke the tool function."""
142        if asyncio.iscoroutinefunction(tool.function):
143            return await tool.function(**params)
144        else:
145            # Run sync function in thread pool
146            loop = asyncio.get_event_loop()
147            return await loop.run_in_executor(
148                None,
149                lambda: tool.function(**params)
150            )

Async Tool Execution

Handle parallel tool calls efficiently:

🐍async_execution.py
1from dataclasses import dataclass
2import asyncio
3
4@dataclass
5class ToolCall:
6    """A tool call from the LLM."""
7    id: str
8    name: str
9    arguments: dict
10
11class AsyncToolExecutor:
12    """Execute multiple tools concurrently."""
13
14    def __init__(self, executor: ToolExecutor):
15        self.executor = executor
16        self.max_concurrent = 10
17        self._semaphore = asyncio.Semaphore(self.max_concurrent)
18
19    async def execute_calls(
20        self,
21        calls: list[ToolCall],
22        context: dict | None = None
23    ) -> dict[str, ExecutionResult]:
24        """Execute multiple tool calls, possibly in parallel."""
25
26        # Separate calls that need confirmation
27        needs_confirmation = []
28        can_execute = []
29
30        for call in calls:
31            tool = self.executor.registry.get(call.name)
32            if tool and tool.requires_confirmation:
33                needs_confirmation.append(call)
34            else:
35                can_execute.append(call)
36
37        results = {}
38
39        # Execute safe tools in parallel
40        if can_execute:
41            parallel_results = await self._execute_parallel(
42                can_execute, context
43            )
44            results.update(parallel_results)
45
46        # Execute confirmation-required tools sequentially
47        for call in needs_confirmation:
48            result = await self.executor.execute(
49                call.name,
50                call.arguments,
51                context
52            )
53            results[call.id] = result
54
55        return results
56
57    async def _execute_parallel(
58        self,
59        calls: list[ToolCall],
60        context: dict | None
61    ) -> dict[str, ExecutionResult]:
62        """Execute calls in parallel with semaphore."""
63
64        async def execute_with_semaphore(call: ToolCall):
65            async with self._semaphore:
66                result = await self.executor.execute(
67                    call.name,
68                    call.arguments,
69                    context
70                )
71                return (call.id, result)
72
73        tasks = [execute_with_semaphore(call) for call in calls]
74        completed = await asyncio.gather(*tasks)
75
76        return dict(completed)
77
78
79# Example: Streaming execution with progress
80class StreamingToolExecutor:
81    """Execute tools with streaming progress updates."""
82
83    def __init__(self, executor: ToolExecutor):
84        self.executor = executor
85
86    async def execute_with_streaming(
87        self,
88        calls: list[ToolCall],
89        on_start: Callable[[ToolCall], None] | None = None,
90        on_complete: Callable[[ToolCall, ExecutionResult], None] | None = None,
91    ) -> dict[str, ExecutionResult]:
92        """Execute with streaming progress callbacks."""
93
94        results = {}
95
96        for call in calls:
97            if on_start:
98                on_start(call)
99
100            result = await self.executor.execute(
101                call.name,
102                call.arguments
103            )
104            results[call.id] = result
105
106            if on_complete:
107                on_complete(call, result)
108
109        return results
110
111
112# Usage
113async def main():
114    registry = ToolRegistry()
115    executor = ToolExecutor(registry)
116    async_executor = AsyncToolExecutor(executor)
117
118    calls = [
119        ToolCall(id="1", name="read_file", arguments={"path": "a.py"}),
120        ToolCall(id="2", name="read_file", arguments={"path": "b.py"}),
121        ToolCall(id="3", name="read_file", arguments={"path": "c.py"}),
122    ]
123
124    # Execute all in parallel
125    results = await async_executor.execute_calls(calls)
126
127    for call_id, result in results.items():
128        print(f"Call {call_id}: {result.status}")

Sandboxed Execution

For code execution tools, sandboxing is essential:

🐍sandboxed_execution.py
1import subprocess
2import tempfile
3import os
4import resource
5from dataclasses import dataclass
6from typing import Any
7
8@dataclass
9class SandboxConfig:
10    """Configuration for sandboxed execution."""
11    timeout_seconds: float = 30.0
12    max_memory_mb: int = 256
13    max_output_bytes: int = 100_000
14    allow_network: bool = False
15    working_dir: str | None = None
16
17class PythonSandbox:
18    """Sandboxed Python code execution."""
19
20    def __init__(self, config: SandboxConfig | None = None):
21        self.config = config or SandboxConfig()
22
23    def execute(self, code: str) -> dict[str, Any]:
24        """Execute Python code in sandbox."""
25
26        # Create temporary file with code
27        with tempfile.NamedTemporaryFile(
28            mode='w',
29            suffix='.py',
30            delete=False
31        ) as f:
32            f.write(code)
33            script_path = f.name
34
35        try:
36            # Build command
37            cmd = ['python', script_path]
38
39            # Set environment
40            env = os.environ.copy()
41            if not self.config.allow_network:
42                # Disable network (Linux-specific)
43                env['HTTP_PROXY'] = ''
44                env['HTTPS_PROXY'] = ''
45
46            # Execute with limits
47            result = subprocess.run(
48                cmd,
49                capture_output=True,
50                text=True,
51                timeout=self.config.timeout_seconds,
52                cwd=self.config.working_dir,
53                env=env
54            )
55
56            # Truncate output if needed
57            stdout = result.stdout[:self.config.max_output_bytes]
58            stderr = result.stderr[:self.config.max_output_bytes]
59
60            return {
61                "success": result.returncode == 0,
62                "stdout": stdout,
63                "stderr": stderr,
64                "return_code": result.returncode
65            }
66
67        except subprocess.TimeoutExpired:
68            return {
69                "success": False,
70                "stdout": "",
71                "stderr": f"Execution timed out after {self.config.timeout_seconds}s",
72                "return_code": -1
73            }
74        except Exception as e:
75            return {
76                "success": False,
77                "stdout": "",
78                "stderr": str(e),
79                "return_code": -1
80            }
81        finally:
82            # Cleanup
83            os.unlink(script_path)
84
85
86class DockerSandbox:
87    """Docker-based sandbox for stronger isolation."""
88
89    def __init__(
90        self,
91        image: str = "python:3.11-slim",
92        config: SandboxConfig | None = None
93    ):
94        self.image = image
95        self.config = config or SandboxConfig()
96
97    def execute(self, code: str) -> dict[str, Any]:
98        """Execute code in Docker container."""
99
100        # Build Docker command with restrictions
101        docker_cmd = [
102            "docker", "run",
103            "--rm",                                    # Remove after execution
104            "--network", "none" if not self.config.allow_network else "bridge",
105            "--memory", f"{self.config.max_memory_mb}m",
106            "--cpus", "1",
107            "--read-only",                             # Read-only filesystem
108            "--tmpfs", "/tmp:size=100m",              # Writable /tmp
109            self.image,
110            "python", "-c", code
111        ]
112
113        try:
114            result = subprocess.run(
115                docker_cmd,
116                capture_output=True,
117                text=True,
118                timeout=self.config.timeout_seconds + 5  # Extra for container overhead
119            )
120
121            return {
122                "success": result.returncode == 0,
123                "stdout": result.stdout[:self.config.max_output_bytes],
124                "stderr": result.stderr[:self.config.max_output_bytes],
125                "return_code": result.returncode
126            }
127
128        except subprocess.TimeoutExpired:
129            return {
130                "success": False,
131                "stdout": "",
132                "stderr": "Container execution timed out",
133                "return_code": -1
134            }
135
136
137# Register sandbox as a tool
138@registry.register(
139    name="execute_python",
140    description="""
141Execute Python code in a secure sandbox.
142
143CAPABILITIES:
144- Standard library access
145- Basic computation and data processing
146- File I/O in /tmp only
147
148LIMITATIONS:
149- No network access
150- 30 second timeout
151- 256MB memory limit
152- No persistent storage
153
154Use for calculations, data processing, and testing code.
155""",
156    timeout=35.0,
157    tags=["execution"]
158)
159async def execute_python(code: str) -> dict:
160    """Execute Python code in sandbox."""
161    sandbox = DockerSandbox()
162    return sandbox.execute(code)

Complete Tool System

Here's a complete tool system bringing everything together:

🐍complete_tool_system.py
1from dataclasses import dataclass, field
2from typing import Any, Callable
3from pydantic import BaseModel, Field
4import asyncio
5import json
6import logging
7
8# Configure logging
9logging.basicConfig(level=logging.INFO)
10logger = logging.getLogger(__name__)
11
12
13class ToolSystem:
14    """Complete tool management system."""
15
16    def __init__(self):
17        self.registry = ToolRegistry()
18        self.executor = ToolExecutor(self.registry)
19        self.async_executor = AsyncToolExecutor(self.executor)
20
21        # Add logging hook
22        self.executor.add_hook(self._log_execution)
23
24    async def _log_execution(self, result: ExecutionResult):
25        """Log tool executions."""
26        logger.info(
27            f"Tool: {result.tool_name} | "
28            f"Status: {result.status.value} | "
29            f"Duration: {result.duration_ms:.2f}ms"
30        )
31        if result.status != ExecutionStatus.SUCCESS:
32            logger.error(f"Error: {result.error}")
33
34    def register(self, **kwargs):
35        """Decorator to register tools."""
36        return self.registry.register(**kwargs)
37
38    async def execute(
39        self,
40        tool_name: str,
41        arguments: dict,
42        context: dict | None = None
43    ) -> ExecutionResult:
44        """Execute a single tool."""
45        return await self.executor.execute(tool_name, arguments, context)
46
47    async def execute_calls(
48        self,
49        calls: list[ToolCall],
50        context: dict | None = None
51    ) -> dict[str, ExecutionResult]:
52        """Execute multiple tool calls."""
53        return await self.async_executor.execute_calls(calls, context)
54
55    def get_tools_for_api(self, provider: str = "anthropic") -> list[dict]:
56        """Get tools formatted for API."""
57        return self.registry.to_api_format(provider)
58
59    def format_results_for_llm(
60        self,
61        results: dict[str, ExecutionResult]
62    ) -> list[dict]:
63        """Format results as tool_result messages."""
64        formatted = []
65
66        for call_id, result in results.items():
67            formatted.append({
68                "tool_use_id": call_id,
69                "content": result.to_observation()
70            })
71
72        return formatted
73
74
75# Create system
76tools = ToolSystem()
77
78
79# Register tools with Pydantic models
80class ReadFileParams(BaseModel):
81    path: str = Field(description="File path to read")
82    encoding: str = Field(default="utf-8", description="File encoding")
83
84@tools.register(
85    name="read_file",
86    description="Read the contents of a file",
87    parameters=ReadFileParams,
88    tags=["filesystem"]
89)
90async def read_file(path: str, encoding: str = "utf-8") -> str:
91    """Read file contents."""
92    async with aiofiles.open(path, encoding=encoding) as f:
93        return await f.read()
94
95
96class WriteFileParams(BaseModel):
97    path: str = Field(description="File path to write")
98    content: str = Field(description="Content to write")
99    mode: str = Field(default="w", description="Write mode (w or a)")
100
101@tools.register(
102    name="write_file",
103    description="Write content to a file",
104    parameters=WriteFileParams,
105    requires_confirmation=True,
106    tags=["filesystem"]
107)
108async def write_file(path: str, content: str, mode: str = "w") -> str:
109    """Write to file."""
110    async with aiofiles.open(path, mode) as f:
111        await f.write(content)
112    return f"Successfully wrote {len(content)} bytes to {path}"
113
114
115class SearchParams(BaseModel):
116    query: str = Field(description="Search query")
117    max_results: int = Field(default=10, ge=1, le=50)
118
119@tools.register(
120    name="search_web",
121    description="Search the web for information",
122    parameters=SearchParams,
123    timeout=15.0,
124    tags=["search"]
125)
126async def search_web(query: str, max_results: int = 10) -> list[dict]:
127    """Search the web."""
128    # Implementation would use actual search API
129    return [{"title": "Result", "url": "https://...", "snippet": "..."}]
130
131
132# Full agent integration
133async def run_agent_step(
134    messages: list[dict],
135    tools_system: ToolSystem
136) -> tuple[str, list[dict]]:
137    """Run one step of the agent with tool handling."""
138
139    import anthropic
140    client = anthropic.Anthropic()
141
142    # Get tools for API
143    api_tools = tools_system.get_tools_for_api("anthropic")
144
145    # Call LLM
146    response = client.messages.create(
147        model="claude-sonnet-4-20250514",
148        max_tokens=4096,
149        tools=api_tools,
150        messages=messages
151    )
152
153    # Check for tool use
154    tool_calls = []
155    text_response = ""
156
157    for block in response.content:
158        if block.type == "tool_use":
159            tool_calls.append(ToolCall(
160                id=block.id,
161                name=block.name,
162                arguments=block.input
163            ))
164        elif block.type == "text":
165            text_response = block.text
166
167    # If tool calls, execute them
168    if tool_calls:
169        results = await tools_system.execute_calls(tool_calls)
170        formatted = tools_system.format_results_for_llm(results)
171
172        # Add to messages for next turn
173        new_messages = messages + [
174            {"role": "assistant", "content": response.content},
175            {"role": "user", "content": formatted}
176        ]
177
178        return "", new_messages
179
180    return text_response, messages
181
182
183# Example usage
184async def main():
185    messages = [
186        {"role": "user", "content": "Read the file config.yaml"}
187    ]
188
189    while True:
190        response, messages = await run_agent_step(messages, tools)
191
192        if response:
193            print(f"Agent: {response}")
194            break
195
196
197if __name__ == "__main__":
198    asyncio.run(main())

Production Considerations

In production, add rate limiting, cost tracking, audit logging, and metrics collection to your tool system.

Summary

Implementing tool execution:

  1. Tool Registry: Central management of tools with metadata
  2. Execution Engine: Handles validation, execution, error handling
  3. Async Execution: Parallel execution for multiple tool calls
  4. Sandboxing: Secure execution for code/dangerous operations
  5. Complete System: Integrates all components for agent use
  6. Observability: Logging, metrics, and hooks for monitoring
Next: Let's explore error handling and retry strategies for robust tool execution.