Back to Daft

Skip this notebook execution in CI because it hits non-public buckets

tutorials/talks_and_demos/pydata_global_2023.ipynb

0.7.103.8 KB
Original Source
python
CI = False
python
# Skip this notebook execution in CI because it hits non-public buckets
if CI:
    import sys

    sys.exit()
python
import daft

Reading Parquet Files

Read all data in Parquet files

python
df = daft.read_parquet(
    "s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/"
)
python
%%time

df.collect()

Read one column only in Parquet files

python
df = daft.read_parquet(
    "s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/"
)
df = df.select("L_ORDERKEY")
python
%%time

df.collect()

Read filtered data from Parquet files

python
df = daft.read_parquet(
    "s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/"
)
df = df.where(df["L_ORDERKEY"] < 100)
python
%%time

df.collect()

Reading many small files

Listing Files

Let's compare naive listing with the Python boto3 library with Daft's s3 listing capabilities.

Listing many small files (100k++) is a notoriously expensive operation in S3, but Daft provides an extremely efficient solution for common cases with hierarchical file namespaces.

python
%%time

import boto3

client = boto3.client("s3")
kwargs = {"Bucket": "daft-public-datasets", "Prefix": "tpch-lineitem/10k-1mb-csv-files"}
response = client.list_objects_v2(**kwargs)
data = response["Contents"]
token = response.get("NextContinuationToken")

while token is not None:
    if token is not None:
        kwargs["ContinuationToken"] = token
    response = client.list_objects_v2(**kwargs)
    data.extend(response["Contents"])
    token = response.get("NextContinuationToken")

print(f"Retrieved {len(data)} results.")
python
%%time

df = daft.from_glob_path("s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv")
df.collect()

Reading 10K small 1MB CSV files

python
df = daft.read_csv("s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv")
python
%%time

df.collect()

Downloading data from URLs

In many unstructured/complex data workloads, you will often have URLs in your table pointing out to some external data. Daft is extremely fast at downloading this data. Much faster than anything I've ever managed to build using boto3 and Python.

python
df = daft.from_glob_path("s3://daft-public-data/open-images/validation-images/**.jpg")

IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(max_connections=64))
df = df.with_column("data", df["path"].download(io_config=IO_CONFIG))
python
%%time

df.collect()

Running a full TPC-H Query

Let's see what this profiling looks like for running a TPC-H query on

python
import os

import daft

PARQUET_FOLDER = "s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/"


def get_df(table_name: str) -> daft.DataFrame:
    return daft.read_parquet(os.path.join(PARQUET_FOLDER, table_name, "*.parquet"))
python
import datetime

from daft import col

lineitem = get_df("lineitem")

discounted_price = col("L_EXTENDEDPRICE") * (1 - col("L_DISCOUNT"))
taxed_discounted_price = discounted_price * (1 + col("L_TAX"))
df = (
    lineitem.where(col("L_SHIPDATE") <= datetime.date(1998, 9, 2))
    .groupby(col("L_RETURNFLAG"), col("L_LINESTATUS"))
    .agg(
        col("L_QUANTITY").alias("sum_qty").sum(),
        col("L_EXTENDEDPRICE").alias("sum_base_price").sum(),
        discounted_price.alias("sum_disc_price").sum(),
        taxed_discounted_price.alias("sum_charge").sum(),
        col("L_QUANTITY").alias("avg_qty").mean(),
        col("L_EXTENDEDPRICE").alias("avg_price").mean(),
        col("L_DISCOUNT").alias("avg_disc").mean(),
        col("L_QUANTITY").alias("count_order").count(),
    )
    .sort(["L_RETURNFLAG", "L_LINESTATUS"])
)