src/mongo/s/query/planner/README.md
Aside: Sharding
Sharding is the process of distributing data across multiple servers (shards) to horizontally scale the database. Each shard owns a subset of the data, enabling parallel processing and improved performance by spreading the workload across multiple machines. For more information on sharding, refer to the Sharding Architecture Guide.
Queries on distributed data are processed in a distributed fashion. This is generally a two-step process:
Both mongos and mongod acting as a router can do distributed query planning.
Aside: mongos
For a sharded cluster, the
mongosinstances provide the interface between the client applications and the sharded cluster, routing queries and write operations to the appropriate shard(s) based on the shard key.
Aside: mongod
mongodis the primary daemon process for the MongoDB system. It handles data requests, manages data access, and performs background management operations.In a sharded cluster,
mongodinstances are deployed as shard servers, each responsible for storing a subset of the data in a cluster. They handle queries targeting their owned data ranges (chunks), determined by the shard key. In some cases, amongodmay also act as a router and perform shard targeting.
For queries that must run on more than one shard (i.e. the query requires data that is spread across shards), there is (1) a shards part of the query and (2) a merge part of the query. The shards part includes components that can be executed in parallel by all the targeted shards, such as filters. The merge part performs the remaining query operations globally on a single node, gathering the results from the shards part and then executing the rest of the query over the aggregated results. These commands are processed through the following steps:
Determine Shards Part and Merge Part: The query is analyzed and split into the shards part, which can be run in parallel across shards, and merge part, which must be executed globally on a merge host.
Shard Targeting: The routing table is consulted to determine which shardIds own data required by the shards part (details in Shard and Host Targeting). This is usually the latest routing table, although a historical routing table may be retrieved for transactions. For details on how the shard key is extracted from the query and how the set of shard ids is produced, refer to Shard and Host Targeting.
Aside: Routing table
The routing table maps chunks of data to shard ids, and it is consulted by
mongosfor shard targeting. It must be updated as data moves between shards, such as when the balancer is turned on or a range migration commits. For more information on the shard versioning protocol it uses to ensure a consistent view of data across a request, refer to the Distributed CRUDs README. For information about routing query operations safely with theRoutingContext, refer to the RoutingContext README.A chunk is a contiguous range over the shard key.
shardVersion across the selected shards. They are transferred to a ClusterClientCursor, which returns the first batch of results.Aside: Cursor
A cursor is a pointer to the result set of a query. The client can iterate over large result sets without loading all the data into memory at once. Queries return cursor ids, as well as a number of documents within the specified
batchSize. If there are more documents remaining in the result set, the client will construct subsequentgetMorerequests with the same cursor id. If all documents have been returned in the first batch, the returned cursor id is 0.
mongos or a merging shard. It collects data from shard cursors and performs a global aggregation or processing to produce the final result.flowchart TD
%% Definitions
A[Client Request]
A1[Query]
B@{ shape: lin-cyl, label: "Router" }
C[Split Query]
D@{ shape: das, label: Shards Part }
E@{ shape: das, label: Merge Part }
F[Shard Targeting]
G@{ shape: cyl, label: "Shard 0" }
H@{ shape: cyl, label: "Shard 1" }
S@{ shape: cyl, label: "Merging
Shard" }
T((mongos))
I0@{ shape: docs, label: "Partial Results"}
I1@{ shape: docs, label: "Partial Results"}
J0((Cursor 0))
J1((Cursor 1))
L@{ shape: docs, label: "Final Aggregated Results"}
M@{ shape: diam, label: "Can run on
router?"}
%% Workflow
A --> |Extract Query| A1
A1 --> B
B --> C
subgraph Query_Processing [Query Processing]
C --> D
C --> E
D --> F
F --> G
F --> H
G --> I0
H --> I1
I0 --> J0
I1 --> J1
J0 --o E
J1 --o E
E --> M
M --> |Yes| T
M --> |No| S
S --> L
T --> L
end
L --> A
There are some high-level differences between the different distributed commands. For example, find commands use the AsyncResultsMerger for the merge part, whereas aggregate commands use a merging pipeline with a $mergeCursors stage.
Find commands have fairly straightforward logic for what goes into the shards part and what goes into the merge part. The filter is always processed by the shards. If the query targets multiple shards, the skip, limit, and sort are applied on the router. If it only targets a single shard, they are applied on the mongod.
Agg commands are more expressive and thus have stage-specific logic that determines where the "pipeline splitting" happens. Generally, it is desirable for the split to happen as late as possible so that the bulk of the pipeline can be executed in parallel in the shards part. For more details, refer to the Distributed Aggregations README.
Aside: Exchange Operator
In certain cases, an additional check is performed to see if the
mergePipelineis eligible for the$exchangeoperator. This is useful for queries that send results to different remote hosts, such as$outto a sharded collection. The goal is to shuffle documents with the exchange such that the merging can be done on multiple shards, rather than selecting a single merger. There will be an exchange producer on each shard generating results, as well as an exchange consumer on each output shard. This increases parallelism during the merging stage.For example, the pipeline
db.coll.aggregate([ {$addFields: {value: <expression>}}, {$group: {_id: "$value"}}, {$project: {shardKey: "$_id"}}, {$out: { to: "sharded", mode: "replaceDocuments" }} ]);would normally perform the merge part (final
$group+$project+$out) remotely on a single merging shard, before sending the data owned by each shard back to the shard.But because the pipeline is shard key-preserving, we could exchange documents such that the results of each partial group are sent to their owning shards. The optimized split pipeline would look like:
{ shards: [ {$addFields: {value: <expression>}}, {$group: {_id: "$value"}}, {exchange: {...}} ], merger: [ {$mergeCursors: {...}}, {$group: {_id: "$shardKey"}}, {$project: {shardKey: "$_id"}}, {$out: {...}} ] }There will be multiple mergers, one per shard owning chunks of the output collection. This increases the chance that each
$outwrite is done locally, rather than remotely.The data is partitioned based on the specific redistribution policy:
kBroadcast- each produced document is sent to all consumers.kRoundRobin- each produced document is sent to one producer in a round-robin fashion.kKeyRange- the data is routed to consumers based on ranges of values. This can be beneficial for merge pipelines that preserve the shard key.For more information about
$exchange, refer toexchange_spec.idlordocument_source_exchange.h.
Top-Level Entrypoints:
Find:
cluster_find_cmd::run()
CanonicalQuery from the command request and sends it to cluster_find::runQuery().readPreference, returning the first batch of results and a cursor id for subsequent getMore requests on success.Aggregate:
cluster_aggregate::runAggregate()
ClusterPipelineCommandBase::_runAggCommand().MongosProcessInterface::preparePipelineForExecution().
mongos router.ShardServerProcessInterface::preparePipelineForExecution()
mongod node that acts as a shard server.$lookup and $unionWith.Let's see how this process is applied to the following sample collection and pipeline:
Sample Data
[
{_id: 0, brand: "Anthropologie", inventory: 15, type: "dresses", location: "5th Avenue"},
{_id: 1, brand: "Anthropologie", inventory: 7, type: "accessories", location: "5th Avenue"},
{_id: 2, brand: "Anthropologie", inventory: 3, type: "pants", location: "Tribeca"},
{_id: 3, brand: "Adidas", inventory: 18, type: "pants", location: "5th Avenue"},
{_id: 4, brand: "Adidas", inventory: 12, type: "shoes", location: "Tribeca"},
{_id: 5, brand: "Adidas", inventory: 18, type: "pants", location: "Tribeca"},
{_id: 6, brand: "Nike", inventory: 25, type: "shoes", location: "Soho"},
{_id: 7, brand: "Zara", inventory: 20, type: "dresses", location: "Soho"}
]
Pipeline
[
{$match: {location: {$in: ["5th Avenue", "Tribeca"]}}},
{$group: {_id: "$brand", totalInventory: {$sum: "$inventory"}}}
]
Imagine the collection is sharded on location.
{location: "5th Avenue"}{location: "Tribeca"}{location: "Soho"}The $match stage can be pushed down to the shards, as can part of the work in the $group stage. During shard targeting, shard0 and shard1 are identified. The $match and $group are executed independently on each shard, producing intermediate results:
// shard0
[
{_id: "Anthropologie", totalInventory: 22},
{_id: "Adidas", totalInventory: 18}
]
// shard1
[
{_id: "Anthropologie", totalInventory: 3},
{_id: "Adidas", totalInventory: 30}
]
The merge host then collects these intermediate results from the shards and performs a final aggregation to combine the results:
[
{_id: "Anthropologie", totalInventory: 25},
{_id: "Adidas", totalInventory: 48}
]
This reduces the amount of data transferred over the network between shards and the merge host, since we filter only the data that matches the location and return only the requested fields back from the shards.
As part of optimization on the router, query commands (find and agg alike) build a CanonicalQuery containing all the information needed to figure out which nodes should run the shards part of the query.
Given a filter from a CanonicalQuery and a ChunkManager that's a wrapper around a routing table at a specific point in time, we can calculate the relevant shard ids (shard_key_pattern_query_util::getShardIdsAndChunksForCanonicalQuery()). The process varies depending on how much shard key information is present in the filter.
Aside: Shard Key Pattern
The field(s) used to partition data across shards. A query must include the full shard key pattern or a prefix of a compound shard key pattern to be targeted to the shards. If no shard key fields are present in the query, the query may need to be broadcasted to all shards.
Case 1: Full Shard Key Extraction (Equality Match)
For queries involving simple equality matches covering the whole shard key pattern, we can directly extract the corresponding shard key values. This can usually be targeted to a single shard, barring special cases such as a non-simple collation, so we take the fast path and return the shardId immediately.
// Shard key pattern:
{a: 1, b: 1}
// Filter over full shard key
{a: {$eq: 3}, b: {$eq: "hi"}}
// Shard key extracted
{a: 3, b: "hi"}
Aside: Collation and Shard Targeting
A collation is a set of rules that defines how strings are compared and stored. Rules can be related to case sensitivity (whether
Aandaare considered equivalent or distinctly ordered) and locale awareness (whetherasorts beforezcan depend on the locale/language). Shard keys are created with the default collation, so when a query specifies a different collation, the query may not be targeted correctly.For example, the equality predicate
{location: 'Nice'}with a case-insensitive collation is expected to match documents where location = 'Nice' or 'nice'. However, because shard keys use the default case-sensitive collation, the query may only target the shard containing the exact match forNice. This can lead to incomplete or incorrect query results, so the query is ineligible for shard targeting unless we explicitly allow forcing targeting with a simple collation.
Case 2: Non-Equality Matches on Shard Key
When the filter contains non-equality predicates (ranges), or the fields in the query filter don't match the full shard key pattern, the query is transformed into index bounds for each shard key field, leveraging query planning as a subroutine. These bounds are then transformed into full shard key ranges.
// Shard key pattern:
{a: 1, b: 1}
// Filter
{a: {$gte: 1, $lt: 2}, b: {$gte: 3, $lt: 4}}
// Bounds
{a: [1, 2), b: [3, 4)}
// Ranges
{a: 1, b: 3} => {a: 2, b: 4}
The chunk manager returns the shardIds that map to the provided shard key ranges.
---
config:
themeVariables:
fontSize: 32px
---
flowchart LR
%% Definitions
A1@{shape: "lean-r", label: "CanonicalQuery Filter"}
A2@{shape: "lean-r", label: "Shard key pattern"}
B@{ shape: div-rect, label: "ChunkManager
(Routing Table)" }
C["Shard Key"]
D@{shape: "diamond", label: Full equality match
on shard key pattern?}
E["Shard Key Index Bounds"]
G["Shard Key Ranges"]
H@{ shape: lean-r, label: "Shard Id(s)" }
%% Workflow
A1 --o D
A2 --o D
D --> |No|E
E --> G
G --> B
B --> H
D --> |Yes| C
C --> B
Aside:
ShardTargetingPolicyThe
ShardTargetingPolicydefines policies that determine whether and how a query or operation can target shards.
kNotAllowed- operations that can only perform local reads on the node, disallowed from communicating with other shards. For instance, a$lookupthat's run in a transaction doesn't support a shardedfromcollection by default, so its subpipeline would have a shard targeting policy ofkNotAllowed.kAllowed- operations can be targeted to specific shards. This is the default in most cases, and applies to queries that perform equality and range lookups over a shard key.kForceTargetingWithSimpleCollation- this forces shard targeting with thesimplecollation, ignoring a potentially different collation specified by the query.