Skip to main content

LangGraph integration

Temporal's integration with LangGraph runs your LangGraph nodes and tasks as Temporal Activities, giving your AI agent workflows durable execution, automatic retries, and timeouts.

The plugin supports both the LangGraph Graph API (StateGraph with nodes and edges) and the Functional API (@entrypoint / @task decorators). Each graph node and task must specify whether it runs as a Temporal Activity or directly inside the Workflow — Activity nodes get configurable timeouts and retry policies, while Workflow nodes run inline and must be deterministic.

info

The Temporal Python SDK integration with LangGraph is currently at an experimental release stage. The API may change in future versions.

Code snippets in this guide are taken from the LangGraph plugin samples. Refer to the samples for the complete code.

Prerequisites

Install the plugin

Install the Temporal Python SDK with LangGraph support:

uv add "temporalio[langgraph]"

or with pip:

pip install "temporalio[langgraph]"

Graph API

The Graph API uses StateGraph to define nodes and edges declaratively.

Define a graph and Workflow

Build a StateGraph, then retrieve it inside your Workflow with the graph() helper:

from datetime import timedelta

from langgraph.graph import START, StateGraph
from temporalio import workflow
from temporalio.contrib.langgraph import graph


async def process_query(query: str) -> str:
"""Process a query and return a response."""
return f"Processed: {query}"


def build_graph() -> StateGraph:
"""Construct a single-node graph."""
g = StateGraph(str)
g.add_node(
"process_query",
process_query,
metadata={
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=10),
},
)
g.add_edge(START, "process_query")
return g


@workflow.defn
class HelloWorldWorkflow:
@workflow.run
async def run(self, query: str) -> str:
return await graph("hello-world").compile().ainvoke(query)

Configure the Worker

Create a LangGraphPlugin with your graphs and pass it to the Worker:

import asyncio

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker


async def main() -> None:
client = await Client.connect("localhost:7233")
plugin = LangGraphPlugin(graphs={"hello-world": build_graph()})

worker = Worker(
client,
task_queue="langgraph-hello-world",
workflows=[HelloWorldWorkflow],
plugins=[plugin],
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

Set Activity options

Pass Activity options as node metadata when calling add_node. Every node must include "execute_in" set to either "activity" or "workflow"; the plugin raises an error if it's missing.

from datetime import timedelta
from temporalio.common import RetryPolicy

g = StateGraph(str)
g.add_node(
"my_node",
my_node,
metadata={
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=30),
"retry_policy": RetryPolicy(maximum_attempts=3),
},
)
danger

Don't pass a LangGraph retry_policy= to add_node (or @task(retry_policy=...)) — the plugin raises an error if you do. Use Temporal's RetryPolicy via the node's metadata (Graph API) or activity_options (Functional API) instead.

Functional API

The Functional API uses @entrypoint and @task decorators, which let you express agent loops with native Python control flow (while, if/else, for).

Define tasks and a Workflow

from datetime import timedelta

from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint


@task
def agent_think(query: str, history: list[str]) -> dict:
"""Decide the next action based on query and tool history."""
tool_results = [h for h in history if h.startswith("[Tool]")]
if len(tool_results) < 2:
return {"action": "tool", "tool_name": "search", "tool_input": query}
return {"action": "final", "answer": f"Found: {'; '.join(tool_results)}"}


@task
def execute_tool(tool_name: str, tool_input: str) -> str:
"""Execute a tool by name."""
return f"[Tool] Result for {tool_name}({tool_input})"


@lg_entrypoint()
async def react_agent(query: str) -> dict:
"""ReAct agent loop: think -> act -> observe -> repeat."""
history: list[str] = []
while True:
decision = await agent_think(query, history)
if decision["action"] == "final":
return {"answer": decision["answer"], "steps": len(history)}
result = await execute_tool(decision["tool_name"], decision["tool_input"])
history.append(result)


all_tasks = [agent_think, execute_tool]

activity_options = {
t.func.__name__: {
"execute_in": "activity",
"start_to_close_timeout": timedelta(seconds=30),
}
for t in all_tasks
}


@workflow.defn
class ReactAgentWorkflow:
@workflow.run
async def run(self, query: str) -> dict:
return await entrypoint("react-agent").ainvoke(query)

Configure the Worker with the Functional API

from temporalio.contrib.langgraph import LangGraphPlugin

plugin = LangGraphPlugin(
entrypoints={"react-agent": react_agent},
tasks=all_tasks,
activity_options=activity_options,
)

worker = Worker(
client,
task_queue="langgraph-react-agent",
workflows=[ReactAgentWorkflow],
plugins=[plugin],
)

Checkpointer

If your LangGraph code requires a checkpointer (for example, if you're using interrupts), use InMemorySaver. Temporal handles durability, so third-party checkpointers (like PostgreSQL or Redis) are not needed.

import langgraph.checkpoint.memory

g = graph("my-graph").compile(
checkpointer=langgraph.checkpoint.memory.InMemorySaver(),
)

Runtime context

LangGraph's run-scoped context (context_schema) is reconstructed on the Activity side, so nodes and tasks can read from runtime.context:

from langgraph.runtime import Runtime
from typing_extensions import TypedDict

from temporalio.contrib.langgraph import graph


class Context(TypedDict):
user_id: str


async def my_node(state: State, runtime: Runtime[Context]) -> dict:
return {"user": runtime.context["user_id"]}


# In the Workflow:
g = graph("my-graph").compile()
await g.ainvoke({...}, context=Context(user_id="alice"))

Your context object must be serializable by the configured Temporal payload converter, since it crosses the Activity boundary.

Stores are not supported

LangGraph's Store (for example, InMemoryStore passed via graph.compile(store=...) or @entrypoint(store=...)) isn't accessible inside Activity-wrapped nodes: the Store holds live state that can't cross the Activity boundary, and Activities may run on a different worker than the Workflow. If you pass a store, the plugin logs a warning on first use and runtime.store is None inside nodes.

Use Workflow state for per-run memory, or an external database (Postgres, Redis, etc.) configured on each worker if you need shared memory across runs.

Activity vs Workflow execution

Every graph node and @task must specify execute_in — set it to "activity" to run as a Temporal Activity, or "workflow" to run directly inside the Workflow. The plugin raises an error if you forget to set it.

execute_in must be set per node or task; it cannot be set in default_activity_options.

Understanding when to use each mode is important for correctness and durability.

When to use an Activity

Use execute_in: "activity" when a node does any of the following:

  • Makes network calls — LLM calls, HTTP requests, database queries, or any I/O. Activities can do I/O; Workflows cannot.
  • Has non-deterministic behavior — anything that can return different results on re-execution (random numbers, current time, external data). Workflows must be deterministic.
  • Is long-running or may fail — Activities get configurable timeouts, automatic retries, and heartbeating. If an LLM call times out or a service is unavailable, Temporal retries the Activity without re-running the entire Workflow.
  • Calls interrupt() — LangGraph's interrupt() is supported in Activity nodes. The plugin serializes the interrupt and propagates it back to the Workflow for human-in-the-loop patterns.

When to run in the Workflow

Use execute_in: "workflow" when a node:

  • Orchestrates other graphs — a node that calls graph("child").compile().ainvoke(state) to dispatch to a subgraph. The subgraph's own nodes still run as Activities, but the orchestration logic runs in the Workflow.
  • Performs pure state transformations — deterministic data reshaping, merging, or filtering with no I/O.
  • Is a lightweight routing step — when a node's only job is to decide what happens next and you want to avoid the overhead of an Activity round-trip.
danger

Workflow code must be deterministic. A node running in the Workflow must not make network calls, use random, read the system clock, or do file I/O. Violating this causes non-determinism errors on replay.

Decision tree: should this node be an Activity?

Does this node make network calls (LLM, HTTP, DB)?
├── Yes → execute_in: "activity"
└── No
Does this node have non-deterministic behavior?
├── Yes → execute_in: "activity"
└── No
Does this node call interrupt()?
├── Yes → execute_in: "activity"
└── No
Is this node orchestrating a subgraph or doing a pure state transform?
├── Yes → execute_in: "workflow"
└── No → execute_in: "activity" (the safe choice)

When in doubt, choose "activity". The Activity overhead is small, and it gives you retries, timeouts, and correctness guarantees.

Where LangGraph primitives run

Not all LangGraph primitives are node functions. Some run in the Workflow context regardless of the execute_in setting:

PrimitiveRuns inNotes
Node functionsActivity or WorkflowControlled by execute_in in node metadata (required)
@task functionsActivity or WorkflowControlled by execute_in in activity_options (required)
Conditional edge functions (add_conditional_edges)WorkflowAlways runs in the Workflow. Must be deterministic and async (sync functions trigger run_in_executor, which is not allowed in the Temporal sandbox).
interrupt()ActivityCall interrupt() inside Activity nodes. The plugin serializes the interrupt and propagates it to the Workflow.
Command(resume=...)WorkflowUsed from Workflow code to resume after an interrupt.
InMemorySaver checkpointerWorkflowRuns in-process. Temporal handles durability — external checkpointers are not needed.
tip

Conditional edge functions like should_continue must be async def, not plain def. Synchronous functions cause LangGraph to use run_in_executor, which is not supported inside Temporal's Workflow sandbox.

# ✅ Correct: async conditional edge function
async def should_continue(state: AgentState) -> str:
if state["messages"][-1].startswith("[Agent]") and "Calling" in state["messages"][-1]:
return "tools"
return END

g.add_conditional_edges("agent", should_continue)

Syntax

# Graph API
g.add_node("my_node", my_node, metadata={"execute_in": "workflow"})

# Functional API
plugin = LangGraphPlugin(
tasks=[my_task],
activity_options={"my_task": {"execute_in": "workflow"}},
)

Example: subgraph orchestration

A common pattern is a parent node that runs in the Workflow and dispatches to a child graph whose nodes run as Activities:

async def parent_node(state: State) -> dict[str, str]:
return await graph("child").compile().ainvoke(state)

parent = StateGraph(State)
parent.add_node("parent_node", parent_node, metadata={"execute_in": "workflow"})
parent.add_edge(START, "parent_node")

plugin = LangGraphPlugin(graphs={"parent": parent, "child": child})

Human-in-the-loop

LangGraph's interrupt() works with Temporal signals and queries to support human-in-the-loop patterns:

  1. A graph node calls interrupt(draft), pausing execution.
  2. The Workflow exposes the pending draft via a Temporal query.
  3. An external process (UI, CLI) queries the draft and sends approval via a Temporal signal.
  4. The graph resumes — interrupt() returns the signal value and the node completes.

See the human-in-the-loop samples for complete working examples using both Graph and Functional APIs.

Static analysis for Workflow nodes

The Temporal sandbox catches non-deterministic code at runtime, but you can catch issues earlier with the lint_langgraph_workflows.py static checker.

The linter parses your Python source files, finds LangGraph nodes configured with execute_in="workflow", and flags non-deterministic code inside them — mirroring the Temporal sandbox restrictions at lint time rather than runtime.

python lint_langgraph_workflows.py my_workflow.py

Example output:

my_workflow.py:17:8: in 'routing_node' — datetime.datetime.now() — use workflow.now() instead
my_workflow.py:18:11: in 'routing_node' — random.choice() — use workflow.random() instead
my_workflow.py:19:4: in 'routing_node' — print() — use workflow.logger instead

3 warning(s) found in workflow nodes.

The linter checks for:

  • Time/date callsdatetime.now(), time.time(), time.sleep()
  • Randomnessrandom.randint(), uuid.uuid4(), secrets.*
  • Network I/Orequests, httpx, aiohttp, urllib, http.client
  • File I/Oopen(), pathlib.Path.read_text(), shutil, glob
  • OS/subprocessos.environ, subprocess.run(), os.system()
  • Otherprint(), multiprocessing, socket

Activity nodes are not checked — they can safely use all of these.

Samples

The LangGraph plugin samples demonstrate all supported patterns across both APIs:

SampleGraph APIFunctional APIDescription
Hello WorldYesYesSimplest possible single-node graph
Human-in-the-loopYesYesinterrupt() with Temporal signals and queries
ReAct AgentYesYesTool-calling agent loop
Control FlowYesParallel execution, loops, and branching

To run any sample:

# Terminal 1: Start Temporal
temporal server start-dev

# Terminal 2: Start the worker
uv run langgraph_plugin/<api>/<sample>/run_worker.py

# Terminal 3: Run the workflow
uv run langgraph_plugin/<api>/<sample>/run_workflow.py