tutorials/talks_and_demos/iceberg_summit_2024.ipynb
CI = False
# Skip this notebook execution in CI because it hits non-public buckets
if CI:
import sys
sys.exit()
#### SETUP - run to set up the notebook
import pathlib
import shutil
shutil.rmtree("/tmp/warehouse")
pathlib.Path("/tmp/warehouse").mkdir(exist_ok=True)
!pip install 'pyiceberg[sql]'
!pip install 'daft[ray]' polars pandas
!pip install ray==2.20.0
!pip install sqlalchemy ipywidgets boto3 mypy_boto3_glue
Let's run through a simple end-to-end example of Reading/Writing to Iceberg.
Catalogs are records of where to find iceberg tables. PyIceberg adds support for connecting to many catalogs such as AWS Glue, Iceberg REST and more.
For the purposes of this tutorial, let's use SQLite as a local catalog so we can run everything on our machine!
from pyiceberg.catalog.sql import SqlCatalog
warehouse_path = "/tmp/warehouse"
catalog = SqlCatalog(
"default",
**{
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
},
)
Namespaces are collections of tables under a Catalog
catalog.create_namespace("my_namespace")
Tables can be created in a catalog. Let's go ahead and create a table in our catalog which does not yet contain any data.
from pyiceberg.schema import IntegerType, NestedField, Schema, StringType
SCHEMA = Schema(
NestedField(1, "name", StringType(), required=False),
NestedField(2, "age", IntegerType(), required=False),
)
table = catalog.create_table("my_namespace.my_friends", schema=SCHEMA)
table
# TODO: Let's show the data in /tmp/warehouse
import daft
df = daft.from_pydict(
{
"name": ["jay", "sammy", "brian"],
"age": [30, 31, 32],
}
)
df
df.write_iceberg(table, mode="overwrite")
PyIceberg provides various options for reading the tables. Let's take a look at a few.
# Pandas: eager materialization
table.scan().to_pandas()
# Polars: lazy materialization
import polars as pl
pl.scan_iceberg(table)
# Daft: lazy materialization
daft.read_iceberg(table) # equivalent to: `table.to_daft()`
# Daft: distributed reads
daft.read_iceberg(table).explain(True)
When working with larger tables, we will want to run our workload in the cloud!
from pyiceberg.catalog.glue import GlueCatalog
catalog = GlueCatalog("my_glue_catalog")
table = catalog.load_table("tpch_iceberg_sf1000.lineitem")
table
df = daft.read_iceberg(table)
df.show()
import datetime
df = df.where(df["L_SHIPDATE"] < datetime.date(1996, 1, 1))
aggregated_df = df.groupby("L_SHIPDATE").agg([daft.col("L_EXTENDEDPRICE").sum()])
aggregated_df
aggregated_df.explain(True)
NOTE:
The plan produced above by Daft indicates that the query will scan a large number of bytes (tens of gigabytes!) over a large number of files (about 280 files).
Running this on our current machine would not be a good idea. The data is fairly large and will take a long time to download via my office WiFi. Let's try a better option!
import ray
ray.init(address="ray://localhost:10001")
ray.cluster_resources()
import daft
daft.set_runner_ray(address="ray://localhost:10001")
import datetime
from pyiceberg.catalog.glue import GlueCatalog
catalog = GlueCatalog("my_glue_catalog")
table = catalog.load_table("tpch_iceberg_sf1000.lineitem")
df = daft.read_iceberg(table)
df = df.where(df["L_SHIPDATE"] < datetime.date(1996, 1, 1))
aggregated_df = df.groupby("L_SHIPDATE").agg([daft.col("L_EXTENDEDPRICE").sum()])
%%time
aggregated_df.sort("L_SHIPDATE").collect()
pip installations of open-sourced Python libraries