docs/examples/ingestion/advanced_ingestion_pipeline.ipynb
<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/ingestion/advanced_ingestion_pipeline.ipynb" target="_parent"></a>
%pip install llama-index-vector-stores-weaviate
%pip install llama-index-embeddings-huggingface
!pip install llama-index
In this notebook, we implement an IngestionPipeline with the following features
All node + transformation combinations will have their outputs cached, which will save time on duplicate runs.
from llama_index.core.ingestion.cache import RedisCache
from llama_index.core.ingestion import IngestionCache
ingest_cache = IngestionCache(
cache=RedisCache.from_host_and_port(host="127.0.0.1", port=6379),
collection="my_test_cache",
)
For this example, we use weaviate as a vector store.
!pip install weaviate-client
import weaviate
auth_config = weaviate.AuthApiKey(api_key="...")
client = weaviate.Client(url="https://...", auth_client_secret=auth_config)
from llama_index.vector_stores.weaviate import WeaviateVectorStore
vector_store = WeaviateVectorStore(
weaviate_client=client, index_name="CachingTest"
)
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
text_splitter = TokenTextSplitter(chunk_size=512)
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
import re
from llama_index.core.schema import TransformComponent
class TextCleaner(TransformComponent):
def __call__(self, nodes, **kwargs):
for node in nodes:
node.text = re.sub(r"[^0-9A-Za-z ]", "", node.text)
return nodes
from llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline(
transformations=[
TextCleaner(),
text_splitter,
embed_model,
TitleExtractor(),
],
vector_store=vector_store,
cache=ingest_cache,
)
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader("../data/paul_graham/").load_data()
nodes = pipeline.run(documents=documents)
import os
# needed for the LLM in the query engine
os.environ["OPENAI_API_KEY"] = "sk-..."
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_vector_store(
vector_store=vector_store,
embed_model=embed_model,
)
query_engine = index.as_query_engine()
print(query_engine.query("What did the author do growing up?"))
The next code block will execute almost instantly due to caching.
pipeline = IngestionPipeline(
transformations=[TextCleaner(), text_splitter, embed_model],
cache=ingest_cache,
)
nodes = pipeline.run(documents=documents)
ingest_cache.clear()