docs/examples/text-embeddings.md
In this example, we demonstrate how to build a text embedding pipeline for processing large text datasets and storing these embeddings in Turbopuffer. You'll learn how to:
What is Turbopuffer? Turbopuffer is a vector database that allows you to store and search through high-dimensional embeddings efficiently. It's designed for production workloads and provides fast similarity search capabilities.
What are embeddings? An embedding is a representation of data (text, images, audio etc.), often a vector of numerical values, that encodes semantic information. These embeddings can then be used in many applications such as semantic search, deduplication, multi-lingual applications, and so on.
By the end, you should be able to run a text embedding pipeline on a cluster and achieve near 100% GPU utilization for your workloads.
Our pipeline will:
Before starting, install the required dependencies and download the spaCy model for text chunking:
pip install "daft[ray]" turbopuffer torch sentence-transformers spacy accelerate transformers
python -m spacy download en_core_web_sm
You will also need AWS access. Individual methods may vary, but once set up you can login via:
aws sso login
import torch
import daft
from daft import col
NUM_GPU_NODES = 8 # The number of GPU nodes available
NLP_MODEL_NAME = "en_core_web_sm" # The spaCy model to use for chunking text
CHUNKING_PARALLELISM = 8 # The number of chunking UDFs to run in parallel per node
EMBEDDING_MODEL_NAME = "Qwen/Qwen3-Embedding-0.6B" # The text embedding model to use
ENCODING_DIM = 1024 # The output dimensions for embeddings
BATCH_SIZE = 512 # The number of records passed into our embeddings UDF
SENTENCE_TRANSFORMER_BATCH_SIZE = 16 # The number of records in each batch to use with SentenceTransformers
When creating embeddings, it's useful to split your text into meaningful chunks. Text is hierarchical and can be broken down at different levels: Document → Sections → Paragraphs → Sentences → Words → Characters. The chunking strategy to use depends on your use case.
We'll use sentence-level chunking in this example.
We'll also use spaCy, which is a natural language processing library that provides robust sentence boundary detection that handles edge cases better than simple punctuation-based splitting.
# Define the return type for chunked text
# Here we'll keep both the chunked text and the chunk ID which we'll later use for creating IDs for the sentences
chunked_type = daft.DataType.list(
daft.DataType.struct({
"text": daft.DataType.string(),
"chunk_id": daft.DataType.int32()
})
)
@daft.udf(
return_dtype=chunked_type,
concurrency=NUM_GPU_NODES * (CHUNKING_PARALLELISM + 1),
batch_size=BATCH_SIZE // CHUNKING_PARALLELISM // 2
)
class ChunkingUDF:
def __init__(self):
import spacy
self.nlp = spacy.load(NLP_MODEL_NAME)
def __call__(self, text_col):
results = []
for text in text_col:
doc = self.nlp(text)
sentence_texts = [
{"text": sentence.text, "chunk_id": i}
for i, sentence in enumerate(doc.sents)
]
results.append(sentence_texts)
return results
This User-Defined Function (UDF):
text_col) to minimize overhead(NUM_GPU_NODES * CHUNKING_PARALLELISM = 64 total instances) for distributed processingThe quality of your embeddings depends heavily on the model you choose. Here are some key considerations:
Model Performance
Some Popular Models
With open models available on HuggingFace, you can easily swap models by changing the EMBEDDING_MODEL_NAME constant in the code below.
We'll create a UDF to generate embeddings from the chunked text:
# Define the return type for embeddings
embedding_type = daft.DataType.embedding(daft.DataType.float32(), ENCODING_DIM)
@daft.udf(
return_dtype=embedding_type,
concurrency=NUM_GPU_NODES,
num_gpus=1,
batch_size=BATCH_SIZE
)
class EncodingUDF:
def __init__(self):
from sentence_transformers import SentenceTransformer
device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.model = SentenceTransformer(EMBEDDING_MODEL_NAME, device=device)
self.model.compile()
def __call__(self, text_col):
embeddings = self.model.encode(
text_col.to_pylist(),
batch_size=SENTENCE_TRANSFORMER_BATCH_SIZE,
convert_to_tensor=True,
torch_dtype=torch.bfloat16,
)
return embeddings.cpu().numpy()
This UDF:
bfloat16 precision to reduce memory usageSENTENCE_TRANSFORMER_BATCH_SIZE = 128) for optimal GPU utilizationYou can run this script locally, but if you're interested in running this pipeline on a cluster, check out our guide on scaling up. In this example, we ran on a ray cluster with 8 g5.2xlarge workers (each comes with an A10G GPU). To configure our Daft script to use the ray cluster, we added:
# Configure Daft to use Ray to schedule work on different worker nodes
daft.set_runner_ray()
# Configure S3 access for reading data
daft.set_planning_config(
default_io_config=daft.io.IOConfig(
s3=daft.io.S3Config.from_env()
)
)
Now we'll execute the complete data processing pipeline:
(
daft.read_parquet("s3://desmond-demo/text-embedding-dataset.parquet")
.with_column("sentences", ChunkingUDF(col("text")))
.explode("sentences")
.with_column("text", col("sentences")["text"])
.with_column("chunk_id", col("sentences")["chunk_id"])
.exclude("sentences")
.with_column("embedding", EncodingUDF(col("text")))
.with_column(
"id",
col("url").right(50) + "-" + col("chunk_id").cast(daft.DataType.string())
)
.select("id", "url", "language", "source", "text", "embedding")
.write_turbopuffer(
namespace="desmond-scale-experiment6",
region="aws-us-west-2",
id_column="id",
vector_column="embedding",
distance_metric="cosine_distance"
)
)
Pipeline steps explained:
DataFrame.write_turbopuffer][daft.DataFrame.write_turbopuffer] methodIf all works out well, when you run this script on your cluster, you should notice that network I/O, CPU work, and GPU work are pipelined to run in parallel, and you should see high GPU utilization :)
SENTENCE_TRANSFORMER_BATCH_SIZE for better throughput, decrease for lower GPU memory usageNUM_GPU_NODES and CHUNKING_PARALLELISM based on your cluster size and cores available per nodeEMBEDDING_MODEL_NAME with other SentenceTransformer modelsChunkingUDF to use different text chunking strategiesSENTENCE_TRANSFORMER_BATCH_SIZE may be too largebfloat16 or float16 quantization for lower GPU memory utilization and higher throughput.This pipeline can efficiently process millions of text documents while automatically scaling across your available compute resources.
Here's the complete script you can run:
import torch
import daft
from daft import col
NUM_GPU_NODES = 8 # The number of GPU nodes available
NLP_MODEL_NAME = "en_core_web_sm" # The spaCy model to use for chunking text
CHUNKING_PARALLELISM = 8 # The number of chunking UDFs to run in parallel per node
EMBEDDING_MODEL_NAME = "Qwen/Qwen3-Embedding-0.6B" # The text embedding model to use
ENCODING_DIM = 1024 # The output dimensions for embeddings
BATCH_SIZE = 512 # The number of records passed into our embeddings UDF
SENTENCE_TRANSFORMER_BATCH_SIZE = 16 # The number of records in each batch to use with SentenceTransformers
# Define the return type for chunked text
chunked_type = daft.DataType.list(
daft.DataType.struct({
"text": daft.DataType.string(),
"chunk_id": daft.DataType.int32()
})
)
@daft.udf(
return_dtype=chunked_type,
concurrency=NUM_GPU_NODES * (CHUNKING_PARALLELISM + 1),
batch_size=BATCH_SIZE // CHUNKING_PARALLELISM // 2
)
class ChunkingUDF:
def __init__(self):
import spacy
self.nlp = spacy.load(NLP_MODEL_NAME)
def __call__(self, text_col):
results = []
for text in text_col:
doc = self.nlp(text)
sentence_texts = [
{"text": sentence.text, "chunk_id": i}
for i, sentence in enumerate(doc.sents)
]
results.append(sentence_texts)
return results
# Define the return type for embeddings
embedding_type = daft.DataType.embedding(daft.DataType.float32(), ENCODING_DIM)
@daft.udf(
return_dtype=embedding_type,
concurrency=NUM_GPU_NODES,
num_gpus=1,
batch_size=BATCH_SIZE
)
class EncodingUDF:
def __init__(self):
from sentence_transformers import SentenceTransformer
device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.model = SentenceTransformer(EMBEDDING_MODEL_NAME, device=device)
self.model.compile()
def __call__(self, text_col):
embeddings = self.model.encode(
text_col.to_pylist(),
batch_size=SENTENCE_TRANSFORMER_BATCH_SIZE,
convert_to_tensor=True,
torch_dtype=torch.bfloat16,
)
return embeddings.cpu().numpy()
def main():
# Configure Daft to use Ray to schedule work on different worker nodes
daft.set_runner_ray()
# Configure S3 access for reading data
daft.set_planning_config(
default_io_config=daft.io.IOConfig(
s3=daft.io.S3Config.from_env()
)
)
(
daft.read_parquet("s3://desmond-demo/text-embedding-dataset.parquet")
.with_column("sentences", ChunkingUDF(col("text")))
.explode("sentences")
.with_column("text", col("sentences")["text"])
.with_column("chunk_id", col("sentences")["chunk_id"])
.exclude("sentences")
.with_column("embedding", EncodingUDF(col("text")))
.with_column(
"id",
col("url").right(50) + "-" + col("chunk_id").cast(daft.DataType.string())
)
.select("id", "url", "language", "source", "text", "embedding")
.write_turbopuffer(
namespace="desmond-scale-experiment6",
region="aws-us-west-2",
id_column="id",
vector_column="embedding",
distance_metric="cosine_distance"
)
)
if __name__ == '__main__':
main()