apps/docs/content/guides/storage/analytics/examples/apache-spark.mdx
Expect rapid changes, limited features, and possible breaking updates. Share feedback as we refine the experience and expand access.
</Admonition>Apache Spark enables distributed analytical processing of large datasets stored in your analytics buckets. Use it for complex transformations, aggregations, and machine learning workflows.
First, ensure you have Spark installed. For Python-based workflows:
pip install pyspark
For detailed Spark setup instructions, see the Apache Spark documentation.
Here's a complete example showing how to configure Spark with your Supabase analytics bucket:
from pyspark.sql import SparkSession
# Configuration - Update with your Supabase credentials
PROJECT_REF = "your-project-ref"
WAREHOUSE = "your-analytics-bucket-name"
SERVICE_KEY = "your-service-key"
# S3 credentials from Project Settings > Storage
S3_ACCESS_KEY = "your-access-key"
S3_SECRET_KEY = "your-secret-key"
S3_REGION = "us-east-1"
# Construct Supabase endpoints
S3_ENDPOINT = f"https://{PROJECT_REF}.supabase.co/storage/v1/s3"
CATALOG_URI = f"https://{PROJECT_REF}.supabase.co/storage/v1/iceberg"
# Initialize Spark session with Iceberg configuration
spark = SparkSession.builder \
.master("local[*]") \
.appName("SupabaseIceberg") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config(
'spark.jars.packages',
'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,org.apache.iceberg:iceberg-aws-bundle:1.6.1'
) \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.supabase", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.supabase.type", "rest") \
.config("spark.sql.catalog.supabase.uri", CATALOG_URI) \
.config("spark.sql.catalog.supabase.warehouse", WAREHOUSE) \
.config("spark.sql.catalog.supabase.token", SERVICE_KEY) \
.config("spark.sql.catalog.supabase.s3.endpoint", S3_ENDPOINT) \
.config("spark.sql.catalog.supabase.s3.path-style-access", "true") \
.config("spark.sql.catalog.supabase.s3.access-key-id", S3_ACCESS_KEY) \
.config("spark.sql.catalog.supabase.s3.secret-access-key", S3_SECRET_KEY) \
.config("spark.sql.catalog.supabase.s3.remote-signing-enabled", "false") \
.config("spark.sql.defaultCatalog", "supabase") \
.getOrCreate()
print("✓ Spark session initialized with Iceberg")
# Create a namespace for organization
spark.sql("CREATE NAMESPACE IF NOT EXISTS analytics")
# Create a new Iceberg table
spark.sql("""
CREATE TABLE IF NOT EXISTS analytics.events (
event_id BIGINT,
user_id BIGINT,
event_name STRING,
event_timestamp TIMESTAMP,
properties STRING
)
USING iceberg
""")
print("✓ Created table: analytics.events")
# Insert data into the table
spark.sql("""
INSERT INTO analytics.events (event_id, user_id, event_name, event_timestamp, properties)
VALUES
(1, 101, 'login', TIMESTAMP '2024-01-15 10:30:00', '{"browser":"chrome"}'),
(2, 102, 'view_product', TIMESTAMP '2024-01-15 10:35:00', '{"product_id":"123"}'),
(3, 101, 'logout', TIMESTAMP '2024-01-15 10:40:00', '{}'),
(4, 103, 'purchase', TIMESTAMP '2024-01-15 10:45:00', '{"amount":99.99}')
""")
print("✓ Inserted 4 rows into analytics.events")
# Read the entire table
result_df = spark.sql("SELECT * FROM analytics.events")
result_df.show(truncate=False)
# Apply filters
filtered_df = spark.sql("""
SELECT event_id, user_id, event_name
FROM analytics.events
WHERE event_name = 'login'
""")
filtered_df.show()
# Aggregations
summary_df = spark.sql("""
SELECT
event_name,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM analytics.events
GROUP BY event_name
ORDER BY event_count DESC
""")
summary_df.show()
# Read as DataFrame
events_df = spark.read.format("iceberg").load("analytics.events")
# Apply Spark transformations
from pyspark.sql.functions import count, col, year, month
# Monthly event counts
monthly_events = events_df \
.withColumn("month", month(col("event_timestamp"))) \
.withColumn("year", year(col("event_timestamp"))) \
.groupBy("year", "month", "event_name") \
.agg(count("event_id").alias("count")) \
.orderBy("year", "month")
monthly_events.show()
# Create another table
spark.sql("""
CREATE TABLE IF NOT EXISTS analytics.users (
user_id BIGINT,
username STRING,
email STRING
)
USING iceberg
""")
spark.sql("""
INSERT INTO analytics.users VALUES
(101, 'alice', '[email protected]'),
(102, 'bob', '[email protected]'),
(103, 'charlie', '[email protected]')
""")
# Join events with users
joined_df = spark.sql("""
SELECT
e.event_id,
e.event_name,
u.username,
u.email,
e.event_timestamp
FROM analytics.events e
JOIN analytics.users u ON e.user_id = u.user_id
ORDER BY e.event_timestamp
""")
joined_df.show(truncate=False)
# Export to Parquet
spark.sql("""
SELECT event_name, COUNT(*) as count
FROM analytics.events
GROUP BY event_name
""").write \
.mode("overwrite") \
.parquet("/tmp/event_summary.parquet")
# Export to CSV
spark.sql("""
SELECT *
FROM analytics.events
WHERE event_timestamp > TIMESTAMP '2024-01-15 10:30:00'
""").write \
.mode("overwrite") \
.option("header", "true") \
.csv("/tmp/recent_events.csv")
print("✓ Results exported")
spark.catalog.cacheTable() for tables accessed multiple timesfrom pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, count
# Setup (see Basic Setup section above)
spark = SparkSession.builder \
.master("local[*]") \
.appName("SupabaseAnalytics") \
.config("spark.sql.defaultCatalog", "supabase") \
# ... (add all config from Basic Setup)
.getOrCreate()
# Step 1: Read raw events
raw_events = spark.sql("SELECT * FROM analytics.events")
# Step 2: Transform and aggregate
monthly_summary = raw_events \
.withColumn("month", month(col("event_timestamp"))) \
.withColumn("year", year(col("event_timestamp"))) \
.groupBy("year", "month", "event_name") \
.agg(count("event_id").alias("total_events"))
# Step 3: Save results
monthly_summary.write \
.mode("overwrite") \
.option("path", "analytics.monthly_summary") \
.saveAsTable("analytics.monthly_summary")
print("✓ Pipeline completed")
monthly_summary.show()