docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb
<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb" target="_parent"></a>
In this notebook, we demonstrate how to execute ingestion pipelines using parallel processes. Both sync and async versions of batched parallel execution are possible with IngestionPipeline.
%pip install llama-index-embeddings-openai
import nest_asyncio
nest_asyncio.apply()
import cProfile, pstats
from pstats import SortKey
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data()
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
# create the pipeline with transformations
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=1024, chunk_overlap=20),
TitleExtractor(),
OpenAIEmbedding(),
]
)
# since we'll be testing performance, using timeit and cProfile
# we're going to disable cache
pipeline.disable_cache = True
A single run. Setting num_workers to a value greater than 1 will invoke parallel execution.
nodes = pipeline.run(documents=documents, num_workers=4)
len(nodes)
%timeit pipeline.run(documents=documents, num_workers=4)
cProfile.run(
"pipeline.run(documents=documents, num_workers=4)",
"newstats",
)
p = pstats.Stats("newstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
Here the ProcessPoolExecutor from concurrent.futures is used to execute processes asynchronously. The tasks are being processed are blocking, but also performed asynchronously on the individual processes.
nodes = await pipeline.arun(documents=documents, num_workers=4)
len(nodes)
import asyncio
loop = asyncio.get_event_loop()
%timeit loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))
loop = asyncio.get_event_loop()
cProfile.run(
"loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))",
"async-newstats",
)
p = pstats.Stats("async-newstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
By default num_workers is set to None and this will invoke sequential execution.
nodes = pipeline.run(documents=documents)
len(nodes)
%timeit pipeline.run(documents=documents)
cProfile.run("pipeline.run(documents=documents)", "oldstats")
p = pstats.Stats("oldstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
As with the sync case, num_workers is default to None, which will then lead to single-batch execution of async tasks.
nodes = await pipeline.arun(documents=documents)
len(nodes)
%timeit loop.run_until_complete(pipeline.arun(documents=documents))
cProfile.run(
"loop.run_until_complete(pipeline.arun(documents=documents))",
"async-oldstats",
)
p = pstats.Stats("async-oldstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)
The results from the above experiments are re-shared below where each strategy is listed from fastest to slowest with this example dataset and pipeline.
We can see that both cases that use Parallel Processing outperforms the Sync, No Parallel Processing (i.e., .run(num_workers=None)). Also, that at least for this case for Async tasks, there is little gains in using Parallel Processing. Perhaps for larger workloads and IngestionPipelines, using Async with Parallel Processing can lead to larger gains.