Back to Llama Index

Async Ingestion Pipeline + Metadata Extraction

docs/examples/ingestion/async_ingestion_pipeline.ipynb

0.14.214.2 KB
Original Source

Async Ingestion Pipeline + Metadata Extraction

Recently, LlamaIndex has introduced async metadata extraction. Let's compare metadata extraction speeds in an ingestion pipeline using a newer and older version of LlamaIndex.

We will test a pipeline using the classic Paul Graham essay.

python
%pip install llama-index-embeddings-openai
%pip install llama-index-llms-openai
python
!mkdir -p 'data/paul_graham/'
!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'
python
import os

os.environ["OPENAI_API_KEY"] = "sk-..."

New LlamaIndex Ingestion

Using a version of LlamaIndex greater or equal to v0.9.7, we can take advantage of improved async metadata extraction within ingestion pipelines.

NOTE: Restart your notebook after installing a new version!

python
!pip install "llama_index>=0.9.7"

NOTE: The num_workers kwarg controls how many requests can be outgoing at a given time using an async semaphore. Setting it higher may increase speeds, but can also lead to timeouts or rate limits, so set it wisely.

python
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.extractors import TitleExtractor, SummaryExtractor
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import MetadataMode


def build_pipeline():
    llm = OpenAI(model="gpt-3.5-turbo-1106", temperature=0.1)

    transformations = [
        SentenceSplitter(chunk_size=1024, chunk_overlap=20),
        TitleExtractor(
            llm=llm, metadata_mode=MetadataMode.EMBED, num_workers=8
        ),
        SummaryExtractor(
            llm=llm, metadata_mode=MetadataMode.EMBED, num_workers=8
        ),
        OpenAIEmbedding(),
    ]

    return IngestionPipeline(transformations=transformations)
python
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader("./data/paul_graham").load_data()
python
import time

times = []
for _ in range(3):
    time.sleep(30)  # help prevent rate-limits/timeouts, keeps each run fair
    pipline = build_pipeline()
    start = time.time()
    nodes = await pipline.arun(documents=documents)
    end = time.time()
    times.append(end - start)

print(f"Average time: {sum(times) / len(times)}")

The current openai python client package is a tad unstable -- sometimes async jobs will timeout, skewing the average. You can see the last progress bar took 1 minute instead of the previous 6 or 7 seconds, skewing the average.

Old LlamaIndex Ingestion

Now, lets compare to an older version of LlamaIndex, which was using "fake" async for metadata extraction.

NOTE: Restart your notebook after installing the new version!

python
!pip install "llama_index<0.9.6"
python
import os

os.environ["OPENAI_API_KEY"] = "sk-..."
python
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.extractors import TitleExtractor, SummaryExtractor
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import MetadataMode


def build_pipeline():
    llm = OpenAI(model="gpt-3.5-turbo-1106", temperature=0.1)

    transformations = [
        SentenceSplitter(chunk_size=1024, chunk_overlap=20),
        TitleExtractor(llm=llm, metadata_mode=MetadataMode.EMBED),
        SummaryExtractor(llm=llm, metadata_mode=MetadataMode.EMBED),
        OpenAIEmbedding(),
    ]

    return IngestionPipeline(transformations=transformations)
python
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader("./data/paul_graham").load_data()
python
import time

times = []
for _ in range(3):
    time.sleep(30)  # help prevent rate-limits/timeouts, keeps each run fair
    pipline = build_pipeline()
    start = time.time()
    nodes = await pipline.arun(documents=documents)
    end = time.time()
    times.append(end - start)

print(f"Average time: {sum(times) / len(times)}")