Introduction
At the heart of every agent is a loop: receive input, think, act, observe, repeat. This core loop is deceptively simple in concept but requires careful implementation to be robust, efficient, and maintainable.
In this section, we'll build the agent core loop from scratch, understanding each component and the design decisions that make it production-ready.
Core Insight: The agent loop is like a heartbeatβit must be reliable, consistent, and handle exceptions gracefully. A well-designed loop makes everything else easier.
Anatomy of the Agent Loop
The agent loop consists of four key phases that repeat until the task is complete:
1βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2β AGENT LOOP β
3β β
4β ββββββββββββ ββββββββββββ ββββββββββββ βββββββββββ
5β β THINK βββββΆβ ACT βββββΆβ OBSERVE βββββΆβ UPDATE ββ
6β β β β β β β β ββ
7β β Analyze β β Execute β β Process β β State ββ
8β β Decide β β Tool β β Results β β Memory ββ
9β ββββββββββββ ββββββββββββ ββββββββββββ βββββββββββ
10β β² β β
11β ββββββββββββββββββββββββββββββββββββββββββββββββ β
12β (Continue until done) β
13βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββLoop Phases
| Phase | Purpose | Key Activities |
|---|---|---|
| Think | Analyze situation, decide action | Process context, reason about next step |
| Act | Execute the chosen action | Call tool, make API request, compute |
| Observe | Process action results | Parse response, handle errors |
| Update | Update state for next iteration | Add to history, update memory |
Basic Loop Structure
1from dataclasses import dataclass, field
2from typing import Any, Optional
3from enum import Enum
4
5class LoopStatus(Enum):
6 RUNNING = "running"
7 COMPLETED = "completed"
8 FAILED = "failed"
9 MAX_STEPS = "max_steps_reached"
10
11@dataclass
12class LoopState:
13 """Tracks the current state of the agent loop."""
14 step: int = 0
15 status: LoopStatus = LoopStatus.RUNNING
16 history: list[dict] = field(default_factory=list)
17 final_result: Optional[str] = None
18 error: Optional[str] = None
19
20class AgentLoop:
21 """The core agent execution loop."""
22
23 def __init__(self, max_steps: int = 20):
24 self.max_steps = max_steps
25
26 async def run(self, task: str) -> LoopState:
27 """Execute the agent loop until completion."""
28 state = LoopState()
29
30 while state.status == LoopStatus.RUNNING:
31 state.step += 1
32
33 # Check step limit
34 if state.step > self.max_steps:
35 state.status = LoopStatus.MAX_STEPS
36 break
37
38 try:
39 # THINK: Get next action from LLM
40 action = await self.think(task, state.history)
41
42 # Check for completion
43 if action.get("final_answer"):
44 state.final_result = action["final_answer"]
45 state.status = LoopStatus.COMPLETED
46 break
47
48 # ACT: Execute the action
49 observation = await self.act(action)
50
51 # OBSERVE & UPDATE: Record results
52 state.history.append({
53 "step": state.step,
54 "thought": action.get("thought"),
55 "action": action.get("action"),
56 "action_input": action.get("action_input"),
57 "observation": observation
58 })
59
60 except Exception as e:
61 state.error = str(e)
62 state.status = LoopStatus.FAILED
63
64 return state
65
66 async def think(self, task: str, history: list) -> dict:
67 """Determine the next action."""
68 raise NotImplementedError
69
70 async def act(self, action: dict) -> str:
71 """Execute an action."""
72 raise NotImplementedErrorMessage and State Management
The agent must maintain conversation state and format messages correctly for the LLM:
Message Types
1from dataclasses import dataclass
2from typing import Literal, Union
3
4@dataclass
5class SystemMessage:
6 """System instructions for the agent."""
7 content: str
8 role: Literal["system"] = "system"
9
10@dataclass
11class UserMessage:
12 """User input or task."""
13 content: str
14 role: Literal["user"] = "user"
15
16@dataclass
17class AssistantMessage:
18 """Agent's thoughts and actions."""
19 content: str
20 role: Literal["assistant"] = "assistant"
21
22@dataclass
23class ToolResultMessage:
24 """Result from tool execution."""
25 tool_use_id: str
26 content: str
27 role: Literal["user"] = "user"
28 is_tool_result: bool = True
29
30Message = Union[SystemMessage, UserMessage, AssistantMessage, ToolResultMessage]Conversation Builder
1class ConversationBuilder:
2 """Builds and manages the conversation for the LLM."""
3
4 def __init__(self, system_prompt: str):
5 self.system_prompt = system_prompt
6 self.messages: list[dict] = []
7
8 def add_user_message(self, content: str) -> None:
9 """Add a user message."""
10 self.messages.append({
11 "role": "user",
12 "content": content
13 })
14
15 def add_assistant_message(self, content: str) -> None:
16 """Add an assistant message."""
17 self.messages.append({
18 "role": "assistant",
19 "content": content
20 })
21
22 def add_tool_use(self, tool_use_id: str, tool_name: str, tool_input: dict) -> None:
23 """Add a tool use block."""
24 self.messages.append({
25 "role": "assistant",
26 "content": [{
27 "type": "tool_use",
28 "id": tool_use_id,
29 "name": tool_name,
30 "input": tool_input
31 }]
32 })
33
34 def add_tool_result(self, tool_use_id: str, result: str) -> None:
35 """Add a tool result."""
36 self.messages.append({
37 "role": "user",
38 "content": [{
39 "type": "tool_result",
40 "tool_use_id": tool_use_id,
41 "content": result
42 }]
43 })
44
45 def get_messages(self) -> list[dict]:
46 """Get all messages for API call."""
47 return self.messages
48
49 def get_system(self) -> str:
50 """Get system prompt."""
51 return self.system_prompt
52
53 def clear_history(self) -> None:
54 """Clear conversation history."""
55 self.messages = []
56
57 def truncate_to_token_limit(self, max_tokens: int) -> None:
58 """Truncate old messages to stay within token limit."""
59 # Simple strategy: keep system + last N messages
60 # More sophisticated: summarize old messages
61 if len(self.messages) > 20:
62 # Keep first message (task) and last 18
63 self.messages = [self.messages[0]] + self.messages[-18:]State Management
1from dataclasses import dataclass, field
2from typing import Any
3from datetime import datetime
4
5@dataclass
6class AgentState:
7 """Complete agent state."""
8
9 # Task context
10 task: str
11 started_at: datetime = field(default_factory=datetime.now)
12
13 # Loop state
14 current_step: int = 0
15 max_steps: int = 20
16 status: str = "running"
17
18 # Conversation
19 conversation: ConversationBuilder = None
20
21 # Results
22 tool_results: dict[str, Any] = field(default_factory=dict)
23 final_answer: Optional[str] = None
24 error: Optional[str] = None
25
26 # Metrics
27 total_tokens_used: int = 0
28 total_api_calls: int = 0
29
30 def __post_init__(self):
31 if self.conversation is None:
32 self.conversation = ConversationBuilder("")
33
34 def is_complete(self) -> bool:
35 """Check if agent has finished."""
36 return self.status in ["completed", "failed", "max_steps"]
37
38 def add_step(self) -> bool:
39 """Increment step counter, return True if within limit."""
40 self.current_step += 1
41 if self.current_step > self.max_steps:
42 self.status = "max_steps"
43 return False
44 return True
45
46 def complete(self, answer: str) -> None:
47 """Mark as completed with answer."""
48 self.final_answer = answer
49 self.status = "completed"
50
51 def fail(self, error: str) -> None:
52 """Mark as failed with error."""
53 self.error = error
54 self.status = "failed"LLM Integration
The LLM is the "brain" of the agent. Here's how to integrate it properly:
Anthropic Client Wrapper
1from anthropic import Anthropic
2from typing import Any
3import json
4
5class LLMClient:
6 """Wrapper for LLM API calls."""
7
8 def __init__(
9 self,
10 model: str = "claude-sonnet-4-20250514",
11 max_tokens: int = 4096
12 ):
13 self.client = Anthropic()
14 self.model = model
15 self.max_tokens = max_tokens
16
17 def create_completion(
18 self,
19 system: str,
20 messages: list[dict],
21 tools: list[dict] = None
22 ) -> dict:
23 """Make a completion request."""
24
25 kwargs = {
26 "model": self.model,
27 "max_tokens": self.max_tokens,
28 "system": system,
29 "messages": messages
30 }
31
32 if tools:
33 kwargs["tools"] = tools
34
35 response = self.client.messages.create(**kwargs)
36
37 return {
38 "content": response.content,
39 "stop_reason": response.stop_reason,
40 "usage": {
41 "input_tokens": response.usage.input_tokens,
42 "output_tokens": response.usage.output_tokens
43 }
44 }
45
46 def parse_response(self, response: dict) -> dict:
47 """Parse LLM response into structured format."""
48
49 result = {
50 "text": None,
51 "tool_use": None,
52 "stop_reason": response["stop_reason"]
53 }
54
55 for block in response["content"]:
56 if block.type == "text":
57 result["text"] = block.text
58 elif block.type == "tool_use":
59 result["tool_use"] = {
60 "id": block.id,
61 "name": block.name,
62 "input": block.input
63 }
64
65 return resultSystem Prompt Design
1def create_system_prompt(
2 agent_name: str,
3 tools: list[dict],
4 instructions: str = ""
5) -> str:
6 """Create a well-structured system prompt."""
7
8 tool_descriptions = "\n".join(
9 f"- {t['name']}: {t['description']}"
10 for t in tools
11 )
12
13 return f"""You are {agent_name}, an AI assistant that can use tools to help users.
14
15## Your Capabilities
16You have access to the following tools:
17{tool_descriptions}
18
19## How to Respond
20
21When you need to use a tool, use the tool_use format.
22When you have enough information to answer, provide your final response.
23
24## Guidelines
251. Think step by step before acting
262. Use tools when needed, don't guess at information you can look up
273. If a tool fails, try an alternative approach
284. Be concise but thorough in your final answers
29
30{instructions}
31
32Remember: You are helpful, accurate, and efficient."""Handling Tool Use
1class ToolUseHandler:
2 """Handle tool use in LLM responses."""
3
4 def __init__(self, tools: dict[str, callable]):
5 self.tools = tools
6
7 async def process_response(
8 self,
9 response: dict,
10 conversation: ConversationBuilder
11 ) -> tuple[bool, Optional[str]]:
12 """
13 Process LLM response.
14
15 Returns:
16 (is_complete, final_answer)
17 """
18
19 parsed = response
20
21 # Check for tool use
22 if parsed.get("tool_use"):
23 tool_call = parsed["tool_use"]
24
25 # Execute tool
26 result = await self.execute_tool(
27 tool_call["name"],
28 tool_call["input"]
29 )
30
31 # Add to conversation
32 conversation.add_tool_use(
33 tool_call["id"],
34 tool_call["name"],
35 tool_call["input"]
36 )
37 conversation.add_tool_result(tool_call["id"], result)
38
39 return False, None
40
41 # Check for final answer
42 if parsed.get("text") and response["stop_reason"] == "end_turn":
43 return True, parsed["text"]
44
45 return False, None
46
47 async def execute_tool(self, name: str, input_data: dict) -> str:
48 """Execute a tool and return result."""
49
50 if name not in self.tools:
51 return f"Error: Tool '{name}' not found"
52
53 try:
54 tool_fn = self.tools[name]
55 result = await tool_fn(**input_data)
56 return str(result)
57 except Exception as e:
58 return f"Error executing {name}: {str(e)}"Action Parsing and Dispatch
When using the native tool use API, parsing is handled automatically. But understanding the flow is still important:
Action Dispatcher
1from dataclasses import dataclass
2from typing import Any, Callable, Awaitable
3
4@dataclass
5class Action:
6 """Parsed action from LLM."""
7 type: str # "tool_use" or "final_answer"
8 name: Optional[str] = None
9 input: Optional[dict] = None
10 answer: Optional[str] = None
11
12class ActionDispatcher:
13 """Dispatch actions to appropriate handlers."""
14
15 def __init__(self):
16 self.tools: dict[str, Callable[..., Awaitable[Any]]] = {}
17 self.middleware: list[Callable] = []
18
19 def register_tool(self, name: str, fn: Callable) -> None:
20 """Register a tool function."""
21 self.tools[name] = fn
22
23 def add_middleware(self, fn: Callable) -> None:
24 """Add middleware for pre/post processing."""
25 self.middleware.append(fn)
26
27 async def dispatch(self, action: Action) -> str:
28 """Dispatch action and return result."""
29
30 if action.type == "final_answer":
31 return action.answer
32
33 if action.type == "tool_use":
34 # Pre-processing middleware
35 for mw in self.middleware:
36 action = await mw.pre_dispatch(action)
37
38 # Execute tool
39 if action.name not in self.tools:
40 return f"Unknown tool: {action.name}"
41
42 try:
43 tool_fn = self.tools[action.name]
44 result = await tool_fn(**(action.input or {}))
45
46 # Post-processing middleware
47 for mw in self.middleware:
48 result = await mw.post_dispatch(action, result)
49
50 return str(result)
51
52 except Exception as e:
53 return f"Tool error: {str(e)}"
54
55 return f"Unknown action type: {action.type}"Validation Middleware
1class ValidationMiddleware:
2 """Validate actions before execution."""
3
4 def __init__(self, tool_schemas: dict[str, dict]):
5 self.schemas = tool_schemas
6
7 async def pre_dispatch(self, action: Action) -> Action:
8 """Validate action before dispatch."""
9
10 if action.type != "tool_use":
11 return action
12
13 schema = self.schemas.get(action.name)
14 if not schema:
15 return action
16
17 # Validate required parameters
18 required = schema.get("required", [])
19 for param in required:
20 if param not in (action.input or {}):
21 raise ValueError(f"Missing required parameter: {param}")
22
23 # Type validation could go here
24 # ...
25
26 return action
27
28 async def post_dispatch(self, action: Action, result: Any) -> Any:
29 """Process result after dispatch."""
30 return result
31
32
33class RateLimitMiddleware:
34 """Rate limit tool calls."""
35
36 def __init__(self, calls_per_minute: int = 60):
37 self.calls_per_minute = calls_per_minute
38 self.call_times: list[float] = []
39
40 async def pre_dispatch(self, action: Action) -> Action:
41 """Check rate limit."""
42 import time
43
44 now = time.time()
45 # Remove calls older than 1 minute
46 self.call_times = [t for t in self.call_times if now - t < 60]
47
48 if len(self.call_times) >= self.calls_per_minute:
49 wait_time = 60 - (now - self.call_times[0])
50 raise Exception(f"Rate limited. Wait {wait_time:.1f}s")
51
52 self.call_times.append(now)
53 return action
54
55 async def post_dispatch(self, action: Action, result: Any) -> Any:
56 return resultComplete Implementation
Here's the complete agent loop implementation:
1from anthropic import Anthropic
2from dataclasses import dataclass, field
3from typing import Any, Callable, Optional
4from datetime import datetime
5import asyncio
6
7@dataclass
8class AgentConfig:
9 """Configuration for the agent."""
10 name: str = "Assistant"
11 model: str = "claude-sonnet-4-20250514"
12 max_steps: int = 20
13 max_tokens: int = 4096
14 system_instructions: str = ""
15
16class Agent:
17 """
18 Complete agent implementation with core loop.
19 """
20
21 def __init__(
22 self,
23 config: AgentConfig = None,
24 tools: list[dict] = None
25 ):
26 self.config = config or AgentConfig()
27 self.client = Anthropic()
28 self.tools_list = tools or []
29 self.tools: dict[str, Callable] = {}
30
31 def register_tool(self, name: str, fn: Callable, schema: dict) -> None:
32 """Register a tool with the agent."""
33 self.tools[name] = fn
34 self.tools_list.append({
35 "name": name,
36 **schema
37 })
38
39 async def run(self, task: str) -> dict:
40 """
41 Run the agent on a task.
42
43 Returns:
44 dict with result, steps, and metadata
45 """
46
47 # Initialize state
48 state = AgentState(
49 task=task,
50 max_steps=self.config.max_steps
51 )
52
53 # Build system prompt
54 system = create_system_prompt(
55 self.config.name,
56 self.tools_list,
57 self.config.system_instructions
58 )
59
60 # Initialize conversation
61 state.conversation = ConversationBuilder(system)
62 state.conversation.add_user_message(task)
63
64 # Main loop
65 while not state.is_complete():
66 # Check step limit
67 if not state.add_step():
68 break
69
70 try:
71 # Call LLM
72 response = self.client.messages.create(
73 model=self.config.model,
74 max_tokens=self.config.max_tokens,
75 system=system,
76 messages=state.conversation.get_messages(),
77 tools=self.tools_list if self.tools_list else None
78 )
79
80 # Track usage
81 state.total_tokens_used += (
82 response.usage.input_tokens +
83 response.usage.output_tokens
84 )
85 state.total_api_calls += 1
86
87 # Process response
88 await self._process_response(response, state)
89
90 except Exception as e:
91 state.fail(str(e))
92
93 return self._create_result(state)
94
95 async def _process_response(
96 self,
97 response,
98 state: AgentState
99 ) -> None:
100 """Process LLM response."""
101
102 # Check for tool use
103 tool_use_block = None
104 text_block = None
105
106 for block in response.content:
107 if block.type == "tool_use":
108 tool_use_block = block
109 elif block.type == "text":
110 text_block = block
111
112 if tool_use_block:
113 # Execute tool
114 result = await self._execute_tool(
115 tool_use_block.name,
116 tool_use_block.input
117 )
118
119 # Add to conversation
120 state.conversation.add_tool_use(
121 tool_use_block.id,
122 tool_use_block.name,
123 tool_use_block.input
124 )
125 state.conversation.add_tool_result(
126 tool_use_block.id,
127 result
128 )
129
130 # Store result
131 state.tool_results[tool_use_block.id] = result
132
133 elif response.stop_reason == "end_turn" and text_block:
134 # Final answer
135 state.complete(text_block.text)
136
137 async def _execute_tool(self, name: str, input_data: dict) -> str:
138 """Execute a tool."""
139
140 if name not in self.tools:
141 return f"Error: Tool '{name}' not found"
142
143 try:
144 fn = self.tools[name]
145 if asyncio.iscoroutinefunction(fn):
146 result = await fn(**input_data)
147 else:
148 result = fn(**input_data)
149 return str(result)
150 except Exception as e:
151 return f"Error: {str(e)}"
152
153 def _create_result(self, state: AgentState) -> dict:
154 """Create final result from state."""
155
156 return {
157 "success": state.status == "completed",
158 "answer": state.final_answer,
159 "error": state.error,
160 "steps": state.current_step,
161 "status": state.status,
162 "tool_results": state.tool_results,
163 "tokens_used": state.total_tokens_used,
164 "api_calls": state.total_api_calls,
165 "duration_seconds": (
166 datetime.now() - state.started_at
167 ).total_seconds()
168 }Usage Example
1import asyncio
2
3# Define tools
4def calculate(expression: str) -> float:
5 """Safely evaluate a math expression."""
6 # In production, use a proper math parser
7 allowed = set("0123456789+-*/(). ")
8 if all(c in allowed for c in expression):
9 return eval(expression)
10 raise ValueError("Invalid expression")
11
12def get_weather(city: str) -> str:
13 """Get weather for a city (mock)."""
14 return f"Weather in {city}: Sunny, 72Β°F"
15
16# Create agent
17agent = Agent(
18 config=AgentConfig(
19 name="Helper",
20 max_steps=10
21 )
22)
23
24# Register tools
25agent.register_tool(
26 "calculate",
27 calculate,
28 {
29 "description": "Evaluate a mathematical expression",
30 "input_schema": {
31 "type": "object",
32 "properties": {
33 "expression": {
34 "type": "string",
35 "description": "Math expression like '2 + 2' or '(10 * 5) / 2'"
36 }
37 },
38 "required": ["expression"]
39 }
40 }
41)
42
43agent.register_tool(
44 "get_weather",
45 get_weather,
46 {
47 "description": "Get current weather for a city",
48 "input_schema": {
49 "type": "object",
50 "properties": {
51 "city": {
52 "type": "string",
53 "description": "City name"
54 }
55 },
56 "required": ["city"]
57 }
58 }
59)
60
61# Run
62async def main():
63 result = await agent.run(
64 "What is 15% of 200, and what's the weather in Paris?"
65 )
66
67 print(f"Success: {result['success']}")
68 print(f"Answer: {result['answer']}")
69 print(f"Steps: {result['steps']}")
70 print(f"Tokens: {result['tokens_used']}")
71
72asyncio.run(main())Summary
The agent core loop is the foundation of every agent. We covered:
- Loop anatomy: Think β Act β Observe β Update cycle
- Message management: Building and maintaining conversation state
- LLM integration: Proper API usage with tool support
- Action dispatch: Parsing responses and executing tools with middleware
- Complete implementation: A working Agent class ready for extension
In the next section, we'll focus on adding toolsβgiving our agent the capabilities to interact with the world.