Introduction
Persistence and checkpointing are crucial for building production-grade LangGraph applications. They enable workflow recovery, debugging, audit trails, and human-in-the-loop patterns that span multiple sessions.
Section Overview: We'll explore checkpointing mechanisms, different storage backends, state history navigation, and production-ready persistence patterns.
Checkpointing Basics
What Checkpointing Captures
🐍python
1from langgraph.graph import StateGraph, START, END
2from langgraph.checkpoint.memory import MemorySaver
3from typing import TypedDict, Annotated
4import operator
5
6
7class WorkflowState(TypedDict):
8 """State that gets checkpointed."""
9 messages: Annotated[list, operator.add]
10 step: int
11 data: dict
12
13
14def build_checkpointed_graph():
15 """Build a graph with checkpointing enabled."""
16
17 graph = StateGraph(WorkflowState)
18
19 def step_one(state: WorkflowState) -> dict:
20 return {"step": 1, "messages": ["Step 1 complete"]}
21
22 def step_two(state: WorkflowState) -> dict:
23 return {"step": 2, "messages": ["Step 2 complete"]}
24
25 def step_three(state: WorkflowState) -> dict:
26 return {"step": 3, "messages": ["Step 3 complete"]}
27
28 graph.add_node("step1", step_one)
29 graph.add_node("step2", step_two)
30 graph.add_node("step3", step_three)
31
32 graph.add_edge(START, "step1")
33 graph.add_edge("step1", "step2")
34 graph.add_edge("step2", "step3")
35 graph.add_edge("step3", END)
36
37 # Enable checkpointing with MemorySaver
38 checkpointer = MemorySaver()
39 return graph.compile(checkpointer=checkpointer)
40
41
42def demonstrate_checkpointing():
43 """Show checkpointing in action."""
44 app = build_checkpointed_graph()
45
46 # Thread ID identifies a unique execution
47 config = {"configurable": {"thread_id": "workflow-123"}}
48
49 # Initial execution
50 result = app.invoke({
51 "messages": [],
52 "step": 0,
53 "data": {"user": "alice"}
54 }, config)
55
56 print("Final result:", result)
57
58 # State is automatically checkpointed after each node
59 # Can retrieve the final state
60 state = app.get_state(config)
61 print("Checkpointed state:", state.values)
62 print("Config:", state.config)Thread Management
🐍python
1def manage_threads():
2 """Managing multiple execution threads."""
3 app = build_checkpointed_graph()
4
5 # Each thread is independent
6 config_a = {"configurable": {"thread_id": "thread-a"}}
7 config_b = {"configurable": {"thread_id": "thread-b"}}
8
9 # Run thread A
10 result_a = app.invoke({
11 "messages": [],
12 "step": 0,
13 "data": {"user": "alice"}
14 }, config_a)
15
16 # Run thread B with different data
17 result_b = app.invoke({
18 "messages": [],
19 "step": 0,
20 "data": {"user": "bob"}
21 }, config_b)
22
23 # States are separate
24 state_a = app.get_state(config_a)
25 state_b = app.get_state(config_b)
26
27 print(f"Thread A user: [state_a.values['data']['user']]") # alice
28 print(f"Thread B user: [state_b.values['data']['user']]") # bob
29
30
31def resume_from_checkpoint():
32 """Resume a workflow from a checkpoint."""
33 app = build_checkpointed_graph()
34 config = {"configurable": {"thread_id": "resumable-workflow"}}
35
36 # Start workflow
37 app.invoke({
38 "messages": [],
39 "step": 0,
40 "data": {}
41 }, config)
42
43 # Later: resume from where we left off
44 # Pass None as input to continue from last checkpoint
45 result = app.invoke(None, config)
46 print("Resumed result:", result)Checkpoint Backends
In-Memory (Development)
🐍python
1from langgraph.checkpoint.memory import MemorySaver
2
3
4def memory_backend():
5 """In-memory checkpointing for development."""
6
7 # Simple in-memory storage
8 memory = MemorySaver()
9
10 graph = StateGraph(WorkflowState)
11 # ... add nodes and edges ...
12
13 app = graph.compile(checkpointer=memory)
14
15 # Fast but not persistent across restarts
16 return appSQLite (Local Persistence)
🐍python
1from langgraph.checkpoint.sqlite import SqliteSaver
2import sqlite3
3
4
5def sqlite_backend():
6 """SQLite checkpointing for local persistence."""
7
8 # Create database connection
9 conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
10
11 # SQLite saver
12 checkpointer = SqliteSaver(conn)
13
14 graph = StateGraph(WorkflowState)
15 # ... add nodes and edges ...
16
17 app = graph.compile(checkpointer=checkpointer)
18
19 # Persists across restarts
20 # Good for single-machine deployments
21 return app
22
23
24# Async SQLite
25async def async_sqlite_backend():
26 """Async SQLite for async applications."""
27 from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
28 import aiosqlite
29
30 async with aiosqlite.connect("checkpoints.db") as conn:
31 checkpointer = AsyncSqliteSaver(conn)
32
33 graph = StateGraph(WorkflowState)
34 # ... add nodes and edges ...
35
36 app = graph.compile(checkpointer=checkpointer)
37 return appPostgreSQL (Production)
🐍python
1from langgraph.checkpoint.postgres import PostgresSaver
2from psycopg_pool import ConnectionPool
3
4
5def postgres_backend():
6 """PostgreSQL checkpointing for production."""
7
8 # Connection pool for PostgreSQL
9 pool = ConnectionPool(
10 conninfo="postgresql://user:pass@localhost:5432/langgraph",
11 min_size=5,
12 max_size=20
13 )
14
15 checkpointer = PostgresSaver(pool)
16
17 # Initialize tables (run once)
18 checkpointer.setup()
19
20 graph = StateGraph(WorkflowState)
21 # ... add nodes and edges ...
22
23 app = graph.compile(checkpointer=checkpointer)
24
25 return app
26
27
28# Async PostgreSQL
29async def async_postgres_backend():
30 """Async PostgreSQL for high-concurrency."""
31 from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
32 from psycopg_pool import AsyncConnectionPool
33
34 pool = AsyncConnectionPool(
35 conninfo="postgresql://user:pass@localhost:5432/langgraph",
36 min_size=5,
37 max_size=20
38 )
39
40 checkpointer = AsyncPostgresSaver(pool)
41 await checkpointer.setup()
42
43 graph = StateGraph(WorkflowState)
44 # ... add nodes and edges ...
45
46 app = graph.compile(checkpointer=checkpointer)
47 return app| Backend | Use Case | Persistence | Scalability |
|---|---|---|---|
| MemorySaver | Development/testing | None | Single process |
| SqliteSaver | Local apps, single-machine | Disk | Single machine |
| PostgresSaver | Production, distributed | Database | Horizontal |
State History and Time Travel
🐍python
1def explore_state_history():
2 """Navigate through checkpoint history."""
3 app = build_checkpointed_graph()
4 config = {"configurable": {"thread_id": "history-demo"}}
5
6 # Execute workflow
7 app.invoke({
8 "messages": [],
9 "step": 0,
10 "data": {}
11 }, config)
12
13 # Get full history
14 history = list(app.get_state_history(config))
15
16 print(f"Total checkpoints: [len(history)]")
17
18 for i, snapshot in enumerate(history):
19 print(f"\nCheckpoint [i]:")
20 print(f" Step: [snapshot.values.get('step')]")
21 print(f" Messages: [snapshot.values.get('messages')]")
22 print(f" Created: [snapshot.created_at]")
23 print(f" Parent: [snapshot.parent_config]")
24
25
26def time_travel_to_checkpoint():
27 """Resume from a specific point in history."""
28 app = build_checkpointed_graph()
29 config = {"configurable": {"thread_id": "time-travel"}}
30
31 # Execute workflow
32 app.invoke({
33 "messages": [],
34 "step": 0,
35 "data": {}
36 }, config)
37
38 # Get history
39 history = list(app.get_state_history(config))
40
41 # Find checkpoint after step 1
42 step1_checkpoint = None
43 for snapshot in history:
44 if snapshot.values.get("step") == 1:
45 step1_checkpoint = snapshot
46 break
47
48 if step1_checkpoint:
49 # Resume from step 1
50 resumed_config = step1_checkpoint.config
51 result = app.invoke(None, resumed_config)
52 print("Resumed from step 1:", result)
53
54
55def branch_from_checkpoint():
56 """Create a new branch from a checkpoint."""
57 app = build_checkpointed_graph()
58 config = {"configurable": {"thread_id": "main-branch"}}
59
60 # Execute main workflow
61 app.invoke({
62 "messages": [],
63 "step": 0,
64 "data": {"branch": "main"}
65 }, config)
66
67 # Get checkpoint after step 1
68 history = list(app.get_state_history(config))
69 step1_checkpoint = next(
70 (s for s in history if s.values.get("step") == 1),
71 None
72 )
73
74 if step1_checkpoint:
75 # Create new branch from step 1
76 new_config = {
77 "configurable": {
78 "thread_id": "feature-branch",
79 "checkpoint_id": step1_checkpoint.config["configurable"]["checkpoint_id"]
80 }
81 }
82
83 # Modify state for new branch
84 app.update_state(new_config, {
85 "data": {"branch": "feature"}
86 })
87
88 # Continue on new branch
89 result = app.invoke(None, new_config)
90 print("Feature branch result:", result)Production Persistence
🐍python
1from dataclasses import dataclass
2from typing import Optional
3import json
4import logging
5
6
7@dataclass
8class PersistenceConfig:
9 """Configuration for production persistence."""
10 backend: str = "postgres"
11 connection_string: str = ""
12 pool_min_size: int = 5
13 pool_max_size: int = 20
14 checkpoint_ttl_days: int = 30
15 enable_compression: bool = True
16
17
18def create_production_checkpointer(config: PersistenceConfig):
19 """Create a production-ready checkpointer."""
20
21 if config.backend == "postgres":
22 from langgraph.checkpoint.postgres import PostgresSaver
23 from psycopg_pool import ConnectionPool
24
25 pool = ConnectionPool(
26 conninfo=config.connection_string,
27 min_size=config.pool_min_size,
28 max_size=config.pool_max_size
29 )
30
31 checkpointer = PostgresSaver(pool)
32 checkpointer.setup()
33
34 return checkpointer
35
36 elif config.backend == "sqlite":
37 from langgraph.checkpoint.sqlite import SqliteSaver
38 import sqlite3
39
40 conn = sqlite3.connect(
41 config.connection_string,
42 check_same_thread=False
43 )
44 return SqliteSaver(conn)
45
46 else:
47 from langgraph.checkpoint.memory import MemorySaver
48 return MemorySaver()
49
50
51class ProductionWorkflow:
52 """Production-ready workflow with persistence."""
53
54 def __init__(self, config: PersistenceConfig):
55 self.config = config
56 self.checkpointer = create_production_checkpointer(config)
57 self.app = self._build_graph()
58 self.logger = logging.getLogger(__name__)
59
60 def _build_graph(self):
61 """Build the workflow graph."""
62 graph = StateGraph(WorkflowState)
63 # ... add nodes and edges ...
64 return graph.compile(checkpointer=self.checkpointer)
65
66 def execute(self, thread_id: str, initial_state: dict) -> dict:
67 """Execute workflow with error handling."""
68 config = {"configurable": {"thread_id": thread_id}}
69
70 try:
71 result = self.app.invoke(initial_state, config)
72 self.logger.info(f"Workflow [thread_id] completed successfully")
73 return result
74
75 except Exception as e:
76 self.logger.error(f"Workflow [thread_id] failed: [e]")
77 # State is still checkpointed up to failure point
78 state = self.app.get_state(config)
79 raise WorkflowError(thread_id, state, e)
80
81 def resume(self, thread_id: str, updates: Optional[dict] = None) -> dict:
82 """Resume a workflow, optionally with state updates."""
83 config = {"configurable": {"thread_id": thread_id}}
84
85 if updates:
86 self.app.update_state(config, updates)
87
88 return self.app.invoke(None, config)
89
90 def get_status(self, thread_id: str) -> dict:
91 """Get current workflow status."""
92 config = {"configurable": {"thread_id": thread_id}}
93 state = self.app.get_state(config)
94
95 return {
96 "thread_id": thread_id,
97 "values": state.values,
98 "next_nodes": state.next,
99 "created_at": state.created_at
100 }
101
102
103class WorkflowError(Exception):
104 """Custom error with state information."""
105
106 def __init__(self, thread_id: str, state, original_error: Exception):
107 self.thread_id = thread_id
108 self.state = state
109 self.original_error = original_error
110 super().__init__(f"Workflow [thread_id] failed at state: [state.values]")Key Takeaways
- Checkpointing captures state after each node, enabling recovery and human-in-the-loop patterns.
- Thread IDs isolate executions - each thread has independent state and history.
- Choose backend by use case - MemorySaver for dev, SQLite for local, PostgreSQL for production.
- State history enables time travel - navigate, branch, and resume from any checkpoint.
- Production deployments need connection pooling, error handling, and cleanup strategies.
Next Section Preview: We'll build a complete multi-agent system with LangGraph, combining everything we've learned.