11. How to generate branches for parallel node execution
Parallel execution of nodes is essential to speed up the entire graph operation. LangGraph Supports parallel execution of the node by default, which can significantly improve the performance of graph-based workflows.
This parallelization fan-out and fan-in Implemented through mechanisms, standard edges conditional_edges Use
Preferences
Copy
Copy# Configuration file for managing API keys as environment variables
from dotenv import load_dotenv
# Load API key information
load_dotenv()
Copy
True
Copy
# Set up LangSmith tracking. https://smith.langchain.com
# !pip install -qU langchain-teddynote
from langchain_teddynote import logging
# Enter a project name.
logging.langsmith("CH17-LangGraph-Modules")
Copy
Parallel nodes fan-out and fan-in
fan-out / fan-in
In parallel processing fan-out and fan-in Silver is a concept that describes the process of sharing and gathering work.
Fan-out (extended) : Squat large tasks into multiple small tasks. For example, when making pizza, you can prepare dough, sauce, and cheese separately. It is fan-out to split each part like this and process it simultaneously.
Fan-in (collection) : Combine small jobs divided into one again. Like the process of raising all the ingredients prepared for pizza to make a finished pizza, it is fan-in to collect the results and complete the final work after several tasks.
In other words, fan-out Distribute the work, fan-in This is the flow of combining the results to get the final result.
In this example Node A in B and C Panout D Shows the process of being a fan.
In State reducer(add) Specifies the operator. This combines or accumulates values instead of simply overwriting existing values for specific keys within the State. For list, it means connecting the new list with the existing one.
LangGraph to specify the reducer function for a specific key in State Annotated Use type. This is the original type for type inspection list ), but without changing the type itself, the reducer function add ) Can be attached to the type.
Copy
Visualize the graph.
Copy
reducer The values added to each node through accumulation You can check what is being.
Copy
Copy
Copy
Response when an exception occurs during parallel processing
LangGraph executes nodes within "super-step (full process step where multiple nodes are handled)", which means that even if the parallel branch runs simultaneously, the entire super-step transaction It means it is processed in a way.
So, if any of these quarters have an exception, the update to the status Not at all Does not apply (full super-step is error handled).
super-step: full process step where multiple nodes are handled
If you have an error-prone task (e.g. unstable API call handling), LangGraph provides two ways to fix it.
You can capture and handle exceptions by writing common Python code within the node.
retry_policy You can set the graph to retry the node where a certain type of exception occurred. You only have to retry the failed branch, so you don't have to worry about performing unnecessary tasks.
These features give you full control over parallel execution and exception handling.
Fan-out and fan-in of parallel nodes with additional steps
In the example above, when each path is a single step fan-out and fan-in Showed how. But what if there are multiple steps in one path?
Copy
Visualize the graph.
Copy
Copy
Copy
Conditional branching
If fan-out is not decisive, add_conditional_edges You can use it yourself.
When there is a known "sink" node to be connected after a conditional branch, when creating a conditional edge then="실행할 노드명" Can be provided.
Copy
Below is the reference code. then When using grammar, then="e" You don't have to add and add edge connections.
Copy
Visualize the graph.
Copy
Copy
Copy
Copy
Copy
Copy
Copy
Sort by confidence of fan-out values
Nodes unfolded in parallel are one" super-step Runs with ". Updates that occur in each super-step are applied to the state sequentially after the corresponding super-step is completed.
If a consistent predefined update sequence is required in the parallel super-step, the output value is recorded in a separate field of state with an identification key, then general from each panned node to the gathering point edge You need to combine them in the "sink" node by adding.
For example, consider when you want to sort the output of a parallel step according to "reliability".
Copy
Visualize the graph.
Copy
Sort results by reliability when running nodes in parallel.
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# Defining the state (using the add_messages reducer)
class State(TypedDict):
aggregate: Annotated[list, add_messages]
# Node value return class
class ReturnNodeValue:
# reset
def __init__(self, node_secret: str):
self._value = node_secret
# Status update on call
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']}")
return {"aggregate": [self._value]}
# Initialize state graph
builder = StateGraph(State)
# Create and assign values to nodes A through D
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Node connection
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
# Compile the graph
graph = builder.compile()
from langchain_teddynote.graphs import visualize_graph
visualize_graph(graph)
Copy# running a graph
graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
Adding I'm A to []
Adding I'm B to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='93e79740-a331-4c56-8d9e-60c4b2704
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='93e79740-a331-4c56-8d9e-60c4b2704
Adding I'm D to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='93e79740-a331-4c56-8d9e-60c4b27 response_metadata={}, id='bb980f25-687c-44b3-8c94-e89f9a1109b0')]
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
# Defining the state (using the add_messages reducer)
class State(TypedDict):
aggregate: Annotated[list, add_messages]
# Node value return class
class ReturnNodeValue:
# reset
def __init__(self, node_secret: str):
self._value = node_secret
# Status update on call
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']}")
return {"aggregate": [self._value]}
# Initialize state graph
builder = StateGraph(State)
# Creating and connecting nodes
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b1", ReturnNodeValue("I'm B1"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge("a", "b1")
builder.add_edge("a", "c")
builder.add_edge("b1", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
# compile the graph
graph = builder.compile()
from langchain_teddynote.graphs import visualize_graph
visualize_graph(graph)
# Executing graph aggregation operations using an empty list, performing default aggregation on all data
graph.invoke({"aggregate": []})
Adding I'm A to []
Adding I'm B1 to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='bbae6f5c-dea2-4790-8450-2d5fa6f29
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='bbae6f5c-dea2-4790-8450-2d5fa6f29d
Adding I'm B2 to [HumanMessage (content="I'm A", additional_kwargs={}, response_metadata={}, id='bbae6f5c-dea2-4790-8450-2d5fa6f29 response_metadata={}, id='d355a34e-43cd-4349-bc02-040e1085cca0')]
Adding I'm D to [HumanMessage (content="I'm A", additional_kwargs={}, response_metadata={}, id='bbaef5c-dea2-4790-8450-2d5fa6f29 response_metadata={}, id='d355a34e-43cd-4349-bc02-040e1085cca0'), HumanMessage (content="I'm B2", additional_kwargs={}, respons
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import END, START, StateGraph
# Defining the state (using the add_messages reducer)
class State(TypedDict):
aggregate: Annotated[list, add_messages]
which: str
# A class that returns unique values for each node.
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['aggregate']}")
return {"aggregate": [self._value]}
# Initialize state graph
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_node("e", ReturnNodeValue("I'm E"))
# Conditional routing path determination function based on the 'which' value of the state
def route_bc_or_cd(state: State) -> Sequence[str]:
if state["which"] == "cd":
return ["c", "d"]
return ["b", "c"]
# List of nodes to be processed in full parallel
intermediates = ["b", "c", "d"]
builder.add_conditional_edges(
"a",
route_bc_or_cd,
intermediates,
)
for node in intermediates:
builder.add_edge(node, "e")
# Final node connection and graph compilation
builder.add_edge("e", END)
graph = builder.compile()
## When using then grammar
# builder.add_conditional_edges(
# "a",
# route_bc_or_cd,
# intermediates,
# then="e",
# )
from langchain_teddynote.graphs import visualize_graph
visualize_graph(graph)
# Run the graph (specified as which: bc)
graph.invoke({"aggregate": [], "which": "bc"})
Adding I'm A to []
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='d6f6e77f-d2fe-4c6b-8dc7-0771b7f16eb
Adding I'm B to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='d6f6e77f-d2fe-4c6b-8dc7-0771b7f16eb
Adding I'm E to [HumanMessage (content="I'm A", additional_kwargs={}, response_metadata={}, id='d6f6e77f-d2fe-4c6b-8dc7-071b7f16 response_metadata={}, id='b60eb63d-879e-4c65-b1ff-09bac04e724c')]
# Run the graph (specified as which: cd)
graph.invoke({"aggregate": [], "which": "cd"})
Adding I'm A to []
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='f5f810cc-a31a-42ea-b6f6-941c7ba647
Adding I'm D to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='f5f810cc-a31a-42ea-b6f6-941c7ba647
Adding I'm E to [HumanMessage (content="I'm A", additional_kwargs={}, response_metadata={}, id='f5f810cc-a31a-42ea-b6f6-941c7ba6 response_metadata={}, id='105d74b7-cea3-4cb5-8975-81168d773ca3')]