Back to Llama Index

Workflows cookbook: walking through all features of Workflows

docs/examples/workflow/workflows_cookbook.ipynb

0.14.21120.5 KB
Original Source

Workflows cookbook: walking through all features of Workflows

First, we install our dependencies. Core contains most of what we need; OpenAI is to handle LLM access and utils-workflow provides the visualization capabilities we'll use later on.

python
!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow

Then we bring in the deps we just installed

python
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step,
    Context,
)
import random
from llama_index.core.workflow import draw_all_possible_flows
from llama_index.utils.workflow import draw_most_recent_execution
from llama_index.llms.openai import OpenAI

Set up our OpenAI key, so we can do actual LLM things.

python
import os

os.environ["OPENAI_API_KEY"] = "sk-proj-..."

Workflow basics

Let's start with the basic possible workflow: it just starts, does one thing, and stops. There's no reason to have a real workflow if your task is this simple, but we're just demonstrating how they work.

python
from llama_index.llms.openai import OpenAI


class OpenAIGenerator(Workflow):
    @step
    async def generate(self, ev: StartEvent) -> StopEvent:
        llm = OpenAI(model="gpt-4o")
        response = await llm.acomplete(ev.query)
        return StopEvent(result=str(response))


w = OpenAIGenerator(timeout=10, verbose=False)
result = await w.run(query="What's LlamaIndex?")
print(result)

One of the neat things about Workflows is that we can use pyvis to visualize them. Let's see what that looks like for this very simple flow.

python
draw_all_possible_flows(OpenAIGenerator, filename="trivial_workflow.html")

Not a lot to see here, yet! The start event goes to generate() and then straight to StopEvent.

Loops and branches

Let's go to a more interesting example, demonstrating our ability to loop:

python
class FailedEvent(Event):
    error: str


class QueryEvent(Event):
    query: str


class LoopExampleFlow(Workflow):
    @step
    async def answer_query(
        self, ev: StartEvent | QueryEvent
    ) -> FailedEvent | StopEvent:
        query = ev.query
        # try to answer the query
        random_number = random.randint(0, 1)
        if random_number == 0:
            return FailedEvent(error="Failed to answer the query.")
        else:
            return StopEvent(result="The answer to your query")

    @step
    async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent:
        # improve the query or decide it can't be fixed
        random_number = random.randint(0, 1)
        if random_number == 0:
            return QueryEvent(query="Here's a better query.")
        else:
            return StopEvent(result="Your query can't be fixed.")

We're using random numbers to simulate LLM actions here so that we can get reliably interesting behavior.

answer_query() accepts a start event. It can then do 2 things:

  • it can answer the query and emit a StopEvent, which returns the result
  • it can decide the query was bad and emit a FailedEvent

improve_query() accepts a FailedEvent. It can also do 2 things:

  • it can decide the query can't be improved and emit a StopEvent, which returns failure
  • it can present a better query and emit a QueryEvent, which creates a loop back to answer_query()

We can also visualize this more complicated workflow:

python
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")

We've set verbose=True here so we can see exactly what events were triggered. You can see it conveniently demonstrates looping and then answering.

python
l = LoopExampleFlow(timeout=10, verbose=True)
result = await l.run(query="What's LlamaIndex?")
print(result)

Maintaining state between events

There is a global state which allows you to keep arbitrary data or functions around for use by all event handlers.

python
class GlobalExampleFlow(Workflow):
    @step
    async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent:
        # load our data here
        await ctx.store.set("some_database", ["value1", "value2", "value3"])

        return QueryEvent(query=ev.query)

    @step
    async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent:
        # use our data with our query
        data = await ctx.store.get("some_database")

        result = f"The answer to your query is {data[1]}"
        return StopEvent(result=result)
python
g = GlobalExampleFlow(timeout=10, verbose=True)
result = await g.run(query="What's LlamaIndex?")
print(result)

Of course, this flow is essentially still linear. A more realistic example would be if your start event could either be a query or a data population event, and you needed to wait. Let's set that up to see what it looks like:

python
class WaitExampleFlow(Workflow):
    @step
    async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent:
        if hasattr(ev, "data"):
            await ctx.store.set("data", ev.data)

        return StopEvent(result=None)

    @step
    async def query(self, ctx: Context, ev: StartEvent) -> StopEvent:
        if hasattr(ev, "query"):
            # do we have any data?
            if hasattr(self, "data"):
                data = await ctx.store.get("data")
                return StopEvent(result=f"Got the data {data}")
            else:
                # there's non data yet
                return None
        else:
            # this isn't a query
            return None
python
w = WaitExampleFlow(verbose=True)
result = await w.run(query="Can I kick it?")
if result is None:
    print("No you can't")
print("---")
result = await w.run(data="Yes you can")
print("---")
result = await w.run(query="Can I kick it?")
print(result)

Let's visualize how this flow works:

python
draw_all_possible_flows(WaitExampleFlow, filename="wait_workflow.html")

Waiting for one or more events

Because waiting for events is such a common pattern, the context object has a convenience function, collect_events(). It will capture events and store them, returning None until all the events it requires have been collected. Those events will be attached to the output of collect_events in the order that they were specified. Let's see this in action:

python
class InputEvent(Event):
    input: str


class SetupEvent(Event):
    error: bool


class QueryEvent(Event):
    query: str


class CollectExampleFlow(Workflow):
    @step
    async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent:
        # generically start everything up
        if not hasattr(self, "setup") or not self.setup:
            self.setup = True
            print("I got set up")
        return SetupEvent(error=False)

    @step
    async def collect_input(self, ev: StartEvent) -> InputEvent:
        if hasattr(ev, "input"):
            # perhaps validate the input
            print("I got some input")
            return InputEvent(input=ev.input)

    @step
    async def parse_query(self, ev: StartEvent) -> QueryEvent:
        if hasattr(ev, "query"):
            # parse the query in some way
            print("I got a query")
            return QueryEvent(query=ev.query)

    @step
    async def run_query(
        self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent
    ) -> StopEvent | None:
        ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent])
        if ready is None:
            print("Not enough events yet")
            return None

        # run the query
        print("Now I have all the events")
        print(ready)

        result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'"
        return StopEvent(result=result)
python
c = CollectExampleFlow()
result = await c.run(input="Here's some input", query="Here's my question")
print(result)

You can see each of the events getting triggered as well as the collection event repeatedly returning None until enough events have arrived. Let's see what this looks like in a flow diagram:

python
draw_all_possible_flows(CollectExampleFlow, "collect_workflow.html")

This concludes our tour of creating, running and visualizing workflows! Check out the docs and examples to learn more.