LangGraph integration
Temporal's integration with LangGraph runs your LangGraph nodes and tasks as Temporal Activities, giving your AI agent workflows durable execution, automatic retries, and timeouts.
The plugin supports both the LangGraph Graph API (StateGraph with nodes and edges) and the Functional API
(@entrypoint / @task decorators). Each graph node and task must specify whether it runs as a Temporal Activity or
directly inside the Workflow — Activity nodes get configurable timeouts and retry policies, while Workflow nodes run
inline and must be deterministic.
The Temporal Python SDK integration with LangGraph is currently at an experimental release stage. The API may change in future versions.
Code snippets in this guide are taken from the LangGraph plugin samples. Refer to the samples for the complete code.
Prerequisites
- This guide assumes you are already familiar with LangGraph. If you aren't, refer to the LangGraph documentation for more details.
- If you are new to Temporal, we recommend reading Understanding Temporal or taking the Temporal 101 course.
- Ensure you have set up your local development environment by following the Set up your local development environment guide. When you're done, leave the Temporal Development Server running if you want to test your code locally.
Install the plugin
Install the Temporal Python SDK with LangGraph support:
uv add "temporalio[langgraph]"
or with pip:
pip install "temporalio[langgraph]"
Graph API
The Graph API uses StateGraph to define nodes and edges declaratively.
Define a graph and Workflow
Build a StateGraph, then retrieve it inside your Workflow with the graph() helper:
from datetime import timedelta
from langgraph.graph import START, StateGraph
from temporalio import workflow
from temporalio.contrib.langgraph import graph
async def process_query(query: str) -> str:
"""Process a query and return a response."""
return f"Processed: {query}"
def build_graph() -> StateGraph:
"""Construct a single-node graph."""
g = StateGraph(str)
g.add_node(
"process_query",
process_query,
metadata={
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=10),
},
)
g.add_edge(START, "process_query")
return g
@workflow.defn
class HelloWorldWorkflow:
@workflow.run
async def run(self, query: str) -> str:
return await graph("hello-world").compile().ainvoke(query)
Configure the Worker
Create a LangGraphPlugin with your graphs and pass it to the Worker:
import asyncio
from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker
async def main() -> None:
client = await Client.connect("localhost:7233")
plugin = LangGraphPlugin(graphs={"hello-world": build_graph()})
worker = Worker(
client,
task_queue="langgraph-hello-world",
workflows=[HelloWorldWorkflow],
plugins=[plugin],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Set Activity options
Pass Activity options as node metadata when calling add_node. Every node must include "execute_in" set to either
"activity" or "workflow"; the plugin raises an error if it's missing.
from datetime import timedelta
from temporalio.common import RetryPolicy
g = StateGraph(str)
g.add_node(
"my_node",
my_node,
metadata={
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=30),
"retry_policy": RetryPolicy(maximum_attempts=3),
},
)
Don't pass a LangGraph retry_policy= to add_node (or @task(retry_policy=...)) — the plugin raises an error if you
do. Use Temporal's RetryPolicy via the node's metadata (Graph API) or activity_options (Functional API) instead.
Functional API
The Functional API uses @entrypoint and @task decorators, which let you express agent loops with native Python
control flow (while, if/else, for).
Define tasks and a Workflow
from datetime import timedelta
from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint
@task
def agent_think(query: str, history: list[str]) -> dict:
"""Decide the next action based on query and tool history."""
tool_results = [h for h in history if h.startswith("[Tool]")]
if len(tool_results) < 2:
return {"action": "tool", "tool_name": "search", "tool_input": query}
return {"action": "final", "answer": f"Found: {'; '.join(tool_results)}"}
@task
def execute_tool(tool_name: str, tool_input: str) -> str:
"""Execute a tool by name."""
return f"[Tool] Result for {tool_name}({tool_input})"
@lg_entrypoint()
async def react_agent(query: str) -> dict:
"""ReAct agent loop: think -> act -> observe -> repeat."""
history: list[str] = []
while True:
decision = await agent_think(query, history)
if decision["action"] == "final":
return {"answer": decision["answer"], "steps": len(history)}
result = await execute_tool(decision["tool_name"], decision["tool_input"])
history.append(result)
all_tasks = [agent_think, execute_tool]
activity_options = {
t.func.__name__: {
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=30),
}
for t in all_tasks
}
@workflow.defn
class ReactAgentWorkflow:
@workflow.run
async def run(self, query: str) -> dict:
return await entrypoint("react-agent").ainvoke(query)
Configure the Worker with the Functional API
from temporalio.contrib.langgraph import LangGraphPlugin
plugin = LangGraphPlugin(
entrypoints={"react-agent": react_agent},
tasks=all_tasks,
activity_options=activity_options,
)
worker = Worker(
client,
task_queue="langgraph-react-agent",
workflows=[ReactAgentWorkflow],
plugins=[plugin],
)
Checkpointer
If your LangGraph code requires a checkpointer (for example, if you're using interrupts), use InMemorySaver. Temporal
handles durability, so third-party checkpointers (like PostgreSQL or Redis) are not needed.
import langgraph.checkpoint.memory
g = graph("my-graph").compile(
checkpointer=langgraph.checkpoint.memory.InMemorySaver(),
)
Runtime context
LangGraph's run-scoped context (context_schema) is reconstructed on the Activity side, so nodes and tasks can read
from runtime.context:
from langgraph.runtime import Runtime
from typing_extensions import TypedDict
from temporalio.contrib.langgraph import graph
class Context(TypedDict):
user_id: str
async def my_node(state: State, runtime: Runtime[Context]) -> dict:
return {"user": runtime.context["user_id"]}
# In the Workflow:
g = graph("my-graph").compile()
await g.ainvoke({...}, context=Context(user_id="alice"))
Your context object must be serializable by the configured Temporal payload converter, since it crosses the Activity
boundary.
Stores are not supported
LangGraph's Store (for example, InMemoryStore passed via graph.compile(store=...) or @entrypoint(store=...))
isn't accessible inside Activity-wrapped nodes: the Store holds live state that can't cross the Activity boundary, and
Activities may run on a different worker than the Workflow. If you pass a store, the plugin logs a warning on first use
and runtime.store is None inside nodes.
Use Workflow state for per-run memory, or an external database (Postgres, Redis, etc.) configured on each worker if you need shared memory across runs.
Activity vs Workflow execution
Every graph node and @task must specify execute_in — set it to "activity" to run as a Temporal Activity, or
"workflow" to run directly inside the Workflow. The plugin raises an error if you forget to set it.
execute_in must be set per node or task; it cannot be set in default_activity_options.
Understanding when to use each mode is important for correctness and durability.
When to use an Activity
Use execute_in: "activity" when a node does any of the following:
- Makes network calls — LLM calls, HTTP requests, database queries, or any I/O. Activities can do I/O; Workflows cannot.
- Has non-deterministic behavior — anything that can return different results on re-execution (random numbers, current time, external data). Workflows must be deterministic.
- Is long-running or may fail — Activities get configurable timeouts, automatic retries, and heartbeating. If an LLM call times out or a service is unavailable, Temporal retries the Activity without re-running the entire Workflow.
- Calls
interrupt()— LangGraph'sinterrupt()is supported in Activity nodes. The plugin serializes the interrupt and propagates it back to the Workflow for human-in-the-loop patterns.
When to run in the Workflow
Use execute_in: "workflow" when a node:
- Orchestrates other graphs — a node that calls
graph("child").compile().ainvoke(state)to dispatch to a subgraph. The subgraph's own nodes still run as Activities, but the orchestration logic runs in the Workflow. - Performs pure state transformations — deterministic data reshaping, merging, or filtering with no I/O.
- Is a lightweight routing step — when a node's only job is to decide what happens next and you want to avoid the overhead of an Activity round-trip.
Workflow code must be deterministic. A node running in
the Workflow must not make network calls, use random, read the system clock, or do file I/O. Violating this causes
non-determinism errors on replay.
Decision tree: should this node be an Activity?
Does this node make network calls (LLM, HTTP, DB)?
├── Yes → execute_in: "activity"
└── No
Does this node have non-deterministic behavior?
├── Yes → execute_in: "activity"
└── No
Does this node call interrupt()?
├── Yes → execute_in: "activity"
└── No
Is this node orchestrating a subgraph or doing a pure state transform?
├── Yes → execute_in: "workflow"
└── No → execute_in: "activity" (the safe choice)
When in doubt, choose "activity". The Activity overhead is small, and it gives you retries, timeouts, and
correctness guarantees.
Where LangGraph primitives run
Not all LangGraph primitives are node functions. Some run in the Workflow context regardless of the execute_in setting:
| Primitive | Runs in | Notes |
|---|---|---|
| Node functions | Activity or Workflow | Controlled by execute_in in node metadata (required) |
@task functions | Activity or Workflow | Controlled by execute_in in activity_options (required) |
Conditional edge functions (add_conditional_edges) | Workflow | Always runs in the Workflow. Must be deterministic and async (sync functions trigger run_in_executor, which is not allowed in the Temporal sandbox). |
interrupt() | Activity | Call interrupt() inside Activity nodes. The plugin serializes the interrupt and propagates it to the Workflow. |
Command(resume=...) | Workflow | Used from Workflow code to resume after an interrupt. |
InMemorySaver checkpointer | Workflow | Runs in-process. Temporal handles durability — external checkpointers are not needed. |
Conditional edge functions like should_continue must be async def, not plain def. Synchronous functions cause
LangGraph to use run_in_executor, which is not supported inside Temporal's Workflow sandbox.
# ✅ Correct: async conditional edge function
async def should_continue(state: AgentState) -> str:
if state["messages"][-1].startswith("[Agent]") and "Calling" in state["messages"][-1]:
return "tools"
return END
g.add_conditional_edges("agent", should_continue)
Syntax
# Graph API
g.add_node("my_node", my_node, metadata={"execute_in": "workflow"})
# Functional API
plugin = LangGraphPlugin(
tasks=[my_task],
activity_options={"my_task": {"execute_in": "workflow"}},
)
Example: subgraph orchestration
A common pattern is a parent node that runs in the Workflow and dispatches to a child graph whose nodes run as Activities:
async def parent_node(state: State) -> dict[str, str]:
return await graph("child").compile().ainvoke(state)
parent = StateGraph(State)
parent.add_node("parent_node", parent_node, metadata={"execute_in": "workflow"})
parent.add_edge(START, "parent_node")
plugin = LangGraphPlugin(graphs={"parent": parent, "child": child})
Human-in-the-loop
LangGraph's interrupt() works with Temporal signals and queries to support human-in-the-loop patterns:
- A graph node calls
interrupt(draft), pausing execution. - The Workflow exposes the pending draft via a Temporal query.
- An external process (UI, CLI) queries the draft and sends approval via a Temporal signal.
- The graph resumes —
interrupt()returns the signal value and the node completes.
See the human-in-the-loop samples for complete working examples using both Graph and Functional APIs.
Static analysis for Workflow nodes
The Temporal sandbox catches non-deterministic code at runtime, but you can catch issues earlier with the
lint_langgraph_workflows.py
static checker.
The linter parses your Python source files, finds LangGraph nodes configured with execute_in="workflow", and flags
non-deterministic code inside them — mirroring the Temporal sandbox restrictions at lint time rather than runtime.
python lint_langgraph_workflows.py my_workflow.py
Example output:
my_workflow.py:17:8: in 'routing_node' — datetime.datetime.now() — use workflow.now() instead
my_workflow.py:18:11: in 'routing_node' — random.choice() — use workflow.random() instead
my_workflow.py:19:4: in 'routing_node' — print() — use workflow.logger instead
3 warning(s) found in workflow nodes.
The linter checks for:
- Time/date calls —
datetime.now(),time.time(),time.sleep() - Randomness —
random.randint(),uuid.uuid4(),secrets.* - Network I/O —
requests,httpx,aiohttp,urllib,http.client - File I/O —
open(),pathlib.Path.read_text(),shutil,glob - OS/subprocess —
os.environ,subprocess.run(),os.system() - Other —
print(),multiprocessing,socket
Activity nodes are not checked — they can safely use all of these.
Samples
The LangGraph plugin samples demonstrate all supported patterns across both APIs:
| Sample | Graph API | Functional API | Description |
|---|---|---|---|
| Hello World | Yes | Yes | Simplest possible single-node graph |
| Human-in-the-loop | Yes | Yes | interrupt() with Temporal signals and queries |
| ReAct Agent | Yes | Yes | Tool-calling agent loop |
| Control Flow | — | Yes | Parallel execution, loops, and branching |
To run any sample:
# Terminal 1: Start Temporal
temporal server start-dev
# Terminal 2: Start the worker
uv run langgraph_plugin/<api>/<sample>/run_worker.py
# Terminal 3: Run the workflow
uv run langgraph_plugin/<api>/<sample>/run_workflow.py