Introduction
Understanding how nodes, edges, and state interact is fundamental to building effective LangGraph applications. This section explores advanced patterns for each of these core components and how to combine them for sophisticated workflows.
Section Overview: We'll explore state schema design, node implementation patterns, edge routing strategies, and custom state reducers for complex state management.
State Design Patterns
State design is crucial for effective LangGraph applications. The state schema defines what information flows through your graph.
Basic State Patterns
🐍python
1from typing import TypedDict, Annotated, Optional, Literal
2from langgraph.graph import MessagesState
3import operator
4
5
6# Pattern 1: Simple flat state
7class SimpleState(TypedDict):
8 """Flat state for simple workflows."""
9 input: str
10 output: str
11 status: str
12
13
14# Pattern 2: Message-based state (common for chat agents)
15class ChatState(MessagesState):
16 """Extends MessagesState for chat applications."""
17 # MessagesState provides 'messages' list automatically
18 user_id: str
19 session_id: str
20
21
22# Pattern 3: Accumulating state with reducers
23class AccumulatingState(TypedDict):
24 """State that accumulates results."""
25 query: str
26 # operator.add means new values are appended to the list
27 findings: Annotated[list[str], operator.add]
28 sources: Annotated[list[dict], operator.add]
29 final_answer: str
30
31
32# Pattern 4: Nested state for complex data
33class NestedState(TypedDict):
34 """Hierarchical state structure."""
35 config: dict
36 context: dict
37 results: dict
38 metadata: dictState Validation
🐍python
1from pydantic import BaseModel, Field, field_validator
2from typing import List, Optional
3
4
5class ValidatedState(BaseModel):
6 """Pydantic model for state validation."""
7
8 query: str = Field(..., min_length=1, description="User query")
9 max_iterations: int = Field(default=5, ge=1, le=20)
10 results: List[str] = Field(default_factory=list)
11 confidence: Optional[float] = Field(default=None, ge=0.0, le=1.0)
12 status: str = Field(default="pending")
13
14 @field_validator("status")
15 def validate_status(cls, v):
16 allowed = ["pending", "processing", "complete", "error"]
17 if v not in allowed:
18 raise ValueError(f"Status must be one of [allowed]")
19 return v
20
21
22# Convert to TypedDict for LangGraph
23class WorkflowState(TypedDict):
24 query: str
25 max_iterations: int
26 results: Annotated[list[str], operator.add]
27 confidence: Optional[float]
28 status: str
29
30
31def validate_state(state: WorkflowState) -> WorkflowState:
32 """Validate state using Pydantic."""
33 validated = ValidatedState(**state)
34 return validated.model_dump()Node Implementation Patterns
Nodes can be implemented in various ways depending on your needs:
Function Nodes
🐍python
1from langchain_core.messages import HumanMessage, AIMessage
2from langchain_openai import ChatOpenAI
3
4
5llm = ChatOpenAI(model="gpt-4o")
6
7
8# Pattern 1: Simple function node
9def process_node(state: WorkflowState) -> dict:
10 """Simple processing node."""
11 query = state["query"]
12 processed = query.upper() # Simple transformation
13 return {"results": [f"Processed: [processed]"]}
14
15
16# Pattern 2: LLM-powered node
17def llm_node(state: WorkflowState) -> dict:
18 """Node that uses an LLM."""
19 messages = [HumanMessage(content=state["query"])]
20 response = llm.invoke(messages)
21 return {"results": [response.content]}
22
23
24# Pattern 3: Async node
25async def async_node(state: WorkflowState) -> dict:
26 """Async node for concurrent operations."""
27 import asyncio
28
29 async def fetch_data(query: str) -> str:
30 await asyncio.sleep(0.1) # Simulate async operation
31 return f"Data for: [query]"
32
33 result = await fetch_data(state["query"])
34 return {"results": [result]}
35
36
37# Pattern 4: Node with error handling
38def robust_node(state: WorkflowState) -> dict:
39 """Node with built-in error handling."""
40 try:
41 # Attempt operation
42 messages = [HumanMessage(content=state["query"])]
43 response = llm.invoke(messages)
44 return {
45 "results": [response.content],
46 "status": "complete",
47 "confidence": 0.95
48 }
49 except Exception as e:
50 return {
51 "results": [f"Error: [str(e)]"],
52 "status": "error",
53 "confidence": 0.0
54 }Class-Based Nodes
🐍python
1from abc import ABC, abstractmethod
2from dataclasses import dataclass
3
4
5class BaseNode(ABC):
6 """Abstract base class for nodes."""
7
8 @abstractmethod
9 def __call__(self, state: WorkflowState) -> dict:
10 pass
11
12
13@dataclass
14class ConfigurableNode(BaseNode):
15 """Node with configuration."""
16
17 model: str = "gpt-4o"
18 temperature: float = 0.0
19 max_tokens: int = 1000
20
21 def __post_init__(self):
22 self.llm = ChatOpenAI(
23 model=self.model,
24 temperature=self.temperature,
25 max_tokens=self.max_tokens
26 )
27
28 def __call__(self, state: WorkflowState) -> dict:
29 messages = [HumanMessage(content=state["query"])]
30 response = self.llm.invoke(messages)
31 return {"results": [response.content]}
32
33
34# Usage
35creative_node = ConfigurableNode(temperature=0.8)
36precise_node = ConfigurableNode(temperature=0.0)Edge Types and Routing
LangGraph supports several types of edges for different routing needs:
Edge Types
🐍python
1from langgraph.graph import StateGraph, START, END
2from typing import Literal
3
4
5graph = StateGraph(WorkflowState)
6
7# 1. Unconditional Edge: Always follow this path
8graph.add_edge("node_a", "node_b")
9
10# 2. Entry Edge: Define where execution starts
11graph.add_edge(START, "first_node")
12
13# 3. Exit Edge: Define where execution ends
14graph.add_edge("final_node", END)
15
16
17# 4. Conditional Edge: Route based on function result
18def route_by_status(state: WorkflowState) -> Literal["success", "error", "retry"]:
19 """Route based on current status."""
20 status = state.get("status", "pending")
21 confidence = state.get("confidence", 0.0)
22
23 if status == "error":
24 return "error"
25 elif confidence < 0.5:
26 return "retry"
27 else:
28 return "success"
29
30
31graph.add_conditional_edges(
32 "process",
33 route_by_status,
34 {
35 "success": "finalize",
36 "error": "error_handler",
37 "retry": "process" # Loop back
38 }
39)
40
41
42# 5. Multiple Source Edges: Fan-out
43graph.add_edge("start_parallel", "worker_1")
44graph.add_edge("start_parallel", "worker_2")
45graph.add_edge("start_parallel", "worker_3")
46
47
48# 6. Convergence: Multiple nodes to one
49graph.add_edge("worker_1", "aggregator")
50graph.add_edge("worker_2", "aggregator")
51graph.add_edge("worker_3", "aggregator")Advanced Routing Patterns
🐍python
1from typing import Sequence
2
3
4# Pattern: Multi-condition routing
5def complex_router(state: WorkflowState) -> str:
6 """Route based on multiple conditions."""
7 status = state.get("status", "pending")
8 confidence = state.get("confidence", 0.0)
9 iteration = len(state.get("results", []))
10
11 # Priority-based routing
12 if status == "error":
13 return "error_handler"
14 elif iteration >= state.get("max_iterations", 5):
15 return "force_complete"
16 elif confidence >= 0.9:
17 return "high_confidence_path"
18 elif confidence >= 0.5:
19 return "medium_confidence_path"
20 else:
21 return "low_confidence_path"
22
23
24# Pattern: Dynamic routing based on content
25def content_router(state: WorkflowState) -> str:
26 """Route based on query content."""
27 query = state["query"].lower()
28
29 if "code" in query or "programming" in query:
30 return "code_specialist"
31 elif "research" in query or "study" in query:
32 return "research_specialist"
33 elif "write" in query or "draft" in query:
34 return "writing_specialist"
35 else:
36 return "general_handler"
37
38
39# Pattern: Fallback routing
40def robust_router(state: WorkflowState) -> str:
41 """Router with fallback behavior."""
42 try:
43 # Attempt primary routing logic
44 result = primary_routing_logic(state)
45 return result
46 except Exception:
47 # Fall back to safe default
48 return "fallback_handler"
49
50
51def primary_routing_logic(state: WorkflowState) -> str:
52 return "success_path"State Reducers
Reducers control how node outputs are merged with existing state:
🐍python
1from typing import Annotated, Callable
2import operator
3
4
5# Built-in reducers
6
7# 1. Add (append to list)
8class ListState(TypedDict):
9 items: Annotated[list[str], operator.add]
10
11# Node returning {"items": ["new"]} appends to existing list
12
13
14# 2. Override (default behavior)
15class OverrideState(TypedDict):
16 value: str # No annotation = replace
17
18# Node returning {"value": "new"} replaces old value
19
20
21# Custom reducers
22
23def merge_dicts(existing: dict, new: dict) -> dict:
24 """Merge dictionaries, with new values taking precedence."""
25 result = existing.copy() if existing else {}
26 result.update(new)
27 return result
28
29
30def keep_best(existing: str, new: str) -> str:
31 """Keep the better result based on some criteria."""
32 if not existing:
33 return new
34 if not new:
35 return existing
36 # Example: keep longer response
37 return new if len(new) > len(existing) else existing
38
39
40def accumulate_with_limit(
41 existing: list,
42 new: list,
43 limit: int = 10
44) -> list:
45 """Accumulate items but keep only the most recent."""
46 combined = (existing or []) + (new or [])
47 return combined[-limit:]
48
49
50# Using custom reducers
51class CustomReducerState(TypedDict):
52 metadata: Annotated[dict, merge_dicts]
53 best_answer: Annotated[str, keep_best]
54 recent_items: Annotated[list, lambda e, n: accumulate_with_limit(e, n, 5)]Reducer Factories
🐍python
1from typing import TypeVar, Generic
2from dataclasses import dataclass
3
4
5T = TypeVar('T')
6
7
8def make_limit_reducer(limit: int):
9 """Factory for creating limited list reducers."""
10 def reducer(existing: list, new: list) -> list:
11 combined = (existing or []) + (new or [])
12 return combined[-limit:]
13 return reducer
14
15
16def make_priority_reducer(priority_fn: Callable):
17 """Factory for creating priority-based reducers."""
18 def reducer(existing, new):
19 if existing is None:
20 return new
21 if new is None:
22 return existing
23 return new if priority_fn(new) > priority_fn(existing) else existing
24 return reducer
25
26
27# Usage
28class FactoryState(TypedDict):
29 last_5_results: Annotated[list, make_limit_reducer(5)]
30 highest_score: Annotated[dict, make_priority_reducer(lambda x: x.get("score", 0))]Key Takeaways
- State design is foundational - Choose between flat, nested, or message-based state based on your use case.
- Annotated reducers control merging - Use operator.add for lists, custom functions for complex logic.
- Nodes can be functions or classes - Class-based nodes offer configuration and reusability.
- Conditional edges enable dynamic flow - Route based on state, content, or LLM decisions.
- Error handling belongs in nodes - Make nodes robust with try/except and status tracking.
Next Section Preview: We'll explore building cyclic graphs that can loop and iterate until conditions are met.