hudi-notebooks/notebooks/01-crud-operations.ipynb
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This notebook is a practical guide to performing CRUD (Create, Read, Update, Delete) operations on an Apache Hudi table using PySpark. We'll be using MinIO as our S3-compatible storage backend, demonstrating how to handle modern data lake architecture. We will work with both Copy-On-Write (COW) and Merge-On-Read (MOR) tables.
We begin by loading the utils.py file, which contains the necessary imports and functions to start a SparkSession.
%run utils.py
spark = get_spark_session("Hudi CRUD Operations")
This is our initial dataset for our Hudi table. It's a list of ride records with columns for timestamp, a unique ID, rider and driver names, the fare, and the city.
columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
data = [
("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-X", 18.50, "new_york"),
("2025-08-10 09:22:10", "uuid-002", "rider-B", "driver-Y", 22.75, "san_francisco"),
("2025-08-10 10:05:45", "uuid-003", "rider-C", "driver-Z", 14.60, "chicago"),
("2025-08-10 11:40:00", "uuid-004", "rider-D", "driver-W", 31.90, "new_york"),
("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco"),
("2025-08-10 13:20:35", "uuid-006", "rider-F", "driver-U", 19.80, "chicago"),
("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco"),
("2025-08-10 15:00:20", "uuid-008", "rider-H", "driver-S", 16.25, "new_york"),
("2025-08-10 15:45:50", "uuid-009", "rider-I", "driver-R", 24.35, "chicago"),
("2025-08-10 16:30:00", "uuid-010", "rider-J", "driver-Q", 20.00, "new_york"),
]
First, we are creating a PySpark DataFrame from our sample data. Let's take a quick look at the data to see what we're starting with.
inputDF = spark.createDataFrame(data).toDF(*columns)
display(inputDF)
Hudi offers two primary table types to choose from:
The choice between them depends on user specific needs, balancing between write and read performance.
Copy-on-Write (COW) Tables: In a COW table, each time you update or delete a record, Hudi rewrites the entire data file that contains that record. This approach makes reads very fast because all the data is in a single, clean file format like Parquet. However, it can be more resource-intensive for frequent updates due to all the file rewriting.
Merge-on-Read (MOR) Tables: With a MOR table, updates are handled differently. Instead of rewriting the entire data file, Hudi writes the new or updated records to a smaller, row-based log file. When you query the table, Hudi merges these log files with the main data files on the fly. This makes write operations much faster, but it can make reads slightly slower as Hudi has to perform the merge during the query.
We will now explore the Copy-on-Write (COW) storage type. As discussed earlier, in a COW table each time data is updated or deleted in a file, Hudi rewrites the entire file with the new data. This is a simpler and more traditional approach. Copy-on-Write (COW) is the default table type in Apache Hudi.
Next, we will set up the specific configuration for our Hudi table. We'll use uuid as our unique record key and partition the data by city to keep it organized. The ts (timestamp) field is our precombine key, which helps Hudi decide which record to keep if it finds duplicates.
table_name_cow = "trips_table_cow"
base_path = f"s3a://warehouse/hudi-crud-cow"
cow_hudi_conf = {
"hoodie.table.name": table_name_cow, # The name of our Hudi table.
"hoodie.datasource.write.recordkey.field": "uuid", # The column that acts as the unique identifier for each record.
"hoodie.datasource.write.table.type": "COPY_ON_WRITE", # Hudi uses Copy-on-Write as the default table type, but we are being explicit here.
"hoodie.datasource.write.partitionpath.field": "city", # The column Hudi uses to partition the data on storage.
"hoodie.datasource.write.precombine.field": "ts", # The field used to deduplicate records when a conflict occurs.
"hoodie.table.cdc.enabled": "true",
"hoodie.datasource.write.hive_style_partitioning": "true" # This ensures partition directories are named like `city=new_york`.
}
This is the "Create" part of our CRUD operations. We are writing our initial DataFrame to MinIO as a Hudi table. Using mode("overwrite") ensures that we start with a fresh table every time we run to ensure a clean start.
# Write the DataFrame to a Hudi COW table
# The default operation is "upsert" if this is not specified.
inputDF.write \
.format("hudi") \
.option("hoodie.datasource.write.operation", "insert") \
.options(**cow_hudi_conf) \
.mode("overwrite") \
.save(f"{base_path}/{table_name_cow}")
There are two main types of files:
ls(f"{base_path}/{table_name_cow}")
Hudi manages a table's metadata by storing it in a special directory within the base path. This metadata helps ensure that all tools reading and writing to the table follow the same rules. One of the key files is hoodie.properties, which acts like a configuration file for the table, holding details like table name and version.
ls(f"{base_path}/{table_name_cow}/.hoodie")
Another crucial part of this metadata is the Hudi Timeline, which consists of small files that log every change to the table. These meta-files follow the naming pattern below:
[action timestamp].[action type].[action state]
ls(f"{base_path}/{table_name_cow}/.hoodie/timeline")
Hudi organizes its physical data into two core types of files: Base Files and Log Files. You can think of a Base File as a traditional, organized table of data (often in a format like Parquet), while Log Files are where Hudi stores new changes and updates to that data.
Base File contains the main stored records in a Hudi table and is optimized for read. Log File contains the records' changes on top of its associated Base File and is optimized for write. Within a partition path of a Hudi table, a single Base File and its associated Log Files (in case of MOR table) are grouped together as a File Slice.
ls(f"{base_path}/{table_name_cow}/city=san_francisco")
To easily query our newly created Hudi table, we first need to register it as a temporary SQL view. After that, we'll run a quick command to list all tables and confirm that it's ready to use.
spark.read.format("hudi").load(f"{base_path}/{table_name_cow}").createOrReplaceTempView(f"{table_name_cow}")
display(spark.sql("show tables"))
We have the temporary view now, let's query the COW table to get a snapshot of all records and display the result.
trips_cow_df = spark.sql("select _hoodie_commit_time, _hoodie_file_name, ts, uuid, rider, driver, fare, city from trips_table_cow")
display(trips_cow_df)
Hudi's upsert is a powerful feature that allows us to insert new records or update existing ones. We are going to update the fare for 'rider-G' by multiplying it by 10. Here, we are showing the updated record before we apply the change to the table.
from pyspark.sql.functions import col
updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_cow}").filter(col("rider") == "rider-G").withColumn("fare", col("fare") * 10)
display(updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts"))
Now, we'll perform the upsert. Because uuid is our record key, Hudi knows to find the original record and replace it with our new, updated one.
updatesDF.write \
.format("hudi") \
.option("hoodie.datasource.write.operation", "upsert") \
.options(**cow_hudi_conf) \
.mode("append") \
.save(f"{base_path}/{table_name_cow}")
Let's check the files in the san_francisco partition to see what happened after the upsert. Since this is a Copy-on-Write table, Hudi didn't just modify the existing record in place. Instead, it created a brand new Parquet file containing the updated record and all the other records for that partition.
ls(f"{base_path}/{table_name_cow}/city=san_francisco")
Also check the Hudi table to confirm the record has updated or not. Compare the fare value and commitTime as well.
spark.sql(f"REFRESH TABLE {table_name_cow}")
trips_cow_read = spark.sql(f"select _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts from {table_name_cow} where city = 'san_francisco'")
display(trips_cow_read)
If you look closely at the file list, you'll see a new Parquet file in the san_francisco partition. This is because Hudi's Copy-on-Write mechanism rewrites the entire file when a record is updated. You can also see that the _hoodie_commit_time has changed specifically for uuid-007, confirming that only this record was part of our latest update (fare = 284.50).
A key feature of Hudi is its ability to handle duplicate records automatically. Hudi uses the precombine.field to decide which record to keep when it encounters two or more records with the same recordkey in a single write operation. In our configuration, the precombine.field is set to ts, so Hudi will keep the record with the latest timestamp.
Let's create a new DataFrame with some duplicate data to see this in action. We'll add two records with the same uuid (uuid-001) but with different ts values. The second record has a later timestamp and a higher fare.
from pyspark.sql.functions import lit
duplicate_data = [
("2025-08-10 08:15:30", "uuid-001", "rider-A", "driver-Z", 25.50, "new_york"), # A duplicate record with an old timestamp
("2025-08-10 17:00:00", "uuid-001", "rider-A", "driver-A", 30.00, "new_york"), # Another duplicate record with a new timestamp
("2025-08-11 07:45:00", "uuid-012", "rider-L", "driver-T", 12.25, "chicago") # A new record
]
duplicate_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
duplicatesDF = spark.createDataFrame(duplicate_data).toDF(*duplicate_columns)
#duplicatesDF.show(truncate=False)
display(duplicatesDF)
Now, let's upsert this data into our COW table. Hudi will process the duplicate records for uuid-001 and, based on our precombine.field (ts), it will only keep the record with the later timestamp.
duplicatesDF.write \
.format("hudi") \
.option("hoodie.datasource.write.operation", "upsert") \
.options(**cow_hudi_conf) \
.mode("append") \
.save(f"{base_path}/{table_name_cow}")
Finally, we'll query the table to see the result. As you can see in the output, only one record for uuid-001 exists, and it's the one with the latest timestamp (2025-08-10 17:00:00). The record with the older timestamp was discarded, and the new record for uuid-012 was successfully inserted.
spark.sql(f"REFRESH TABLE {table_name_cow}")
trips_cow_df_dedup = spark.sql(f"select _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts from {table_name_cow} where uuid = 'uuid-001' or uuid = 'uuid-012'")
display(trips_cow_df_dedup)
Finally, let's demonstrate how to delete a record. Let's create a DataFrame that contains the record we want to delete. We'll delete the record for rider-G (uuid-007).
# Create a DataFrame with the record to be deleted
delete_data = [
("2025-08-10 14:10:05", "uuid-007", "rider-G", "driver-T", 28.45, "san_francisco")
]
delete_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
deleteDF = spark.createDataFrame(delete_data).toDF(*delete_columns)
deleteDF.write \
.format("hudi") \
.option("hoodie.datasource.write.operation", "delete") \
.options(**cow_hudi_conf) \
.mode("append") \
.save(f"{base_path}/{table_name_cow}")
Running a snapshot query now confirms that the record for 'rider-G' (uuid-007) is no longer present in our table.
spark.sql(f"REFRESH TABLE {table_name_cow}")
trips_cow_df = spark.sql(f"select _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts from {table_name_cow}")
display(trips_cow_df)
For comparison, let's explore the Merge-on-Read (MOR) table type. In a MOR table, updates are written to a separate log file, which is then merged with the base data files when you read the table.
Here's the configuration for our MOR table.
table_name_mor = "trips_table_mor"
base_path = f"s3a://warehouse/hudi-crud-mor"
mor_hudi_conf = {
"hoodie.table.name": table_name_mor,
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.partitionpath.field": "city",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.datasource.write.hive_style_partitioning": "true"
}
First, let's insert our initial dataset into our new Merge-on-Read (MOR) table. Just like with the COW table, this initial insert creates the table and its partitions.
# Write the DataFrame to a Hudi MOR table
inputDF.write \
.format("hudi") \
.option("hoodie.datasource.write.operation", "insert") \
.options(**mor_hudi_conf) \
.mode("overwrite") \
.save(f"{base_path}/{table_name_mor}")
Let's inspect the file system after this fresh insert. Since there are no updates yet, Hudi has created a clean Parquet base file in each partition, just like it did for our COW table.
ls(f"{base_path}/{table_name_mor}")
ls(f"{base_path}/{table_name_mor}/city=new_york")
After the fresh insert, let's query our new MOR table to confirm that all records were written correctly. Hudi's default read mode automatically gives us a full snapshot of the table.
# First, make sure the table is registered as a temporary view
spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").createOrReplaceTempView("trips_table_mor")
# Now, query the table using SQL to see the inserted records
trips_mor_df = spark.sql("SELECT _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts FROM trips_table_mor")
display(trips_mor_df)
Let's update a record in our MOR table to see how it affects our read modes. We'll find the record for 'driver-W' and double its fare.
from pyspark.sql.functions import col
updatesDF = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").filter(col("driver") == "driver-W").withColumn("fare", col("fare") * 2)
display(updatesDF.select("uuid", "rider", "driver", "fare", "city", "ts"))
Now, we'll perform the upsert. This is where the magic of Merge-on-Read happens! Instead of rewriting the entire Parquet file, Hudi will write a new log file containing just our updated record.
updatesDF.write \
.format("hudi") \
.option("hoodie.datasource.write.operation", "upsert") \
.options(**mor_hudi_conf) \
.mode("append") \
.save(f"{base_path}/{table_name_mor}")
Let's inspect the file system after this update operation.
ls(f"{base_path}/{table_name_mor}/city=new_york")
Now query the MOR table to confirm that the record has been updated correctly.
spark.sql(f"REFRESH TABLE {table_name_mor}")
trips_mor_read = spark.sql(f"select _hoodie_commit_time, _hoodie_file_name, uuid, rider, driver, fare, city, ts from {table_name_mor} where city = 'new_york'")
display(trips_mor_read)
trips_mor_df = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").select("uuid", "rider", "driver", "fare", "city", "ts").filter(col("driver") == "driver-W")
display(trips_mor_df)
Just like with the upsert, deleting a record in a Merge-on-Read table is handled by writing a new log file, not by rewriting the entire base file. Hudi records the delete action in the log, and the record will appear to be gone in a snapshot/real-time query.
First, let's create a DataFrame that contains the record we want to delete. We'll delete the record for rider-E (uuid-005).
# Create a DataFrame with the record to be deleted
delete_data = [
("2025-08-10 12:55:15", "uuid-005", "rider-E", "driver-V", 25.10, "san_francisco")
]
delete_columns = ["ts", "uuid", "rider", "driver", "fare", "city"]
deleteDF = spark.createDataFrame(delete_data).toDF(*delete_columns)
Now, we perform the delete operation. This will generate a new .log file in the city=san_francisco partition, which marks the record for deletion.
deleteDF.write \
.format("hudi") \
.option("hoodie.datasource.write.operation", "delete") \
.options(**mor_hudi_conf) \
.mode("append") \
.save(f"{base_path}/{table_name_mor}")
Let's quickly check the file system. You should see a new .log file in the city=san_francisco partition alongside the existing Parquet and other log files
ls(f"{base_path}/{table_name_mor}/city=san_francisco")
Finally, we'll perform a snapshot query. As you can see, the record for rider-E is no longer visible in the table, demonstrating that the delete was successful.
trips_mor_df = spark.read.format("hudi").load(f"{base_path}/{table_name_mor}").select("uuid", "rider", "driver", "fare", "city", "ts")
display(trips_mor_df)
stop_spark_session()