Back to Hudi

Licensed to the Apache Software Foundation (ASF) under one

hudi-notebooks/notebooks/06_hudi_trino_example.ipynb

0.5.34.3 KB
Original Source
python
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
<center> </center>

Querying Hudi Tables using Trino: A Step-by-Step Guide

This guide demonstrates a cross-engine workflow: writing optimized Lakehouse tables with Apache Spark and querying them with Trino for fast, interactive analytics.

1. Library Imports

python
import os
import trino.dbapi
import pandas as pd

2. Global Configuration

python
db_name = "trino_db"
table_name = "hudi_trips_table"
s3_base_path = f"s3a://warehouse/"
base_path = os.path.join(s3_base_path, db_name, table_name)

3. Spark Session Initialization

python
%run utils.py
python
spark = get_spark_session(app_name = "Hudi Trino Example")

4. Sample Data Generation

python
columns = ["ts", "uuid", "rider", "driver", "fare", "city"]

data = [
    ("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-X", 18.50, "new_york"),
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 22.75, "san_francisco"),
    ("2025-08-10 10:05:45", "uuid-003", "rider-C", "driver-Z", 14.60, "chicago"),
    ("2025-08-10 11:40:00", "uuid-004", "rider-D", "driver-W", 31.90, "new_york"),
    ("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco"),
    ("2025-08-10 13:20:35", "uuid-006", "rider-F", "driver-U", 19.80, "chicago"),
    ("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco"),
    ("2025-08-10 15:00:20", "uuid-008", "rider-H", "driver-S", 16.25, "new_york"),
    ("2025-08-10 15:45:50", "uuid-009", "rider-I", "driver-R", 24.35, "chicago"),
    ("2025-08-10 16:30:00", "uuid-010", "rider-J", "driver-Q", 20.00, "new_york")
]
python
input_df = spark.createDataFrame(data).toDF(*columns)
display(input_df, 5)

5. Hudi Write Configuration & Ingestion

python
hudi_write_options = {
    "hoodie.table.name" : table_name,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "city",
    "hoodie.metadata.enable": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.meta.sync.enable": "true",
    "hoodie.datasource.hive_sync.partition_fields": "city",
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.metastore.uris": "thrift://hive-metastore:9083",
    "hoodie.datasource.hive_sync.database": db_name,
    "hoodie.datasource.hive_sync.table": table_name
}
python
# Write the data to Hudi
input_df.write.format("hudi") \
    .options(**hudi_write_options) \
    .mode("overwrite") \
    .save(base_path)
python
df = spark.sql(f"SELECT * FROM {db_name}.{table_name}")
display(df, 5)

6. Querying with Trino

python
# Establish Trino Connection
TRINO_HOST='trino'
TRINO_PORT=8080
TRINO_CATALOG='hudi'
TRINO_SCHEMA=db_name

conn = trino.dbapi.connect(
    host=TRINO_HOST,
    port=TRINO_PORT,
    user='trino',            
    catalog=TRINO_CATALOG,
    schema=TRINO_SCHEMA
)
python
# Query the Data from Trino
try:
    query = f"SELECT * FROM {table_name} LIMIT 100"
    cur = conn.cursor()
    cur.execute(query)

    rows = cur.fetchall()
    colnames = [desc[0] for desc in cur.description]
    pandas_df = pd.DataFrame(rows, columns=colnames)
finally:
    cur.close()

df = spark.createDataFrame(pandas_df)
display(df, 5)
python
# Close the Trino Connection
conn.close()

7. Cleanup

python
# Stop the Spark Session
stop_spark_session()