pkg/metastore/compaction/README.md
The Pyroscope ingestion pipeline is designed to gather data in memory as small segments, which are periodically flushed to object storage, along with the metadata entries being added to the metastore index. Depending on the configuration and deployment scale, the number of segments created per second can increase significantly, reaching millions of objects per hour or day. This can lead to performance degradation in the query path due to high read amplification caused by the large number of small segments. In addition to read amplification, a high number of metadata entries can also lead to performance degradation across the entire cluster, impacting the write path as well.
The new background compaction process helps mitigate this by merging small segments into larger ones, aiming to reduce the number of objects a query needs to fetch from object storage.
The compaction service is responsible for planning compaction jobs, scheduling their execution, and updating the metastore index with the results. The compaction service resides within the metastore component, while the compaction worker is a separate service designed to scale horizontally.
The compaction service relies on the Raft protocol to guarantee consistency across the replicas. The diagram below illustrates the interaction between the compaction worker and the compaction service: workers poll the service on a regular basis to request new compaction jobs and report status updates.
A status update is processed by the leader node in two steps, each of which is a Raft command committed to the log:
Critical sections are guaranteed to be executed serially in the context of the Raft state machine and by the same leader (within the same term), and atomically from the cluster's perspective. If the prepared compaction plan update is not accepted by the Raft log, the update plan is discarded, and the new leader will propose a new plan.
The two-step process ensures that all the replicas use the same compaction plan, regardless of their internal state,
as long as the replicas can apply UpdateCompactionPlanchange. This is true even in case the compaction algorithm
(the GetCompactionPlanUpdate step) changes across the replicas during the ongoing migration – version upgrade or
downgrade.
As of now, both steps are committed to the Raft log. However, as an optimization, the first step – preparation, can be implemented as a Linearizable Read through Read Index (which we already use in metadata queries) to avoid unnecessary replication of the read-only operation. This approach is already used by the metadata index
cleaner: leader read with a follow-up proposal.However, unlike cleanup, compaction is a more complex operation, and the serialization guarantees provided
by Raft command execution flow help avoid many potential issues with concurrent read/write access.
sequenceDiagram
participant W as Compaction Worker
box Compaction Service
participant H as Endpoint
participant R as Raft
end
loop
W ->>+H: PollCompactionJobsRequest
critical
critical FSM state read
H ->>R: GetCompactionPlanUpdate
create participant U as Plan Update
R ->>U:
U ->>+S: Job status updates
Note right of U: Job ownership is protected with
leases with fencing token
S ->>-U: Job state changes
U ->>+S: Assign jobs
S ->>-U: Job state changes
U ->>+P: Create jobs
Note right of U: New jobs are created if
workers have enough capacity
P ->>P: Dequeue blocks
and load tombstones
P ->>-U: New jobs
U ->>+S: Add jobs
S ->>-U: Job state changes
destroy U
U ->>R: CompactionPlanUpdate
R ->>H: CompactionPlanUpdate
end
critical FSM state update
H ->>R: UpdateCompactionPlan
R ->>S: Update schedule
(new, completed, assigned, reassigned jobs)
R ->>P: Remove source blocks from the planner queue (new jobs)
R ->>I: Replace source blocks in the index (completed jobs)
and create tombstones for deleted
I ->>+C: Add new blocks
C ->>C: Enqueue
C ->>-I:
I ->>R:
R ->>H: CompactionPlanUpdate
end
end
H ->> W: PollCompactionJobsResponse
end
box FSM
participant C as Compactor
participant P as Planner
participant S as Scheduler
participant I as Metadata Index
end
The compactor is responsible for maintaining a queue of source blocks eligible for compaction. Currently, this queue is a simple doubly-linked FIFO structure, populated with new block batches as they are added to the index. In the current implementation, a new compaction job is created once the sufficient number of blocks have been enqueued. Compaction jobs are planned on demand when requests are received from the compaction service.
The queue is segmented by the Tenant, Shard, and Level attributes of the block metadata entries, meaning that
a block compaction never crosses these boundaries. This segmentation helps avoid unnecessary compactions of unrelated
blocks. However, the downside is that blocks are never compacted across different shards, which can lead to suboptimal
compaction results. Due to the dynamic data placement, it is possible for a tenant to be placed on a shard for only a
short period of time. As a result, the data in that shard may not be compacted with other data from the same tenant.
Cross-shard compaction is to be implemented as a future enhancement. The observed impact of the limitation is moderate.
Profiling data from each service (identified by the service_name label) is stored as a separate dataset within a block.
The block layout is composed of a collection of non-overlapping, independent datasets, each containing distinct data. At compaction, matching datasets from different blocks are merged: their tsdb index, symbols, and profile tables are merged and rewritten to a new block, to optimize the data for efficient reading.
The scheduler implements the basic Small Job First strategy: blocks of lower levels are considered smaller than blocks of higher levels, and their compaction is prioritized. This is justifiable because the smaller blocks affect read amplification more than the larger blocks, and the compaction of smaller blocks is more efficient.
Compaction jobs are assigned to workers in the order of their priority.
Internally, the scheduler maintains a priority queue of jobs for each compaction level. Jobs of lower levels are assigned first, and the scheduler does not consider jobs of higher levels until all eligible jobs of lower levels are assigned.
The priority is determined by several factors:
COMPACTION_STATUS_UNSPECIFIED: unassigned jobs.COMPACTION_STATUS_IN_PROGRESS: in-progress jobs. The first job that can't be reassigned is a sentinel:
no more jobs are eligible for assignment at this level.See Job Status Description for more details.
The challenge is that we don't know the capacity of our worker fleet in advance, and we have no control over them; they can appear and disappear at any time. Another problem is that in some failure modes, such as unavailability or lack of compaction workers, or temporary unavailability of the metastore service, the number of blocks to be compacted may reach significant levels (millions).
Therefore, we use an adaptive approach to keep the scheduler's job queue short while ensuring the compaction workers are fully utilized. In every request, the worker specifies how many free slots it has available for new jobs. As the compaction procedure is a synchronous CPU-bound task, we use the number of logical CPU cores as the worker's max capacity and decrement it for each in-progress compaction job. When a new request arrives, it specifies the current worker's capacity, which serves as evidence that the entire worker fleet has enough resources to handle at least this number of jobs. Thus, for every request, we try to enqueue a number of jobs equal to the reported capacity.
Over time, this ensures good balance between the number of jobs in the queue and the worker capacity utilization, even if there are millions of blocks to compact.
Distributed locking implementation is inspired by The Chubby lock service and Leases: An Efficient Fault-Tolerant Mechanism for Distributed File Cache Consistency. The implementation is based on the Raft protocol.
Ownership of a compaction job is granted to a compaction worker for a specified period – a lease:
A lease is a contract that gives its holder specified rights over property for a limited period of time.
The real-time clock of the worker and the scheduler cannot be used; instead, the timestamp of the Raft log entry, assigned by the Raft leader when the entry is appended to the log, serves as the reference point in time.
The fact that leases are allocated by the current leader allows for spurious lease invalidation when the leader changes and the clock skew exceeds the lease duration. This is acceptable because jobs will be reassigned repeatedly, and the occurrence of the event should be very rare. However, the solution does not tolerate clock skews exceeding the job lease duration (which is 15 seconds by default).
The log entry index is used as the fencing token of protected resources (compaction jobs).
The Raft log entry index is a monotonically increasing integer, guaranteed to be unique for each command. Each time a job is assigned to a worker, the worker is provided with the current Raft log index as the fencing token, which is also assigned to the job. For subsequent requests, the worker must provide the fencing token it was given at assignment. The ownership of the job is confirmed if the provided token is greater than or equal to the job's token. The job's token may change if the job is reassigned to another worker, and the new token is derived from the current Raft log index, which is guaranteed to be greater.
Token authentication is not enforced in this design, as the system operates in a trusted environment with cooperative workers. However, m malicious workers can arbitrarily specify a token. In the future, we may consider implementing a basic authentication mechanism based on cryptographic signatures to further ensure the integrity of token usage.
This is an advisory locking mechanism, meaning resources are not automatically restricted from access when the lock is not acquired. Consequently, a client might choose to delete source blocks associated with a compaction job or continue processing the job even without holding the lease. This behavior, however, should be avoided in the worker implementation.
When a worker requests a new assignment, the scheduler must find the highest-priority job that is not assigned yet, and assign it to the worker. When a job is assigned, the worker is given a lease with a deadline. The worker should refresh the lease before it expires.
The worker must send a status update to the scheduler to refresh the lease. The scheduler must update the lease expiration time if the worker still owns the job.
The scheduler may revoke a job if the worker does not send the status update within the lease duration.
When a new assignment is requested by a worker, the scheduler inspects in-progress jobs and checks if the lease duration has expired. If the lease has expired, the job is reassigned to the worker requested for a new assignment.
If the timestamp of the current Raft log entry (command) exceeds the job lease_expires_at timestamp,
the scheduler must revoke the job:
COMPACTION_STATUS_IN_PROGRESS.The worker instance that has lost the job is not notified immediately. If the worker reports an update for a job that it is not assigned to, or if the job is not found (for example, if it has been completed by another worker), the scheduler does not allocate a new lease; the worker should stop processing. This mechanism prevents the worker from processing jobs unnecessarily.
If the worker is not capable of executing the job, it may abandon the job without further notifications. The scheduler will eventually reassign the job to another worker. The lost job might be reassigned to the same worker instance if that instance detects the loss before others do: abandoned jobs are assigned to the first worker that requests new assignments when no unassigned jobs are available.
There is no explicit mechanism for reporting a failure from the worker. In fact, the scheduler must not rely on error reports from workers, as jobs that cause workers to crash would yield no reports at all.
To avoid infinite reassignment loops, the scheduler keeps track of reassignments (failures) for each job. If the number of failures exceeds a set threshold, the job is not reassigned and remains at the bottom of the queue. Once the cause of failure is resolved, the error limit can be temporarily increased to reprocess these jobs.
The scheduler queue has a size limit. Typically, the only scenario in which this limit is reached is when the compaction process is not functioning correctly (e.g., due to a bug in the compaction procedure), preventing blocks from being compacted and resulting in many jobs remaining in a failed state. Once the queue size limit is reached, failed jobs are evicted, meaning the corresponding blocks will never be compacted. This may cause read amplification of the data queries and bloat the metadata index. Therefore, the limit should be large enough. The recommended course of action is to roll back or fix the bug and restart the compaction process, temporarily increasing the error limit if necessary.
When the worker reports a successful completion of the job, the scheduler must remove the job from the schedule and notify the planner about the completion.
The diagram below depicts the state machine of the job status.
stateDiagram-v2
[*] --> Unassigned : Create Job
Unassigned --> InProgress : Assign Job
InProgress --> Success : Job Completed
InProgress --> LeaseExpired: Job Lease Expires
LeaseExpired: Abandoned Job
LeaseExpired --> Excluded: Failure Threshold Exceeded
Excluded: Faulty Job
Success --> [*] : Remove Job from Schedule
LeaseExpired --> InProgress : Reassign Job
Unassigned : COMPACTION_STATUS_UNSPECIFIED
InProgress : COMPACTION_STATUS_IN_PROGRESS
Success : COMPACTION_STATUS_SUCCESS
LeaseExpired : COMPACTION_STATUS_IN_PROGRESS
Excluded: COMPACTION_STATUS_IN_PROGRESS
| Status | Description |
|---|---|
COMPACTION_STATUS_UNSPECIFIED | Not allowed. |
COMPACTION_STATUS_IN_PROGRESS | Job lease refresh. The worker should refresh the new lease before the new deadline. |
COMPACTION_STATUS_SUCCESS | Not allowed. |
| --- | No lease refresh from the scheduler. The worker should stop processing. |
| Status | Description |
|---|---|
COMPACTION_STATUS_UNSPECIFIED | Not allowed. |
COMPACTION_STATUS_IN_PROGRESS | Job lease refresh. The scheduler must extend the lease of the job, if the worker still owns it. |
COMPACTION_STATUS_SUCCESS | The job has been successfully completed. The scheduler must remove the job from the schedule and communicate the update to the planner. |
COMPACTION_STATUS_UNSPECIFIED is never sent over the wire between the scheduler and workers.COMPACTION_STATUS_IN_PROGRESS cannot be reassigned if its failure counter exceeds the threshold.COMPACTION_STATUS_SUCCESS is removed from the schedule immediately.