Back to Llama Index

Parallelizing Ingestion Pipeline

docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb

0.14.214.3 KB
Original Source

<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb" target="_parent"></a>

Parallelizing Ingestion Pipeline

In this notebook, we demonstrate how to execute ingestion pipelines using parallel processes. Both sync and async versions of batched parallel execution are possible with IngestionPipeline.

python
%pip install llama-index-embeddings-openai
python
import nest_asyncio

nest_asyncio.apply()
python
import cProfile, pstats
from pstats import SortKey

Load data

python
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data()

Define our IngestionPipeline

python
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline

# create the pipeline with transformations
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=20),
        TitleExtractor(),
        OpenAIEmbedding(),
    ]
)

# since we'll be testing performance, using timeit and cProfile
# we're going to disable cache
pipeline.disable_cache = True

Parallel Execution

A single run. Setting num_workers to a value greater than 1 will invoke parallel execution.

python
nodes = pipeline.run(documents=documents, num_workers=4)
python
len(nodes)
python
%timeit pipeline.run(documents=documents, num_workers=4)
python
cProfile.run(
    "pipeline.run(documents=documents, num_workers=4)",
    "newstats",
)
p = pstats.Stats("newstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

Async Parallel Execution

Here the ProcessPoolExecutor from concurrent.futures is used to execute processes asynchronously. The tasks are being processed are blocking, but also performed asynchronously on the individual processes.

python
nodes = await pipeline.arun(documents=documents, num_workers=4)
python
len(nodes)
python
import asyncio

loop = asyncio.get_event_loop()
%timeit loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))
python
loop = asyncio.get_event_loop()
cProfile.run(
    "loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))",
    "async-newstats",
)
p = pstats.Stats("async-newstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

Sequential Execution

By default num_workers is set to None and this will invoke sequential execution.

python
nodes = pipeline.run(documents=documents)
python
len(nodes)
python
%timeit pipeline.run(documents=documents)
python
cProfile.run("pipeline.run(documents=documents)", "oldstats")
p = pstats.Stats("oldstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

Async on Main Processor

As with the sync case, num_workers is default to None, which will then lead to single-batch execution of async tasks.

python
nodes = await pipeline.arun(documents=documents)
python
len(nodes)
python
%timeit loop.run_until_complete(pipeline.arun(documents=documents))
python
cProfile.run(
    "loop.run_until_complete(pipeline.arun(documents=documents))",
    "async-oldstats",
)
p = pstats.Stats("async-oldstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

In Summary

The results from the above experiments are re-shared below where each strategy is listed from fastest to slowest with this example dataset and pipeline.

  1. (Async, Parallel Processing): 20.3s
  2. (Async, No Parallel Processing): 20.5s
  3. (Sync, Parallel Processing): 29s
  4. (Sync, No Parallel Processing): 1min 11s

We can see that both cases that use Parallel Processing outperforms the Sync, No Parallel Processing (i.e., .run(num_workers=None)). Also, that at least for this case for Async tasks, there is little gains in using Parallel Processing. Perhaps for larger workloads and IngestionPipelines, using Async with Parallel Processing can lead to larger gains.