docs/custom-code/func.md
@daft.funcWhen Daft's built-in functions aren't sufficient for your needs, the @daft.func and @daft.cls decorators let you run your own Python code over each row of data. Simply decorate a Python function or class, and it becomes usable in Daft DataFrame operations.
import daft
@daft.func
def add_and_format(a: int, b: int) -> str:
return f"Sum: {a + b}"
df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
df = df.select(add_and_format(df["x"], df["y"]))
df.show()
╭───────────╮
│ x │
│ --- │
│ Utf8 │
╞═══════════╡
│ Sum: 5 │
├╌╌╌╌╌╌╌╌╌╌╌┤
│ Sum: 7 │
├╌╌╌╌╌╌╌╌╌╌╌┤
│ Sum: 9 │
╰───────────╯
Daft supports multiple function variants to optimize for different use cases:
@daft.func.batch): Process entire batches of data with daft.Series for high performanceDaft automatically detects which variant to use for regular functions based on your function signature. For batch functions, you must use the @daft.func.batch decorator.
Row-wise functions are the default variant. They process one row at a time and return one value per row.
import daft
@daft.func
def multiply(a: int, b: int) -> int:
return a * b
df = daft.from_pydict({"x": [1, 2, 3], "y": [10, 20, 30]})
df = df.select(multiply(df["x"], df["y"]))
df.show()
╭───────╮
│ x │
│ --- │
│ Int64 │
╞═══════╡
│ 10 │
├╌╌╌╌╌╌╌┤
│ 40 │
├╌╌╌╌╌╌╌┤
│ 90 │
╰───────╯
Daft automatically infers the return type from your function's type hint:
@daft.func
def tokenize(text: str) -> list[int]:
vocab = {char: i for i, char in enumerate(set(text))}
return [vocab[char] for char in text]
df = daft.from_pydict({"text": ["hello", "world"]})
df = df.select(tokenize(df["text"]))
# The return type is automatically inferred as List[Int64]
print(df.schema())
If you need to override the inferred type, use the return_dtype parameter:
@daft.func(return_dtype=daft.DataType.int32())
def add(a: int, b: int) -> int:
return a + b
You can mix DataFrame expressions with literal values:
@daft.func
def add_constant(value: int, constant: int) -> int:
return value + constant
df = daft.from_pydict({"x": [1, 2, 3]})
df = df.select(add_constant(df["x"], 100)) # constant is a literal
df.show()
╭───────╮
│ x │
│ --- │
│ Int64 │
╞═══════╡
│ 101 │
├╌╌╌╌╌╌╌┤
│ 102 │
├╌╌╌╌╌╌╌┤
│ 103 │
╰───────╯
Functions with default arguments work as expected:
@daft.func
def format_number(value: int, prefix: str = "$", suffix: str = "") -> str:
return f"{prefix}{value}{suffix}"
df = daft.from_pydict({"amount": [10, 20, 30]})
# Use defaults
df.select(format_number(df["amount"])).show()
# Override with literals
df.select(format_number(df["amount"], prefix="€", suffix=" EUR")).show()
# Override with expressions
df.select(format_number(df["amount"], suffix=df["amount"].cast(daft.DataType.string()))).show()
When called without any expressions, functions execute immediately:
@daft.func
def add(a: int, b: int) -> int:
return a + b
# This executes immediately and returns 8
result = add(3, 5)
print(result) # 8
# This returns a Daft Expression
expr = add(df["x"], df["y"])
Decorate async functions to enable concurrent execution across rows:
import daft
import asyncio
import aiohttp
@daft.func
async def fetch_url(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
df = daft.from_pydict({
"urls": [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3",
]
})
# Requests are made concurrently
df = df.select(fetch_url(df["urls"]))
Use max_concurrency to limit the number of concurrent coroutines, for example to rate-limit API calls:
@daft.func(max_concurrency=10)
async def fetch_url(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
Generator functions use yield to produce multiple output rows per input row. Other columns in the DataFrame are automatically broadcast to match the number of generated values. You may only use one generator function per DataFrame operation.
import daft
from typing import Iterator
@daft.func
def repeat_value(value: str, count: int) -> Iterator[str]:
for _ in range(count):
yield value
df = daft.from_pydict({
"id": [1, 2, 3],
"word": ["hello", "world", "daft"],
"times": [2, 3, 1]
})
df = df.select("id", repeat_value(df["word"], df["times"]))
df.show()
╭───────┬───────╮
│ id ┆ word │
│ --- ┆ --- │
│ Int64 ┆ Utf8 │
╞═══════╪═══════╡
│ 1 ┆ hello │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 1 ┆ hello │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2 ┆ world │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2 ┆ world │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2 ┆ world │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3 ┆ daft │
╰───────┴───────╯
Notice how the id column values are repeated to match the number of generated values.
Use Iterator[T] or Generator[T, None, None] type hints to indicate the yielded type:
from typing import Iterator
@daft.func
def split_text(text: str) -> Iterator[str]:
for word in text.split():
yield word
Alternatively, specify the return type explicitly:
@daft.func(return_dtype=daft.DataType.string())
def split_text(text: str):
for word in text.split():
yield word
If a generator yields no values for a particular input row, a null value is inserted:
@daft.func
def yield_if_positive(value: int) -> Iterator[int]:
if value > 0:
yield value
df = daft.from_pydict({"x": [-1, 0, 5, 10]})
df = df.select(yield_if_positive(df["x"]))
df.show()
╭───────╮
│ x │
│ --- │
│ Int64 │
╞═══════╡
│ None │
├╌╌╌╌╌╌╌┤
│ None │
├╌╌╌╌╌╌╌┤
│ 5 │
├╌╌╌╌╌╌╌┤
│ 10 │
╰───────╯
@daft.func.batchFor performance-critical operations, batch UDFs process entire batches of data at once using daft.Series objects instead of individual values. This allows you to leverage optimized libraries like PyArrow or NumPy for efficient vectorized operations.
Use batch UDFs when:
Batch UDFs receive daft.Series objects as arguments and return a daft.Series, list, numpy.ndarray, or pyarrow.Array:
import daft
from daft import DataType, Series
@daft.func.batch(return_dtype=DataType.int64())
def add_series(a: Series, b: Series) -> Series:
import pyarrow.compute as pc
# Convert to PyArrow for efficient computation
a_arrow = a.to_arrow()
b_arrow = b.to_arrow()
result = pc.add(a_arrow, b_arrow)
return result
df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
df = df.select(add_series(df["x"], df["y"]))
df.show()
╭───────╮
│ x │
│ --- │
│ Int64 │
╞═══════╡
│ 5 │
├╌╌╌╌╌╌╌┤
│ 7 │
├╌╌╌╌╌╌╌┤
│ 9 │
╰───────╯
Batch UDFs can accept both Series and scalar arguments. Scalar arguments are passed through without modification:
@daft.func.batch(return_dtype=DataType.int64())
def add_constant(a: Series, constant: int) -> Series:
import pyarrow.compute as pc
a_arrow = a.to_arrow()
result = pc.add(a_arrow, constant)
return result
df = daft.from_pydict({"x": [1, 2, 3]})
df = df.select(add_constant(df["x"], 100))
df.show()
╭───────╮
│ x │
│ --- │
│ Int64 │
╞═══════╡
│ 101 │
├╌╌╌╌╌╌╌┤
│ 102 │
├╌╌╌╌╌╌╌┤
│ 103 │
╰───────╯
Like regular functions, batch UDFs execute immediately when called with scalars:
@daft.func.batch(return_dtype=DataType.int64())
def multiply(a: Series, b: Series) -> Series:
import pyarrow.compute as pc
a_arrow = a.to_arrow()
b_arrow = b.to_arrow()
result = pc.multiply(a_arrow, b_arrow)
return result
# Lazy execution - returns Expression
expr = multiply(df["x"], df["y"])
# Eager execution - computes immediately
result = multiply(5, 10) # Returns 50
@daft.func and @daft.func.batch accept a common set of keyword arguments for concurrency control, scheduling, and error handling:
@daft.func(
cpus=2, # Each invocation needs 2 CPUs
gpus=1, # Each invocation needs 1 GPU
use_process=True, # Run the function in a subprocess instead of a thread
max_retries=3, # Retry a failing invocation up to 3 times with backoff
on_error="log", # On final failure, log the exception and emit None
)
def call_flaky_api(url: str) -> str:
import requests
return requests.get(url).text
cpus and gpuscpus and gpus are used for concurrency control and scheduling — not placement. Daft uses them to decide how many invocations of your function can run in parallel on a given machine.
If you annotate a function with cpus=2 and it runs on a machine with 4 CPUs, Daft runs at most 2 invocations in parallel on that machine. gpus works the same way: @daft.func(gpus=1) on a machine with 2 GPUs means at most 2 concurrent invocations.
Both accept fractional values (e.g. cpus=0.5, gpus=0.5). gpus values above 1.0 must be integers.
max_concurrencymax_concurrency controls the maximum number of concurrent invocations of an async @daft.func or @daft.func.batch. It does not apply to synchronous (non-async) UDFs — setting it on a sync function raises.
@daft.func(max_concurrency=10)
async def fetch_url(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
use_processRuns the function in a subprocess instead of a thread on the main process. Use this when your function is not thread-safe or holds the GIL heavily. The default (None) lets Daft pick at runtime based on observed performance.
max_retries and on_errorControl what happens when a function invocation raises an exception.
max_retries=N retries failing invocations up to N times with exponential backoff starting at 100 ms, doubling each attempt, capped at 60 s, with ±25% jitter. If the raised exception is a daft.ai.utils.RetryAfterError, the specified retry-after delay is honored instead of the default backoff.on_error decides what to do after retries are exhausted:
"raise" (default) — fail the query."log" — log the exception and emit None for that invocation."ignore" — silently emit None for that invocation.When your function returns a struct (dictionary), you can use unnest=True to automatically expand the struct fields into separate columns:
import daft
@daft.func(
return_dtype=daft.DataType.struct({
"first": daft.DataType.string(),
"last": daft.DataType.string(),
"age": daft.DataType.int64()
}),
unnest=True
)
def parse_person(full_name: str, age: int):
parts = full_name.split()
return {"first": parts[0], "last": parts[1], "age": age}
df = daft.from_pydict({
"name": ["Alice Smith", "Bob Jones"],
"age": [30, 25]
})
df = df.select(parse_person(df["name"], df["age"]))
df.show()
╭───────┬───────┬───────╮
│ first ┆ last ┆ age │
│ --- ┆ --- ┆ --- │
│ Utf8 ┆ Utf8 ┆ Int64 │
╞═══════╪═══════╪═══════╡
│ Alice ┆ Smith ┆ 30 │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ Bob ┆ Jones ┆ 25 │
╰───────┴───────┴───────╯
Without unnest=True, you would get a single column containing struct values.
You can combine generator functions with unnest=True to yield multiple structs that get expanded into columns:
from typing import Iterator
@daft.func(
return_dtype=daft.DataType.struct({
"index": daft.DataType.int64(),
"char": daft.DataType.string()
}),
unnest=True
)
def enumerate_chars(text: str) -> Iterator[dict]:
for i, char in enumerate(text):
yield {"index": i, "char": char}
df = daft.from_pydict({"word": ["hi", "bye"]})
df = df.select(enumerate_chars(df["word"]))
df.show()
╭───────┬──────╮
│ index ┆ char │
│ --- ┆ --- │
│ Int64 ┆ Utf8 │
╞═══════╪══════╡
│ 0 ┆ h │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 1 ┆ i │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 0 ┆ b │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 1 ┆ y │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2 ┆ e │
╰───────┴──────╯