docs/custom-code/gpu.md
For GPU work, always reach for @daft.cls: it initializes your model once in __init__ and reuses it across rows, so the expensive model load is amortized over the whole query. Request a GPU with the gpus parameter on the class decorator.
import daft
from daft import DataType, Series
@daft.cls(gpus=1)
class Embedder:
def __init__(self, model_name: str):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model_name).cuda()
@daft.method.batch(return_dtype=DataType.list(DataType.float32()), batch_size=64)
def encode(self, text: Series) -> list[list[float]]:
return self.model.encode(text.to_pylist()).tolist()
embedder = Embedder("all-MiniLM-L6-v2")
df = daft.from_pydict({"text": ["hello", "world", "daft"]})
df = df.select(embedder.encode(df["text"]))
The class is initialized once per Python worker; the model stays resident on the GPU for the whole query.
gpus accepts fractional values up to 1.0. Use this when a single invocation cannot saturate the GPU — for example, a small model with meaningful preprocessing overhead.
@daft.cls(gpus=0.5, max_concurrency=2)
class SmallModel:
def __init__(self, name: str):
import torch
self.model = torch.load(name).cuda()
@daft.method.batch(return_dtype=DataType.float64(), batch_size=16)
def infer(self, x: Series) -> Series:
return self.model(x.to_arrow().to_numpy())
With gpus=0.5 and max_concurrency=2, two instances share one physical GPU. Daft isolates CUDA_VISIBLE_DEVICES per process, so device ordinals stay consistent inside each instance.
Values above 1.0 must be integers (e.g. gpus=2 for model parallelism across two GPUs).
@daft.cls(gpus=1)
class SentenceEmbedder:
def __init__(self, model_name: str = "BAAI/bge-large-en-v1.5"):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model_name, device="cuda")
@daft.method.batch(return_dtype=DataType.list(DataType.float32()), batch_size=128)
def embed(self, text: Series) -> list[list[float]]:
return self.model.encode(text.to_pylist(), batch_size=128).tolist()
@daft.cls(gpus=1)
class CLIPScorer:
def __init__(self, model_name: str = "openai/clip-vit-base-patch32"):
import torch
from transformers import CLIPModel, CLIPProcessor
self.model = CLIPModel.from_pretrained(model_name).to("cuda").eval()
self.processor = CLIPProcessor.from_pretrained(model_name)
self.torch = torch
@daft.method.batch(return_dtype=DataType.list(DataType.float32()), batch_size=32)
def embed_images(self, image_bytes: Series) -> list[list[float]]:
from io import BytesIO
from PIL import Image
images = [Image.open(BytesIO(b)).convert("RGB") for b in image_bytes.to_pylist()]
inputs = self.processor(images=images, return_tensors="pt").to("cuda")
with self.torch.no_grad():
features = self.model.get_image_features(**inputs)
return features.cpu().tolist()
For production-grade LLM inference, Daft ships a built-in vLLM integration — see AI Functions › Prompt. For smaller custom generation tasks:
@daft.cls(gpus=1)
class Generator:
def __init__(self, model_name: str = "Qwen/Qwen2.5-1.5B-Instruct"):
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name, torch_dtype=torch.bfloat16, device_map="cuda"
).eval()
self.torch = torch
@daft.method(return_dtype=DataType.string())
def generate(self, prompt: str) -> str:
inputs = self.tokenizer(prompt, return_tensors="pt").to("cuda")
with self.torch.no_grad():
output = self.model.generate(**inputs, max_new_tokens=128)
return self.tokenizer.decode(output[0], skip_special_tokens=True)
@daft.cls(gpus=1)
class Whisper:
def __init__(self, model_name: str = "openai/whisper-large-v3"):
import torch
from transformers import pipeline
self.pipe = pipeline(
"automatic-speech-recognition",
model=model_name,
torch_dtype=torch.float16,
device="cuda",
)
@daft.method(return_dtype=DataType.string())
def transcribe(self, audio_path: str) -> str:
return self.pipe(audio_path)["text"]
GPU throughput is batch-size sensitive. A few rules of thumb:
See Classes & Methods › Batch Sizing for additional considerations.
GPU inference can fail for reasons unrelated to your code — OOM, transient driver errors, preempted instances. Use max_retries and on_error to keep long queries going:
@daft.cls(gpus=1, max_retries=2, on_error="log")
class Model:
...