Back to Hudi

Licensed to the Apache Software Foundation (ASF) under one

hudi-notebooks/notebooks/05-mastering-sql-procedures.ipynb

0.5.319.2 KB
Original Source
python
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
<center> </center>

A Hands-on Guide to Hudi SQL Procedures

This notebook is a comprehensive guide to using Hudi's powerful SQL procedures directly from Spark SQL. These procedures, invoked using the CALL keyword, allow you to perform advanced table maintenance, auditing, and data management tasks using familiar SQL commands.

python
%run utils.py
python
spark = get_spark_session("Hudi SQL Procedures")

First, let's set up a sample Hudi table we'll use throughout this guide.

python
spark.sql(f"DROP TABLE IF EXISTS trips_cow_hudi_sql")

spark.sql(f"""CREATE TABLE trips_cow_hudi_sql (
    ts STRING,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING hudi
TBLPROPERTIES (
    'hoodie.table.type' = 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field' = 'uuid',
    'hoodie.datasource.write.precombine.field' = 'ts'
)
LOCATION 's3a://warehouse/hudi-sql-procedure/trips_cow_hudi_sql'
""")
python
display(spark.sql(f"""DESCRIBE EXTENDED trips_cow_hudi_sql"""))

Next, we'll insert some sample data into our new table using a standard SQL INSERT INTO statement.

python
spark.sql(f"""
  INSERT INTO trips_cow_hudi_sql VALUES
    ('2025-08-10 08:15:30', 'uuid-001', 'rider-A', 'driver-X', 18.50, 'new_york'),
    ('2025-08-10 09:22:10', 'uuid-002', 'rider-B', 'driver-Y', 22.75, 'san_francisco'),
    ('2025-08-10 10:05:45', 'uuid-003', 'rider-C', 'driver-Z', 14.60, 'chicago'),
    ('2025-08-10 11:40:00', 'uuid-004', 'rider-D', 'driver-W', 31.90, 'new_york'),
    ('2025-08-10 12:55:15', 'uuid-005', 'rider-E', 'driver-V', 25.10, 'san_francisco');
""")
python
display(spark.sql(f"""SELECT * from trips_cow_hudi_sql"""))

Types of SQL Procedures in Apache Hudi

Apache Hudi provides a comprehensive set of SQL procedures categorized by their functionality.

Hudi SQL Procedures: A Detailed Look

Hudi's SQL procedures, invoked using the CALL keyword, are a powerful way to interact with table's metadata and services. Let's explore some of the most common ones.

Help Procedure

Use help procedure to inspect a stored procedure

python
spark.sql(f"""CALL help(cmd => 'show_commits')""").show(truncate=False)

1. Commit Management

show_commits

This procedure shows the history of all completed transactions on the table. It's a great way to audit changes and understand the evolution of your data.

python
display(spark.sql(f"""CALL show_commits('trips_cow_hudi_sql')"""))

show_commits_metadata

This procedure provides a detailed breakdown of a specific commit. It shows the commit time, the number of files and records affected, and other key metrics. This is invaluable for understanding the impact of a particular write operation.

python
display(spark.sql(f"""CALL show_commits_metadata(table => 'trips_cow_hudi_sql', limit => 3)"""))

show_commit_extra_metadata

This procedure shows extra metadata about a specific commit.

python
# First, let's get a list of all commit times from the table.
all_commits = [row[0] for row in spark.sql(f"CALL show_commits('trips_cow_hudi_sql')").collect()]

# We'll use the latest commit time for this example.
latest_commit_time = all_commits[-1]

# Now, we'll call the procedure to get the metadata for that commit.
display(spark.sql(f"""CALL show_commit_extra_metadata(table => 'trips_cow_hudi_sql', instant_time => '{latest_commit_time}')"""))

Let's add some more records into the table to get another commit.

python
spark.sql(f"""
  INSERT INTO trips_cow_hudi_sql VALUES
    ('2025-08-11 08:15:30', 'uuid-006', 'rider-A', 'driver-V', 38.75, 'new_york'),
    ('2025-08-11 12:55:15', 'uuid-007', 'rider-E', 'driver-X', 85.10, 'san_francisco');
""")

Now, check the commits again on the table.

python
display(spark.sql(f"""CALL show_commits('trips_cow_hudi_sql')"""))

From above output, We can clearly see that we have now 2 commits.

show_commit_files

This procedure shows the list of files that were part of a specific commit. It's useful for debugging and understanding the physical changes on the file system.

python
# First, let's get a list of all commit times from the table.
all_commits = [row[0] for row in spark.sql("CALL show_commits('trips_cow_hudi_sql')").collect()]

# We'll use the latest commit time for this example.
latest_commit_time = all_commits[0]
python
# We'll use the latest commit time from our previous step.
display(spark.sql(f"""CALL show_commit_files(table => 'trips_cow_hudi_sql', instant_time => '{latest_commit_time}')"""))

COMMITS_COMPARE

This is a powerful procedure to compare the changes between two specific commits. It shows which files and records were added, updated, or deleted.

python
display(spark.sql(f"""CALL commits_compare(table => 'trips_cow_hudi_sql', path => 's3a://warehouse/hudi-sql-procedure/trips_cow_hudi_sql/')"""))

Savepoints and Rollbacks

Savepoints are a way to create a stable, named checkpoint on your Hudi timeline. They protect your data from being cleaned or archived, allowing you to roll back to a known-good state if something goes wrong.

create_savepoints

This procedure creates a named savepoint at the current state of the table. You can add a comment to describe the reason for the savepoint.

python
# We'll get the latest commit time to create a savepoint at that instant.
latest_commit_time = [row[0] for row in spark.sql("CALL show_commits('trips_cow_hudi_sql')").collect()][0]
python
display(spark.sql(f"""CALL create_savepoint(table => 'trips_cow_hudi_sql', commit_time => '{latest_commit_time}')"""))

show_savepoints

This procedure lists all the savepoints that have been created on the table.

python
display(spark.sql(f"""CALL show_savepoints(table => 'trips_cow_hudi_sql')"""))

Now, let us query the table to know the records currently it holds.

python
display(spark.sql(f"SELECT * FROM trips_cow_hudi_sql"))

rollback_to_savepoint

This procedure allows you to roll back the table to a previously created savepoint. This is useful for recovering from bad data writes or other issues.

To demonstrate, we'll first make an update and then roll back. Here we are changing the fare amount to 25 for uuid uuid-002

python
spark.sql(f"UPDATE trips_cow_hudi_sql SET fare = 25.0 WHERE uuid = 'uuid-002'")

Let's verify the change. And we can see that the fame amount has been changed to 25.

python
display(spark.sql(f"SELECT * FROM trips_cow_hudi_sql where uuid = 'uuid-002'"))

Now, we'll roll back to the savepoint we just created.

python
display(spark.sql(f"CALL rollback_to_savepoint(table => 'trips_cow_hudi_sql', instant_time => '{latest_commit_time}')"))

Let's check the table. The update should be gone.

python
display(spark.sql(f"SELECT * FROM trips_cow_hudi_sql where uuid = 'uuid-002'"))

delete_savepoint

After a savepoint is no longer needed, you can delete it to allow Hudi to clean and archive the associated data.

python
display(spark.sql(f"""
    CALL hudi.delete_savepoint(
        table => 'trips_cow_hudi_sql',
        table_path => 's3a://warehouse/hudi-sql-procedure/trips_cow_hudi_sql',
        instant_time => '{latest_commit_time}'
    )
"""))
python
display(spark.sql(f"""CALL show_savepoints(table => 'trips_cow_hudi_sql')"""))

rollback_to_instant

This procedure is a direct way to roll back a specific commit without needing a named savepoint. It's useful for quickly undoing the last transaction.

First, let's make a simple update. Let's update the fare amount to 100 where uuid is uuid-001

python
spark.sql(f"UPDATE trips_cow_hudi_sql SET fare = 100.00 WHERE uuid = 'uuid-001'")
python
# verify the change
display(spark.sql("SELECT uuid, fare FROM trips_cow_hudi_sql WHERE uuid = 'uuid-001'"))
python
# Get the commit time from the previous commit.
last_commit_time = [row[0] for row in spark.sql(f"CALL show_commits('trips_cow_hudi_sql')").collect()][0]
print(last_commit_time)
python
display(spark.sql(f"CALL show_commits('trips_cow_hudi_sql')"))

Now, we'll roll back the table to the instant before that commit.

python
display(spark.sql(f"CALL rollback_to_instant('trips_cow_hudi_sql', '{last_commit_time}')"))
python
# The update to fare amount should now be gone.
display(spark.sql("SELECT uuid, fare FROM trips_cow_hudi_sql WHERE uuid = 'uuid-001'"))

show_rollbacks

This procedure shows a list of all rollback actions performed on the table. It provides a history of your recovery operations.

python
display(spark.sql(f"""CALL show_rollbacks('trips_cow_hudi_sql')"""))

show_rollback_detail

For a deeper dive, this procedure gives you a detailed breakdown of a specific rollback, including which files and records were affected.

python
show_rollback_instant = [row[0] for row in spark.sql("CALL show_rollbacks('trips_cow_hudi_sql')").collect()][0]

display(spark.sql(f"CALL show_rollback_detail(table => 'trips_cow_hudi_sql', instant_time => '{show_rollback_instant}')"))

2. Metadata Table Management

The following procedures are used to manage and inspect Hudi's internal Metadata Table. This table acts as a highly efficient index for all the files and partitions in your Hudi table, significantly speeding up operations like file listing.

Let's create a new table for this exercise.

python
spark.sql(f"DROP TABLE IF EXISTS trips_metadata")

spark.sql(f"""CREATE TABLE trips_metadata (
    ts STRING,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING hudi
TBLPROPERTIES (
    'hoodie.table.type' = 'COPY_ON_WRITE',
    'hoodie.datasource.write.recordkey.field' = 'uuid',
    'hoodie.datasource.write.precombine.field' = 'ts',
    'hoodie.datasource.write.partitionpath.field' = 'city'
)
LOCATION 's3a://warehouse/hudi-sql-procedure/trips_metadata'
""")

create_metadata_table

This procedure creates the Hudi Metadata Table for an existing Hudi table. It is crucial for enabling performance optimizations and is a one-time operation.

First, let's create the Metadata Table for our main Hudi table.

python
display(spark.sql(f"CALL create_metadata_table(table => 'trips_metadata')"))

Next, we'll insert some sample data into our new table using a standard SQL INSERT INTO statement.

python
spark.sql(f"""
  INSERT INTO trips_metadata VALUES
    ('2025-08-10 08:15:30', 'uuid-001', 'rider-A', 'driver-X', 18.50, 'new_york'),
    ('2025-08-10 09:22:10', 'uuid-002', 'rider-B', 'driver-Y', 22.75, 'san_francisco'),
    ('2025-08-10 10:05:45', 'uuid-003', 'rider-C', 'driver-Z', 14.60, 'chicago'),
    ('2025-08-10 11:40:00', 'uuid-004', 'rider-D', 'driver-W', 31.90, 'new_york'),
    ('2025-08-10 12:55:15', 'uuid-005', 'rider-E', 'driver-V', 25.10, 'san_francisco');
""")

init_metadata_table

This procedure is used to initialize or repair the metadata for a Hudi table. It is particularly useful if the metadata has become corrupted or needs to be rebuilt.

Now, we initialize the metadata table to populate it with file information.

python
display(spark.sql(f"CALL init_metadata_table(table => 'trips_metadata')"))

show_metadata_table_partitions

This procedure shows the partitions that are actively tracked by the Metadata Table. You can inspect this to confirm that Hudi is correctly managing your table's partitions.

python
display(spark.sql(f"CALL show_metadata_table_partitions(table => 'trips_metadata')"))

show_metadata_table_files

This procedure provides a detailed list of all the files and their sizes within a specific partition, as tracked by the Metadata Table.

Now this should show the files in the metadata table for the chicago partition.

python
display(spark.sql(f"""CALL show_metadata_table_files(table => 'trips_metadata', partition => 'city=chicago')"""))

delete_metadata_table

This procedure deletes the Metadata Table. You would typically use this if you want to rebuild it from scratch or if you no longer need the metadata table.

python
display(spark.sql(f"CALL delete_metadata_table(table => 'trips_metadata')"))

3. Table Information

The following procedures allow you to inspect the properties and file structure of your Hudi table.

show_table_properties

This procedure is a simple but powerful way to view all the configurations and metadata associated with your Hudi table. It's especially useful for verifying your settings and understanding how Hudi is configured.

python
# Call the procedure to show all properties of our 'trips_metadata' table.
display(spark.sql("CALL show_table_properties('trips_metadata')"))

show_fsview_all

This procedure provides a complete view of all file groups and file slices within your table, including information about both base and log files. This gives you a detailed look at the physical layout of your data on the file system.

python
# Call the procedure to show the full file system view for the 'trips_metadata' table.
display(spark.sql("CALL show_fsview_all('trips_metadata')"))

4. Table Services

In this exercise we will explore COMPACTION related SQL Procedures.

Compaction is a crucial process for MOR tables. It merges the small .log files (which contain your updates and inserts) into larger, more efficient .parquet base files. This is important for maintaining optimal query performance and storage efficiency.

First, let's create a MOR table and insert some data. Notice that we are setting the table type to MERGE_ON_READ.

python
spark.sql(f"DROP TABLE IF EXISTS trips_mor_compaction")

spark.sql(f"""CREATE TABLE trips_mor_compaction (
    ts STRING,
    uuid STRING,
    rider STRING,
    driver STRING,
    fare DOUBLE,
    city STRING
) USING hudi
TBLPROPERTIES (
    'hoodie.table.type' = 'MERGE_ON_READ',
    'hoodie.datasource.write.recordkey.field' = 'uuid',
    'hoodie.datasource.write.precombine.field' = 'ts',
    'hoodie.compact.inline.max.delta.commits' = '4'
)
LOCATION 's3a://warehouse/hudi-sql-procedure/trips_mor_compaction'
""")

Let's insert some sample data into the newly created table.

python
spark.sql(f"""
  INSERT INTO trips_mor_compaction VALUES
    ('2025-08-10 08:15:30', 'uuid-001', 'rider-A', 'driver-X', 18.50, 'new_york'),
    ('2025-08-10 09:22:10', 'uuid-002', 'rider-B', 'driver-Y', 22.75, 'san_francisco');
""")

Let's check the files in the table path. We will see a .parquet file containing the records.

python
ls(f"s3a://warehouse/hudi-sql-procedure/trips_mor_compaction")

Now, let's perform two updates. This will generate two separate .log files in the table.

python
# First update
spark.sql(f"""
INSERT INTO trips_mor_compaction VALUES 
('2025-08-10 08:20:00', 'uuid-001', 'rider-A', 'driver-A', 19.50, 'new_york');
""")

# Second update
spark.sql(f"""
INSERT INTO trips_mor_compaction VALUES 
('2025-08-10 08:25:00', 'uuid-001', 'rider-A', 'driver-B', 20.50, 'new_york');
""")

Let's inspect the files in the table. You should now see one .parquet base file and two .log files, each corresponding to an update.

python
ls(f"s3a://warehouse/hudi-sql-procedure/trips_mor_compaction")

By setting the configuration hoodie.compact.inline.max.delta.commits to 4, we are instructing Hudi to automatically trigger a compaction after every 4 delta commits. This means that once the threshold is reached, Hudi will merge the accumulated .log files into a new base .parquet file, optimizing the data layout for faster reads.

Let's check the number of commits we have done till now on our MOR table.

python
display(spark.sql("CALL show_commits(table => 'trips_mor_compaction')"))

From the above output, it is seen that we have done 3 commits till now and the next commit will trigger the compaction. Let's perform another update. This will generate a separate .log file in the table and trigger the compaction too. The compaction will generate a new .parquet file as well.

python
# Third update
spark.sql(f"""
INSERT INTO trips_mor_compaction VALUES 
('2025-08-10 09:22:10', 'uuid-002', 'rider-B', 'driver-Y', 25.50, 'san_francisco');
""")

Now, let's check the number of commits again. And you can see that there are 4 deltacommits and one commit which happened due to compaction.

python
display(spark.sql("CALL show_commits(table => 'trips_mor_compaction')"))

show_compaction

This procedure shows you the status and plan of any pending or completed compaction jobs. The output will show that a compaction job has been scheduled for our table.

python
# Check Compaction History
display(spark.sql(f"CALL show_compaction(table => 'trips_mor_compaction')"))

After the compaction job completes, let's inspect the files in the table again. You will see that a new .parquet file has been created, and the old .log files have been merged into it. The previous .parquet file may still be present but will be marked for eventual cleaning.

python
ls(f"s3a://warehouse/hudi-sql-procedure/trips_mor_compaction")

Finally, let's query the table to confirm that the changes are present in the new base file. The output shows the latest version of the record for uuid-001 and uuid-002.

python
spark.sql(f"REFRESH TABLE trips_mor_compaction")

display(spark.sql(f"SELECT uuid, driver, fare, ts FROM trips_mor_compaction"))

run_clean

The run_clean procedure is Hudi's garbage collection service. It identifies and deletes old, obsolete versions of data files that are no longer needed for time travel or rollback, based on your configured retention policy (e.g., keeping only the last 10 commits).

python
# This command schedules and runs the cleaning service immediately. It deletes old file versions,
# keeping only the number specified in 'file_versions_retained' policy.
spark.sql(f"""
CALL run_clean(
  table => 'trips_mor_compaction',
  trigger_max_commits => 2,
  clean_policy => 'KEEP_LATEST_FILE_VERSIONS',
  file_versions_retained => 1
)
""")

Let's check the filesystem, we should see only the newly created base parquet file.

python
ls(f"s3a://warehouse/hudi-sql-procedure/trips_mor_compaction")

If you are interested in diving deeper into Hudi SQL Procedures, be sure to check out the official Hudi documentation. It provides detailed guidance on how these procedures can simplify common data lakehouse operations, making your workflows easier to manage.

python
stop_spark_session()