Chapter 15
15 min read
Section 96 of 175

Persistence and Checkpointing

LangGraph Deep Dive

Introduction

Persistence and checkpointing are crucial for building production-grade LangGraph applications. They enable workflow recovery, debugging, audit trails, and human-in-the-loop patterns that span multiple sessions.

Section Overview: We'll explore checkpointing mechanisms, different storage backends, state history navigation, and production-ready persistence patterns.

Checkpointing Basics

What Checkpointing Captures

🐍python
1from langgraph.graph import StateGraph, START, END
2from langgraph.checkpoint.memory import MemorySaver
3from typing import TypedDict, Annotated
4import operator
5
6
7class WorkflowState(TypedDict):
8    """State that gets checkpointed."""
9    messages: Annotated[list, operator.add]
10    step: int
11    data: dict
12
13
14def build_checkpointed_graph():
15    """Build a graph with checkpointing enabled."""
16
17    graph = StateGraph(WorkflowState)
18
19    def step_one(state: WorkflowState) -> dict:
20        return {"step": 1, "messages": ["Step 1 complete"]}
21
22    def step_two(state: WorkflowState) -> dict:
23        return {"step": 2, "messages": ["Step 2 complete"]}
24
25    def step_three(state: WorkflowState) -> dict:
26        return {"step": 3, "messages": ["Step 3 complete"]}
27
28    graph.add_node("step1", step_one)
29    graph.add_node("step2", step_two)
30    graph.add_node("step3", step_three)
31
32    graph.add_edge(START, "step1")
33    graph.add_edge("step1", "step2")
34    graph.add_edge("step2", "step3")
35    graph.add_edge("step3", END)
36
37    # Enable checkpointing with MemorySaver
38    checkpointer = MemorySaver()
39    return graph.compile(checkpointer=checkpointer)
40
41
42def demonstrate_checkpointing():
43    """Show checkpointing in action."""
44    app = build_checkpointed_graph()
45
46    # Thread ID identifies a unique execution
47    config = {"configurable": {"thread_id": "workflow-123"}}
48
49    # Initial execution
50    result = app.invoke({
51        "messages": [],
52        "step": 0,
53        "data": {"user": "alice"}
54    }, config)
55
56    print("Final result:", result)
57
58    # State is automatically checkpointed after each node
59    # Can retrieve the final state
60    state = app.get_state(config)
61    print("Checkpointed state:", state.values)
62    print("Config:", state.config)

Thread Management

🐍python
1def manage_threads():
2    """Managing multiple execution threads."""
3    app = build_checkpointed_graph()
4
5    # Each thread is independent
6    config_a = {"configurable": {"thread_id": "thread-a"}}
7    config_b = {"configurable": {"thread_id": "thread-b"}}
8
9    # Run thread A
10    result_a = app.invoke({
11        "messages": [],
12        "step": 0,
13        "data": {"user": "alice"}
14    }, config_a)
15
16    # Run thread B with different data
17    result_b = app.invoke({
18        "messages": [],
19        "step": 0,
20        "data": {"user": "bob"}
21    }, config_b)
22
23    # States are separate
24    state_a = app.get_state(config_a)
25    state_b = app.get_state(config_b)
26
27    print(f"Thread A user: [state_a.values['data']['user']]")  # alice
28    print(f"Thread B user: [state_b.values['data']['user']]")  # bob
29
30
31def resume_from_checkpoint():
32    """Resume a workflow from a checkpoint."""
33    app = build_checkpointed_graph()
34    config = {"configurable": {"thread_id": "resumable-workflow"}}
35
36    # Start workflow
37    app.invoke({
38        "messages": [],
39        "step": 0,
40        "data": {}
41    }, config)
42
43    # Later: resume from where we left off
44    # Pass None as input to continue from last checkpoint
45    result = app.invoke(None, config)
46    print("Resumed result:", result)

Checkpoint Backends

In-Memory (Development)

🐍python
1from langgraph.checkpoint.memory import MemorySaver
2
3
4def memory_backend():
5    """In-memory checkpointing for development."""
6
7    # Simple in-memory storage
8    memory = MemorySaver()
9
10    graph = StateGraph(WorkflowState)
11    # ... add nodes and edges ...
12
13    app = graph.compile(checkpointer=memory)
14
15    # Fast but not persistent across restarts
16    return app

SQLite (Local Persistence)

🐍python
1from langgraph.checkpoint.sqlite import SqliteSaver
2import sqlite3
3
4
5def sqlite_backend():
6    """SQLite checkpointing for local persistence."""
7
8    # Create database connection
9    conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
10
11    # SQLite saver
12    checkpointer = SqliteSaver(conn)
13
14    graph = StateGraph(WorkflowState)
15    # ... add nodes and edges ...
16
17    app = graph.compile(checkpointer=checkpointer)
18
19    # Persists across restarts
20    # Good for single-machine deployments
21    return app
22
23
24# Async SQLite
25async def async_sqlite_backend():
26    """Async SQLite for async applications."""
27    from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
28    import aiosqlite
29
30    async with aiosqlite.connect("checkpoints.db") as conn:
31        checkpointer = AsyncSqliteSaver(conn)
32
33        graph = StateGraph(WorkflowState)
34        # ... add nodes and edges ...
35
36        app = graph.compile(checkpointer=checkpointer)
37        return app

PostgreSQL (Production)

🐍python
1from langgraph.checkpoint.postgres import PostgresSaver
2from psycopg_pool import ConnectionPool
3
4
5def postgres_backend():
6    """PostgreSQL checkpointing for production."""
7
8    # Connection pool for PostgreSQL
9    pool = ConnectionPool(
10        conninfo="postgresql://user:pass@localhost:5432/langgraph",
11        min_size=5,
12        max_size=20
13    )
14
15    checkpointer = PostgresSaver(pool)
16
17    # Initialize tables (run once)
18    checkpointer.setup()
19
20    graph = StateGraph(WorkflowState)
21    # ... add nodes and edges ...
22
23    app = graph.compile(checkpointer=checkpointer)
24
25    return app
26
27
28# Async PostgreSQL
29async def async_postgres_backend():
30    """Async PostgreSQL for high-concurrency."""
31    from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
32    from psycopg_pool import AsyncConnectionPool
33
34    pool = AsyncConnectionPool(
35        conninfo="postgresql://user:pass@localhost:5432/langgraph",
36        min_size=5,
37        max_size=20
38    )
39
40    checkpointer = AsyncPostgresSaver(pool)
41    await checkpointer.setup()
42
43    graph = StateGraph(WorkflowState)
44    # ... add nodes and edges ...
45
46    app = graph.compile(checkpointer=checkpointer)
47    return app
BackendUse CasePersistenceScalability
MemorySaverDevelopment/testingNoneSingle process
SqliteSaverLocal apps, single-machineDiskSingle machine
PostgresSaverProduction, distributedDatabaseHorizontal

State History and Time Travel

🐍python
1def explore_state_history():
2    """Navigate through checkpoint history."""
3    app = build_checkpointed_graph()
4    config = {"configurable": {"thread_id": "history-demo"}}
5
6    # Execute workflow
7    app.invoke({
8        "messages": [],
9        "step": 0,
10        "data": {}
11    }, config)
12
13    # Get full history
14    history = list(app.get_state_history(config))
15
16    print(f"Total checkpoints: [len(history)]")
17
18    for i, snapshot in enumerate(history):
19        print(f"\nCheckpoint [i]:")
20        print(f"  Step: [snapshot.values.get('step')]")
21        print(f"  Messages: [snapshot.values.get('messages')]")
22        print(f"  Created: [snapshot.created_at]")
23        print(f"  Parent: [snapshot.parent_config]")
24
25
26def time_travel_to_checkpoint():
27    """Resume from a specific point in history."""
28    app = build_checkpointed_graph()
29    config = {"configurable": {"thread_id": "time-travel"}}
30
31    # Execute workflow
32    app.invoke({
33        "messages": [],
34        "step": 0,
35        "data": {}
36    }, config)
37
38    # Get history
39    history = list(app.get_state_history(config))
40
41    # Find checkpoint after step 1
42    step1_checkpoint = None
43    for snapshot in history:
44        if snapshot.values.get("step") == 1:
45            step1_checkpoint = snapshot
46            break
47
48    if step1_checkpoint:
49        # Resume from step 1
50        resumed_config = step1_checkpoint.config
51        result = app.invoke(None, resumed_config)
52        print("Resumed from step 1:", result)
53
54
55def branch_from_checkpoint():
56    """Create a new branch from a checkpoint."""
57    app = build_checkpointed_graph()
58    config = {"configurable": {"thread_id": "main-branch"}}
59
60    # Execute main workflow
61    app.invoke({
62        "messages": [],
63        "step": 0,
64        "data": {"branch": "main"}
65    }, config)
66
67    # Get checkpoint after step 1
68    history = list(app.get_state_history(config))
69    step1_checkpoint = next(
70        (s for s in history if s.values.get("step") == 1),
71        None
72    )
73
74    if step1_checkpoint:
75        # Create new branch from step 1
76        new_config = {
77            "configurable": {
78                "thread_id": "feature-branch",
79                "checkpoint_id": step1_checkpoint.config["configurable"]["checkpoint_id"]
80            }
81        }
82
83        # Modify state for new branch
84        app.update_state(new_config, {
85            "data": {"branch": "feature"}
86        })
87
88        # Continue on new branch
89        result = app.invoke(None, new_config)
90        print("Feature branch result:", result)

Production Persistence

🐍python
1from dataclasses import dataclass
2from typing import Optional
3import json
4import logging
5
6
7@dataclass
8class PersistenceConfig:
9    """Configuration for production persistence."""
10    backend: str = "postgres"
11    connection_string: str = ""
12    pool_min_size: int = 5
13    pool_max_size: int = 20
14    checkpoint_ttl_days: int = 30
15    enable_compression: bool = True
16
17
18def create_production_checkpointer(config: PersistenceConfig):
19    """Create a production-ready checkpointer."""
20
21    if config.backend == "postgres":
22        from langgraph.checkpoint.postgres import PostgresSaver
23        from psycopg_pool import ConnectionPool
24
25        pool = ConnectionPool(
26            conninfo=config.connection_string,
27            min_size=config.pool_min_size,
28            max_size=config.pool_max_size
29        )
30
31        checkpointer = PostgresSaver(pool)
32        checkpointer.setup()
33
34        return checkpointer
35
36    elif config.backend == "sqlite":
37        from langgraph.checkpoint.sqlite import SqliteSaver
38        import sqlite3
39
40        conn = sqlite3.connect(
41            config.connection_string,
42            check_same_thread=False
43        )
44        return SqliteSaver(conn)
45
46    else:
47        from langgraph.checkpoint.memory import MemorySaver
48        return MemorySaver()
49
50
51class ProductionWorkflow:
52    """Production-ready workflow with persistence."""
53
54    def __init__(self, config: PersistenceConfig):
55        self.config = config
56        self.checkpointer = create_production_checkpointer(config)
57        self.app = self._build_graph()
58        self.logger = logging.getLogger(__name__)
59
60    def _build_graph(self):
61        """Build the workflow graph."""
62        graph = StateGraph(WorkflowState)
63        # ... add nodes and edges ...
64        return graph.compile(checkpointer=self.checkpointer)
65
66    def execute(self, thread_id: str, initial_state: dict) -> dict:
67        """Execute workflow with error handling."""
68        config = {"configurable": {"thread_id": thread_id}}
69
70        try:
71            result = self.app.invoke(initial_state, config)
72            self.logger.info(f"Workflow [thread_id] completed successfully")
73            return result
74
75        except Exception as e:
76            self.logger.error(f"Workflow [thread_id] failed: [e]")
77            # State is still checkpointed up to failure point
78            state = self.app.get_state(config)
79            raise WorkflowError(thread_id, state, e)
80
81    def resume(self, thread_id: str, updates: Optional[dict] = None) -> dict:
82        """Resume a workflow, optionally with state updates."""
83        config = {"configurable": {"thread_id": thread_id}}
84
85        if updates:
86            self.app.update_state(config, updates)
87
88        return self.app.invoke(None, config)
89
90    def get_status(self, thread_id: str) -> dict:
91        """Get current workflow status."""
92        config = {"configurable": {"thread_id": thread_id}}
93        state = self.app.get_state(config)
94
95        return {
96            "thread_id": thread_id,
97            "values": state.values,
98            "next_nodes": state.next,
99            "created_at": state.created_at
100        }
101
102
103class WorkflowError(Exception):
104    """Custom error with state information."""
105
106    def __init__(self, thread_id: str, state, original_error: Exception):
107        self.thread_id = thread_id
108        self.state = state
109        self.original_error = original_error
110        super().__init__(f"Workflow [thread_id] failed at state: [state.values]")

Key Takeaways

  • Checkpointing captures state after each node, enabling recovery and human-in-the-loop patterns.
  • Thread IDs isolate executions - each thread has independent state and history.
  • Choose backend by use case - MemorySaver for dev, SQLite for local, PostgreSQL for production.
  • State history enables time travel - navigate, branch, and resume from any checkpoint.
  • Production deployments need connection pooling, error handling, and cleanup strategies.
Next Section Preview: We'll build a complete multi-agent system with LangGraph, combining everything we've learned.