Skip to content

Graph Definitions

StateGraph

Bases: Graph

A graph whose nodes communicate by reading and writing to a shared state. The signature of each node is State -> Partial.

Each state key can optionally be annotated with a reducer function that will be used to aggregate the values of that key received from multiple nodes. The signature of a reducer function is (Value, Value) -> Value.

Parameters:

NameTypeDescriptionDefault
state_schemaOptional[type[Any]]

The schema class that defines the state.

None
config_schemaOptional[type[Any]]

The schema class that defines the configuration. Use this to expose configurable parameters in your API.

None
Example
from langchain_core.runnables import RunnableConfig
from typing_extensions import Annotated, TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph

def reducer(a: list, b: int | None) -> list:
    if b is not None:
        return a + [b]
    return a

class State(TypedDict):
    x: Annotated[list, reducer]

class ConfigSchema(TypedDict):
    r: float

graph = StateGraph(State, config_schema=ConfigSchema)

def node(state: State, config: RunnableConfig) -> dict:
    r = config["configurable"].get("r", 1.0)
    x = state["x"][-1]
    next_value = x * r * (1 - x)
    return {"x": next_value}

graph.add_node("A", node)
graph.set_entry_point("A")
graph.set_finish_point("A")
compiled = graph.compile()

print(compiled.config_specs)
# [ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]

step1 = compiled.invoke({"x": 0.5}, {"configurable": {"r": 3.0}})
# {'x': [0.5, 0.75]}

Methods:

NameDescription
add_node

Add a new node to the state graph.

add_edge

Add a directed edge from the start node (or list of start nodes) to the end node.

add_conditional_edges

Add a conditional edge from the starting node to any number of destination nodes.

add_sequence

Add a sequence of nodes that will be executed in the provided order.

compile

Compiles the state graph into a CompiledStateGraph object.

add_node

add_node(
    node: Union[str, RunnableLike],
    action: Optional[RunnableLike] = None,
    *,
    defer: bool = False,
    metadata: Optional[dict[str, Any]] = None,
    input: Optional[type[Any]] = None,
    retry: Optional[
        Union[RetryPolicy, Sequence[RetryPolicy]]
    ] = None,
    cache_policy: Optional[CachePolicy] = None,
    destinations: Optional[
        Union[dict[str, str], tuple[str, ...]]
    ] = None
) -> Self

Add a new node to the state graph.

Parameters:

NameTypeDescriptionDefault
nodeUnion[str, RunnableLike]

The function or runnable this node will run. If a string is provided, it will be used as the node name, and action will be used as the function or runnable.

required
actionOptional[RunnableLike]

The action associated with the node. (default: None) Will be used as the node function or runnable if node is a string (node name).

None
metadataOptional[dict[str, Any]]

The metadata associated with the node. (default: None)

None
inputOptional[type[Any]]

The input schema for the node. (default: the graph's input schema)

None
retryOptional[Union[RetryPolicy, Sequence[RetryPolicy]]]

The policy for retrying the node. (default: None) If a sequence is provided, the first matching policy will be applied.

None
cache_policyOptional[CachePolicy]

The cache policy for the node. (default: None)

None
destinationsOptional[Union[dict[str, str], tuple[str, ...]]]

Destinations that indicate where a node can route to. This is useful for edgeless graphs with nodes that return Command objects. If a dict is provided, the keys will be used as the target node names and the values will be used as the labels for the edges. If a tuple is provided, the values will be used as the target node names. NOTE: this is only used for graph rendering and doesn't have any effect on the graph execution.

None

Raises: ValueError: If the key is already being used as a state key.

Example
from langgraph.graph import START, StateGraph

def my_node(state, config):
    return {"x": state["x"] + 1}

builder = StateGraph(dict)
builder.add_node(my_node)  # node name will be 'my_node'
builder.add_edge(START, "my_node")
graph = builder.compile()
graph.invoke({"x": 1})
# {'x': 2}
Customize the name:
builder = StateGraph(dict)
builder.add_node("my_fair_node", my_node)
builder.add_edge(START, "my_fair_node")
graph = builder.compile()
graph.invoke({"x": 1})
# {'x': 2}

Returns:

NameTypeDescription
SelfSelf

The instance of the state graph, allowing for method chaining.

add_edge

add_edge(
    start_key: Union[str, list[str]], end_key: str
) -> Self

Add a directed edge from the start node (or list of start nodes) to the end node.

When a single start node is provided, the graph will wait for that node to complete before executing the end node. When multiple start nodes are provided, the graph will wait for ALL of the start nodes to complete before executing the end node.

Parameters:

NameTypeDescriptionDefault
start_keyUnion[str, list[str]]

The key(s) of the start node(s) of the edge.

required
end_keystr

The key of the end node of the edge.

required

Raises:

TypeDescription
ValueError

If the start key is 'END' or if the start key or end key is not present in the graph.

Returns:

NameTypeDescription
SelfSelf

The instance of the state graph, allowing for method chaining.

add_conditional_edges

add_conditional_edges(
    source: str,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[
            ..., Awaitable[Union[Hashable, list[Hashable]]]
        ],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[
        Union[dict[Hashable, str], list[str]]
    ] = None,
    then: Optional[str] = None,
) -> Self

Add a conditional edge from the starting node to any number of destination nodes.

Parameters:

NameTypeDescriptionDefault
sourcestr

The starting node. This conditional edge will run when exiting this node.

required
pathUnion[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]]

The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

required
path_mapOptional[Union[dict[Hashable, str], list[str]]]

Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

None
thenOptional[str]

The name of a node to execute after the nodes selected by path.

None

Returns:

NameTypeDescription
SelfSelf

The instance of the graph, allowing for method chaining.

Without typehints on the path function's return value (e.g., -> Literal["foo", "__end__"]:)

or a path_map, the graph visualization assumes the edge could transition to any node in the graph.

add_sequence

add_sequence(
    nodes: Sequence[
        Union[RunnableLike, tuple[str, RunnableLike]]
    ],
) -> Self

Add a sequence of nodes that will be executed in the provided order.

Parameters:

NameTypeDescriptionDefault
nodesSequence[Union[RunnableLike, tuple[str, RunnableLike]]]

A sequence of RunnableLike objects (e.g. a LangChain Runnable or a callable) or (name, RunnableLike) tuples. If no names are provided, the name will be inferred from the node object (e.g. a runnable or a callable name). Each node will be executed in the order provided.

required

Raises:

TypeDescription
ValueError

if the sequence is empty.

ValueError

if the sequence contains duplicate node names.

Returns:

NameTypeDescription
SelfSelf

The instance of the state graph, allowing for method chaining.

compile

compile(
    checkpointer: Checkpointer = None,
    *,
    cache: Optional[BaseCache] = None,
    store: Optional[BaseStore] = None,
    interrupt_before: Optional[
        Union[All, list[str]]
    ] = None,
    interrupt_after: Optional[Union[All, list[str]]] = None,
    debug: bool = False,
    name: Optional[str] = None
) -> CompiledStateGraph

Compiles the state graph into a CompiledStateGraph object.

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.

Parameters:

NameTypeDescriptionDefault
checkpointerCheckpointer

A checkpoint saver object or flag. If provided, this Checkpointer serves as a fully versioned "short-term memory" for the graph, allowing it to be paused, resumed, and replayed from any point. If None, it may inherit the parent graph's checkpointer when used as a subgraph. If False, it will not use or inherit any checkpointer.

None
interrupt_beforeOptional[Union[All, list[str]]]

An optional list of node names to interrupt before.

None
interrupt_afterOptional[Union[All, list[str]]]

An optional list of node names to interrupt after.

None
debugbool

A flag indicating whether to enable debug mode.

False
nameOptional[str]

The name to use for the compiled graph.

None

Returns:

NameTypeDescription
CompiledStateGraphCompiledStateGraph

The compiled state graph.

CompiledStateGraph

Bases: CompiledGraph

Methods:

NameDescription
stream

Stream graph steps for a single input.

astream

Asynchronously stream graph steps for a single input.

invoke

Run the graph with a single input and config.

ainvoke

Asynchronously invoke the graph on a single input.

get_state

Get the current state of the graph.

aget_state

Get the current state of the graph.

get_state_history

Get the history of the state of the graph.

aget_state_history

Asynchronously get the history of the state of the graph.

update_state

Update the state of the graph with the given values, as if they came from

aupdate_state

Asynchronously update the state of the graph with the given values, as if they came from

bulk_update_state

Apply updates to the graph state in bulk. Requires a checkpointer to be set.

abulk_update_state

Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

get_graph

Return a drawable representation of the computation graph.

aget_graph

Return a drawable representation of the computation graph.

get_subgraphs

Get the subgraphs of the graph.

aget_subgraphs

Get the subgraphs of the graph.

with_config

Create a copy of the Pregel object with an updated config.

stream

stream(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: (
        StreamMode | list[StreamMode] | None
    ) = None,
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    subgraphs: bool = False
) -> Iterator[dict[str, Any] | Any]

Stream graph steps for a single input.

Parameters:

NameTypeDescriptionDefault
inputdict[str, Any] | Any

The input to the graph.

required
configRunnableConfig | None

The configuration to use for the run.

None
stream_modeStreamMode | list[StreamMode] | None

The mode to stream output, defaults to self.stream_mode. Options are:

  • "values": Emit all values in the state after each step, including interrupts. When used with functional API, values are emitted once at the end of the workflow.
  • "updates": Emit only the node or task names and updates returned by the nodes or tasks after each step. If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are emitted separately.
  • "custom": Emit custom data from inside nodes or tasks using StreamWriter.
  • "messages": Emit LLM messages token-by-token together with metadata for any LLM invocations inside nodes or tasks.
  • "debug": Emit debug events with as much information as possible for each step.
None
output_keysstr | Sequence[str] | None

The keys to stream, defaults to all non-context channels.

None
interrupt_beforeAll | Sequence[str] | None

Nodes to interrupt before, defaults to all nodes in the graph.

None
interrupt_afterAll | Sequence[str] | None

Nodes to interrupt after, defaults to all nodes in the graph.

None
checkpoint_duringbool | None

Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved.

None
debugbool | None

Whether to print debug information during execution, defaults to False.

None
subgraphsbool

Whether to stream subgraphs, defaults to False.

False

Yields:

TypeDescription
dict[str, Any] | Any

The output of each step in the graph. The output shape depends on the stream_mode.

Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START

class State(TypedDict):
    alist: Annotated[list, operator.add]
    another_list: Annotated[list, operator.add]

builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
    print(event)

# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
    print(event)

# {'a': {'another_list': ['hi']}}
# {'b': {'alist': ['there']}}
Using stream_mode="debug":
for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
    print(event)

# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter

def node_a(state: State, writer: StreamWriter):
    writer({"custom_data": "foo"})
    return {"alist": ["hi"]}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
    print(event)

# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

class State(TypedDict):
    question: str
    answer: str

def node_a(state: State):
    response = llm.invoke(state["question"])
    return {"answer": response.content}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
    print(event)

# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

astream async

astream(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: (
        StreamMode | list[StreamMode] | None
    ) = None,
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    subgraphs: bool = False
) -> AsyncIterator[dict[str, Any] | Any]

Asynchronously stream graph steps for a single input.

Parameters:

NameTypeDescriptionDefault
inputdict[str, Any] | Any

The input to the graph.

required
configRunnableConfig | None

The configuration to use for the run.

None
stream_modeStreamMode | list[StreamMode] | None

The mode to stream output, defaults to self.stream_mode. Options are:

  • "values": Emit all values in the state after each step, including interrupts. When used with functional API, values are emitted once at the end of the workflow.
  • "updates": Emit only the node or task names and updates returned by the nodes or tasks after each step. If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are emitted separately.
  • "custom": Emit custom data from inside nodes or tasks using StreamWriter.
  • "messages": Emit LLM messages token-by-token together with metadata for any LLM invocations inside nodes or tasks.
  • "debug": Emit debug events with as much information as possible for each step.
None
output_keysstr | Sequence[str] | None

The keys to stream, defaults to all non-context channels.

None
interrupt_beforeAll | Sequence[str] | None

Nodes to interrupt before, defaults to all nodes in the graph.

None
interrupt_afterAll | Sequence[str] | None

Nodes to interrupt after, defaults to all nodes in the graph.

None
checkpoint_duringbool | None

Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved.

None
debugbool | None

Whether to print debug information during execution, defaults to False.

None
subgraphsbool

Whether to stream subgraphs, defaults to False.

False

Yields:

TypeDescription
AsyncIterator[dict[str, Any] | Any]

The output of each step in the graph. The output shape depends on the stream_mode.

Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START

class State(TypedDict):
    alist: Annotated[list, operator.add]
    another_list: Annotated[list, operator.add]

builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
    print(event)

# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
    print(event)

# {'a': {'another_list': ['hi']}}
# {'b': {'alist': ['there']}}
Using stream_mode="debug":
async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
    print(event)

# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter

async def node_a(state: State, writer: StreamWriter):
    writer({"custom_data": "foo"})
    return {"alist": ["hi"]}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
    print(event)

# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

class State(TypedDict):
    question: str
    answer: str

async def node_a(state: State):
    response = await llm.ainvoke(state["question"])
    return {"answer": response.content}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"question": "What is the capital of France?"}, stream_mode="messages"):
    print(event)

# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

invoke

invoke(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    **kwargs: Any
) -> dict[str, Any] | Any

Run the graph with a single input and config.

Parameters:

NameTypeDescriptionDefault
inputdict[str, Any] | Any

The input data for the graph. It can be a dictionary or any other type.

required
configRunnableConfig | None

Optional. The configuration for the graph run.

None
stream_modeStreamMode

Optional[str]. The stream mode for the graph run. Default is "values".

'values'
output_keysstr | Sequence[str] | None

Optional. The output keys to retrieve from the graph run.

None
interrupt_beforeAll | Sequence[str] | None

Optional. The nodes to interrupt the graph run before.

None
interrupt_afterAll | Sequence[str] | None

Optional. The nodes to interrupt the graph run after.

None
debugbool | None

Optional. Enable debug mode for the graph run.

None
**kwargsAny

Additional keyword arguments to pass to the graph run.

{}

Returns:

TypeDescription
dict[str, Any] | Any

The output of the graph run. If stream_mode is "values", it returns the latest output.

dict[str, Any] | Any

If stream_mode is not "values", it returns a list of output chunks.

ainvoke async

ainvoke(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    **kwargs: Any
) -> dict[str, Any] | Any

Asynchronously invoke the graph on a single input.

Parameters:

NameTypeDescriptionDefault
inputdict[str, Any] | Any

The input data for the computation. It can be a dictionary or any other type.

required
configRunnableConfig | None

Optional. The configuration for the computation.

None
stream_modeStreamMode

Optional. The stream mode for the computation. Default is "values".

'values'
output_keysstr | Sequence[str] | None

Optional. The output keys to include in the result. Default is None.

None
interrupt_beforeAll | Sequence[str] | None

Optional. The nodes to interrupt before. Default is None.

None
interrupt_afterAll | Sequence[str] | None

Optional. The nodes to interrupt after. Default is None.

None
debugbool | None

Optional. Whether to enable debug mode. Default is None.

None
**kwargsAny

Additional keyword arguments.

{}

Returns:

TypeDescription
dict[str, Any] | Any

The result of the computation. If stream_mode is "values", it returns the latest value.

dict[str, Any] | Any

If stream_mode is "chunks", it returns a list of chunks.

get_state

get_state(
    config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot

Get the current state of the graph.

aget_state async

aget_state(
    config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot

Get the current state of the graph.

get_state_history

get_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[StateSnapshot]

Get the history of the state of the graph.

aget_state_history async

aget_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[StateSnapshot]

Asynchronously get the history of the state of the graph.

update_state

update_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any | None,
    as_node: str | None = None,
) -> RunnableConfig

Update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

aupdate_state async

aupdate_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any,
    as_node: str | None = None,
) -> RunnableConfig

Asynchronously update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

bulk_update_state

bulk_update_state(
    config: RunnableConfig,
    supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig

Apply updates to the graph state in bulk. Requires a checkpointer to be set.

Parameters:

NameTypeDescriptionDefault
configRunnableConfig

The config to apply the updates to.

required
superstepsSequence[Sequence[StateUpdate]]

A list of supersteps, each including a list of updates to apply sequentially to a graph state. Each update is a tuple of the form (values, as_node).

required

Raises:

TypeDescription
ValueError

If no checkpointer is set or no updates are provided.

InvalidUpdateError

If an invalid update is provided.

Returns:

NameTypeDescription
RunnableConfigRunnableConfig

The updated config.

abulk_update_state async

abulk_update_state(
    config: RunnableConfig,
    supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig

Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

Parameters:

NameTypeDescriptionDefault
configRunnableConfig

The config to apply the updates to.

required
superstepsSequence[Sequence[StateUpdate]]

A list of supersteps, each including a list of updates to apply sequentially to a graph state. Each update is a tuple of the form (values, as_node).

required

Raises:

TypeDescription
ValueError

If no checkpointer is set or no updates are provided.

InvalidUpdateError

If an invalid update is provided.

Returns:

NameTypeDescription
RunnableConfigRunnableConfig

The updated config.

get_graph

get_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False
) -> Graph

Return a drawable representation of the computation graph.

aget_graph async

aget_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False
) -> Graph

Return a drawable representation of the computation graph.

get_subgraphs

get_subgraphs(
    *, namespace: str | None = None, recurse: bool = False
) -> Iterator[tuple[str, PregelProtocol]]

Get the subgraphs of the graph.

Parameters:

NameTypeDescriptionDefault
namespacestr | None

The namespace to filter the subgraphs by.

None
recursebool

Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned.

False

Returns:

TypeDescription
Iterator[tuple[str, PregelProtocol]]

Iterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs.

aget_subgraphs async

aget_subgraphs(
    *, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]

Get the subgraphs of the graph.

Parameters:

NameTypeDescriptionDefault
namespacestr | None

The namespace to filter the subgraphs by.

None
recursebool

Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned.

False

Returns:

TypeDescription
AsyncIterator[tuple[str, PregelProtocol]]

AsyncIterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs.

with_config

with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Self

Create a copy of the Pregel object with an updated config.

Graph

Methods:

NameDescription
add_node

Add a new node to the graph.

add_edge

Add a directed edge from the start node to the end node.

add_conditional_edges

Add a conditional edge from the starting node to any number of destination nodes.

compile

Compiles the graph into a CompiledGraph object.

add_node

add_node(
    node: Union[str, RunnableLike],
    action: Optional[RunnableLike] = None,
    *,
    metadata: Optional[dict[str, Any]] = None
) -> Self

Add a new node to the graph.

Parameters:

NameTypeDescriptionDefault
nodeUnion[str, RunnableLike]

The function or runnable this node will run. If a string is provided, it will be used as the node name, and action will be used as the function or runnable.

required
actionOptional[RunnableLike]

The action associated with the node. (default: None) Will be used as the node function or runnable if node is a string (node name).

None
metadataOptional[dict[str, Any]]

The metadata associated with the node. (default: None)

None

add_edge

add_edge(start_key: str, end_key: str) -> Self

Add a directed edge from the start node to the end node.

Parameters:

NameTypeDescriptionDefault
start_keystr

The key of the start node of the edge.

required
end_keystr

The key of the end node of the edge.

required

add_conditional_edges

add_conditional_edges(
    source: str,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[
            ..., Awaitable[Union[Hashable, list[Hashable]]]
        ],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[
        Union[dict[Hashable, str], list[str]]
    ] = None,
    then: Optional[str] = None,
) -> Self

Add a conditional edge from the starting node to any number of destination nodes.

Parameters:

NameTypeDescriptionDefault
sourcestr

The starting node. This conditional edge will run when exiting this node.

required
pathUnion[Callable[..., Union[Hashable, list[Hashable]]], Callable[..., Awaitable[Union[Hashable, list[Hashable]]]], Runnable[Any, Union[Hashable, list[Hashable]]]]

The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.

required
path_mapOptional[Union[dict[Hashable, str], list[str]]]

Optional mapping of paths to node names. If omitted the paths returned by path should be node names.

None
thenOptional[str]

The name of a node to execute after the nodes selected by path.

None

Returns:

NameTypeDescription
SelfSelf

The instance of the graph, allowing for method chaining.

Without typehints on the path function's return value (e.g., -> Literal["foo", "__end__"]:)

or a path_map, the graph visualization assumes the edge could transition to any node in the graph.

compile

compile(
    checkpointer: Checkpointer = None,
    interrupt_before: Optional[
        Union[All, list[str]]
    ] = None,
    interrupt_after: Optional[Union[All, list[str]]] = None,
    debug: bool = False,
    name: Optional[str] = None,
    *,
    cache: Optional[BaseCache] = None,
    store: Optional[BaseStore] = None
) -> CompiledGraph

Compiles the graph into a CompiledGraph object.

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.

Parameters:

NameTypeDescriptionDefault
checkpointerCheckpointer

A checkpoint saver object or flag. If provided, this Checkpointer serves as a fully versioned "short-term memory" for the graph, allowing it to be paused, resumed, and replayed from any point. If None, it may inherit the parent graph's checkpointer when used as a subgraph. If False, it will not use or inherit any checkpointer.

None
interrupt_beforeOptional[Union[All, list[str]]]

An optional list of node names to interrupt before.

None
interrupt_afterOptional[Union[All, list[str]]]

An optional list of node names to interrupt after.

None
debugbool

A flag indicating whether to enable debug mode.

False
nameOptional[str]

The name to use for the compiled graph.

None

Returns:

NameTypeDescription
CompiledGraphCompiledGraph

The compiled graph.

CompiledGraph

Bases: Pregel

Methods:

NameDescription
stream

Stream graph steps for a single input.

astream

Asynchronously stream graph steps for a single input.

invoke

Run the graph with a single input and config.

ainvoke

Asynchronously invoke the graph on a single input.

get_state

Get the current state of the graph.

aget_state

Get the current state of the graph.

get_state_history

Get the history of the state of the graph.

aget_state_history

Asynchronously get the history of the state of the graph.

update_state

Update the state of the graph with the given values, as if they came from

aupdate_state

Asynchronously update the state of the graph with the given values, as if they came from

bulk_update_state

Apply updates to the graph state in bulk. Requires a checkpointer to be set.

abulk_update_state

Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

get_graph

Return a drawable representation of the computation graph.

aget_graph

Return a drawable representation of the computation graph.

get_subgraphs

Get the subgraphs of the graph.

aget_subgraphs

Get the subgraphs of the graph.

with_config

Create a copy of the Pregel object with an updated config.

stream

stream(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: (
        StreamMode | list[StreamMode] | None
    ) = None,
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    subgraphs: bool = False
) -> Iterator[dict[str, Any] | Any]

Stream graph steps for a single input.

Parameters:

NameTypeDescriptionDefault
inputdict[str, Any] | Any

The input to the graph.

required
configRunnableConfig | None

The configuration to use for the run.

None
stream_modeStreamMode | list[StreamMode] | None

The mode to stream output, defaults to self.stream_mode. Options are:

  • "values": Emit all values in the state after each step, including interrupts. When used with functional API, values are emitted once at the end of the workflow.
  • "updates": Emit only the node or task names and updates returned by the nodes or tasks after each step. If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are emitted separately.
  • "custom": Emit custom data from inside nodes or tasks using StreamWriter.
  • "messages": Emit LLM messages token-by-token together with metadata for any LLM invocations inside nodes or tasks.
  • "debug": Emit debug events with as much information as possible for each step.
None
output_keysstr | Sequence[str] | None

The keys to stream, defaults to all non-context channels.

None
interrupt_beforeAll | Sequence[str] | None

Nodes to interrupt before, defaults to all nodes in the graph.

None
interrupt_afterAll | Sequence[str] | None

Nodes to interrupt after, defaults to all nodes in the graph.

None
checkpoint_duringbool | None

Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved.

None
debugbool | None

Whether to print debug information during execution, defaults to False.

None
subgraphsbool

Whether to stream subgraphs, defaults to False.

False

Yields:

TypeDescription
dict[str, Any] | Any

The output of each step in the graph. The output shape depends on the stream_mode.

Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START

class State(TypedDict):
    alist: Annotated[list, operator.add]
    another_list: Annotated[list, operator.add]

builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
    print(event)

# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
    print(event)

# {'a': {'another_list': ['hi']}}
# {'b': {'alist': ['there']}}
Using stream_mode="debug":
for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
    print(event)

# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter

def node_a(state: State, writer: StreamWriter):
    writer({"custom_data": "foo"})
    return {"alist": ["hi"]}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
    print(event)

# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

class State(TypedDict):
    question: str
    answer: str

def node_a(state: State):
    response = llm.invoke(state["question"])
    return {"answer": response.content}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

for event in graph.stream({"question": "What is the capital of France?"}, stream_mode="messages"):
    print(event)

# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

astream async

astream(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: (
        StreamMode | list[StreamMode] | None
    ) = None,
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    subgraphs: bool = False
) -> AsyncIterator[dict[str, Any] | Any]

Asynchronously stream graph steps for a single input.

Parameters:

NameTypeDescriptionDefault
inputdict[str, Any] | Any

The input to the graph.

required
configRunnableConfig | None

The configuration to use for the run.

None
stream_modeStreamMode | list[StreamMode] | None

The mode to stream output, defaults to self.stream_mode. Options are:

  • "values": Emit all values in the state after each step, including interrupts. When used with functional API, values are emitted once at the end of the workflow.
  • "updates": Emit only the node or task names and updates returned by the nodes or tasks after each step. If multiple updates are made in the same step (e.g. multiple nodes are run) then those updates are emitted separately.
  • "custom": Emit custom data from inside nodes or tasks using StreamWriter.
  • "messages": Emit LLM messages token-by-token together with metadata for any LLM invocations inside nodes or tasks.
  • "debug": Emit debug events with as much information as possible for each step.
None
output_keysstr | Sequence[str] | None

The keys to stream, defaults to all non-context channels.

None
interrupt_beforeAll | Sequence[str] | None

Nodes to interrupt before, defaults to all nodes in the graph.

None
interrupt_afterAll | Sequence[str] | None

Nodes to interrupt after, defaults to all nodes in the graph.

None
checkpoint_duringbool | None

Whether to checkpoint intermediate steps, defaults to True. If False, only the final checkpoint is saved.

None
debugbool | None

Whether to print debug information during execution, defaults to False.

None
subgraphsbool

Whether to stream subgraphs, defaults to False.

False

Yields:

TypeDescription
AsyncIterator[dict[str, Any] | Any]

The output of each step in the graph. The output shape depends on the stream_mode.

Using stream_mode="values":
import operator
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START

class State(TypedDict):
    alist: Annotated[list, operator.add]
    another_list: Annotated[list, operator.add]

builder = StateGraph(State)
builder.add_node("a", lambda _state: {"another_list": ["hi"]})
builder.add_node("b", lambda _state: {"alist": ["there"]})
builder.add_edge("a", "b")
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
    print(event)

# {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
# {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
# {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
Using stream_mode="updates":
async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
    print(event)

# {'a': {'another_list': ['hi']}}
# {'b': {'alist': ['there']}}
Using stream_mode="debug":
async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
    print(event)

# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
# {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
# {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Using stream_mode="custom":
from langgraph.types import StreamWriter

async def node_a(state: State, writer: StreamWriter):
    writer({"custom_data": "foo"})
    return {"alist": ["hi"]}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"alist": ['Ex for stream_mode="custom"']}, stream_mode="custom"):
    print(event)

# {'custom_data': 'foo'}
Using stream_mode="messages":
from typing_extensions import Annotated, TypedDict
from langgraph.graph import StateGraph, START
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

class State(TypedDict):
    question: str
    answer: str

async def node_a(state: State):
    response = await llm.ainvoke(state["question"])
    return {"answer": response.content}

builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
graph = builder.compile()

async for event in graph.astream({"question": "What is the capital of France?"}, stream_mode="messages"):
    print(event)

# (AIMessageChunk(content='The', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], 'langgraph_path': ('__pregel_pull', 'a'), 'langgraph_checkpoint_ns': '...', 'checkpoint_ns': '...', 'ls_provider': 'openai', 'ls_model_name': 'gpt-4o-mini', 'ls_model_type': 'chat', 'ls_temperature': 0.7})
# (AIMessageChunk(content=' capital', additional_kwargs={}, response_metadata={}, id='...'), {'langgraph_step': 1, 'langgraph_node': 'a', 'langgraph_triggers': ['start:a'], ...})
# (AIMessageChunk(content=' of', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' France', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' is', additional_kwargs={}, response_metadata={}, id='...'), {...})
# (AIMessageChunk(content=' Paris', additional_kwargs={}, response_metadata={}, id='...'), {...})

invoke

invoke(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    **kwargs: Any
) -> dict[str, Any] | Any

Run the graph with a single input and config.

Parameters:

NameTypeDescriptionDefault
inputdict[str, Any] | Any

The input data for the graph. It can be a dictionary or any other type.

required
configRunnableConfig | None

Optional. The configuration for the graph run.

None
stream_modeStreamMode

Optional[str]. The stream mode for the graph run. Default is "values".

'values'
output_keysstr | Sequence[str] | None

Optional. The output keys to retrieve from the graph run.

None
interrupt_beforeAll | Sequence[str] | None

Optional. The nodes to interrupt the graph run before.

None
interrupt_afterAll | Sequence[str] | None

Optional. The nodes to interrupt the graph run after.

None
debugbool | None

Optional. Enable debug mode for the graph run.

None
**kwargsAny

Additional keyword arguments to pass to the graph run.

{}

Returns:

TypeDescription
dict[str, Any] | Any

The output of the graph run. If stream_mode is "values", it returns the latest output.

dict[str, Any] | Any

If stream_mode is not "values", it returns a list of output chunks.

ainvoke async

ainvoke(
    input: dict[str, Any] | Any,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    checkpoint_during: bool | None = None,
    debug: bool | None = None,
    **kwargs: Any
) -> dict[str, Any] | Any

Asynchronously invoke the graph on a single input.

Parameters:

NameTypeDescriptionDefault
inputdict[str, Any] | Any

The input data for the computation. It can be a dictionary or any other type.

required
configRunnableConfig | None

Optional. The configuration for the computation.

None
stream_modeStreamMode

Optional. The stream mode for the computation. Default is "values".

'values'
output_keysstr | Sequence[str] | None

Optional. The output keys to include in the result. Default is None.

None
interrupt_beforeAll | Sequence[str] | None

Optional. The nodes to interrupt before. Default is None.

None
interrupt_afterAll | Sequence[str] | None

Optional. The nodes to interrupt after. Default is None.

None
debugbool | None

Optional. Whether to enable debug mode. Default is None.

None
**kwargsAny

Additional keyword arguments.

{}

Returns:

TypeDescription
dict[str, Any] | Any

The result of the computation. If stream_mode is "values", it returns the latest value.

dict[str, Any] | Any

If stream_mode is "chunks", it returns a list of chunks.

get_state

get_state(
    config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot

Get the current state of the graph.

aget_state async

aget_state(
    config: RunnableConfig, *, subgraphs: bool = False
) -> StateSnapshot

Get the current state of the graph.

get_state_history

get_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> Iterator[StateSnapshot]

Get the history of the state of the graph.

aget_state_history async

aget_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None
) -> AsyncIterator[StateSnapshot]

Asynchronously get the history of the state of the graph.

update_state

update_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any | None,
    as_node: str | None = None,
) -> RunnableConfig

Update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

aupdate_state async

aupdate_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any,
    as_node: str | None = None,
) -> RunnableConfig

Asynchronously update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.

bulk_update_state

bulk_update_state(
    config: RunnableConfig,
    supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig

Apply updates to the graph state in bulk. Requires a checkpointer to be set.

Parameters:

NameTypeDescriptionDefault
configRunnableConfig

The config to apply the updates to.

required
superstepsSequence[Sequence[StateUpdate]]

A list of supersteps, each including a list of updates to apply sequentially to a graph state. Each update is a tuple of the form (values, as_node).

required

Raises:

TypeDescription
ValueError

If no checkpointer is set or no updates are provided.

InvalidUpdateError

If an invalid update is provided.

Returns:

NameTypeDescription
RunnableConfigRunnableConfig

The updated config.

abulk_update_state async

abulk_update_state(
    config: RunnableConfig,
    supersteps: Sequence[Sequence[StateUpdate]],
) -> RunnableConfig

Asynchronously apply updates to the graph state in bulk. Requires a checkpointer to be set.

Parameters:

NameTypeDescriptionDefault
configRunnableConfig

The config to apply the updates to.

required
superstepsSequence[Sequence[StateUpdate]]

A list of supersteps, each including a list of updates to apply sequentially to a graph state. Each update is a tuple of the form (values, as_node).

required

Raises:

TypeDescription
ValueError

If no checkpointer is set or no updates are provided.

InvalidUpdateError

If an invalid update is provided.

Returns:

NameTypeDescription
RunnableConfigRunnableConfig

The updated config.

get_graph

get_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False
) -> Graph

Return a drawable representation of the computation graph.

aget_graph async

aget_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False
) -> Graph

Return a drawable representation of the computation graph.

get_subgraphs

get_subgraphs(
    *, namespace: str | None = None, recurse: bool = False
) -> Iterator[tuple[str, PregelProtocol]]

Get the subgraphs of the graph.

Parameters:

NameTypeDescriptionDefault
namespacestr | None

The namespace to filter the subgraphs by.

None
recursebool

Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned.

False

Returns:

TypeDescription
Iterator[tuple[str, PregelProtocol]]

Iterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs.

aget_subgraphs async

aget_subgraphs(
    *, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]

Get the subgraphs of the graph.

Parameters:

NameTypeDescriptionDefault
namespacestr | None

The namespace to filter the subgraphs by.

None
recursebool

Whether to recurse into the subgraphs. If False, only the immediate subgraphs will be returned.

False

Returns:

TypeDescription
AsyncIterator[tuple[str, PregelProtocol]]

AsyncIterator[tuple[str, PregelProtocol]]: An iterator of the (namespace, subgraph) pairs.

with_config

with_config(
    config: RunnableConfig | None = None, **kwargs: Any
) -> Self

Create a copy of the Pregel object with an updated config.

Functions:

NameDescription
add_messages

Merges two lists of messages, updating existing messages by ID.

add_messages

add_messages(
    left: Messages,
    right: Messages,
    *,
    format: Optional[Literal["langchain-openai"]] = None
) -> Messages

Merges two lists of messages, updating existing messages by ID.

By default, this ensures the state is "append-only", unless the new message has the same ID as an existing message.

Parameters:

NameTypeDescriptionDefault
leftMessages

The base list of messages.

required
rightMessages

The list of messages (or single message) to merge into the base list.

required
formatOptional[Literal['langchain-openai']]

The format to return messages in. If None then messages will be returned as is. If 'langchain-openai' then messages will be returned as BaseMessage objects with their contents formatted to match OpenAI message format, meaning contents can be string, 'text' blocks, or 'image_url' blocks and tool responses are returned as their own ToolMessages.

Requirement

Must have langchain-core>=0.3.11 installed to use this feature.

None

Returns:

TypeDescription
Messages

A new list of messages with the messages from right merged into left.

Messages

If a message in right has the same ID as a message in left, the

Messages

message from right will replace the message from left.

Example
Basic usage
from langchain_core.messages import AIMessage, HumanMessage
msgs1 = [HumanMessage(content="Hello", id="1")]
msgs2 = [AIMessage(content="Hi there!", id="2")]
add_messages(msgs1, msgs2)
# [HumanMessage(content='Hello', id='1'), AIMessage(content='Hi there!', id='2')]
Overwrite existing message
msgs1 = [HumanMessage(content="Hello", id="1")]
msgs2 = [HumanMessage(content="Hello again", id="1")]
add_messages(msgs1, msgs2)
# [HumanMessage(content='Hello again', id='1')]
Use in a StateGraph
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph

class State(TypedDict):
    messages: Annotated[list, add_messages]

builder = StateGraph(State)
builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
builder.set_entry_point("chatbot")
builder.set_finish_point("chatbot")
graph = builder.compile()
graph.invoke({})
# {'messages': [AIMessage(content='Hello', id=...)]}
Use OpenAI message format
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, add_messages

class State(TypedDict):
    messages: Annotated[list, add_messages(format='langchain-openai')]

def chatbot_node(state: State) -> list:
    return {"messages": [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "Here's an image:",
                    "cache_control": {"type": "ephemeral"},
                },
                {
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": "image/jpeg",
                        "data": "1234",
                    },
                },
            ]
        },
    ]}

builder = StateGraph(State)
builder.add_node("chatbot", chatbot_node)
builder.set_entry_point("chatbot")
builder.set_finish_point("chatbot")
graph = builder.compile()
graph.invoke({"messages": []})
# {
#     'messages': [
#         HumanMessage(
#             content=[
#                 {"type": "text", "text": "Here's an image:"},
#                 {
#                     "type": "image_url",
#                     "image_url": {"url": "data:image/jpeg;base64,1234"},
#                 },
#             ],
#         ),
#     ]
# }