hudi-notebooks/notebooks/06_hudi_trino_example.ipynb
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This guide demonstrates a cross-engine workflow: writing optimized Lakehouse tables with Apache Spark and querying them with Trino for fast, interactive analytics.
import os
import trino.dbapi
import pandas as pd
db_name = "trino_db"
table_name = "hudi_trips_table"
s3_base_path = f"s3a://warehouse/"
base_path = os.path.join(s3_base_path, db_name, table_name)
%run utils.py
spark = get_spark_session(app_name = "Hudi Trino Example")
columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
data = [
("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-X", 18.50, "new_york"),
("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 22.75, "san_francisco"),
("2025-08-10 10:05:45", "uuid-003", "rider-C", "driver-Z", 14.60, "chicago"),
("2025-08-10 11:40:00", "uuid-004", "rider-D", "driver-W", 31.90, "new_york"),
("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco"),
("2025-08-10 13:20:35", "uuid-006", "rider-F", "driver-U", 19.80, "chicago"),
("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco"),
("2025-08-10 15:00:20", "uuid-008", "rider-H", "driver-S", 16.25, "new_york"),
("2025-08-10 15:45:50", "uuid-009", "rider-I", "driver-R", 24.35, "chicago"),
("2025-08-10 16:30:00", "uuid-010", "rider-J", "driver-Q", 20.00, "new_york")
]
input_df = spark.createDataFrame(data).toDF(*columns)
display(input_df, 5)
hudi_write_options = {
"hoodie.table.name" : table_name,
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.partitionpath.field": "city",
"hoodie.metadata.enable": "true",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.datasource.meta.sync.enable": "true",
"hoodie.datasource.hive_sync.partition_fields": "city",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.datasource.hive_sync.metastore.uris": "thrift://hive-metastore:9083",
"hoodie.datasource.hive_sync.database": db_name,
"hoodie.datasource.hive_sync.table": table_name
}
# Write the data to Hudi
input_df.write.format("hudi") \
.options(**hudi_write_options) \
.mode("overwrite") \
.save(base_path)
df = spark.sql(f"SELECT * FROM {db_name}.{table_name}")
display(df, 5)
# Establish Trino Connection
TRINO_HOST='trino'
TRINO_PORT=8080
TRINO_CATALOG='hudi'
TRINO_SCHEMA=db_name
conn = trino.dbapi.connect(
host=TRINO_HOST,
port=TRINO_PORT,
user='trino',
catalog=TRINO_CATALOG,
schema=TRINO_SCHEMA
)
# Query the Data from Trino
try:
query = f"SELECT * FROM {table_name} LIMIT 100"
cur = conn.cursor()
cur.execute(query)
rows = cur.fetchall()
colnames = [desc[0] for desc in cur.description]
pandas_df = pd.DataFrame(rows, columns=colnames)
finally:
cur.close()
df = spark.createDataFrame(pandas_df)
display(df, 5)
# Close the Trino Connection
conn.close()
# Stop the Spark Session
stop_spark_session()