docs/proposals/shuffle-sharding-on-the-read-path.md
Cortex currently supports sharding of tenants to a subset of the ingesters on the write path PR.
This feature is called “subring”, because it computes a subset of nodes registered to the hash ring. The aim of this feature is to improve isolation between tenants and reduce the number of tenants impacted by an outage.
This approach is similar to the techniques described in Amazon’s Shuffle Sharding article, but currently suffers from a non random selection of nodes (proposed solution below).
Cortex can be configured with a default subring size, and then it can be customized on a per-tenant basis. The per-tenant configuration is live reloaded during runtime and applied without restarting the Cortex process.
The subring sharding currently supports only the write-path. The read-path is not shuffle sharding aware. For example, an outage of more than one ingester with RF=3 will affect all tenants, or a particularly noisy tenant wrt queries has the ability to affect all tenants.
The Cortex read path should support shuffle sharding to isolate the impact of an outage in the cluster. The shard size must be dynamically configurable on a per-tenant basis during runtime.
This deliverable involves introducing shuffle sharding in:
The solution is implemented in https://github.com/cortexproject/cortex/pull/3090.
The subring is a subset of nodes that should be used for a specific tenant.
The current subring implementation doesn’t shuffle tenants across nodes. Given a tenant ID, it finds the first node owning the hash(tenant ID) token and then it picks N distinct consecutive nodes walking the ring clockwise.
For example, in a cluster with 6 nodes (numbered 1-6) and a replication factor of 3, three tenants (A, B, C) could have the following shards:
| Tenant ID | Node 1 | Node 2 | Node 3 | Node 4 | Node 5 | Node 6 |
|---|---|---|---|---|---|---|
| A | x | x | x | |||
| B | x | x | x | |||
| C | x | x | x |
We propose to build the subring picking N distinct and random nodes registered in the ring, using the following algorithm:
hash() function to be decided. The required property is to be strong enough to not generate loops across multiple subsequent hashing of the previous hash.
Implemented in https://github.com/cortexproject/cortex/pull/3113.
Today each querier connects to each query-frontend instance, and calls a single “Process” method via gRPC.
“Process” is a bi-directional streaming gRPC method – using the server-to-client stream for sending requests from query-frontend to the querier, and client-to-server stream for returning results from querier to the query-frontend. NB this is the opposite of what might be considered normal. Query-frontend scans all its queues with pending query requests, and picks a query to execute based on a fair schedule between tenants.
The query request is then sent to an idle querier worker over the stream opened in the Process method, and the query-frontend then waits for a response from querier. This loop repeats until querier disconnects.
To support shuffle sharding, Query-Frontends will keep a list of connected Queriers, and randomly (but consistently between query-frontends) choose N of them to distribute requests to. When Query-Frontend looks for the next request to send to a given querier, it will only consider tenants that “belong” to the Querier.
To choose N Queriers for a tenant, we propose to use a simple algorithm:
index = FNV-1a(SID) % number of Queriers 5. Loop to (3) until we’ve found N distinct queriers (where N is the shard size) and stop early if there aren’t enough queriers
hash() function to be decided. The required property is to be strong enough to not generate loops across multiple subsequent hashing of the previous hash.
An alternative option would be using the subring. This implies having queriers registering to the hash ring and query-frontend instances using the ring client to find the queriers subring for each tenant.
This solution looks adding more complexity without any actual benefit.
Completely different approach would be to introduce a place where starting queriers would register (eg. DNS-based service discovery), and let query-frontends discover queriers from this central registry.
Possible benefit would be that queriers don’t need to initiate connection to all query-frontends, but query-frontends would only connect to queriers for which they have actual pending requests. However this would be a significant redesign of how query-frontend / querier communication works.
Implemented in https://github.com/cortexproject/cortex/pull/3069.
As of today, the store-gateway supports blocks sharding with customizable replication factor (defaults to 3). Blocks of a single tenant are sharded across all store-gateway instances and so to execute a query the querier may touch any store-gateway in the cluster.
The current sharding implementation is based on a hash ring formed by store-gateway instances.
The proposed solution to add shuffle sharding support to the store-gateway is to leverage on the existing hash ring to build a per-tenant subring, which is then used both by the querier and store-gateway to know to which store-gateway a block belongs to.
When shuffle sharding is enabled:
syncUsersBlocks() will build a tenant’s subring for each tenant found scanning the bucket and will skip any tenant not belonging to its shard.Likewise, ShardingMetadataFilter will first build a tenant’s subring and then will use the existing logic to filter out blocks not belonging to store-gateway instance itself. The tenant ID can be read from the block’s meta.json.
blocksStoreReplicationSet.GetClientsFor() will first build a tenant’s subring and then will use the existing logic to find out to which store-gateway instance each requested block belongs to.Given the store-gateways already form a ring and building the shuffle sharding based on the ring (like in the write path) doesn’t introduce extra operational complexity, we haven’t discussed alternatives.
We’re currently discussing/evaluating different options.
Cortex must guarantee query correctness; transiently incorrect results may be cached and returned forever. The main problem to solve when introducing ingesters shuffle sharding on the read path is to make sure that a querier fetch data from all ingesters having at least 1 sample for a given tenant.
The problem to solve is: how can a querier efficiently find which ingesters have data for a given tenant? Each option must consider the changing of the set of ingesters and the changing of each tenant’s subring size.
This section describes an alternative approach. Discussion is still on-going.
The idea is for the queries to be able to deduce what ingesters could possibly hold data for a given tenant by just consulting the ring (and the per-tenant sub ring sizes). We posit that this is possible with only a single piece of extra information: a single timestamp per ingester saying when the ingester first joined the ring.
When a new ingester is added to the ring, there will be a set of user subrings that see a change: an ingester being removed, and a new one being added. We need to guarantee that for some time period (the block flush interval), the ingester removed is also consulted for queries.
To do this, during the subring selection if we encounters an ingester added within the time period, we will add this to the subring but continue node selection as before - in effect, selecting an extra ingester:
var (
subringSize int
selectedNodes []Node
deadline = time.Now().Add(-flushWindow)
)
for len(selectedNodes) < subringSize {
token := random.Next()
node := getNodeByToken(token)
for {
if node in selectedNodes {
node = node.Next()
continue
}
if node.Added.After(deadline) {
subringSize++
selectedNodes.Add(node)
node = node.Next()
continue
)
selectedNodes.Add(node)
break
)
}
When an ingester is permanently removed from the ring it will flush its data to the object store and the subrings containing the removed ingester will gain a “new” ingester. Queries consult the store and merge the results with those from the ingesters, so no data will be missed.
Queriers and store-gateways will discover newly flushed blocks on next sync (-blocks-storage.bucket-store.sync-interval, default 5 minutes).
Multiple ingesters should not be scaled-down within this interval.
To improve read-performance, queriers and rulers are usually configured with non-zero value of -querier.query-store-after option.
This option makes queriers and rulers to consult only ingesters when running queries within specified time window (eg. 12h).
During scale-down this needs to be lowered in order to let queriers and rulers use flushed blocks from the storage.
Node selection for subrings is stable - increasing the size of a subring is guaranteed to only add new nodes to it (and not remove any nodes). Hence, if a tenant’s subring is increase in size the queriers will notice the config change and start consulting the new ingester.
If a tenant’s subring decreases in size, there is currently no way for the queriers to know how big the ring was previously, and hence they will potentially miss an ingester with data for that tenant.
This is deemed an infrequent operation that we considered banning, but have a proposal for how we might make it possible:
The proposal is to have separate read subring and write subring size in the config. The read subring will not be allowed to be smaller than the write subring. When reducing the size of a tenant’s subring, operators must first reduce the write subring, and then two hours later when the blocks have been flushed, the read subring. In the majority of cases the read subring will not need to be specified, as it will default to the write subring size.
A possible solution could be keeping in the querier an in-memory data structure to map each ingester to the list of tenants for which it has some data. This data structure would be constructed at querier startup, and then periodically updated, interpolating two information:
When a querier starts up and before getting ready:
Then the querier watches the ingester ring and rebuilds the in-memory map whenever the ring topology changes.
A new tenant starts remote writing to the cluster. The querier doesn’t know it in its in-memory map, so it adds the tenant on the fly to the map just looking at the current state of the ring.
When a new ingester is added / removed to / from the ring, the ring topology changes and queriers will update the in-memory map.
Queriers periodically (every 1m) reload the limits config file. When a tenant shard size change is detected, the querier updates the in-memory map for the affected tenant.
Issue: some time series data may be missing in queries up to 1m.
Consider the following scenario:
This section describes an alternative approach.
As of today, queriers discover ingesters via the ring:
The proposal is to expose a new gRPC endpoint on ingesters, which allows queriers to receive a stream of real time updates from ingesters about the tenants for which an ingester currently has time series data.
From the querier side:
Pros:
Cons:
The ruler currently supports rule groups sharding across a pool of rulers. When sharding is enabled, rulers form a hash ring and each ruler uses the ring to check if it should evaluate a specific rule group.
At a polling interval (defaults to 1 minute), the ruler:
We propose to introduce shuffle sharding in the ruler as well, leveraging on the already existing hash ring used by the current sharding implementation.
The configuration will be extended to allow to configure:
When shuffle sharding is enabled:
The ruler re-syncs the rule groups from the bucket whenever one of the following conditions happen: