pip/pip-352.md
Pulsar Topic Compaction provides a key-based data retention mechanism that allows you only to keep the most recent message associated with that key to reduce storage space and improve system efficiency.
Another Pulsar's internal use case, the Topic Compaction of the new load balancer, changed the strategy of compaction. It only keeps the first value of the key. For more detail, see PIP-215.
There is also plugable topic compaction service present. For more details, see PIP-278
More topic compaction details can be found in Pulsar Topic Compaction.
Currently, there are two types of compactors
available: TwoPhaseCompactor and StrategicTwoPhaseCompactor. The latter
is specifically utilized for internal load balancing purposes and is not
employed for regular compaction of Pulsar topics. On the other hand, the
former can be configured via CompactionServiceFactory in the
broker.conf.
I believe it could be advantageous to introduce another type of topic compactor that operates based on event time. Such a compactor would have the capability to maintain desired messages within the topic while preserving the order expected by external applications. Although applications may send messages with the current event time, variations in network conditions or redeliveries could result in messages being stored in the Pulsar topic in a different order than intended. Implementing event time-based checks could mitigate this inconvenience.
Abstract TwoPhaseCompactor
Migrate the current implementation to a new abstraction
Introduce new compactor based on event time
Makes existing tests compatible with new implementations.
In order to change the way topic is compacted we need to create EventTimeCompactionServiceFactory. This service provides a new
compactor EventTimeOrderCompactor which has a logic similar to existing TwoPhaseCompactor with a slightly change in algorithm responsible for
deciding which message is outdated.
New compaction service factory can be enabled via compactionServiceFactoryClassName
Abstract TwoPhaseCompactor and move current logic to new PublishingOrderCompactor
Implement EventTimeCompactionServiceFactory and EventTimeOrderCompactor
Create MessageCompactionData as a holder for compaction related data
Example implementation can be found here