apps/docs/content/guides/storage/analytics/examples/pyiceberg.mdx
Expect rapid changes, limited features, and possible breaking updates. Share feedback as we refine the experience and expand access.
</Admonition>PyIceberg is a Python client for Apache Iceberg that enables programmatic interaction with Iceberg tables. Use it to create, read, update, and delete data in your analytics buckets.
pip install pyiceberg pyarrow
Here's a complete example showing how to connect to your Supabase analytics bucket and perform operations:
from pyiceberg.catalog import load_catalog
import pyarrow as pa
import datetime
# Configuration - Update with your Supabase credentials
PROJECT_REF = "your-project-ref"
WAREHOUSE = "your-analytics-bucket-name"
SERVICE_KEY = "your-service-key"
# S3 credentials from Project Settings > Storage
S3_ACCESS_KEY = "your-access-key"
S3_SECRET_KEY = "your-secret-key"
S3_REGION = "us-east-1"
# Construct Supabase endpoints
S3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"
CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"
# Load the Iceberg REST Catalog
catalog = load_catalog(
"supabase-analytics",
type="rest",
warehouse=WAREHOUSE,
uri=CATALOG_URI,
token=SERVICE_KEY,
**{
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.endpoint": S3_ENDPOINT,
"s3.access-key-id": S3_ACCESS_KEY,
"s3.secret-access-key": S3_SECRET_KEY,
"s3.region": S3_REGION,
"s3.force-virtual-addressing": False,
},
)
print("✓ Successfully connected to Iceberg catalog")
# Create a namespace for organization
catalog.create_namespace_if_not_exists("analytics")
# Define the schema for your Iceberg table
schema = pa.schema([
pa.field("event_id", pa.int64()),
pa.field("user_id", pa.int64()),
pa.field("event_name", pa.string()),
pa.field("event_timestamp", pa.timestamp("ms")),
pa.field("properties", pa.string()),
])
# Create the table
table = catalog.create_table_if_not_exists(
("analytics", "events"),
schema=schema
)
print("✓ Created table: analytics.events")
import datetime
# Prepare your data
current_time = datetime.datetime.now()
data = pa.table({
"event_id": [1, 2, 3, 4, 5],
"user_id": [101, 102, 101, 103, 102],
"event_name": ["login", "view_product", "logout", "purchase", "login"],
"event_timestamp": [current_time] * 5,
"properties": [
'{"browser":"chrome"}',
'{"product_id":"123"}',
'{}',
'{"amount":99.99}',
'{"browser":"firefox"}'
],
})
# Append data to the table
table.append(data)
print("✓ Appended 5 rows to analytics.events")
# Scan the entire table
scan_result = table.scan().to_pandas()
print(f"Total rows: {len(scan_result)}")
print(scan_result.head())
# Query with filters
filtered = table.scan(
filter="event_name = 'login'"
).to_pandas()
print(f"Login events: {len(filtered)}")
# Select specific columns
selected = table.scan(
selected_fields=["user_id", "event_name", "event_timestamp"]
).to_pandas()
print(selected.head())
# List all namespaces
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)
# List tables in a namespace
tables = catalog.list_tables("analytics")
print("Tables in analytics:", tables)
# Get table metadata
table_metadata = catalog.load_table(("analytics", "events"))
print("Schema:", table_metadata.schema())
print("Partitions:", table_metadata.partitions())
try:
# Attempt to load a table
table = catalog.load_table(("analytics", "nonexistent"))
except Exception as e:
print(f"Error loading table: {e}")
# Check if table exists before creating
namespace = "analytics"
table_name = "events"
try:
existing_table = catalog.load_table((namespace, table_name))
print(f"Table already exists")
except Exception:
print(f"Table does not exist, creating...")
table = catalog.create_table((namespace, table_name), schema=schema)
from pyiceberg.catalog import load_catalog
import pyarrow as pa
import pandas as pd
# Setup (see Basic Setup section above)
catalog = load_catalog(...)
# Step 1: Create analytics namespace
catalog.create_namespace_if_not_exists("warehouse")
# Step 2: Define table schema
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("created_at", pa.timestamp("ms")),
])
# Step 3: Create table
table = catalog.create_table_if_not_exists(
("warehouse", "products"),
schema=schema
)
# Step 4: Load data from CSV or database
df = pd.read_csv("products.csv")
data = pa.Table.from_pandas(df)
# Step 5: Write to analytics bucket
table.append(data)
print(f"✓ Loaded {len(data)} products to warehouse.products")
# Step 6: Verify
result = table.scan().to_pandas()
print(result.describe())