LangGraph Architecture Decisions
name: langgraph-architecture
by anderskev · published 2026-04-01
$ claw add gh:anderskev/anderskev-langgraph-architecture---
name: langgraph-architecture
description: Guides architectural decisions for LangGraph applications. Use when deciding between LangGraph vs alternatives, choosing state management strategies, designing multi-agent systems, or selecting persistence and streaming approaches.
---
# LangGraph Architecture Decisions
When to Use LangGraph
Use LangGraph When You Need:
Consider Alternatives When:
| Scenario | Alternative | Why |
|----------|-------------|-----|
| Single LLM call | Direct API call | Overhead not justified |
| Linear pipeline | LangChain LCEL | Simpler abstraction |
| Stateless tool use | Function calling | No persistence needed |
| Simple RAG | LangChain retrievers | Built-in patterns |
| Batch processing | Async tasks | Different execution model |
State Schema Decisions
TypedDict vs Pydantic
| TypedDict | Pydantic |
|-----------|----------|
| Lightweight, faster | Runtime validation |
| Dict-like access | Attribute access |
| No validation overhead | Type coercion |
| Simpler serialization | Complex nested models |
**Recommendation**: Use TypedDict for most cases. Use Pydantic when you need validation or complex nested structures.
Reducer Selection
| Use Case | Reducer | Example |
|----------|---------|---------|
| Chat messages | `add_messages` | Handles IDs, RemoveMessage |
| Simple append | `operator.add` | `Annotated[list, operator.add]` |
| Keep latest | None (LastValue) | `field: str` |
| Custom merge | Lambda | `Annotated[list, lambda a, b: ...]` |
| Overwrite list | `Overwrite` | Bypass reducer |
State Size Considerations
# SMALL STATE (< 1MB) - Put in state
class State(TypedDict):
messages: Annotated[list, add_messages]
context: str
# LARGE DATA - Use Store
class State(TypedDict):
messages: Annotated[list, add_messages]
document_ref: str # Reference to store
def node(state, *, store: BaseStore):
doc = store.get(namespace, state["document_ref"])
# Process without bloating checkpointsGraph Structure Decisions
Single Graph vs Subgraphs
**Single Graph** when:
**Subgraphs** when:
Conditional Edges vs Command
| Conditional Edges | Command |
|------------------|---------|
| Routing based on state | Routing + state update |
| Separate router function | Decision in node |
| Clearer visualization | More flexible |
| Standard patterns | Dynamic destinations |
# Conditional Edge - when routing is the focus
def router(state) -> Literal["a", "b"]:
return "a" if condition else "b"
builder.add_conditional_edges("node", router)
# Command - when combining routing with updates
def node(state) -> Command:
return Command(goto="next", update={"step": state["step"] + 1})Static vs Dynamic Routing
**Static Edges** (`add_edge`):
**Dynamic Routing** (`add_conditional_edges`, `Command`, `Send`):
Persistence Strategy
Checkpointer Selection
| Checkpointer | Use Case | Characteristics |
|--------------|----------|-----------------|
| `InMemorySaver` | Testing only | Lost on restart |
| `SqliteSaver` | Development | Single file, local |
| `PostgresSaver` | Production | Scalable, concurrent |
| Custom | Special needs | Implement BaseCheckpointSaver |
Checkpointing Scope
# Full persistence (default)
graph = builder.compile(checkpointer=checkpointer)
# Subgraph options
subgraph = sub_builder.compile(
checkpointer=None, # Inherit from parent
checkpointer=True, # Independent checkpointing
checkpointer=False, # No checkpointing (runs atomically)
)When to Disable Checkpointing
Multi-Agent Architecture
Supervisor Pattern
Best for:
┌─────────────┐
│ Supervisor │
└──────┬──────┘
┌────────┬───┴───┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Agent1│ │Agent2│ │Agent3│ │Agent4│
└──────┘ └──────┘ └──────┘ └──────┘Peer-to-Peer Pattern
Best for:
┌──────┐ ┌──────┐
│Agent1│◄───►│Agent2│
└──┬───┘ └───┬──┘
│ │
▼ ▼
┌──────┐ ┌──────┐
│Agent3│◄───►│Agent4│
└──────┘ └──────┘Handoff Pattern
Best for:
┌────────┐ ┌────────┐ ┌────────┐
│Research│───►│Planning│───►│Execute │
└────────┘ └────────┘ └────────┘Streaming Strategy
Stream Mode Selection
| Mode | Use Case | Data |
|------|----------|------|
| `updates` | UI updates | Node outputs only |
| `values` | State inspection | Full state each step |
| `messages` | Chat UX | LLM tokens |
| `custom` | Progress/logs | Your data via StreamWriter |
| `debug` | Debugging | Tasks + checkpoints |
Subgraph Streaming
# Stream from subgraphs
async for chunk in graph.astream(
input,
stream_mode="updates",
subgraphs=True # Include subgraph events
):
namespace, data = chunk # namespace indicates depthHuman-in-the-Loop Design
Interrupt Placement
| Strategy | Use Case |
|----------|----------|
| `interrupt_before` | Approval before action |
| `interrupt_after` | Review after completion |
| `interrupt()` in node | Dynamic, contextual pauses |
Resume Patterns
# Simple resume (same thread)
graph.invoke(None, config)
# Resume with value
graph.invoke(Command(resume="approved"), config)
# Resume specific interrupt
graph.invoke(Command(resume={interrupt_id: value}), config)
# Modify state and resume
graph.update_state(config, {"field": "new_value"})
graph.invoke(None, config)Error Handling Strategy
Retry Configuration
# Per-node retry
RetryPolicy(
initial_interval=0.5,
backoff_factor=2.0,
max_interval=60.0,
max_attempts=3,
retry_on=lambda e: isinstance(e, (APIError, TimeoutError))
)
# Multiple policies (first match wins)
builder.add_node("node", fn, retry_policy=[
RetryPolicy(retry_on=RateLimitError, max_attempts=5),
RetryPolicy(retry_on=Exception, max_attempts=2),
])Fallback Patterns
def node_with_fallback(state):
try:
return primary_operation(state)
except PrimaryError:
return fallback_operation(state)
# Or use conditional edges for complex fallback routing
def route_on_error(state) -> Literal["retry", "fallback", "__end__"]:
if state.get("error") and state["attempts"] < 3:
return "retry"
elif state.get("error"):
return "fallback"
return ENDScaling Considerations
Horizontal Scaling
Performance Optimization
1. **Minimize state size** - Use references for large data
2. **Parallel nodes** - Fan out when possible
3. **Cache expensive operations** - Use CachePolicy
4. **Async everywhere** - Use ainvoke, astream
Resource Limits
# Set recursion limit
config = {"recursion_limit": 50}
graph.invoke(input, config)
# Track remaining steps in state
class State(TypedDict):
remaining_steps: RemainingSteps
def check_budget(state):
if state["remaining_steps"] < 5:
return "wrap_up"
return "continue"Decision Checklist
Before implementing:
1. [ ] Is LangGraph the right tool? (vs simpler alternatives)
2. [ ] State schema defined with appropriate reducers?
3. [ ] Persistence strategy chosen? (dev vs prod checkpointer)
4. [ ] Streaming needs identified?
5. [ ] Human-in-the-loop points defined?
6. [ ] Error handling and retry strategy?
7. [ ] Multi-agent coordination pattern? (if applicable)
8. [ ] Resource limits configured?
More tools from the same signal band
Order food/drinks (点餐) on an Android device paired as an OpenClaw node. Uses in-app menu and cart; add goods, view cart, submit order (demo, no real payment).
Sign plugins, rotate agent credentials without losing identity, and publicly attest to plugin behavior with verifiable claims and authenticated transfers.
The philosophical layer for AI agents. Maps behavior to Spinoza's 48 affects, calculates persistence scores, and generates geometric self-reports. Give your...