flowchart TD
START((Start)) --> B[classify_intent]
B -->|question/feature| C[search_documentation]
B -->|bug| D[track_bug]
B -->|other| E[draft_response]
C --> E
D --> E
E --> F{High Urgency<br/>or Complex?}
F -->|Yes| G[human_review]
F -->|No| H((End / Send))
G --> H
Thinking in LangGraph
Imagine you need to automate a customer support email inbox. The system should:
- Read incoming emails
- Classify them by urgency and topic
- Search relevant documentation to answer questions
- Draft appropriate responses
- Escalate complex or urgent issues to a human for review before sending
Here is the complete implementation of this agent using the LangGraph Functional API.
The Agent Implementation
import uuid
from typing import Literal, TypedDict
from langchain_openai import ChatOpenAI
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import InMemorySaver
llm = ChatOpenAI(model="gpt-4o-mini")
# 1. Data Structures
class EmailClassification(TypedDict):
intent: Literal["question", "bug", "billing", "feature", "complex"]
urgency: Literal["low", "medium", "high", "critical"]
topic: str
summary: str
# 2. Define discrete tasks
@task
def classify_intent(email_content: str, sender_email: str) -> EmailClassification:
"""Use an LLM to categorize urgency and topic."""
structured_llm = llm.with_structured_output(EmailClassification)
prompt = f"""
Analyze this customer email and classify it:
Email: {email_content}
From: {sender_email}
"""
return structured_llm.invoke(prompt)
@task
def search_documentation(classification: EmailClassification) -> list[str]:
"""Search knowledge base for relevant information."""
query = f"{classification.get('intent')} {classification.get('topic')}"
# Dummy search results for demonstration
return ["Reset password via Settings > Security > Change Password"]
@task
def track_bug(classification: EmailClassification) -> str:
"""Create a ticket in the bug tracking system."""
ticket_id = "BUG-12345"
return f"Bug ticket {ticket_id} created"
@task
def draft_response(email_content: str, classification: EmailClassification, context: list[str]) -> str:
"""Generate response using context."""
formatted_context = "\n".join(f"- {c}" for c in context)
prompt = f"""
Draft a response to this customer email:
{email_content}
Email intent: {classification.get('intent')}
Urgency level: {classification.get('urgency')}
Context:
{formatted_context}
Guidelines:
- Be professional and helpful
- Address their specific concern
"""
return llm.invoke(prompt).content
@task
def human_review(draft: str, classification: EmailClassification):
"""Pause for human review and approval."""
decision = interrupt({
"action": "Please review and approve/edit this response",
"draft_response": draft,
"urgency": classification.get('urgency'),
"intent": classification.get('intent'),
})
return decision
# 3. Wire it all together with an entrypoint
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def email_workflow(inputs: dict) -> dict:
"""Main workflow routing logic."""
email_content = inputs["email_content"]
sender_email = inputs["sender_email"]
# Classify the email
classification = classify_intent(email_content, sender_email).result()
# Gather context based on classification
context = []
if classification['intent'] in ['question', 'feature']:
context = search_documentation(classification).result()
elif classification['intent'] == 'bug':
context = [track_bug(classification).result()]
# Draft the response
draft = draft_response(email_content, classification, context).result()
# Determine if human review is needed
needs_review = classification['urgency'] in ['high', 'critical'] or classification['intent'] == 'complex'
if needs_review:
decision = human_review(draft, classification).result()
if decision.get("approved"):
draft = decision.get("edited_response", draft)
else:
return {"status": "escalated_to_human", "final_response": None}
# Send email (mocked)
print(f"Sending reply: {draft[:50]}...")
return {"status": "sent", "final_response": draft}How It Works
This implementation demonstrates several core concepts of the LangGraph Functional API. Here is the visual flow of how the code executes:
Breaking work into discrete steps (@task)
We wrapped distinct actions (like classify_intent and search_documentation) with the @task decorator.
Why? Tasks are independently observable and resilient. Decorating a function as a @task means LangGraph automatically tracks its execution, allowing you to easily add retry policies for transient network errors, or cache expensive LLM calls. If part of the workflow fails or gets interrupted, LangGraph will use the checkpointer to remember the results of previously completed tasks so they are not re-run.
Defining the control flow (@entrypoint)
The @entrypoint acts as the orchestrator of your tasks.
Why? Instead of manually defining graph edges and nodes (as in older state-machine APIs), the Functional API lets you write standard Python control logic (if/elif/else). This makes routing decisions explicit and intuitive. The checkpointer=InMemorySaver() argument automatically persisting the state of the entrypoint and its tasks.
Wait for results before proceeding (.result())
Inside the entrypoint, we call .result() after executing each task (e.g., classify_intent(...).result()).
Why? Tasks run asynchronously by default to support parallel execution (e.g., running multiple searches at once). Calling .result() blocks the workflow until that specific task is complete, ensuring the downstream functions have the necessary data to proceed.
Keeping humans in the loop (interrupt)
The human_review task uses the interrupt() function to pause the workflow.
Why? When the code hits interrupt(), the entrypoint completely halts execution and saves everything to the checkpointer. It will pause indefinitely until a human provides the needed input. Once the input is supplied via a resume command, the workflow picks up exactly where it left off, avoiding the need to re-classify the email or regenerate the draft.
Running the Workflow
Let’s see the agent in action with an urgent scenario:
# 1. Initialize the run
initial_state = {
"email_content": "I was charged twice for my subscription! This is urgent!",
"sender_email": "customer@example.com"
}
# 2. Run with a thread_id for persistence
config = {"configurable": {"thread_id": "customer_123"}}
# 3. The graph will stop at the interrupt()
result = email_workflow.invoke(initial_state, config)
print(f"Paused for human review: {result}")
# 4. Provide the human's decision using Command
from langgraph.types import Command
human_response = Command(
resume={
"approved": True,
"edited_response": "We sincerely apologize for the double charge. I've initiated an immediate refund."
}
)
# 5. Resume execution with the same config
final_result = email_workflow.invoke(human_response, config)
print(f"Workflow completed: {final_result}")Notice how we reuse the same thread_id in the config.
Why? The thread_id links the separate invocations together. The checkpointer uses it to look up the paused execution state. Without it, the workflow wouldn’t know which paused execution to resume.
Handle errors appropriately
Different errors need different handling strategies:
| Error Type | Who Fixes It | Strategy | When to Use |
|---|---|---|---|
| Transient errors (network issues, rate limits) | System (automatic) | RetryPolicy() |
Temporary failures that usually resolve on retry |
| LLM-recoverable errors (tool failures, parsing issues) | LLM | Store error in state and loop back | LLM can see the error and adjust its approach |
| User-fixable errors (missing information, unclear instructions) | Human | Pause with interrupt() |
Need user input to proceed |
| Unexpected errors | Developer | Let them bubble up | Unknown issues that need debugging |
Transient errors
Add a retry policy to automatically retry network issues and rate limits. The @task decorator makes this trivial to configure in the functional API:
from langgraph.func import task
from langgraph.types import RetryPolicy
@task(retry=RetryPolicy(max_attempts=3, initial_interval=1.0))
def search_documentation(classification: dict) -> list[str]:
# Network calls to knowledge base
return search_api(classification)LLM-recoverable
Store the error in state and loop back so the LLM can see what went wrong and try again. The Functional API lets you handle this using standard Python try/except blocks inside your execution loop:
from langgraph.func import entrypoint
@entrypoint()
def agent_workflow(messages: list) -> list:
for _ in range(5):
response = llm.invoke(messages)
messages.append(response)
if not response.tool_calls:
return messages
try:
# Attempt to execute the tool
result = run_tool(response.tool_calls[0])
messages.append({"role": "tool", "content": result})
except ToolError as e:
# Let the LLM see what went wrong and try again
messages.append({"role": "tool", "content": f"Tool error: {str(e)}"})
return messagesUser-fixable
Pause and collect information from the user when needed (like account IDs, order numbers, or clarifications). Because the Functional API pauses execution in-place, you don’t need any complex state management to resume:
from langgraph.func import task
from langgraph.types import interrupt
@task
def lookup_customer_history(customer_id: str | None) -> dict:
if not customer_id:
user_input = interrupt({
"message": "Customer ID needed",
"request": "Please provide the customer's account ID to look up their subscription history"
})
# The workflow resumes right here when the user provides input
customer_id = user_input['customer_id']
# Now proceed with the lookup
return fetch_customer_history(customer_id)Unexpected
Let them bubble up for debugging. Don’t catch what you can’t handle:
from langgraph.func import task
@task
def send_reply(draft_response: str) -> None:
try:
email_service.send(draft_response)
except Exception:
raise # Surface unexpected errors