examples/rag/milvus-quickstart.ipynb
In this tutorial, we'll use Feast to inject documents and structured data (i.e., features) into the context of an LLM (Large Language Model) to power a RAG Application (Retrieval Augmented Generation) with Milvus as the online vector database.
Feast solves several common issues in this flow:
We will:
%%sh
pip install feast -U -q
echo "Please restart your runtime now (Runtime -> Restart runtime). This ensures that the correct dependencies are loaded."
Reminder: Please restart your runtime after installing Feast (Runtime -> Restart runtime). This ensures that the correct dependencies are loaded.
A feature repository is a directory that contains the configuration of the feature store and individual features. This configuration is written as code (Python/YAML) and it's highly recommended that teams track it centrally using git. See Feature Repository for a detailed explanation of feature repositories.
The easiest way to create a new feature repository to use the feast init command in your terminal. For this RAG demo with Milvus, you do not need to initialize a feast repo. We have already provided a complete feature repository for you in the current directory (check feature_repo) with all the necessary Milvus configurations set up and ready to use.
Let's take a look at the demo repo itself. It breaks down into
data/ contains raw demo parquet dataexample_repo.py contains demo feature definitionsfeature_store.yaml contains a demo setup configuring where data sources are, including Milvus settingstest_workflow.py showcases how to run all key Feast commands, including defining, retrieving, and pushing features.
python test_workflow.py.%cd feature_repo/
!ls -R
Let's inspect the setup of the project in feature_store.yaml.
The key line defining the overall architecture of the feature store is the provider.
The provider value sets default offline and online stores.
In this demo, we use the local provider with Parquet files for offline storage and Milvus for online storage
Our Milvus configuration includes:
This setup enables efficient vector similarity search for our RAG application while maintaining Feast's feature management capabilities.
Valid values for provider in feature_store.yaml are:
Note that there are many other offline / online stores Feast works with, including Azure, Hive, Trino, and PostgreSQL via community plugins. See https://docs.feast.dev/roadmap for all supported connectors.
A custom setup can also be made by following Customizing Feast
!pygmentize feature_store.yaml
The raw feature data we have in this demo is stored in a local parquet file. The dataset Wikipedia summaries of diferent cities.
import pandas as pd
df = pd.read_parquet("./data/city_wikipedia_summaries_with_embeddings.parquet")
df['vector'] = df['vector'].apply(lambda x: x.tolist())
embedding_length = len(df['vector'][0])
print(f'embedding length = {embedding_length}')
from IPython.display import display
display(df.head())
feast apply scans python files in the current directory for feature/entity definitions and deploys infrastructure according to feature_store.yaml.
Let's inspect what example_repo.py looks like:
from datetime import timedelta
from feast import (
FeatureView,
Field,
FileSource,
)
from feast.data_format import ParquetFormat
from feast.types import Float32, Array, String, ValueType
from feast import Entity
item = Entity(
name="item_id",
description="Item ID",
value_type=ValueType.INT64,
)
parquet_file_path = "./data/city_wikipedia_summaries_with_embeddings.parquet"
source = FileSource(
file_format=ParquetFormat(),
path=parquet_file_path,
timestamp_field="event_timestamp",
)
city_embeddings_feature_view = FeatureView(
name="city_embeddings",
entities=[item],
schema=[
Field(
name="vector",
dtype=Array(Float32),
vector_index=True,
vector_search_metric="COSINE",
),
Field(name="state", dtype=String),
Field(name="sentence_chunks", dtype=String),
Field(name="wiki_summary", dtype=String),
],
source=source,
ttl=timedelta(hours=2),
)
Now we run feast apply to register the feature views and entities defined in example_repo.py, and sets up Milvus online store tables. Note that we had previously specified Milvus as the online store in feature_store.yaml by specifying a local provider.
! feast apply
from datetime import datetime
from feast import FeatureStore
store = FeatureStore(repo_path=".")
materialize_incrementalWe now serialize the latest values of features since the beginning of time to prepare for serving. Note, materialize_incremental serializes all new features since the last materialize call, or since the time provided minus the ttl timedelta. In this case, this will be CURRENT_TIME - 1 day (ttl was set on the FeatureView instances in feature_repo/feature_repo/example_repo.py).
CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME
An alternative to using the CLI command is to use Python:
store.write_to_online_store(feature_view_name='city_embeddings', df=df)
Note that now there are online_store.db and registry.db, which store the materialized features and schema information, respectively.
pymilvus_client = store._provider._online_store._connect(store.config)
COLLECTION_NAME = pymilvus_client.list_collections()[0]
milvus_query_result = pymilvus_client.query(
collection_name=COLLECTION_NAME,
filter="item_id == '0'",
)
pd.DataFrame(milvus_query_result[0]).head()
Note from the above command that the online store indexes by entity_key.
Entity keys include a list of all entities needed (e.g. all relevant primary keys) to generate the feature vector. In this case, this is a serialized version of the driver_id. We use this later to fetch all features for a given driver at inference time.
During inference (e.g., during when a user submits a chat message) we need to embed the input text. This can be thought of as a feature transformation of the input data. In this example, we'll do this with a small Sentence Transformer from Hugging Face.
import torch
import torch.nn.functional as F
from feast import FeatureStore
from pymilvus import MilvusClient, DataType, FieldSchema
from transformers import AutoTokenizer, AutoModel
from example_repo import city_embeddings_feature_view, item
TOKENIZER = "sentence-transformers/all-MiniLM-L6-v2"
MODEL = "sentence-transformers/all-MiniLM-L6-v2"
def mean_pooling(model_output, attention_mask):
token_embeddings = model_output[
0
] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
)
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9
)
def run_model(sentences, tokenizer, model):
encoded_input = tokenizer(
sentences, padding=True, truncation=True, return_tensors="pt"
)
# Compute token embeddings
with torch.no_grad():
model_output = model(**encoded_input)
sentence_embeddings = mean_pooling(model_output, encoded_input["attention_mask"])
sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
return sentence_embeddings
At inference time, we need to use vector similarity search through the document embeddings from the online feature store using retrieve_online_documents_v2() while passing the embedded query. These feature vectors can then be fed into the context of the LLM.
question = "Which city has the largest population in New York?"
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER)
model = AutoModel.from_pretrained(MODEL)
query_embedding = run_model(question, tokenizer, model)
query = query_embedding.detach().cpu().numpy().tolist()[0]
from IPython.display import display
# Retrieve top k documents
context_data = store.retrieve_online_documents_v2(
features=[
"city_embeddings:vector",
"city_embeddings:item_id",
"city_embeddings:state",
"city_embeddings:sentence_chunks",
"city_embeddings:wiki_summary",
],
query=query,
top_k=3,
distance_metric='COSINE',
).to_df()
display(context_data)
def format_documents(context_df):
output_context = ""
unique_documents = context_df.drop_duplicates().apply(
lambda x: "City & State = {" + x['state'] +"}\nSummary = {" + x['wiki_summary'].strip()+"}",
axis=1,
)
for i, document_text in enumerate(unique_documents):
output_context+= f"****START DOCUMENT {i}****\n{document_text.strip()}\n****END DOCUMENT {i}****"
return output_context
RAG_CONTEXT = format_documents(context_data[['state', 'wiki_summary']])
print(RAG_CONTEXT)
FULL_PROMPT = f"""
You are an assistant for answering questions about states. You will be provided documentation from Wikipedia. Provide a conversational answer.
If you don't know the answer, just say "I do not know." Don't make up an answer.
Here are document(s) you should use when answer the users question:
{RAG_CONTEXT}
"""
import os
from openai import OpenAI
client = OpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
)
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": FULL_PROMPT},
{"role": "user", "content": question}
],
)
print('\n'.join([c.message.content for c in response.choices]))