tutorials/talks_and_demos/pydata_global_2023.ipynb
CI = False
# Skip this notebook execution in CI because it hits non-public buckets
if CI:
import sys
sys.exit()
import daft
df = daft.read_parquet(
"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/"
)
%%time
df.collect()
df = daft.read_parquet(
"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/"
)
df = df.select("L_ORDERKEY")
%%time
df.collect()
df = daft.read_parquet(
"s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/100_0/32/parquet/lineitem/"
)
df = df.where(df["L_ORDERKEY"] < 100)
%%time
df.collect()
Let's compare naive listing with the Python boto3 library with Daft's s3 listing capabilities.
Listing many small files (100k++) is a notoriously expensive operation in S3, but Daft provides an extremely efficient solution for common cases with hierarchical file namespaces.
%%time
import boto3
client = boto3.client("s3")
kwargs = {"Bucket": "daft-public-datasets", "Prefix": "tpch-lineitem/10k-1mb-csv-files"}
response = client.list_objects_v2(**kwargs)
data = response["Contents"]
token = response.get("NextContinuationToken")
while token is not None:
if token is not None:
kwargs["ContinuationToken"] = token
response = client.list_objects_v2(**kwargs)
data.extend(response["Contents"])
token = response.get("NextContinuationToken")
print(f"Retrieved {len(data)} results.")
%%time
df = daft.from_glob_path("s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv")
df.collect()
df = daft.read_csv("s3://daft-public-datasets/tpch-lineitem/10k-1mb-csv-files/**/*.csv")
%%time
df.collect()
In many unstructured/complex data workloads, you will often have URLs in your table pointing out to some external data. Daft is extremely fast at downloading this data. Much faster than anything I've ever managed to build using boto3 and Python.
df = daft.from_glob_path("s3://daft-public-data/open-images/validation-images/**.jpg")
IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(max_connections=64))
df = df.with_column("data", df["path"].download(io_config=IO_CONFIG))
%%time
df.collect()
Let's see what this profiling looks like for running a TPC-H query on
import os
import daft
PARQUET_FOLDER = "s3://eventual-dev-benchmarking-fixtures/uncompressed-smaller-rg/tpch-dbgen/1000_0/512/parquet/"
def get_df(table_name: str) -> daft.DataFrame:
return daft.read_parquet(os.path.join(PARQUET_FOLDER, table_name, "*.parquet"))
import datetime
from daft import col
lineitem = get_df("lineitem")
discounted_price = col("L_EXTENDEDPRICE") * (1 - col("L_DISCOUNT"))
taxed_discounted_price = discounted_price * (1 + col("L_TAX"))
df = (
lineitem.where(col("L_SHIPDATE") <= datetime.date(1998, 9, 2))
.groupby(col("L_RETURNFLAG"), col("L_LINESTATUS"))
.agg(
col("L_QUANTITY").alias("sum_qty").sum(),
col("L_EXTENDEDPRICE").alias("sum_base_price").sum(),
discounted_price.alias("sum_disc_price").sum(),
taxed_discounted_price.alias("sum_charge").sum(),
col("L_QUANTITY").alias("avg_qty").mean(),
col("L_EXTENDEDPRICE").alias("avg_price").mean(),
col("L_DISCOUNT").alias("avg_disc").mean(),
col("L_QUANTITY").alias("count_order").count(),
)
.sort(["L_RETURNFLAG", "L_LINESTATUS"])
)