rfc/rfc-91/rfc-91.md
JIRA: HUDI-9122
Currently in Hudi, distributed locking relies on external systems like Zookeeper, which add complexity and extra dependencies. This RFC introduces a storage-based implementation of the LockProvider interface that utilizes conditional writes in cloud storage platforms (such as GCS and AWS S3) to implement a native distributed locking mechanism for Hudi. By directly integrating lock management with cloud storage, this solution reduces operational overhead, and ensures robust coordination during concurrent writes.
There's a limitation of existing implementation in FileSystemBasedLockProvider (https://github.com/apache/hudi/pull/7440/files#r1061068482) and conditional writes of the file system / storage are required for the storage-based lock provider to operate properly. Hence, we cannot leverage existing implementations.
AWS S3 recently introduced conditional writes, and GCS and Azure storage already support them. This RFC leverages these features to implement a distributed lock provider for Hudi using a leader election algorithm. In this approach, each process attempts an atomic conditional write to a file calculated using the table base path. The first process to succeed is elected leader and takes charge of exclusive operations. This method provides a straightforward, reliable locking mechanism without the need for additional distributed system.
This design implements a leader election algorithm for Apache Hudi using a single lock file per table stored in .hoodie folder. Each table’s lock is represented by a JSON file with the following fields:
s3://bucket/table/.hoodie/.locks/table_lock.json.Each LockProvider must implement tryLock() and unlock() however we also need to do our own lock renewal, therefore this implementation also has renewLock(). The implementation will import a service using reflection which writes to S3/GCS/Azure based on the provided location to write the locks. This ensures the main logic for conditional writes is shared regardless of the underlying storage.
tryLock(): guarantees that only one process can acquire the lock using the conditional write
renewLock(): periodically extends the lock’s expiration (the heartbeat) to continue holding the lock if allowed.
unlock(): safely releases the lock.
Once a lock is acquired, a dedicated heartbeat task periodically calls renewLock() (typically every 30 seconds) to extend the expiration. This ensures the lock remains valid as long as the owning process (thread) is active. The heartbeat manager oversees this process, ensuring no other updates occur concurrently on the lock file. Each lock provider has one heartbeat manager with a single executor thread.
hoodie.write.lock.storage.heartbeat.poll.secs: default 30 sec, how often to renew each lock.hoodie.write.lock.storage.validity.timeout.secs: default 300 sec (5 min), how long each lock is valid for.
Also requires hoodie.base.path, if this does not exist it should fail.The heartbeat should always be at minimum a factor 10 less than the timeout to ensure enough retries exist to acquire the heartbeat.
We will make the conditional write implementation pluggable so each cloud provider's conditional write logic can be added uniquely. For libraries like Hadoop and OpenDAL, conditional writes are on the verge of being supported in java, but not at this time, so we will default to using the client libraries.
When we create the new lock file in tryLock we will use the If-None-Match precondition. From AWS docs:
Etags are unique hashes of the contents of the object. Since our payload has a unique owner uuid, as long as the expiration (which is calculated by System.currentTimeMillis()) changes across requests for the same node, the Etag will change (otherwise the request would return 304 instead of 201/202).
When we overwrite an existing file in any of the methods, we will use the If-Match precondition. From AWS docs:
GCS has ETags, but they also have generation numbers, which are even better, and work for more use cases. Our current implementation already uses them, so they do not need further validation.
When we create the new lock file in tryLock we will use generationMatch(0). From GCP docs:
We can use the same logic for preconditions with overwrite operations using the currently stored lock file's generation number.
We can write normal junit tests using testcontainers with GCS and S3 to simulate edge cases and general contention. Further adhoc testing will include the following scenarios:
We will add some high contention, high usage unit tests that create hundreds of threads to try and acquire locks simultaneously on the testcontainers to simulate load and contention. We can also use thread-unsafe structures like Arraylists to ensure concurrent modifications do not occur.
Run a long-running streaming ingestion process that continuously performs inserts, updates, and deletes. Ensure that frequent commits occur while table services like compaction and clustering operate concurrently. This test will help verify that the lock provider can handle overlapping operations without causing excessive delays or lock contention.
While the streaming ingestion is active, execute multiple Spark jobs and SQL operations (including inserts, updates, and deletes) against the same Hudi table. This scenario is designed to simulate a mixed workload and to confirm that the lock provider maintains a stable baseline commit latency, prevents deadlocks, and handles high levels of concurrency without impacting overall performance.
Initiate one or more continuous streaming processes that run for an extended period (few days). Monitor these processes for issues such as connection leaks, resource exhaustion, or performance degradation over time. Periodic consistency checks during this test will ensure that the data remains intact and that commit operations continue to perform reliably.
After running the above tests, perform validation queries to verify that key fields and preCombine values remain consistent throughout the ingestion process. This step ensures that the lock provider does not introduce any data discrepancies, even under heavy commit loads and concurrent operations.
Throughout all tests, track key performance metrics such as commit latency, throughput, and lock wait times. Monitoring resource utilization (CPU, memory, and network usage) is also essential to determine if the lock provider introduces any significant overhead or bottlenecks.