docs/examples/workflow/workflows_cookbook.ipynb
First, we install our dependencies. Core contains most of what we need; OpenAI is to handle LLM access and utils-workflow provides the visualization capabilities we'll use later on.
!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow
Then we bring in the deps we just installed
from llama_index.core.workflow import (
Event,
StartEvent,
StopEvent,
Workflow,
step,
Context,
)
import random
from llama_index.core.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution
from llama_index.llms.openai import OpenAI
Set up our OpenAI key, so we can do actual LLM things.
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
Let's start with the basic possible workflow: it just starts, does one thing, and stops. There's no reason to have a real workflow if your task is this simple, but we're just demonstrating how they work.
from llama_index.llms.openai import OpenAI
class OpenAIGenerator(Workflow):
@step
async def generate(self, ev: StartEvent) -> StopEvent:
llm = OpenAI(model="gpt-4o")
response = await llm.acomplete(ev.query)
return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)
One of the neat things about Workflows is that we can use pyvis to visualize them. Let's see what that looks like for this very simple flow.
draw_all_possible_flows(OpenAIGenerator, filename="trivial_workflow.html")
Not a lot to see here, yet! The start event goes to generate() and then straight to StopEvent.
Let's go to a more interesting example, demonstrating our ability to loop:
class FailedEvent(Event):
error: str
class QueryEvent(Event):
query: str
class LoopExampleFlow(Workflow):
@step
async def answer_query(
self, ev: StartEvent | QueryEvent
) -> FailedEvent | StopEvent:
query = ev.query
# try to answer the query
random_number = random.randint(0, 1)
if random_number == 0:
return FailedEvent(error="Failed to answer the query.")
else:
return StopEvent(result="The answer to your query")
@step
async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
# improve the query or decide it can't be fixed
random_number = random.randint(0, 1)
if random_number == 0:
return QueryEvent(query="Here's a better query.")
else:
return StopEvent(result="Your query can't be fixed.")
We're using random numbers to simulate LLM actions here so that we can get reliably interesting behavior.
answer_query() accepts a start event. It can then do 2 things:
improve_query() accepts a FailedEvent. It can also do 2 things:
We can also visualize this more complicated workflow:
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")
We've set verbose=True here so we can see exactly what events were triggered. You can see it conveniently demonstrates looping and then answering.
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="What's LlamaIndex?")
print(result)
There is a global state which allows you to keep arbitrary data or functions around for use by all event handlers.
class GlobalExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
# load our data here
await ctx.store.set("some_database", ["value1", "value2", "value3"])
return QueryEvent(query=ev.query)
@step
async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
# use our data with our query
data = await ctx.store.get("some_database")
result = f"The answer to your query is {data[1]}"
return StopEvent(result=result)
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)
Of course, this flow is essentially still linear. A more realistic example would be if your start event could either be a query or a data population event, and you needed to wait. Let's set that up to see what it looks like:
class WaitExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "data"):
await ctx.store.set("data", ev.data)
return StopEvent(result=None)
@step
async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
if hasattr(ev, "query"):
# do we have any data?
if hasattr(self, "data"):
data = await ctx.store.get("data")
return StopEvent(result=f"Got the data {data}")
else:
# there's non data yet
return None
else:
# this isn't a query
return None
w = WaitExampleFlow(verbose=True)
result = await w.run(query="Can I kick it?")
if result is None:
print("No you can't")
print("---")
result = await w.run(data="Yes you can")
print("---")
result = await w.run(query="Can I kick it?")
print(result)
Let's visualize how this flow works:
draw_all_possible_flows(WaitExampleFlow, filename="wait_workflow.html")
Because waiting for events is such a common pattern, the context object has a convenience function, collect_events(). It will capture events and store them, returning None until all the events it requires have been collected. Those events will be attached to the output of collect_events in the order that they were specified. Let's see this in action:
class InputEvent(Event):
input: str
class SetupEvent(Event):
error: bool
class QueryEvent(Event):
query: str
class CollectExampleFlow(Workflow):
@step
async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
# generically start everything up
if not hasattr(self, "setup") or not self.setup:
self.setup = True
print("I got set up")
return SetupEvent(error=False)
@step
async def collect_input(self, ev: StartEvent) -> InputEvent:
if hasattr(ev, "input"):
# perhaps validate the input
print("I got some input")
return InputEvent(input=ev.input)
@step
async def parse_query(self, ev: StartEvent) -> QueryEvent:
if hasattr(ev, "query"):
# parse the query in some way
print("I got a query")
return QueryEvent(query=ev.query)
@step
async def run_query(
self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
) -> StopEvent | None:
ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
if ready is None:
print("Not enough events yet")
return None
# run the query
print("Now I have all the events")
print(ready)
result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'"
return StopEvent(result=result)
c = CollectExampleFlow()
result = await c.run(input="Here's some input", query="Here's my question")
print(result)
You can see each of the events getting triggered as well as the collection event repeatedly returning None until enough events have arrived. Let's see what this looks like in a flow diagram:
draw_all_possible_flows(CollectExampleFlow, "collect_workflow.html")
This concludes our tour of creating, running and visualizing workflows! Check out the docs and examples to learn more.