quickwit/quickwit-indexing/src/source/queue_sources/design.md
Besides the usual failures that can happen during indexing, most queues are also subject to duplicates. To ensure that all object files are indexed exactly once, we track the progress of their indexing using the shard table:
To keep messages invisible to other pipelines while they are being processed, each received message spawns a visibility extension task. This task is responsible of extending the visibility timeout each time the visibility deadlines approaches. When the last batch is read for the message and sent to the indexing pipeline:
Garbage collection is owned by the queue based sources. Each pipeline with a queue source will spawn a garbage collection task. To avoid having an increased load on the metastore as the number of pipeline scales, garbage collection calls are debounced by the control plane.
This module is meant to be generic enough to:
Note that because every single messages is tracked by the metastore, this design will not behave well with high message rates. For instance it is not meant to be efficient with a data stream where every message contains a single event. As a rule of thumb, to protect the metastore, it is discouraged to try processing more than 50 messages per second with this design. This means that high throughput can only be achieved with larger contents for each message (e.g larger files when the using the file source with queue notifications).
The QueueCoordinator is a concrete implementation of the machinery necessary to consume data from a queue, from the message reception to its acknowledgment after indexing. The QueueCoordinator interacts with 3 main components.
QueueThe Queue is an abstract interface that can represent any queue implementation (AWS SQS, Google Pub/Sub...). It is sufficient that the queue guaranties at least one delivery of its messages. The abstraction reduces the actual queue's API surface to 3 main functions:
QueueLocalStateThe local state is an in memory data structure that keeps track of the knowledge that the current source has of recently received messages. It manages the transitions of messages between 4 states:
QueueSharedStateThe shared state is a client of the Shard API, a metastore API that was mainly designed to serve ingest V2. The Shard API improves on the previous checkpoint API which was stored as a blob in one of the fields of the index model. The flow is the following one:
The queue source opens a shard, using an ID that uniquely identifies the content of the message as shard ID. For the file source, the shard ID is the file URI. Each source has a unique publish token that is provided in the OpenShards metastore request. The response of the OpenShards requests returns the token of the caller that called the API first. Either:
AcquireShards call to update the shard's token in the metastore with the local one. This indicates subsequent attempts to process the shard that this pipeline now has its ownership. Note though that this is subject to a race conditions: 2 pipelines might acquire the shard concurrently. In that case both pipelines will assume that it owns the shard, and one of them will fail at commit time.The QueueSharedState also owns the background task that will periodically initiate a call to PruneShards to garbage collect old shards.