Back to Feast

Overview

examples/rag/milvus-quickstart.ipynb

0.63.013.1 KB
Original Source

Overview

In this tutorial, we'll use Feast to inject documents and structured data (i.e., features) into the context of an LLM (Large Language Model) to power a RAG Application (Retrieval Augmented Generation) with Milvus as the online vector database.

Feast solves several common issues in this flow:

  1. Online retrieval: At inference time, LLMs often need access to data that isn't readily available and needs to be precomputed from other data sources.
    • Feast manages deployment to a variety of online stores (e.g. Milvus, DynamoDB, Redis, Google Cloud Datastore) and ensures necessary features are consistently available and freshly computed at inference time.
  2. Vector Search: Feast has built support for vector similarity search that is easily configured declaritively so users can focus on their application. Milvus provides powerful and efficient vector similarity search capabilities.
  3. Richer structured data: Along with vector search, users can query standard structured fields to inject into the LLM context for better user experiences.
  4. Feature/Context and versioning: Different teams within an organization are often unable to reuse data across projects and services, resulting in duplicate application logic. Models have data dependencies that need to be versioned, for example when running A/B tests on model/prompt versions.
    • Feast enables discovery of and collaboration on previously used documents, features, and enables versioning of sets of data.

We will:

  1. Deploy a local feature store with a Parquet file offline store and Milvus online store.
  2. Write/materialize the data (i.e., feature values) from the offline store (a parquet file) into the online store (Milvus).
  3. Serve the features using the Feast SDK with Milvus's vector search capabilities
  4. Inject the document into the LLM's context to answer questions
python
%%sh
pip install feast -U -q
echo "Please restart your runtime now (Runtime -> Restart runtime). This ensures that the correct dependencies are loaded."

Reminder: Please restart your runtime after installing Feast (Runtime -> Restart runtime). This ensures that the correct dependencies are loaded.

Step 2: Create a feature repository

A feature repository is a directory that contains the configuration of the feature store and individual features. This configuration is written as code (Python/YAML) and it's highly recommended that teams track it centrally using git. See Feature Repository for a detailed explanation of feature repositories.

The easiest way to create a new feature repository to use the feast init command in your terminal. For this RAG demo with Milvus, you do not need to initialize a feast repo. We have already provided a complete feature repository for you in the current directory (check feature_repo) with all the necessary Milvus configurations set up and ready to use.

Demo data scenario

  • We're using Wikipedia summaries about cities that have been transformed into vector embeddings using a sentence transformer model. These embeddings enable semantic search capabilities in our RAG application.
  • Our goal is to retrieve the most relevant city information based on user queries, combining the power of Milvus' vector similarity search with Feast's feature management to provide accurate, contextual responses through an LLM.

Step 2a: Inspecting the feature repository

Let's take a look at the demo repo itself. It breaks down into

  • data/ contains raw demo parquet data
  • example_repo.py contains demo feature definitions
  • feature_store.yaml contains a demo setup configuring where data sources are, including Milvus settings
  • test_workflow.py showcases how to run all key Feast commands, including defining, retrieving, and pushing features.
    • You can run this with python test_workflow.py.
python
%cd feature_repo/
!ls -R

Step 2b: Inspecting the project configuration

Let's inspect the setup of the project in feature_store.yaml.

The key line defining the overall architecture of the feature store is the provider.

The provider value sets default offline and online stores.

  • The offline store provides the compute layer to process historical data (for generating training data & feature values for serving).
  • The online store is a low latency store of the latest feature values (for powering real-time inference).

In this demo, we use the local provider with Parquet files for offline storage and Milvus for online storage

Our Milvus configuration includes:

  • Vector search enabled with 384-dimensional embeddings
  • FLAT index type with COSINE similarity metric
  • Local storage path at data/online_store.db

This setup enables efficient vector similarity search for our RAG application while maintaining Feast's feature management capabilities.

Valid values for provider in feature_store.yaml are:

  • local: use file source with Milvus Lite
  • gcp: use BigQuery/Snowflake with Google Cloud Datastore/Redis
  • aws: use Redshift/Snowflake with DynamoDB/Redis

Note that there are many other offline / online stores Feast works with, including Azure, Hive, Trino, and PostgreSQL via community plugins. See https://docs.feast.dev/roadmap for all supported connectors.

A custom setup can also be made by following Customizing Feast

python
!pygmentize feature_store.yaml

Inspecting the raw data

The raw feature data we have in this demo is stored in a local parquet file. The dataset Wikipedia summaries of diferent cities.

python
import pandas as pd 

df = pd.read_parquet("./data/city_wikipedia_summaries_with_embeddings.parquet")
df['vector'] = df['vector'].apply(lambda x: x.tolist())
embedding_length = len(df['vector'][0])
print(f'embedding length = {embedding_length}')
python
from IPython.display import display

display(df.head())

Step 3: Register feature definitions and deploy your feature store

feast apply scans python files in the current directory for feature/entity definitions and deploys infrastructure according to feature_store.yaml.

Step 3a: Inspecting feature definitions

Let's inspect what example_repo.py looks like:

python
from datetime import timedelta

from feast import (
    FeatureView,
    Field,
    FileSource,
)
from feast.data_format import ParquetFormat
from feast.types import Float32, Array, String, ValueType
from feast import Entity

item = Entity(
    name="item_id",
    description="Item ID",
    value_type=ValueType.INT64,
)

parquet_file_path = "./data/city_wikipedia_summaries_with_embeddings.parquet"

source = FileSource(
    file_format=ParquetFormat(),
    path=parquet_file_path,
    timestamp_field="event_timestamp",
)

city_embeddings_feature_view = FeatureView(
    name="city_embeddings",
    entities=[item],
    schema=[
        Field(
            name="vector",
            dtype=Array(Float32),
            vector_index=True,
            vector_search_metric="COSINE",
        ),
        Field(name="state", dtype=String),
        Field(name="sentence_chunks", dtype=String),
        Field(name="wiki_summary", dtype=String),
    ],
    source=source,
    ttl=timedelta(hours=2),
)

Step 3b: Applying feature definitions

Now we run feast apply to register the feature views and entities defined in example_repo.py, and sets up Milvus online store tables. Note that we had previously specified Milvus as the online store in feature_store.yaml by specifying a local provider.

python
! feast apply

Step 5: Load features into your online store

python
from datetime import datetime
from feast import FeatureStore

store = FeatureStore(repo_path=".")

Step 5a: Using materialize_incremental

We now serialize the latest values of features since the beginning of time to prepare for serving. Note, materialize_incremental serializes all new features since the last materialize call, or since the time provided minus the ttl timedelta. In this case, this will be CURRENT_TIME - 1 day (ttl was set on the FeatureView instances in feature_repo/feature_repo/example_repo.py).

bash
CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME

An alternative to using the CLI command is to use Python:

python
store.write_to_online_store(feature_view_name='city_embeddings', df=df)

Step 5b: Inspect materialized features

Note that now there are online_store.db and registry.db, which store the materialized features and schema information, respectively.

python
pymilvus_client = store._provider._online_store._connect(store.config)
COLLECTION_NAME = pymilvus_client.list_collections()[0]

milvus_query_result = pymilvus_client.query(
    collection_name=COLLECTION_NAME,
    filter="item_id == '0'",
)
pd.DataFrame(milvus_query_result[0]).head()

Quick note on entity keys

Note from the above command that the online store indexes by entity_key.

Entity keys include a list of all entities needed (e.g. all relevant primary keys) to generate the feature vector. In this case, this is a serialized version of the driver_id. We use this later to fetch all features for a given driver at inference time.

Step 6: Embedding a query using PyTorch and Sentence Transformers

During inference (e.g., during when a user submits a chat message) we need to embed the input text. This can be thought of as a feature transformation of the input data. In this example, we'll do this with a small Sentence Transformer from Hugging Face.

python
import torch
import torch.nn.functional as F
from feast import FeatureStore
from pymilvus import MilvusClient, DataType, FieldSchema
from transformers import AutoTokenizer, AutoModel
from example_repo import city_embeddings_feature_view, item

TOKENIZER = "sentence-transformers/all-MiniLM-L6-v2"
MODEL = "sentence-transformers/all-MiniLM-L6-v2"

def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[
        0
    ]  # First element of model_output contains all token embeddings
    input_mask_expanded = (
        attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    )
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
        input_mask_expanded.sum(1), min=1e-9
    )

def run_model(sentences, tokenizer, model):
    encoded_input = tokenizer(
        sentences, padding=True, truncation=True, return_tensors="pt"
    )
    # Compute token embeddings
    with torch.no_grad():
        model_output = model(**encoded_input)

    sentence_embeddings = mean_pooling(model_output, encoded_input["attention_mask"])
    sentence_embeddings = F.normalize(sentence_embeddings, p=2, dim=1)
    return sentence_embeddings

Step 7: Fetching real-time vectors and data for online inference

At inference time, we need to use vector similarity search through the document embeddings from the online feature store using retrieve_online_documents_v2() while passing the embedded query. These feature vectors can then be fed into the context of the LLM.

python
question = "Which city has the largest population in New York?"

tokenizer = AutoTokenizer.from_pretrained(TOKENIZER)
model = AutoModel.from_pretrained(MODEL)
query_embedding = run_model(question, tokenizer, model)
query = query_embedding.detach().cpu().numpy().tolist()[0]
python
from IPython.display import display

# Retrieve top k documents
context_data = store.retrieve_online_documents_v2(
    features=[
        "city_embeddings:vector",
        "city_embeddings:item_id",
        "city_embeddings:state",
        "city_embeddings:sentence_chunks",
        "city_embeddings:wiki_summary",
    ],
    query=query,
    top_k=3,
    distance_metric='COSINE',
).to_df()
display(context_data)
python
def format_documents(context_df):
    output_context = ""
    unique_documents = context_df.drop_duplicates().apply(
        lambda x: "City & State = {" + x['state'] +"}\nSummary = {" + x['wiki_summary'].strip()+"}",
        axis=1,
    )
    for i, document_text in enumerate(unique_documents):
        output_context+= f"****START DOCUMENT {i}****\n{document_text.strip()}\n****END DOCUMENT {i}****"
    return output_context
python
RAG_CONTEXT = format_documents(context_data[['state', 'wiki_summary']])
python
print(RAG_CONTEXT)
python
FULL_PROMPT = f"""
You are an assistant for answering questions about states. You will be provided documentation from Wikipedia. Provide a conversational answer.
If you don't know the answer, just say "I do not know." Don't make up an answer.

Here are document(s) you should use when answer the users question:
{RAG_CONTEXT}
"""
python
import os
from openai import OpenAI

client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
)
python
response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[
        {"role": "system", "content": FULL_PROMPT},
        {"role": "user", "content": question}
    ],
)
python
print('\n'.join([c.message.content for c in response.choices]))

End