Back to Hudi

Licensed to the Apache Software Foundation (ASF) under one

hudi-notebooks/notebooks/02-query-types.ipynb

0.5.316.4 KB
Original Source
python
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
<center> </center>

Deep Dive into Hudi Table & Query Types: Snapshot, RO, Incremental, Time Travel, CDC

This notebook is your guide to mastering Hudi's advanced query capabilities. We'll explore hands-on examples of the different read modes like Snapshot, Read-Optimized, Incremental, Time Travel, and Change Data Capture to help you understand when to use each for building efficient data pipelines.

Setting up the Environment

We begin by loading the utils.py file, which contains the necessary imports and functions to start a SparkSession.

python
%run utils.py
python
spark = get_spark_session("Hudi Query Types")

Before we can start querying, we need to create our Hudi tables. Hudi offers two primary table types to choose from:

  • Copy-on-Write (COW)
  • Merge-on-Read (MOR)

For this deep dive, we'll create one table for each of Hudi's main storage types:

  • trips_table_cow: Our Copy-on-Write (COW) table, which we'll use to demonstrate how Hudi rewrites files on updates.
  • trips_table_mor: Our Merge-on-Read (MOR) table, which will help us understand how Hudi uses log files for faster updates and different read views.

After creating both tables, we'll be ready to explore all the query types.

This is the sample ride data we will use to create our Hudi table. It includes details like the timestamp, a unique ID, rider, driver, fare, and city.

python
columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
data = [
    ("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-X", 18.50, "new_york"),
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 22.75, "san_francisco"),
    ("2025-08-10 10:05:45", "uuid-003", "rider-C", "driver-Z", 14.60, "chicago"),
    ("2025-08-10 11:40:00", "uuid-004", "rider-D", "driver-W", 31.90, "new_york"),
    ("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco"),
    ("2025-08-10 13:20:35", "uuid-006", "rider-F", "driver-U", 19.80, "chicago"),
    ("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco"),
    ("2025-08-10 15:00:20", "uuid-008", "rider-H", "driver-S", 16.25, "new_york"),
    ("2025-08-10 15:45:50", "uuid-009", "rider-I", "driver-R", 24.35, "chicago"),
    ("2025-08-10 16:30:00", "uuid-010", "rider-J", "driver-Q", 20.00, "new_york"),
]
python
inputDF = spark.createDataFrame(data).toDF(*columns)
display(inputDF)

Hudi Configuration for a COW Table

python
table_name_cow = "trips_table_cow"
base_path = f"s3a://warehouse/hudi-query-types"

cow_hudi_conf = {
    "hoodie.table.name": table_name_cow, # The name of our Hudi table.
    "hoodie.datasource.write.recordkey.field": "uuid", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    "hoodie.write.markers.type": "DIRECT",
    "hoodie.table.cdc.enabled": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true" # This ensures partition directories are named like `city=new_york`.
}

Inserting data in a COW Table

python
# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
inputDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "insert") \
    .options(**cow_hudi_conf) \
    .mode("overwrite") \
    .save(f"{base_path}/{table_name_cow}")

Now that COW table is set up, we can begin our deep dive into Hudi's powerful query types. In this section, we will discuss:

  • Snapshot Query: The default read mode for viewing the latest state of the table.
  • Incremental Query: A way to get only the new data added since a specific point in time.
  • Time Travel Query: How to view the table as it existed at a past moment.
  • Change Data Capture (CDC) Query: How to retrieve a detailed stream of changes (updates, inserts, and deletes).

Querying COW Tables

1. Snapshot Query

This is the default query type when reading Hudi tables. Its goal is to give you a complete, up-to-the-minute view of your data.

Let's do a quick snapshot query to see the current state of our tables.

python
cowSnapshotQueryDF = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}")

display(cowSnapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts"))

Now update one record in the COW table.

python
from pyspark.sql.functions import col

updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}").filter(col("rider") == "rider-G").withColumn("fare", col("fare") * 10)

display(updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts"))
python
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**cow_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Again execute the snapshot query to confirm if it results the latest view of the table and YES, We can see that it fetches the updated fare.

python
cowSnapshotQueryDF = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}")

display(cowSnapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts").filter(col("rider") == "rider-G"))

2. Incremental Reads

Hudi's incremental query feature lets us efficiently process only the data that has changed since a specific point in time. We'll start by listing all the commit times in our table.

Now, let's configure an incremental read to grab only the data committed after our update operation. Let's fetch the latest commit from the table.

python
# Get distinct commit times ordered
all_commits_df = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}") \
    .select("_hoodie_commit_time") \
    .distinct() \
    .orderBy("_hoodie_commit_time")

# Collect top 50 commit times as a list
all_commits = [row['_hoodie_commit_time'] for row in all_commits_df.take(50)]

incrementalTime = all_commits[-1]  # Commit time we are interested in
display(all_commits_df)

print(f"Incremental commit time: {incrementalTime}")
python
incremental_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': incrementalTime,
}

incrementalQueryDF = spark.read.format("hudi") \
  .options(**incremental_read_options) \
  .load(f"{base_path}/{table_name_cow}")

incrementalQueryDF.createOrReplaceTempView("trips_incremental")

When we query our temporary incremental table, you can see that it returns only the single record that was updated since our last write operation.

python
display(spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare, city, ts from trips_incremental"))

3. Time Travel Query

Hudi also allows for time travel, which means we can query the state of our table at a specific point in the past. By specifying the commit time from our initial data insertion, we can view the table's contents before we performed the update.

python
beginTime = all_commits[-2]  # Commit time we are interested in
print(f"Begin/Initial commit time: {beginTime}")
python
spark.read.format("hudi") \
  .option("as.of.instant", beginTime) \
  .load(f"{base_path}/{table_name_cow}").createOrReplaceTempView("trips_time_travel")
python
display(spark.sql("select _hoodie_commit_time, uuid, rider, driver, fare, city, ts from trips_time_travel"))

As you can see, querying the historical view shows the original fare for 'rider-G' before we updated it. This is a great way to audit or restore data from the past.

4. Change Data Capture (CDC)

Hudi's Change Data Capture (CDC) feature lets you read a stream of all the changes (inserts, updates, and deletes) that have been applied to your table. This is perfect for downstream systems that need to react to data modifications in real-time. We'll start by adding some new data and updating an existing record to generate some changes.

python
from pyspark.sql.functions import lit
from pyspark.sql import Row

# Define a DataFrame with one new record and one updated record
cdc_data = [
    ("2025-08-11 10:00:00", "uuid-011", "rider-K", "driver-P", 10.50, "chicago"), # new record
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 50.00, "san_francisco") # updated record
]

cdc_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
cdcDF = spark.createDataFrame(cdc_data).toDF(*cdc_columns)

Now, we'll perform an upsert with our new data. This will create a new commit with one insert and one update.

python
# Perform the upsert to introduce the new changes
cdcDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**cow_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Now we can perform a CDC query using a special incremental format. We'll set the query type to "incremental" and specify "hoodie.datasource.query.incremental.format": "cdc". By using the latest_commit_time we just fetched, we can capture all the changes from our last commit. The output will include the op column, which tells us whether a record was inserted, updated, or deleted.

To see the changes from this specific transaction, we'll first get its commit time. We'll then use this as our starting point for the CDC query to capture all the changes from that moment forward.

python
# Read the Hudi table to get a DataFrame of all records.
hudi_df = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}")

# Get all distinct commit times and sort them in chronological order.
# We then collect this list to the driver.
all_commits = [row[0] for row in hudi_df.select("_hoodie_commit_time").distinct().orderBy("_hoodie_commit_time").collect()]

if len(all_commits) >= 2:
    # Use index -2 to get the second-to-last commit time from the sorted list.
    previous_commit_time = all_commits[-2]
    print(f"Second-to-last commit time: {previous_commit_time}")
else:
    print("There are not enough commits to determine a previous commit time.")
    previous_commit_time = 0

Now we can perform a CDC query using a special incremental format. We'll set the query type to "incremental" and specify "hoodie.datasource.query.incremental.format": "cdc". By using the latest_commit_time we just fetched, we can capture all the changes from our last commit. The output will include the op column, which tells us whether a record was inserted, updated, or deleted.

python
cdc_read_options = {
  'hoodie.datasource.query.type': 'incremental',
  'hoodie.datasource.read.begin.instanttime': previous_commit_time,
  'hoodie.datasource.query.incremental.format': 'cdc'
}

cdcQueryDF = spark.read.format("hudi"). \
  options(**cdc_read_options). \
  load(f"{base_path}/{table_name_cow}")

display(cdcQueryDF)

Let's look at the above output to see what happened:

Update: We have a record where op is u. This corresponds to the update we made to uuid-002. The before column shows the original fare of 22.75, and the after column shows the new fare of 50.0.

Insert: We also have a record where op is i. This is the new record for uuid-011. The before column is null because it didn't exist before this commit, while the after column contains all the new record's data.

Hudi Configuration for a MOR Table

python
table_name_mor = "trips_table_mor"
base_path = f"s3a://warehouse/hudi-query-types"

mor_hudi_conf = {
    "hoodie.table.name": table_name_mor,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.partitionpath.field": "city",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.write.markers.type": "DIRECT",
    "hoodie.datasource.write.hive_style_partitioning": "true"
}

Inserting Data into a MOR Table

python
# Write the DataFrame to a Hudi MOR table
inputDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "insert") \
    .options(**mor_hudi_conf) \
    .mode("overwrite") \
    .save(f"{base_path}/{table_name_mor}")

The MOR table is ready now, we can begin our deep dive into Hudi's powerful query types. In this section, we will discuss:

  • Snapshot Query: The default read mode for viewing the latest state of the table.
  • Read-Optimized (RO) Query: A specialized mode for faster reads on MOR tables.

Querying MOR Tables

Just like COW tables, lets query the MOR table as well. As we do not have any updates on our MOR table yet it is having only base data files. We can confirm the same by inspecting the filesystem.

python
ls(f"{base_path}/{table_name_mor}")
python
ls(f"{base_path}/{table_name_mor}/city=new_york")

1. Snapshot Query

When you run this query on a Merge-on-Read (MOR) table, Hudi merges the recent changes from the log files with the base data files to present the latest records, which can affect performance.

python
morSnapshotQueryDF = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}")

display(morSnapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts"))

2. Reading in Read-Optimized Mode

Now, let's read the same table in read-optimized mode. This mode is faster because it only reads the base files, but it won't show any recent updates that are still in the log files.

python
mor_ro_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "read_optimized") \
    .load(f"{base_path}/{table_name_mor}")

display(mor_ro_df.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts"))

Updating a Record in the MOR table

Let's update a record in our MOR table to see how it affects our read modes. We'll find the record for 'driver-W' and double its fare.

python
from pyspark.sql.functions import col

updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").filter(col("driver") == "driver-W").withColumn("fare", col("fare") * 2)

display(updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts"))

Now we perform the upsert. In a MOR table, this update will be written to a log file, separate from the main Parquet data files.

python
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**mor_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_mor}")
python
ls(f"{base_path}/{table_name_mor}/city=new_york")

After the update, a snapshot query correctly shows the new fare for 'driver-W'. This is because the log files containing our update were merged with the base files during this read operation.

python
morSnapshotQueryDF = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}")

display(morSnapshotQueryDF.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts").filter(col("driver") == "driver-W"))

Finally, a read-optimized query of the same table still shows the old fare for 'driver-W'. This is because the read-optimized query only looks at the base data files and ignores the unmerged update in the log file.

python
mor_ro_df = spark.read.format("hudi") \
    .option("hoodie.datasource.query.type", "read_optimized") \
    .load(f"{base_path}/{table_name_mor}")

display(mor_ro_df.select("_hoodie_commit_time", "uuid", "rider", "driver", "fare", "city", "ts").filter(col("driver") == "driver-W"))
python
stop_spark_session()