Skip to main content

LangGraph Reference

LangGraph is an open-source framework by LangChain for building multi-agent systems using state machines and directed graphs.


Overview​

LangGraph provides:

  • Fine-grained control over agent workflows
  • Persistent state management
  • Conditional routing and branching
  • Human-in-the-loop checkpoints
  • Streaming and async support

Core Concepts​

State​

A typed dictionary representing the current application snapshot:

from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
# Message history with automatic aggregation
messages: Annotated[list, add_messages]

# Request context
request_id: str
user_id: str
intent: str

# Intermediate results
cloud_resources: dict
deployment_status: str

# Control flow
next_agent: str
approval_pending: bool

Nodes​

Python functions that process state:

def cloud_agent_node(state: AgentState) -> dict:
"""Node that handles cloud provisioning"""
# Process the request
result = provision_infrastructure(state["messages"])

# Return state updates
return {
"cloud_resources": result,
"messages": [AIMessage(content=f"Provisioned: {result}")]
}

Edges​

Functions determining the next node:

def route_to_next(state: AgentState) -> str:
"""Determine next node based on state"""
if state.get("approval_pending"):
return "human_approval"
elif state.get("cloud_resources"):
return "cicd_agent"
else:
return END

Subgraphs​

Reusable, composable agent groups:

# Create a subgraph for cloud operations
cloud_subgraph = StateGraph(AgentState)
cloud_subgraph.add_node("provision_vpc", provision_vpc_node)
cloud_subgraph.add_node("provision_cluster", provision_cluster_node)
cloud_subgraph.add_edge("provision_vpc", "provision_cluster")
cloud_subgraph.add_edge("provision_cluster", END)

# Add subgraph to main graph
main_graph.add_node("cloud_operations", cloud_subgraph.compile())

Building a Graph​

Basic Structure​

from langgraph.graph import StateGraph, END

# 1. Define the graph
graph = StateGraph(AgentState)

# 2. Add nodes
graph.add_node("supervisor", supervisor_node)
graph.add_node("cloud_agent", cloud_agent_node)
graph.add_node("cicd_agent", cicd_agent_node)

# 3. Add edges
graph.add_edge("supervisor", "cloud_agent")
graph.add_conditional_edges(
"cloud_agent",
route_to_next,
{
"cicd_agent": "cicd_agent",
"end": END
}
)

# 4. Set entry point
graph.set_entry_point("supervisor")

# 5. Compile
app = graph.compile()

Conditional Routing​

def supervisor_router(state: AgentState) -> str:
"""Route based on intent analysis"""
intent = state.get("intent", "")

if "cloud" in intent or "infrastructure" in intent:
return "cloud_agent"
elif "deploy" in intent or "ci/cd" in intent:
return "cicd_agent"
elif "monitor" in intent:
return "monitoring_agent"
else:
return END

graph.add_conditional_edges(
"supervisor",
supervisor_router,
{
"cloud_agent": "cloud_agent",
"cicd_agent": "cicd_agent",
"monitoring_agent": "monitoring_agent",
END: END
}
)

Multi-Agent Patterns​

Pattern 1: Supervisor Architecture​

Central supervisor decides which agent to call:

from langgraph.prebuilt import create_react_agent

def supervisor_node(state: AgentState):
"""Supervisor analyzes and routes"""
response = llm.invoke([
SystemMessage(content="Analyze the request and decide which agent to use"),
*state["messages"]
])

# Extract routing decision
if "cloud" in response.content.lower():
return Command(goto="cloud_agent")
elif "deploy" in response.content.lower():
return Command(goto="cicd_agent")
else:
return Command(goto=END)

Pattern 2: Hierarchical Teams​

Multiple team supervisors under a main supervisor:

# Team 1: Infrastructure
infra_graph = StateGraph(AgentState)
infra_graph.add_node("aws_agent", aws_node)
infra_graph.add_node("gcp_agent", gcp_node)
infra_graph.add_node("infra_supervisor", infra_supervisor_node)
infra_team = infra_graph.compile()

# Team 2: DevOps
devops_graph = StateGraph(AgentState)
devops_graph.add_node("cicd_agent", cicd_node)
devops_graph.add_node("monitoring_agent", monitoring_node)
devops_graph.add_node("devops_supervisor", devops_supervisor_node)
devops_team = devops_graph.compile()

# Main graph with team subgraphs
main_graph = StateGraph(AgentState)
main_graph.add_node("main_supervisor", main_supervisor_node)
main_graph.add_node("infra_team", infra_team)
main_graph.add_node("devops_team", devops_team)

Pattern 3: Tool-Calling (ReAct)​

Agents exposed as tools:

from langgraph.prebuilt import create_react_agent

# Define agents as tools
@tool
def cloud_agent(request: str) -> str:
"""Handles cloud infrastructure requests"""
return process_cloud_request(request)

@tool
def cicd_agent(request: str) -> str:
"""Handles CI/CD pipeline requests"""
return process_cicd_request(request)

# Create ReAct agent with tools
tools = [cloud_agent, cicd_agent]
react_agent = create_react_agent(llm, tools)

Checkpointing​

Persist state for resumability:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver

# In-memory (development)
memory = MemorySaver()
app = graph.compile(checkpointer=memory)

# PostgreSQL (production)
postgres = PostgresSaver.from_conn_string("postgresql://...")
app = graph.compile(checkpointer=postgres)

# Invoke with thread ID for persistence
config = {"configurable": {"thread_id": "user-123-session-456"}}
result = app.invoke(initial_state, config=config)

# Resume later with same thread ID
result = app.invoke(new_input, config=config)

Human-in-the-Loop​

Interrupt for human approval:

from langgraph.types import interrupt

def approval_node(state: AgentState):
"""Pause for human approval"""
plan = state.get("deployment_plan")

# Interrupt and wait for human input
human_response = interrupt({
"type": "approval_request",
"plan": plan,
"options": ["approve", "reject", "modify"]
})

if human_response["decision"] == "approve":
return {"approval_pending": False, "approved": True}
else:
return {"approval_pending": False, "approved": False}

# Configure interrupt points
app = graph.compile(interrupt_before=["approval_node"])

Streaming​

Stream events as they happen:

# Stream all events
async for event in app.astream(input_state, config=config):
print(event)

# Stream specific event types
async for event in app.astream_events(input_state, config=config):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")
elif event["event"] == "on_tool_end":
print(f"Tool result: {event['data']['output']}")

Error Handling​

def safe_agent_node(state: AgentState):
"""Node with error handling"""
try:
result = perform_operation(state)
return {"result": result, "error": None}
except RetryableError as e:
# Will be retried
raise
except FatalError as e:
# Store error and route to error handler
return {
"error": str(e),
"next_agent": "error_handler"
}

def error_handler_node(state: AgentState):
"""Handle errors gracefully"""
error = state.get("error")
return {
"messages": [AIMessage(content=f"Error occurred: {error}. Please try again.")]
}

State Schema Best Practices​

class ProductionState(TypedDict):
# Use Annotated for complex aggregations
messages: Annotated[list, add_messages]

# Keep state flat (avoid deep nesting)
request_id: str
user_id: str

# Use Optional for nullable fields
cloud_result: Optional[dict]
cicd_result: Optional[dict]

# Track execution for debugging
execution_path: list[str]

# Error tracking
errors: list[str]

# Control flow (explicit)
current_step: str
is_complete: bool

Visualization​

# Generate Mermaid diagram
print(app.get_graph().draw_mermaid())

# Save as PNG (requires graphviz)
app.get_graph().draw_png("workflow.png")

Best Practices​

  • ✅ Keep state schemas simple and flat
  • ✅ Use StateGraph for explicit control flow
  • ✅ Implement checkpoints for long workflows
  • ✅ Define clear termination conditions
  • ✅ Test all conditional branches
  • ✅ Monitor state transitions for bottlenecks
  • ✅ Use subgraphs for modularity
  • ✅ Handle errors at each node

External Resources​