docs/change_streams.md
Disclaimer: the following documentation is based on how change streams are implemented in the current version of master, if not explicitly stated otherwise. Implementation details for previous versions may vary slightly.
Change streams are a convenient way for an application to monitor changes made to the data in a deployment. The events produced by change streams are called "change events". The event data is produced from the oplog(s) of the deployment. The events that are emitted by change streams include
Which exact event types are emitted by a change stream depends on the change stream configuration and the deployment type.
Change streams are mainly used by customer applications and tools to keep track of changes to the data in a deployment, in order to relay these updates to external systems. Some of MongoDB's own tools and components are also based on change streams, e.g. mongosync (C2C), Atlas Search, Atlas Stream Processing, and the resharding process. The component that opens a change stream and pulls events from it is called the "consumer".
Change Streams provide various guarantees:
_id field) that accompanies
every change event, which acts as a bookmark. This allows to the consumer to continue processing
changes from the last known position without missing events.Change streams can be opened on different levels that control which events are emitted:
testDB.testCollection).testDB).Consumers can use additional filters to a change stream by adding $match stages to the change
stream pipeline as needed.
Change streams cannot be opened on views. This includes timeseries collections if they use views.
Change streams respect the read permissions of the consumer. A consumer can only open change streams on collections or databases that they have access to.
Change streams can be opened against a replica sets and sharded cluster deployments. They cannot be opened against standalone mongod instances, as there is no oplog to generate the events from in standalone mode.
In replica set deployments, the change stream can be opened directly on any replica set member of the deployment. In sharded cluster deployments, the change stream must be opened against any of the deployment's mongos processes.
A change stream is opened by executing an aggregate command with a pipeline that contains at least
the $changeStream pipeline stage.
mongosh, the "jstest shell" (mongo) and many drivers provide simpler "watch" wrappers for this.
To open a collection-level change stream on a specific collection (e.g., testDB.testCollection),
the following mongosh command can be used:
db.getSiblingDB("testDB").runCommand({
aggregate: "testCollection",
pipeline: [
{
$changeStream: {},
},
],
cursor: {},
});
To open a database-level change stream on a specific database (e.g., testDB), use the following
command in mongosh:
db.getSiblingDB("testDB").runCommand({
aggregate: 1,
pipeline: [
{
$changeStream: {},
},
],
cursor: {},
});
The aggregate parameter must be set to 1 for database-level change streams, and the command must
be executed inside the desired database.
The internal namespace that is used by database-level change streams is <dbName>.$cmd.aggregate
(where <dbName> is the actual name of the database).
All-cluster change streams can only be opened on the admin database and also need the
allChangesForCluster flag to be set to true in order to work. The following mongosh command
can be used to open an all-cluster change stream:
db.adminCommand({
aggregate: 1,
pipeline: [
{
$changeStream: {
allChangesForCluster: true,
},
},
],
cursor: {},
});
The internal namespace that is used by all-cluster change streams is always admin.$cmd.aggregate.
As a $changeStream is a pipeline, additional pipeline stages can be added to it for filtering and
transforming results, e.g.
$addFields$match$project$replaceRoot$replaceWith$redact$set$unsetThere is also a change streams-specific stage $changeStreamSplitLargeEvent to split large events
into smaller fragments, in order to avoid running into BSONObjectTooLarge errors.
When opening a change stream without specifying an explicit point in time, the change stream will be opened using the current time, and will report only change events that happened after that point in time. The current time here is
To open a change stream at a specific point in time instead of using the current time, the parameter
startAtOperationTime can be set in the initial change stream request. The startAtOperationTime
parameter is specified as a logical timestamp.
Change streams allow the consumer to resume the change stream after an error occurred.
To support resumability, change streams report a "resume token" inside the _id field of every
emitted event.
To resume a change stream after an error occurred, the resume token of a previously consumed event
can be passed in one of the parameters resumeAfter or startAfter when opening a change stream.
The resumeAfter parameter cannot be used with resume tokens that were emitted by an "invalidate"
event. The startAfter parameter can be used even with invalidate events.
When specifying an explicit start point for a change stream, only one of the parameters
resumeAfter, startAfter and startAtOperationTime can be used. Using more than one of them when
opening a change stream will return an error.
Resume tokens are not "portable" in the sense that they can only be used to resume a change stream that is opened with the same settings and pipeline stages as the change stream that produced the original resume token.
For example, changing the $match expression of a change stream when resuming from a change stream
with a different $match expression may lead to different events being returned, which may lead to
the event with the original resume token not being found in the new change stream.
The resume tokens that are emitted by change streams are string values that contain a hexadecimal encoding of the internal resume token data. The internal resume token data contains
Resume tokens are versioned. Currently only version 2 is supported.
Future versions may introduce new resume token versions. Client applications should treat resume tokens as opaque identifiers and should not make any assumptions about the format or internals or resume tokens, nor should they rely on the internal implementation details of resume tokens.
Resume tokens are serialized and deserialized by the ResumeToken class. The resume token internal data is stored in ResumeTokenData.
There are two types of resume tokens:
The former stem from actual change events. High watermark token are a special kind of change stream resume token that represent a logical position in the global change stream ordered only by cluster time, not a specific event.
High watermark tokens sort strictly before any real event token at the same cluster time. That is, a high‑watermark token for time T sorts ahead of all events whose cluster time >= T.
An example encoded resume token looks as follows:
{ "_data" : "8269B03187000000022B0429296E1404" }
To destructure a resume token into its internal constituent parts, there is the function
decodeResumeToken() available in the mongo shell.
Invoking it on the example resume token above, it will produce:
{
clusterTime: Timestamp(1773154695, 2),
tokenData: 0,
version: 2,
txnOpIndex: NumberLong(0),
tokenType: 0,
fromInvalidate: false
}
This can be very helpful to extract the cluster time (i.e. the resume timepoint) from a resume token.
When opening a change stream on a replica set, a cursor will be established on the targeted replica set node. The change stream cursor is "tailable" and will remain open until it is explicitly closed by the consumer or the change stream runs into an error. Also, unused cursors are eventually garbage-collected after a period of inactivity.
When opening a change stream on a sharded cluster, the targeted mongos instance will open the
required cursors on the relevant shards of the cluster and also the config server. Here, the mongos
instance will also automatically open additional cursors in case new shards are added to the
cluster. All this is abstracted from the consumer of the change stream. The consumer of the change
stream will only see a single cursor and interact with mongos, which handles the complexity of
managing the underlying shard cursors.
If a change stream cursor can be successfully established, the cursor id is returned to the
consumer. The consumer can then use the cursor id to pull change events from the change stream by
issuing follow-up getMore commands to this cursor.
If a change stream cursor cannot be successfully opened, the initial aggregate command will
return an error, and the returned cursor id will be 0. In this case, no events can be consumed
from the change stream, and the consumer needs to resolve the error.
When a change stream is opened at a specific point in time, it is validated that the oplog of all
participating nodes actually contains data for this point in time.
If the oplog does not contain any data for the exact point in time or before, it would be possible
that the requested data has already fallen off the oplog.
In case no oplog entry can be found that is at least as old as the specified timetamp, opening the
change stream will fail with error code OplogQueryMinTsMissing.
This validation happens for all change streams, regardless if the start timestamp is specified via
the resumeAfter, startAfter or startAtOperationTime parameters, or if the start time is
implied from the current time.
An exception in which opening a change stream at a later point in time than the timestamp of the
first present oplog entry is permitted is for new shard primaries.
New shard primary can be added to an existing cluster at any point in time. When a new shard primary
is added, its first oplog entry will be a no-op entry with msg == initiating set (on ASC) or
msg == new primary (on DSC).
The code for this can be found here.
Another common error is ChangeStreamHistoryLost. This error is raised when a change stream is
opened with a resume token that cannot be found (anymore) in any of the participating nodes' oplogs.
This can either happen when the resume event has actually fallen off the oplog, or, when a
change stream is resumed with the resume token from another change stream with a different $match
expression. In this case, the new change stream may filter out the resume event due to the different
$match expression, so it cannot be found anymore.
Resuming a change stream using a resume token from a change stream with a different $match
expression is thus not guaranteed to work.
To fetch events from a previously opened change stream, the consumer can send a getMore request
using the cursor id that was established by the initial aggregate command, e.g.
// For a collection-level change stream on "testDB.testCollection"
db.getSiblingDB("testDB").runCommand({
getMore: cursorId,
collection: "testCollection",
});
// For a database-level change stream on "testDB"
db.getSiblingDB("testDB").runCommand({
getMore: cursorId,
collection: "$cmd.aggregate",
});
// For an all-cluster change stream:
db.adminCommand({
getMore: cursorId,
collection: "$cmd.aggregate",
});
Responses can be further controlled by using the following optional parameters in the getMore
request:
batchSize: maximum number of change events to return in the response.maxTimeMS: maximum server-side waiting time for producing events.The getMore command will fill the response with up to batchSize results if that many events are
available. A response can also contain less events than the specified batchSize.
Regardless of the specified batch size, the maximum response size limit of 16MB will be honored, in
order to prevent responses from getting too large.
A change stream response is returned to the consumer when
batchSize events have been accumulated in the response, orIn case the change stream cursor has reached the end of the oplog and there are currently no events
to return, the response will be returned immediately if it already contains at least one event.
If the response is empty, the change stream will wait for at most maxTimeMS for new oplog entries
to arrive.
If no new oplog entries arrive within maxTimeMS, an empty response will be returned. If new oplog
entries arrive within maxTimeMS and at least one of them matches the change stream's filter, the
matching event will be returned immediately. If oplog entries arrive but do not match the change
stream's filter, the change stream will wait for matching oplog entries until maxTimeMS is fully
expired.
In general, the returned change stream events have the following fields:
_id: resume token for the event. This is not the same as the document id. The resume token
can be used to open a new change stream starting at the very same event.operationType: type of the change stream event.clusterTime: logical timestamp of when the event originally occurred.wallTime: wall-clock date/time of when the event originally occurred.ns: namespace inside which the event occurred.The following generic fields are added for change streams that were opened with the
showExpandedEvents flag:
collectionUUID: UUID of the collection for which the event occurred, if applicable.operationDescription: populated for DDL events.Most other fields are event type-specific, so they are only present for specific events. A few such fields include:
documentKey: the _id value of the affected document, populated for DML events. May contain the
shard key values for sharded collections.fullDocument: the full document for "insert" and "replace" events. Will also be populated for
"update" events if the change stream is opened with the fullDocument parameter set to any other
value than default.updateDescription / rawUpdateDescription: contains details for "update" events.The majority of change stream event fields are emitted by the ChangeStreamDefaultEventTransformation
object here. This object is called by the ChangeStreamEventTransform
stage here.
A custom $project stage in the change stream pipeline can be used to suppress certain fields.
Emitted change events can get large, especially if they contain pre- or post-images. In this case
the events can exceed the maximum BSON object size of 16MB, which can lead to BSONObjectTooLarge
errors when trying to process these change stream events.
To split large change stream events into multiple smaller chunks, change stream consumers can add
a $changeStreamSplitLargeEvent stage as the last step of their change stream pipeline, e.g.
db.getSiblingDB("testDB").runCommand({
aggregate: "testCollection",
pipeline: [
{
$changeStream: {},
},
{
$changeStreamSplitLargeEvent: {},
},
],
cursor: {},
});
The splitting is performed by the ChangeStreamSplitLargeEventStage stage here,
using this helper function.
The change stream consumer is responsible for assembling the split event fragments into a single
event later.
Collection-level and database-level change streams can return so-called "invalidate" events that close the change stream cursor in specific situations:
Issuing of change stream invalidate events is implemented in the ChangeStreamCheckInvalidateStage
here.
The behavior of change streams can be controlled via various parameters that can be passed with the
initial aggregate command used to open the change stream.
The parameters are defined in an IDL file.
The parameters that are provided when opening the change stream are automatically validated using mechanisms provided by the IDL framework. Additional validation of the change stream parameters is performed here. Invalid change stream parameters are immediately rejected with appropriate errors.
fullDocumentThe fullDocument change stream parameter controls what value should be returned inside the
fullDocument field for change stream DML "update" events.
The following values are possible:
default: the fullDocument field will only be populated for "insert" and "replace" events.updateLookup: the fullDocument field will be populated with the current version of the
document, identified by the document's _id value. Note that the current version of the document
may not be the same version of the document that was present when the "update" change event was
originally recorded. If no document can be found by the lookup, the fullDocument field will
contain null.whenAvailable: the fullDocument field will be populated with the post-image for the event.
The post-image is generated on the fly from a stored pre-image and applying a delta update from
the event on top of it. If no post-image is available, the fullDocument field will contain
null.required: populates the fullDocument field with the post-image for the event. Post-images are
generated in the same way as in whenAvailable. If no post-image can be generated, this will
abort the change stream with a NoMatchingDocument error.The latter two options rely on pre-images to be enabled for the target collection(s). When pre-images are enabled, they are written synchronously with the regular "update" oplog entry, and change stream events aren’t returned until both have been majority-committed.
Post-images for "update" events are added to change events by the ChangeStreamAddPostImage stage
here.
fullDocumentBeforeChangeThe fullDocumentBeforeChange change stream parameter controls what value should be returned inside
the fullDocumentBeforeChange field for change stream DML events ("update", "replace", "delete").
The following values are possible:
off (default): the fullDocumentBeforeChange field will always be omitted.
whenAvailable: the field will be populated with the pre-image of the document modified by the
current change event, if available. If no pre-image is available, the fullDocumentBeforeChange
field will contain null.
required: populates the fullDocumentBeforeChange field with the stored pre-image for the event
if it exists. If no pre-image is available, aborts the change stream with a NoMatchingDocument
error.
Pre-images are added to change events by the ChangeStreamAddPreImage stage
here.
There are also numerous flags that control the behavior of change streams. The most important flag parameters are:
showExpandedEvents (public)The showExpandedEvents flag can be used to make a change stream return both additional event types
and additional fields.
The flag defaults to false. In this mode, change streams will only return DML events and no DDL
events.
When setting showExpandedEvents to true, change streams will also emit events for various DDL
operations.
In addition, setting showExpandedEvents will make change streams return the additional fields
collectionUUID (for various change stream event types) and updateDescription.disambiguatedPaths
(for update events).
matchCollectionUUIDForUpdateLookup (public)The matchCollectionUUIDForUpdateLookup field can be used to ensure that "updateLookup" operations
are performed on the correct collection in case multiple collections with the same name have existed
over time.
This is relevant, because change streams can be opened retroactively on collections that were already
dropped and may have been recreated with the same name but different contents afterwards.
The flag defaults to false. In this case, "updateLookup" operations will not verify that the
looked-up document is actually from the same collection "generation" as the change event the
document was looked up for.
If set to true, "updateLookup" operations will compare the collection UUID of the change event
with the UUID of the collection. If there is a UUID mismatch, the returned fullDocument field of
the event will be set to null.
allChangesForCluster (public)This flag must be set when opening an all-cluster change stream. Will normally be set by drivers automatically when opening an all-cluster change stream.
showSystemEvents (internal)The showSystemEvents flag can be used to make change streams return events for collections inside
the system namespace. These are not emitted by default. Setting showSystemEvents to true will
also include events related to system collections in the change stream.
The flag defaults to false and is internal.
showMigrationEvents (internal)The showMigrationEvents flag can be used to make change streams return DML events that are
happening during chunk migrations. If set to true, insert and delete events related to chunk
migrations will be reported as if they were regular events.
The flag defaults to false and is internal.
showCommitTimestamp (internal)The showCommitTimestamp flag can be used to include the transaction commit timestamp inside DML
events that were part of a prepared transaction.
The flag defaults to true and is internal. It is used by the resharding.
showRawUpdateDescription (internal)The showRawUpdateDescription flag can be used to make change streams emit the raw, internal format
used for "update" oplog entries.
If set to true, emitted change stream "update" events will contain a rawUpdateDescription field.
The default is false. In this case, emitted change stream "update" events will contain the regular
updateDescription field.
allowToRunOnConfigDB (internal)The allowToRunOnConfigDB flag is an internal flag that can be used to open a change stream on the
config server in a sharded cluster. It is used internally by mongos to open a cursor on the config
server to keep track of shard additions and removals in the deployment.
$_passthroughToShard (internal)In sharded cluster deployments, all change streams are supposed to be opened on mongos. mongos
will open the required cursors to the data shards and the config server on the consumer's behalf.
If the consumer only wants to target a specific shard of the cluster, they can use the $_passthroughToShard
aggregation parameter to limit the change stream to a single shard.
For example, to open a collection-level change stream targeting only one of the cluster's shards
(identified by the value in shardId), the following example code can be used:
db.getSiblingDB("testDB").runCommand({
aggregate: "testCollection",
pipeline: [
{
$changeStream: {},
},
],
$_passthroughToShard: {shard: shardId},
cursor: {},
});
Using $_passthroughToShard will bypass the regular cluster shard targeting for change streams
and open a replica set change stream pipeline (only) on the targeted shard. The change events that
mongos retrieves from the single shard will be returned as is, without using a merge pipeline on
mongos.
When a change stream is opened against a replica set, the consumer opens the change stream directly on a replica set node, which can then return matching events immediately from the node's own oplog. The events are already correctly ordered, and the latency is defined by the node's replication lag and how close the change stream has advanced towards the end of the node's oplog.
Opening a change stream on a sharded cluster works differently. Here, the consumer opens the change stream against a mongos instance. The mongos instance will then use the cluster's topology information to open the cursors on the config server and the data shards on behalf of the consumer. Because of the ordering guarantee provided by change streams, mongos must wait until all cursors have either responded with events, or ran into a timeout and reported that currently no more events are available for them. The latter is why change streams in a sharded cluster can have higher latency than change streams in replica sets.
For sharded cluster change streams, the merging of the multiple streams of change events from the
different cursors is performed by the AsyncResultsMerger.
A change stream pipeline issued by a consumer contains the $changeStream meta stage.
This stage is expanded internally into multiple DocumentSources here.
The change stream DocumentSources are located in the src/mongo/db/pipeline directory here, among other DocumentSources that
are not related to change streams.
The DocumentSources are only used for pipeline building and optimization, but they are converted
into execution Stages later when the change stream is executed.
These Stages are located in the src/mongo/db/exec/agg directory here.
On a replica set, the $changeStream stage is expanded into the following internal stages:
$_internalChangeStreamOplogMatch$_internalChangeStreamUnwindTransaction$_internalChangeStreamTransform$_internalChangeStreamCheckInvalidate (only present for collection-level and database-level change
streams)$_internalChangeStreamCheckResumability$_internalChangeStreamAddPreImage (only present if fullDocumentBeforeChange is not set to off)$_internalChangeStreamAddPostImage (only present if fullDocument is not set to default)$_internalChangeStreamEnsureResumeTokenPresent (only present if the change stream resume token is
not a high water mark token)$match expression (only present if the user's change stream pipeline contains a
$match stage)$project expression (only present if the user's change stream pipeline contains a
$project stage)$_internalChangeStreamSplitLargeEvent (only present if the change stream is opened with the
$changeStreamSplitLargeEvent pipeline step)The change stream pipeline on replica sets will also contain a $match stage to filter out all non-DML
change events in case showExpandedEvents is not set.
For sharded cluster change streams, mongos will first expand the $changeStream stage into the
following internal stages:
$_internalChangeStreamOplogMatch$_internalChangeStreamUnwindTransaction$_internalChangeStreamTransform$_internalChangeStreamCheckInvalidate (only present for collection-level and database-level change
streams)$_internalChangeStreamCheckResumability$_internalChangeStreamAddPreImage (only present if fullDocumentBeforeChange is not set to off)$_internalChangeStreamAddPostImage (only present if fullDocument is not set to default)$match expression (only present if the user's change stream pipeline contains a
$match stage)$project expression (only present if the user's change stream pipeline contains a
$project stage)$_internalChangeStreamSplitLargeEvent (only present if the change stream is opened with the
$changeStreamSplitLargeEvent pipeline step)$_internalChangeStreamHandleTopologyChange$_internalChangeStreamEnsureResumeTokenPresent (only present if the change stream resume token is
not a high water mark token)Additionally, the change stream pipeline on a sharded cluster will contain a $match stage to
filter out all non-DML change events in case showExpandedEvents is not set.
After building the initial pipeline stages, mongos will split the pipeline into two parts:
The pipeline split point is above the $_internalChangeStreamHandleTopologyChange stage.
mongos will also add a $mergeCursors stage that aggregates the responses from different shards
and the config server into a single, sorted stream.
The shard pipeline will look like this:
$_internalChangeStreamOplogMatch$_internalChangeStreamUnwindTransaction$_internalChangeStreamTransform$_internalChangeStreamCheckInvalidate (only present for collection-level and database-level change
streams)$_internalChangeStreamCheckResumability$_internalChangeStreamAddPreImage (only present if fullDocumentBeforeChange is not set to off)$_internalChangeStreamAddPostImage (only present if fullDocument is not set to default)$match expression (only present if the user's change stream pipeline contains a
$match stage)$project expression (only present if the change stream pipeline contains a $project
stage)$_internalChangeStreamSplitLargeEvent (only present if the change stream is opened with the
$changeStreamSplitLargeEvent pipeline step)The merge pipeline on mongos will look like this:
$mergeCursors$_internalChangeStreamHandleTopologyChange$_internalChangeStreamEnsureResumeTokenPresent (only present if the change stream resume token is
not a high water mark token)$_internalChangeStreamOplogMatchThis stage is responsible for reading data from the oplog and filtering out irrelevant events.
The DocumentSourceChangeStreamOplogMatch code is here.
The oplog filter for the stage is built here.
There is no Stage equivalent for DocumentSourceChangeStreamOplogMatch, as it will be turned into
a $cursor stage for execution.
$_internalChangeStreamUnwindTransactionThis stage is responsible for "unwinding" (expanding) multiple operations that are contained in an
"applyOps" oplog entry into individual events.
The DocumentSourceChangeStreamUnwindTransaction code is here.
The ChangeStreamUnwindTransactionStage code is here.
$_internalChangeStreamTransformThis stage is responsible for converting oplog entries into change events. It will build a change
event document for every oplog entry that enters this stage.
Event fields are added based on the change stream configuration.
The DocumentSourceChangeStreamTransform code is here.
The ChangeStreamTransformStage code is here.
The actual event transformation happens inside ChangeStreamDefaultEventTransformation here.
$_internalChangeStreamCheckInvalidateThis stage is responsible for creating change stream "invalidate" events and is only added for
collection-level and database-level change streams.
The DocumentSourceChangeStreamCheckInvalidate code is here.
The ChangeStreamCheckInvalidate code is here.
When an invalidate event is encountered, the stage will first emit an "invalidate" event, and then
throws a ChangeStreamInvalidated exception on the next call. The ChangeStreamInvalidatedInfo.
exception type contains the error code ChangeStreamInvalidated.
$_internalChangeStreamCheckResumabilityThis stage checks if the oplog has enough history to resume the change stream, and consumes all
events up to the given resume point. If no data for the resume point can be found in the oplog
anymore, it will throw a ChangeStreamHistoryLost error.
The DocumentSourceChangeStreamCheckResumability code is here.
The ChangeStreamCheckResumabilityStage code is here.
$_internalChangeStreamAddPreImageThis stage is responsible for adding pre-image data to "update", "replace" and "delete" events. It
is only added to change stream pipelines if the fullDocumentBeforeChange parameter is not set to
off.
If enabled, the stage relies on the pre-images stored in the system's pre-image system collection.
The DocumentSourceChangeStreamAddPreImage code is here.
The ChangeStreamAddPreImageStage code is here.
$_internalChangeStreamAddPostImageThis stage is responsible for adding post-image data to "update" events. It is only added to change
stream pipelines if the fullDocument parameter is not set to default.
If fullDocument is set to updateLookup, the stage will perform a lookup for the current version
of a document that was updated by an "update" event, and store it in the fullDocument field of
the "update" event if present. The lookup is performed using the _id value of the document from
the change event. As the lookup is executed at a different point in time than when the change event
was recorded, it is possible that the lookup finds a different version of the document than the one
that was active when the change event was recorded. This can happen if the document was updated
again between the change event and the lookup. The lookup may also find no document at all if the
document was deleted after the "update" event, but before the lookup.
In case the lookup cannot find a document with the requested _id, it will populate the
fullDocument field with a value of null.
If fullDocument is set to whenAvailable or required, the stage will make use of the stored
pre-image of the document in the system's pre-image system collection. It will fetch the pre-image
and then apply the delta that is stored in the "update" change event on top of it, and store the
result in the fullDocument field.
The DocumentSourceChangeStreamAddPostImage code is here.
The ChangeStreamAddPostImageStage code is here.
$_internalChangeStreamEnsureResumeTokenPresentThis stage is used by change streams to ensure that the resume token that was specified as part of
the change stream parameters is actually in the stream. The stage is only present if the change
stream resume token is not a high water mark token. If the resume token cannot be found in the
stream, it will throw a ChangeStreamFatalError.
The DocumentSourceChangeStreamEnsureResumeTokenPresent code is here.
The ChangeStreamEnsureResumeTokenPresent code is here.
$_internalChangeStreamHandleTopologyChangeThis stage is only present in sharded cluster change streams and is always part of the mongos
merge pipeline. The stage is responsible for opening additional cursors to shards that have been
added to the cluster. It will handle "insert" events into the config.shards collection that
were observed from the config server.
The DocumentSourceChangeStreamHandleTopologyChange code can be found here.
The ChangeStreamHandleTopologyChangeStage code can be found here.