Back to Hudi

Licensed to the Apache Software Foundation (ASF) under one

hudi-notebooks/notebooks/01-crud-operations.ipynb

0.5.318.9 KB
Original Source
python
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
<center> </center>

Getting Started with Hudi: A Hands-on Guide to CRUD Operations

This notebook is a practical guide to performing CRUD (Create, Read, Update, Delete) operations on an Apache Hudi table using PySpark. We'll be using MinIO as our S3-compatible storage backend, demonstrating how to handle modern data lake architecture. We will work with both Copy-On-Write (COW) and Merge-On-Read (MOR) tables.

Setting up the Environment

We begin by loading the utils.py file, which contains the necessary imports and functions to start a SparkSession.

python
%run utils.py
python
spark = get_spark_session("Hudi CRUD Operations")

This is our initial dataset for our Hudi table. It's a list of ride records with columns for timestamp, a unique ID, rider and driver names, the fare, and the city.

python
columns = ["ts", "uuid", "rider", "driver", "fare", "city"]

data = [
    ("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-X", 18.50, "new_york"),
    ("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 22.75, "san_francisco"),
    ("2025-08-10 10:05:45", "uuid-003", "rider-C", "driver-Z", 14.60, "chicago"),
    ("2025-08-10 11:40:00", "uuid-004", "rider-D", "driver-W", 31.90, "new_york"),
    ("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco"),
    ("2025-08-10 13:20:35", "uuid-006", "rider-F", "driver-U", 19.80, "chicago"),
    ("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco"),
    ("2025-08-10 15:00:20", "uuid-008", "rider-H", "driver-S", 16.25, "new_york"),
    ("2025-08-10 15:45:50", "uuid-009", "rider-I", "driver-R", 24.35, "chicago"),
    ("2025-08-10 16:30:00", "uuid-010", "rider-J", "driver-Q", 20.00, "new_york"),
]

First, we are creating a PySpark DataFrame from our sample data. Let's take a quick look at the data to see what we're starting with.

python
inputDF = spark.createDataFrame(data).toDF(*columns)
display(inputDF)

Hudi offers two primary table types to choose from:

  • Copy-on-Write (COW)
  • Merge-on-Read (MOR)

The choice between them depends on user specific needs, balancing between write and read performance.

Copy-on-Write (COW) Tables: In a COW table, each time you update or delete a record, Hudi rewrites the entire data file that contains that record. This approach makes reads very fast because all the data is in a single, clean file format like Parquet. However, it can be more resource-intensive for frequent updates due to all the file rewriting.

Merge-on-Read (MOR) Tables: With a MOR table, updates are handled differently. Instead of rewriting the entire data file, Hudi writes the new or updated records to a smaller, row-based log file. When you query the table, Hudi merges these log files with the main data files on the fly. This makes write operations much faster, but it can make reads slightly slower as Hudi has to perform the merge during the query.

Copy-on-Write (COW) Tables

We will now explore the Copy-on-Write (COW) storage type. As discussed earlier, in a COW table each time data is updated or deleted in a file, Hudi rewrites the entire file with the new data. This is a simpler and more traditional approach. Copy-on-Write (COW) is the default table type in Apache Hudi.

Hudi Configuration for a COW Table

Next, we will set up the specific configuration for our Hudi table. We'll use uuid as our unique record key and partition the data by city to keep it organized. The ts (timestamp) field is our precombine key, which helps Hudi decide which record to keep if it finds duplicates.

python
table_name_cow = "trips_table_cow"
base_path = f"s3a://warehouse/hudi-crud-cow"

cow_hudi_conf = {
    "hoodie.table.name": table_name_cow, # The name of our Hudi table.
    "hoodie.datasource.write.recordkey.field": "uuid", # The column that acts as the unique identifier for each record.
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
    "hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
    "hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
    "hoodie.table.cdc.enabled": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true" # This ensures partition directories are named like `city=new_york`.
}

Inserting data in a COW Table

This is the "Create" part of our CRUD operations. We are writing our initial DataFrame to MinIO as a Hudi table. Using mode("overwrite") ensures that we start with a fresh table every time we run to ensure a clean start.

python
# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
inputDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "insert") \
    .options(**cow_hudi_conf) \
    .mode("overwrite") \
    .save(f"{base_path}/{table_name_cow}")

Hudi Table File Structure

There are two main types of files:

  • Metadata files located in /.hoodie/
  • Data files stored within partition paths for partitioned tables, or under the base path for non-partitioned tables
python
ls(f"{base_path}/{table_name_cow}")

Hudi manages a table's metadata by storing it in a special directory within the base path. This metadata helps ensure that all tools reading and writing to the table follow the same rules. One of the key files is hoodie.properties, which acts like a configuration file for the table, holding details like table name and version.

python
ls(f"{base_path}/{table_name_cow}/.hoodie")

Another crucial part of this metadata is the Hudi Timeline, which consists of small files that log every change to the table. These meta-files follow the naming pattern below:

[action timestamp].[action type].[action state]

python
ls(f"{base_path}/{table_name_cow}/.hoodie/timeline")
  • An action timestamp is a unique, chronological identifier for each event, marking when it was scheduled.
  • An action type describes the operation that took place. Examples include commit or deltacommit for data changes, compaction or clean for maintenance, and savepoint or restore for recovery.
  • An action state shows the current status of the action. It can be requested (waiting to start), inflight (in progress), or commit (completed).

Hudi organizes its physical data into two core types of files: Base Files and Log Files. You can think of a Base File as a traditional, organized table of data (often in a format like Parquet), while Log Files are where Hudi stores new changes and updates to that data.

Base File contains the main stored records in a Hudi table and is optimized for read. Log File contains the records' changes on top of its associated Base File and is optimized for write. Within a partition path of a Hudi table, a single Base File and its associated Log Files (in case of MOR table) are grouped together as a File Slice.

python
ls(f"{base_path}/{table_name_cow}/city=san_francisco")

To easily query our newly created Hudi table, we first need to register it as a temporary SQL view. After that, we'll run a quick command to list all tables and confirm that it's ready to use.

python
spark.read.format("hudi").load(f"{base_path}/{table_name_cow}").createOrReplaceTempView(f"{table_name_cow}")
display(spark.sql("show tables"))

We have the temporary view now, let's query the COW table to get a snapshot of all records and display the result.

python
trips_cow_df = spark.sql("select _hoodie_commit_time, _hoodie_file_name, ts, uuid, rider, driver, fare, city from trips_table_cow")
display(trips_cow_df)

Upserting Records (Update)

Hudi's upsert is a powerful feature that allows us to insert new records or update existing ones. We are going to update the fare for 'rider-G' by multiplying it by 10. Here, we are showing the updated record before we apply the change to the table.

python
from pyspark.sql.functions import col

updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}").filter(col("rider") == "rider-G").withColumn("fare", col("fare") * 10)

display(updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts"))

Upserting the modified record

Now, we'll perform the upsert. Because uuid is our record key, Hudi knows to find the original record and replace it with our new, updated one.

python
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**cow_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Let's check the files in the san_francisco partition to see what happened after the upsert. Since this is a Copy-on-Write table, Hudi didn't just modify the existing record in place. Instead, it created a brand new Parquet file containing the updated record and all the other records for that partition.

python
ls(f"{base_path}/{table_name_cow}/city=san_francisco")

Also check the Hudi table to confirm the record has updated or not. Compare the fare value and commitTime as well.

python
spark.sql(f"REFRESH TABLE {table_name_cow}")
trips_cow_read = spark.sql(f"select _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts from {table_name_cow} where city = 'san_francisco'")
display(trips_cow_read)

If you look closely at the file list, you'll see a new Parquet file in the san_francisco partition. This is because Hudi's Copy-on-Write mechanism rewrites the entire file when a record is updated. You can also see that the _hoodie_commit_time has changed specifically for uuid-007, confirming that only this record was part of our latest update (fare = 284.50).

Deduplication with Precombine Field

A key feature of Hudi is its ability to handle duplicate records automatically. Hudi uses the precombine.field to decide which record to keep when it encounters two or more records with the same recordkey in a single write operation. In our configuration, the precombine.field is set to ts, so Hudi will keep the record with the latest timestamp.

Let's create a new DataFrame with some duplicate data to see this in action. We'll add two records with the same uuid (uuid-001) but with different ts values. The second record has a later timestamp and a higher fare.

python
from pyspark.sql.functions import lit

duplicate_data = [
    ("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-Z", 25.50, "new_york"), # A duplicate record with an old timestamp
    ("2025-08-10 17:00:00", "uuid-001", "rider-A", "driver-A", 30.00, "new_york"), # Another duplicate record with a new timestamp
    ("2025-08-11 07:45:00", "uuid-012", "rider-L", "driver-T", 12.25, "chicago")   # A new record
]
duplicate_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
duplicatesDF = spark.createDataFrame(duplicate_data).toDF(*duplicate_columns)

#duplicatesDF.show(truncate=False)
display(duplicatesDF)

Now, let's upsert this data into our COW table. Hudi will process the duplicate records for uuid-001 and, based on our precombine.field (ts), it will only keep the record with the later timestamp.

python
duplicatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**cow_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Finally, we'll query the table to see the result. As you can see in the output, only one record for uuid-001 exists, and it's the one with the latest timestamp (2025-08-10 17:00:00). The record with the older timestamp was discarded, and the new record for uuid-012 was successfully inserted.

python
spark.sql(f"REFRESH TABLE {table_name_cow}")
trips_cow_df_dedup = spark.sql(f"select _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts from {table_name_cow} where uuid = 'uuid-001' or uuid = 'uuid-012'")
display(trips_cow_df_dedup)

Deleting a Record in a COW Table

Finally, let's demonstrate how to delete a record. Let's create a DataFrame that contains the record we want to delete. We'll delete the record for rider-G (uuid-007).

python
# Create a DataFrame with the record to be deleted
delete_data = [
    ("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco")
]
delete_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
deleteDF = spark.createDataFrame(delete_data).toDF(*delete_columns)
python
deleteDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "delete") \
    .options(**cow_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_cow}")

Running a snapshot query now confirms that the record for 'rider-G' (uuid-007) is no longer present in our table.

python
spark.sql(f"REFRESH TABLE {table_name_cow}")
trips_cow_df = spark.sql(f"select _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts from {table_name_cow}")
display(trips_cow_df)

Merge-on-Read (MOR) Tables

For comparison, let's explore the Merge-on-Read (MOR) table type. In a MOR table, updates are written to a separate log file, which is then merged with the base data files when you read the table.

Here's the configuration for our MOR table.

python
table_name_mor = "trips_table_mor"
base_path = f"s3a://warehouse/hudi-crud-mor"

mor_hudi_conf = {
    "hoodie.table.name": table_name_mor,
    "hoodie.datasource.write.recordkey.field": "uuid",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.partitionpath.field": "city",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.hive_style_partitioning": "true"
}

Inserting Data into a MOR Table

First, let's insert our initial dataset into our new Merge-on-Read (MOR) table. Just like with the COW table, this initial insert creates the table and its partitions.

python
# Write the DataFrame to a Hudi MOR table
inputDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "insert") \
    .options(**mor_hudi_conf) \
    .mode("overwrite") \
    .save(f"{base_path}/{table_name_mor}")

Let's inspect the file system after this fresh insert. Since there are no updates yet, Hudi has created a clean Parquet base file in each partition, just like it did for our COW table.

python
ls(f"{base_path}/{table_name_mor}")
python
ls(f"{base_path}/{table_name_mor}/city=new_york")

After the fresh insert, let's query our new MOR table to confirm that all records were written correctly. Hudi's default read mode automatically gives us a full snapshot of the table.

python
# First, make sure the table is registered as a temporary view
spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").createOrReplaceTempView("trips_table_mor")

# Now, query the table using SQL to see the inserted records
trips_mor_df = spark.sql("SELECT _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts FROM trips_table_mor")
display(trips_mor_df)

Updating a Record in the MOR Table

Let's update a record in our MOR table to see how it affects our read modes. We'll find the record for 'driver-W' and double its fare.

python
from pyspark.sql.functions import col

updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").filter(col("driver") == "driver-W").withColumn("fare", col("fare") * 2)

display(updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts"))

Now, we'll perform the upsert. This is where the magic of Merge-on-Read happens! Instead of rewriting the entire Parquet file, Hudi will write a new log file containing just our updated record.

python
updatesDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "upsert") \
    .options(**mor_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_mor}")

Let's inspect the file system after this update operation.

python
ls(f"{base_path}/{table_name_mor}/city=new_york")

Now query the MOR table to confirm that the record has been updated correctly.

python
spark.sql(f"REFRESH TABLE {table_name_mor}")
trips_mor_read = spark.sql(f"select _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts from {table_name_mor} where city = 'new_york'")
display(trips_mor_read)
python
trips_mor_df = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").select("uuid", "rider", "driver", "fare", "city", "ts").filter(col("driver") == "driver-W")
display(trips_mor_df)

Deleting a Record in a MOR Table

Just like with the upsert, deleting a record in a Merge-on-Read table is handled by writing a new log file, not by rewriting the entire base file. Hudi records the delete action in the log, and the record will appear to be gone in a snapshot/real-time query.

First, let's create a DataFrame that contains the record we want to delete. We'll delete the record for rider-E (uuid-005).

python
# Create a DataFrame with the record to be deleted
delete_data = [
    ("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco")
]
delete_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
deleteDF = spark.createDataFrame(delete_data).toDF(*delete_columns)

Now, we perform the delete operation. This will generate a new .log file in the city=san_francisco partition, which marks the record for deletion.

python
deleteDF.write \
    .format("hudi") \
    .option("hoodie.datasource.write.operation", "delete") \
    .options(**mor_hudi_conf) \
    .mode("append") \
    .save(f"{base_path}/{table_name_mor}")

Let's quickly check the file system. You should see a new .log file in the city=san_francisco partition alongside the existing Parquet and other log files

python
ls(f"{base_path}/{table_name_mor}/city=san_francisco")

Finally, we'll perform a snapshot query. As you can see, the record for rider-E is no longer visible in the table, demonstrating that the delete was successful.

python
trips_mor_df = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").select("uuid", "rider", "driver", "fare", "city", "ts")
display(trips_mor_df)
python
stop_spark_session()