Generator function (ie yield Functions that use keywords and act like an emulator) can be used in the LCEL pipeline.
The signature of this generator Iterator[Input] -> Iterator[Output] Should be. In the case of asynchronous generators AsyncIterator[Input] -> AsyncIterator[Output] is.
This is useful for:
Custom output parser implementation
Maintain streaming while modifying output from previous steps
In the example below, we will implement a custom output parser for comma-separated lists.
Copy
%pip install -qU langchain langchain-openai
Copy
from typing import Iterator, List
from langchain.prompts.chat import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
prompt = ChatPromptTemplate.from_template(
# List five companies similar to the given company, separated by commas.
"Write a comma-separated list of 5 companies similar to: {company}"
)
# Initialize the ChatOpenAI model by setting the temperature to 0.0.
model = ChatOpenAI(temperature=0.0, model="gpt-4-turbo-preview")
# Create a chain by connecting prompts and models and applying a string output parser.
str_chain = prompt | model | StrOutputParser()
Chain below Stream Run with and output the result. Results are generated in real time.
Copy
Copy
invoke() Check the run results.
Copy
Copy
split_into_list The function takes the emulator of the LLM token as input and returns the emulator of the comma-separated string list.
The last chunk yield Returns by (generator).
Copy
str_chain Pipe the string of the variable ( | ) Using operators split_into_list Pass to the function.
split_into_list The function serves to split the string into list.
Split list list_chain Assigned to variables.
Copy
list_chain Object stream Input data by calling a method {"animal": "bear"} Generate output for.
stream The method returns the output in chunk units, and each chunk is processed through repeated statements.
Each chunk print It is output immediately using functions, flush=True This output takes place immediately without buffering through the option.
This code list_chain It shows the process of generating output for input data using, processing output in chunks and outputting immediately.
Copy
Copy
this time list_chain on invoke() Inject data into. Let's see if it works without problems.
Copy
Copy
Asynchronous
asplit_into_list Functions are asynchronous generator ( AsyncIterator[str] ) Asynchronous generator of string list ( AsyncIterator[List[str]] ).
Copy
Stream using asynchronous functions.
Copy
Copy
Invoke the data to the asynchronous chain to make sure it works without problems.
# Stream data.
for chunk in str_chain.stream({"company": "Google"}):
# Outputs each chunk, flushing the buffer immediately without a newline.
print(chunk, end="", flush=True)
# A custom parser that takes an iterator of llm tokens as input and splits them into a comma-separated list of strings.
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
# Holds partial input until a comma is encountered.
buffer = ""
for chunk in input:
# Append the current chunk to the buffer.
buffer += chunk
# Repeat while there are commas in the buffer.
while "," in buffer:
# Split buffers by commas.
comma_index = buffer.index(",")
# Returns everything before the comma.
yield [buffer[:comma_index].strip()]
# The rest is saved for the next iteration.
buffer = buffer[comma_index + 1 :]
# Returns the last chunk.
yield [buffer.strip()]
list_chain = str_chain | split_into_list # Split a chain of strings into a list. Python Copy More options.
# Verify that the generated list_chain is streamed without any problems.
for chunk in list_chain.stream({"company": "Google"}):
print(chunk, flush=True) # Output each chunk and flush the buffer immediately.
from typing import AsyncIterator
# Defining an asynchronous function
async def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:
buffer = ""
# `input` is an `async_generator` object, so use `async for`
async for chunk in input:
buffer += chunk
while "," in buffer:
comma_index = buffer.index(",")
yield [
buffer[:comma_index].strip()
] # Split by comma and return as a list
buffer = buffer[comma_index + 1:]
yield [buffer.strip()] # Returns the remaining buffer contents as a list.
# Pipeline alist_chain and asplit_into_list
alist_chain = str_chain | asplit_into_list
# Stream data using an async for loop.
async for chunk in alist_chain.astream({"company": "Google"}):
# Output each chunk and empty the buffer.
print(chunk, flush=True)