LangGraph Reference
LangGraph is an open-source framework by LangChain for building multi-agent systems using state machines and directed graphs.
Overview​
LangGraph provides:
- Fine-grained control over agent workflows
- Persistent state management
- Conditional routing and branching
- Human-in-the-loop checkpoints
- Streaming and async support
Core Concepts​
State​
A typed dictionary representing the current application snapshot:
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
# Message history with automatic aggregation
messages: Annotated[list, add_messages]
# Request context
request_id: str
user_id: str
intent: str
# Intermediate results
cloud_resources: dict
deployment_status: str
# Control flow
next_agent: str
approval_pending: bool
Nodes​
Python functions that process state:
def cloud_agent_node(state: AgentState) -> dict:
"""Node that handles cloud provisioning"""
# Process the request
result = provision_infrastructure(state["messages"])
# Return state updates
return {
"cloud_resources": result,
"messages": [AIMessage(content=f"Provisioned: {result}")]
}
Edges​
Functions determining the next node:
def route_to_next(state: AgentState) -> str:
"""Determine next node based on state"""
if state.get("approval_pending"):
return "human_approval"
elif state.get("cloud_resources"):
return "cicd_agent"
else:
return END
Subgraphs​
Reusable, composable agent groups:
# Create a subgraph for cloud operations
cloud_subgraph = StateGraph(AgentState)
cloud_subgraph.add_node("provision_vpc", provision_vpc_node)
cloud_subgraph.add_node("provision_cluster", provision_cluster_node)
cloud_subgraph.add_edge("provision_vpc", "provision_cluster")
cloud_subgraph.add_edge("provision_cluster", END)
# Add subgraph to main graph
main_graph.add_node("cloud_operations", cloud_subgraph.compile())
Building a Graph​
Basic Structure​
from langgraph.graph import StateGraph, END
# 1. Define the graph
graph = StateGraph(AgentState)
# 2. Add nodes
graph.add_node("supervisor", supervisor_node)
graph.add_node("cloud_agent", cloud_agent_node)
graph.add_node("cicd_agent", cicd_agent_node)
# 3. Add edges
graph.add_edge("supervisor", "cloud_agent")
graph.add_conditional_edges(
"cloud_agent",
route_to_next,
{
"cicd_agent": "cicd_agent",
"end": END
}
)
# 4. Set entry point
graph.set_entry_point("supervisor")
# 5. Compile
app = graph.compile()
Conditional Routing​
def supervisor_router(state: AgentState) -> str:
"""Route based on intent analysis"""
intent = state.get("intent", "")
if "cloud" in intent or "infrastructure" in intent:
return "cloud_agent"
elif "deploy" in intent or "ci/cd" in intent:
return "cicd_agent"
elif "monitor" in intent:
return "monitoring_agent"
else:
return END
graph.add_conditional_edges(
"supervisor",
supervisor_router,
{
"cloud_agent": "cloud_agent",
"cicd_agent": "cicd_agent",
"monitoring_agent": "monitoring_agent",
END: END
}
)
Multi-Agent Patterns​
Pattern 1: Supervisor Architecture​
Central supervisor decides which agent to call:
from langgraph.prebuilt import create_react_agent
def supervisor_node(state: AgentState):
"""Supervisor analyzes and routes"""
response = llm.invoke([
SystemMessage(content="Analyze the request and decide which agent to use"),
*state["messages"]
])
# Extract routing decision
if "cloud" in response.content.lower():
return Command(goto="cloud_agent")
elif "deploy" in response.content.lower():
return Command(goto="cicd_agent")
else:
return Command(goto=END)
Pattern 2: Hierarchical Teams​
Multiple team supervisors under a main supervisor:
# Team 1: Infrastructure
infra_graph = StateGraph(AgentState)
infra_graph.add_node("aws_agent", aws_node)
infra_graph.add_node("gcp_agent", gcp_node)
infra_graph.add_node("infra_supervisor", infra_supervisor_node)
infra_team = infra_graph.compile()
# Team 2: DevOps
devops_graph = StateGraph(AgentState)
devops_graph.add_node("cicd_agent", cicd_node)
devops_graph.add_node("monitoring_agent", monitoring_node)
devops_graph.add_node("devops_supervisor", devops_supervisor_node)
devops_team = devops_graph.compile()
# Main graph with team subgraphs
main_graph = StateGraph(AgentState)
main_graph.add_node("main_supervisor", main_supervisor_node)
main_graph.add_node("infra_team", infra_team)
main_graph.add_node("devops_team", devops_team)
Pattern 3: Tool-Calling (ReAct)​
Agents exposed as tools:
from langgraph.prebuilt import create_react_agent
# Define agents as tools
@tool
def cloud_agent(request: str) -> str:
"""Handles cloud infrastructure requests"""
return process_cloud_request(request)
@tool
def cicd_agent(request: str) -> str:
"""Handles CI/CD pipeline requests"""
return process_cicd_request(request)
# Create ReAct agent with tools
tools = [cloud_agent, cicd_agent]
react_agent = create_react_agent(llm, tools)
Checkpointing​
Persist state for resumability:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
# In-memory (development)
memory = MemorySaver()
app = graph.compile(checkpointer=memory)
# PostgreSQL (production)
postgres = PostgresSaver.from_conn_string("postgresql://...")
app = graph.compile(checkpointer=postgres)
# Invoke with thread ID for persistence
config = {"configurable": {"thread_id": "user-123-session-456"}}
result = app.invoke(initial_state, config=config)
# Resume later with same thread ID
result = app.invoke(new_input, config=config)
Human-in-the-Loop​
Interrupt for human approval:
from langgraph.types import interrupt
def approval_node(state: AgentState):
"""Pause for human approval"""
plan = state.get("deployment_plan")
# Interrupt and wait for human input
human_response = interrupt({
"type": "approval_request",
"plan": plan,
"options": ["approve", "reject", "modify"]
})
if human_response["decision"] == "approve":
return {"approval_pending": False, "approved": True}
else:
return {"approval_pending": False, "approved": False}
# Configure interrupt points
app = graph.compile(interrupt_before=["approval_node"])
Streaming​
Stream events as they happen:
# Stream all events
async for event in app.astream(input_state, config=config):
print(event)
# Stream specific event types
async for event in app.astream_events(input_state, config=config):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")
elif event["event"] == "on_tool_end":
print(f"Tool result: {event['data']['output']}")
Error Handling​
def safe_agent_node(state: AgentState):
"""Node with error handling"""
try:
result = perform_operation(state)
return {"result": result, "error": None}
except RetryableError as e:
# Will be retried
raise
except FatalError as e:
# Store error and route to error handler
return {
"error": str(e),
"next_agent": "error_handler"
}
def error_handler_node(state: AgentState):
"""Handle errors gracefully"""
error = state.get("error")
return {
"messages": [AIMessage(content=f"Error occurred: {error}. Please try again.")]
}
State Schema Best Practices​
class ProductionState(TypedDict):
# Use Annotated for complex aggregations
messages: Annotated[list, add_messages]
# Keep state flat (avoid deep nesting)
request_id: str
user_id: str
# Use Optional for nullable fields
cloud_result: Optional[dict]
cicd_result: Optional[dict]
# Track execution for debugging
execution_path: list[str]
# Error tracking
errors: list[str]
# Control flow (explicit)
current_step: str
is_complete: bool
Visualization​
# Generate Mermaid diagram
print(app.get_graph().draw_mermaid())
# Save as PNG (requires graphviz)
app.get_graph().draw_png("workflow.png")
Best Practices​
- ✅ Keep state schemas simple and flat
- ✅ Use
StateGraphfor explicit control flow - ✅ Implement checkpoints for long workflows
- ✅ Define clear termination conditions
- ✅ Test all conditional branches
- ✅ Monitor state transitions for bottlenecks
- ✅ Use subgraphs for modularity
- ✅ Handle errors at each node