docs/internal/DistributedArchitectureGuide.md
The Distributed Area contains indexing and coordination systems.
The index path stretches from the user REST command through shard routing down to each individual shard's translog and storage engine. Reindexing is effectively reading from a source index and writing to a destination index (perhaps on different nodes). The coordination side includes cluster coordination, shard allocation, cluster autoscaling stats, task management, and cross cluster replication. Less obvious coordination systems include networking, the discovery plugin system, the snapshot/restore logic, and shard recovery.
A guide to the general Elasticsearch components can be found here.
Every elasticsearch node maintains various networking clients and servers, protocols, and synchronous/asynchronous handling. Our public docs cover user facing settings and some internal aspects - Network Settings.
The HTTP Server is a single entry point for all external clients (excluding cross-cluster communication). Management, ingestion, search, and all other external operations pass through the HTTP server.
Elasticsearch works over HTTP 1.1 and supports features such as TLS, chunked
transfer encoding, content compression, and pipelining. While attempting to
be HTTP spec compliant, Elasticsearch is not a webserver. ES Supports GET
requests with a payload (though some old proxies may drop content) and
POST for clients unable to send GET-with-body. Requests cannot be cached
by middle boxes.
There is no connection limit, but a limit on payload size exists. The default
maximum payload is 100MB after compression. It's a very large number and almost
never a good target that the client should approach. See
HttpTransportSettings class.
Security features, including basic security: authentication(authc), authorization(authz), Transport Layer Security (TLS) are available in the free tier and achieved with separate x-pack modules.
The HTTP server provides two options for content processing: full aggregation and incremental processing. Aggregated content is a preferable choice for small messages that do not fit for incremental parsing (e.g., JSON). Aggregation has drawbacks: it requires more memory, which is reserved until all bytes are received. Concurrent incomplete requests can lead to unbounded memory growth and potential OOMs. Large delimited content, such as bulk indexing, which is processed in byte chunks, provides better control over memory usage but is more complicated for application code.
Incremental bulk indexing includes a back-pressure feature. See org. elasticsearch.index.IndexingPressure. When memory pressure grows high
(LOW_WATERMARK), reading bytes from TCP sockets is paused for some
connections, allowing only a few to proceed until the pressure is resolved.
When memory grows too high (HIGH_WATERMARK) bulk items are rejected with 429.
This mechanism protects against unbounded memory usage and OutOfMemory
errors (OOMs).
ES supports multiple Content-Types for the payload. These are
implementations of MediaType interface. A common implementation is called
XContentType, including CBOR, JSON, SMILE, YAML, and their versioned types.
X-pack extensions includes PLAIN_TEXT, CSV, etc. Classes that implement
ToXContent and friends can be serialized and sent over HTTP.
HTTP routing is based on a combination of Method and URI. For example,
RestCreateIndexAction handler uses ("PUT", "/{index}"), where curly braces
indicate path variables. RestBulkAction specifies a list of routes
@Override
public List<Route> routes() {
return List.of(
new Route(POST, "/_bulk"),
new Route(PUT, "/_bulk"),
new Route(POST, "/{index}/_bulk"),
new Route(PUT, "/{index}/_bulk")
);
}
Every REST handler must be declared in the ActionModule class in the
initRestHandlers method. Plugins implementing ActionPlugin can extend the
list of handlers via the getRestHandlers override. Every REST handler
should extend BaseRestHandler.
The REST handler’s job is to parse and validate the HTTP request and construct a typed version of the request, often a Transport request. When security is enabled, the HTTP layer handles authentication (based on headers), and the Transport layer handles authorization.
Request handling flow from Java classes view goes as:
(if security enabled) Security.getHttpServerTransportWithHeadersValidator
-> `Netty4HttpServerTransport`
-> `AbstractHttpServerTransport`
-> `RestController`
-> `BaseRestHandler`
-> `Rest{Some}Action`
Netty4HttpServerTransport is a single implementation of
AbstractHttpServerTransport from the transport-netty4
module. The x-pack/security module injects TLS and headers validator.
Transport is the term for node-to-node communication, utilizing a TCP-based custom binary protocol. Every node acts as both a client and a server. Node-to-node communication almost never uses HTTP transport (except for reindex-from-remote).
Netty4Transport is the sole implementation of TCP transport, initializing
both the Transport client and server. The x-pack/security plugin provides
a secure version: SecurityNetty4Transport (with TLS and authentication).
A Connection between nodes is a pool of Channels, where each channel is a
non-blocking TCP connection (Java NIO terminology). Once a cluster is
discovered, a Connection (pool of Channels) is opened to every other node,
and every other node opens a Connection back. This results in two
Connections between any two nodes (A→B and B→A). A node sends requests only
on the Connection it opens (acting as a client). The default pool is around 13
Channels, divided into sub-pools for different purposes (e.g., ping,
node-state, bulks). The pool structure is defined in the ConnectionProfile
class.
ES never behaves incorrectly (e.g. loses data) in the face of network outages but it may become unavailable unless the network is stable. Network stability between nodes is assumed, though connectivity issues remain a constant challenge.
Request timeouts are discouraged, as Transport requests are guaranteed to
eventually receive a response, even without a timeout. SO_KEEPALIVE helps
detect and tear down dead connections. When a connection closes with an error,
the entire pool is closed, outstanding requests fail, and the pool is
reconnected.
There are no retries on the Transport layer itself. The application layer
decides when and how to retry (e.g., via RetryableAction or
TransportMasterNodeAction). In the future Transport framework might support
retries #95100.
Transport can multiplex requests and responses in a single Channel, but
cannot multiplex parts of messages. Each transport message must be fully
dispatched before the next can be sent. Proper application-layer sizing/chunking
of messages is recommended to ensure fairness of delivery across multiple
senders. A Transport message cannot be larger than 30% of heap (
org.elasticsearch.transport.TcpTransport#THIRTY_PER_HEAP_SIZE) or 2GB (due to
org.elasticsearch.transport.Header#networkMessageSize being an int).
The TransportMessage family tree includes various types (node-to-node,
broadcast, master node acknowledged) to ensure correct dispatch and response
handling. For example when a message must be accepted on all nodes.
Snapshotting to remote repositories involves different networking clients and SDKs. For example AWS SDK comes with Apache or Netty HTTP client, Azure with Netty-based Project-Reactor, GCP uses default Java HTTP client. Underlying clients may be reused between repositories, with varying levels of control over networking settings.
There are other features such as SAML/JWT metadata reloading, Watcher HTTP action, reindex and ML related features such as inference that also use HTTP clients.
ES handles a mix of I/O operations (disk, HTTP server, Transport client/server, repositories), resulting in a combination of synchronous and asynchronous styles. Asynchronous IO utilizes a small set of threads by running small tasks, minimizing context switch. Synchronous IO uses many threads and relies on an OS scheduler. ES typically runs with 100+ threads, where Async and Sync threads compete for resources.
Netty is a networking framework/toolkit used extensively for HTTP and Transport networks, providing foundational building blocks for networking applications.
Netty is an Async IO framework, it runs with a few threads. An event-loop is
a thread that processes events for one or many Channels (TCP connections).
Every Channel has exactly one, unchanging event-loop, eliminating the need to
synchronize events within that Channel. A single, CPU-bound Transport ThreadPool (e.g.,4 threads for 4 cores) serves all HTTP and Transport
servers and clients, handling potentially hundreds or thousands of connections.
Event-loop threads serve many connections each, it's critical to not block threads for a long time. Fork any blocking operation or heavy computation to another thread pool. Forking, however, comes with overhead. Do not fork simple requests that can be served from memory and do not require heavy computations (milliseconds).
Transport threads are monitored by ThreadWatchdog. A warning log appears if a
single task runs longer than 5 seconds. Slowness can be caused by blocking, GC
pauses, or CPU starvation from other thread pools.
Netty's controlled memory allocation provides a performance edge by managing and reusing byte buffer pools (e.g., pools of 1MiB byte chunks sliced into 16KiB pages). Some pages might not be in use while taking up heap space and show up in the heap dump.
Netty reads socket bytes into direct buffers, and ES copies them into pooled
byte-buffers (CopyBytesSocketChannel). The application is responsible for
retaining (increasing ref-count) and releasing (decreasing ref-count) for
pooled buffers.
Reference counting introduces two primary problems:
The compiler does not help detect these issues. They require careful testing using Netty's LeakDetector with a Paranoid level. It's enabled by default in all tests.
Every asynchronous operation in Netty returns a future. It is easy to forget to check the result, as a following call always succeeds:
ctx.write(message)
Check the result of an async operation:
ctx.write(message).addListener(f -> { if (f.isSuccess() ...)});
(We have many thread pools, what and why)
See the Javadocs for ActionListener
(TODO: add useful starter references and explanations for a range of Listener classes. Reference the Netty section.)
(long running actions should be forked off of the Netty thread. Keep short operations to avoid forking costs)
The RestClient is primarily used in testing, to send requests against cluster nodes in the same format as would users. There
are some uses of RestClient, via RestClientBuilder, in the production code. For example, remote reindex leverages the
RestClient internally as the REST client to the remote elasticsearch cluster, and to take advantage of the compatibility of
RestClient requests with much older elasticsearch versions. The RestClient is also used externally by the Java API Client
to communicate with Elasticsearch.
Every Elasticsearch node in a cluster is assigned a set
of roles that
define the kind of responsibilities it can own. Roles are represented internally by the DiscoveryNodeRole class.
Possible values are master, data, data_content, data_hot, data_warm, data_cold, data_frozen, ingest,
voting_only, remote_cluster_client, ml, transform, index and search.
Roles are configured via the node.roles config field in elasticsearch.yml. If node.roles is not explicitly set,
the node starts with
all default-enabled
roles. If node.roles is explicitly set, only the listed roles are enabled.
A node with the master role is master-eligible: it can participate in master elections and can be elected as the
cluster's master node.
A node with the data role (relevant for stateful ES only) can host shards and perform data related operations such as
CRUD, search, and aggregations.
Data-tier roles (relevant for stateful ES only) allow users to move indices to specific hardware specs as data grows older. This is managed by index lifecycle management policies. The available data tier roles are:
data_content for long-lived indices that can be queried and updated frequently, no matter how old they are.data_hot, which holds the most recent, and frequently queried, time-series data.data_warm, which typically holds less recent time-series data, that will not be read as often as the hot
tier and rarely needs to be updated.data_cold, for infrequently accessed time-series data, sometimes in the form of searchable snapshots.data_frozen, which holds rarely accessed data stored entirely as searchable snapshots.A node with the ingest role can
execute ingest pipelines.
A node with the ml role can run machine learning jobs and trained model inference. Typically, those are memory and
CPU-intensive workloads so having dedicated nodes with this role avoids contention with indexing and searching.
A node with the remote_cluster_client role can establish outbound connections to remote clusters
for cross-cluster search (CCS)
and cross-cluster replication (CCR).
A node with the transform role can execute transform jobs.
A node with the voting_only role participates in master elections and cluster state publication quorum, but can never
be elected as master itself. The voting_only
role must always be combined
with the master role. Voting-only nodes are useful for providing an additional vote for quorum without requiring the
node to handle the full master
workload (tiebreaker
nodes).
A node configured with an empty role list (node.roles: []) has no specific role and acts as
a coordinating-only
node. Such nodes can receive client requests, route them to the appropriate data or master nodes, and aggregate results.
They do not hold data, run ingest pipelines, or participate in master elections.
In stateless deployments, data is persisted in a shared
object storage and nodes' disk is only used as a cache layer. The data (and data tier) roles are disabled in favor of
two new roles:
index: indexing nodes, which host primary shards and handle all write operations.search: search nodes, which host search-only replica shards and handle read operations.Master-eligible (master role) nodes form the cluster's voting configuration and participate
in master elections.
The elected master node is responsible for cluster-wide coordination: it processes all cluster state
updates and publishes them to the rest of the cluster. The cluster maintains (conceptually, see term
section) at most a single master at all times; if the master fails, a new one is elected from the remaining
master-eligible nodes (excluding voting_only nodes, which can vote but cannot be elected).
The next few sections explore the cluster state and the master election process in more detail.
The ClusterState is the portion of the cluster state which is held in-memory by every node. To ensure
correctness, updates to the ClusterState require strong
consistency (linearizability). Without strong consistency, nodes could
observe conflicting cluster states (eg two nodes could believe they both hold the primary for the same shard, or apply
different mappings to the same index) leading to data loss or corruption.
The ClusterState is (conceptually) immutable: every update produces a new instance of the ClusterState class. The
elected master is responsible for coordinating all cluster state updates and publishing the latest to the other nodes in
the cluster. Note that ClusterState updates are expensive, often taking hundreds of milliseconds or more, so they
should only be executed when absolutely necessary.
The Metadata part of the ClusterState is persisted to disk via the PersistedClusterStateService and will survive
restarts. The rest of the ClusterState components are in-memory only, and need to be rebuilt every time there is a
full cluster restart.
The Metadata of a ClusterState is persisted on disk and comprises information from two categories:
A few standouts are:
clusterUUID: the cluster unique id.coordinationMetadata: the current term, voting configurations ... etc. (see Master Elections
section).persistentSettings: cluster-level settings applied
via the cluster settings API.customs: cluster-level custom metadata: NodesShutdownMetadata, RepositoriesMetadata .. etc.Notable components of the ProjectMetadata include:
id: the project unique id.indices: map of index name to IndexMetadata (settings, mappings, aliases, number of shards/replicas, etc.).
Contains all indices in this project.aliasedIndices and templates. See aliases
and templates for more details.customs: project-level custom metadata. Includes data stream definitions (via DataStreamMetadata project custom),
index lifecycle metadata (IndexLifecycleMetadata) and others.Note that some concepts are applicable to both cluster and project scopes, e.g. persistent tasks.
The rest of the ClusterState is ephemeral, it is not persisted to disk and is rebuilt from scratch if the cluster restarts. Some key components are:
nodes (DiscoveryNodes)The set of all nodes currently in the cluster. Tracks the masterNodeId, the localNodeId and categorizes nodes by
role (master-eligible, data and ingest). Also records the minimum and maximum node versions and supported index versions
across the cluster.
routingTable (GlobalRoutingTable)Maps each project to its RoutingTable, which itself maps each index to an IndexRoutingTable containing
an IndexShardRoutingTable per shard. Each IndexShardRoutingTable lists the ShardRouting entries for a shard,
detailing which node owns its, its state (UNASSIGNED,INITIALIZING, STARTED or RELOCATING), and whether
it is a primary or replica.
blocks (ClusterBlocks)Cluster-level, project-level and index-level blocks that restrict certain operations for the relevant scope. For
example, an index can be blocked for writes during a close operation, or the entire cluster can be set to read-only.
Blocks operate at
five levels:
READ, WRITE, METADATA_READ, METADATA_WRITE, and REFRESH.
customsAdditional custom ephemeral state. Examples include SnapshotsInProgress, RestoreInProgress, SnapshotDeletionsInProgress, and HealthMetadata.
compatibilityVersions and minVersions (CompatibilityVersions)Per-node and cluster-wide min TransportVersion and system index mappings versions used to figure out serialization compatibility across the cluster.
clusterFeatures (ClusterFeatures)Info on what features are present throughout the cluster.
The version field is
a monotonically increasing
long that uniquely identifies committed states. It is technically persisted to disk by PersistedClusterStateService,
which stores it as last_accepted_version in the Lucene commit user data alongside the cluster metadata. On node
startup, this version is loaded back and used to initialize the in-memory cluster state, so the monotonically-increasing
property is preserved across restarts.
The stateUUID field uniquely identifies every state, including uncommitted ones. It is not persisted.
The MasterService is the single-threaded coordinator for all cluster state updates on the master node. It organizes
pending cluster state update tasks into Priority-based queues: IMMEDIATE, URGENT, HIGH, NORMAL, LOW, and
LANGUID. It processes them in that order, highest priority first.
The MasterService uses a batching framework that groups multiple cluster state update tasks together, processing them
as a single batch and publishing only one resulting ClusterState update. This avoids triggering a distinct publication
for each individual task, which would be very costly.
Producers of cluster state update tasks, such as SnapshotsService, can
then define
their own task queues, priority and batch executors (ClusterStateTaskExecutor), which the MasterService uses to
group and process related tasks together.
A queue processor,
backed by a single-threaded java.util.concurrent.ExecutorService, processes batches one at a time.
The executeAndPublishBatch
method takes the next batch, invokes the batch's ClusterStateTaskExecutor to compute a new ClusterState, and
publishes the result.
If the new state is identical to the previous one (by reference), no publication takes place. Otherwise, the master
assigns a new version and stateUUID to the state and proceeds to publication.
sequenceDiagram
participant M as Master Node
participant F as Follower Node
rect rgb(255, 248, 240)
Note over M,F: Phase 1 — Publish
M->>F: PublishRequest (diff or full state)
M->>M: Local PublishRequest/Response
Note right of F: PublicationTransportHandler
::handleIncomingPublishRequest
Deserializes state
Note right of F: CoordinationState
::handlePublishRequest
Validates term/version,
persists as lastAcceptedState
F->>M: PublishResponse (term, version)
Note left of M: CoordinationState
::handlePublishResponse
Collects responses until quorum
end
rect rgb(255, 220, 220)
alt Quorum not reached
Note over M: FailedToCommitClusterStateException
Master steps down, new election begins
end
end
rect rgb(255, 248, 240)
Note over M,F: Phase 2 — Commit
M->>F: ApplyCommitRequest
M->>M: Local ApplyCommitRequest/Response
Note right of F: Coordinator::handleApplyCommit
Marks last accepted state as committed
Note right of F: ClusterApplierService
::onNewClusterState
Applies new ClusterState
F->>M: ACK (after application completes)
Note left of M: ClusterApplierService
::onNewClusterState
Master applies its own state
end
Once the MasterService has computed a new ClusterState, it passes it to the Coordinator, which is responsible for then sharing with it with all nodes in the cluster. This publication process follows a two-step commit protocol. The progress of the publication for each follower node is tracked via a PublicationTarget object which maintains a PublicationTargetState state.
The Coordinator creates
a PublicationContext
via the PublicationTransportHandler, which pre-serializes the cluster state into shared, reference-counted byte
buffers. A ClusterStateDiff against the previous state is prepared. A full serialization is also prepared when
the previous state's blocks had its disable state persistence flag set, or when a node present in the new state was
absent from the previous one (since that node will not have a previous state to apply a diff against).
It then builds a PublishRequest and creates a CoordinatorPublication
to coordinate
the protocol.
The master
then sends
the serialized cluster state to every node in the cluster. As an optimization, the diff is sent to nodes that were
already part of the cluster, while new nodes receive the full state directly. If a node
responds
with IncompatibleClusterStateVersionException
the PublicationTransportHandler
then retries
with the full state.
The recipient nodes verify the received state is valid, compare it against their local state via CoordinationState.handlePublishRequest and send back a PublishResponse containing the new cluster state term and version. At that point, the recipient nodes have persisted the new state but have not yet applied it.
Once the master has collected enough PublishResponses from master-eligible nodes to satisfy quorum (
see Quorum section), it creates an ApplyCommitRequest. The master then sends this commit message to all
nodes that responded to the PublishRequest.
When receiving this request, each
node marks this last accepted state as committed
and proceeds to apply the new state (see Cluster State Application section). The node
only ACKs the ApplyCommitRequest back to the master after application completes (i.e. after
ClusterApplierService.onNewClusterState finishes).
If the master cannot achieve a PublishResponse quorum (e.g. too many nodes are faulty), the Publication will fail the entire publication with a FailedToCommitClusterStateException. The master will then step down (it can no longer be master if it is not able to publish cluster states) and a new election begins.
Once the ApplyCommitRequests have been sent, the master will
try waiting
for all nodes to apply the new state
before applying its own state
and moving on. No quorum of responses is needed at that point. If follower nodes time out, the master will still
move on to applying the new cluster state locally.
When a new ClusterState is committed, the follower nodes need to apply the committed state locally. This is the responsibility of the ClusterApplierService class, which runs on a single dedicated thread.
When receiving
an ApplyCommitRequest from the master, the Coordinator will hand over the committed state to
the ClusterApplierService, which
will submit
an UpdateTask
to the executor. If the new state differs from the currently applied state (by reference), the
service applies
the changes through the following sequence of steps:
A ClusterChangedEvent is created, holding the new state, the previous state, and their deltas.
The node opens network connections to newly added nodes (blocking call).
If needed, the updated cluster settings are applied.
ClusterStateAppliers (which update the node's internal state) are invoked in priority order. Example high-priority appliers are IndicesClusterStateService, which creates, deletes and updates local shard copies, and RepositoriesService, which manages snapshot repository lifecycle.
Transport connections to nodes that are no longer in the cluster are closed (non-blocking call).
ClusterApplierService::state
is updated
to the new state, making it visible to the rest of the classes on the node.
ClusterStateListeners are notified. Unlike appliers, listeners run after the state is visible. Examples include the PersistentTasksNodeService which reacts to changes in persistent tasks, and the SnapshotShardsService which watches for snapshot-related shard assignments.
Once application completes, the Coordinator's
callback onClusterStateApplied
closes the election scheduler (if active) and stops peer finding, since the node is now part of a cluster with a
functioning master.
As mentioned in previous sections, only the Metadata portion of the ClusterState is persisted to disk (
see Persisted State). The rest of the ClusterState (routing table, node membership, in-progress
snapshots ... etc.) is ephemeral and rebuilt from scratch after each full cluster restart.
The PersistedClusterStateService is responsible for storing the cluster metadata in a simple Lucene index in each of
the node's data paths, under the
_state directory.
The metadata is split across several documents and is written incrementally where possible.
The PersistedClusterStateService uses 3 types of documents, each identified by a type string field: global,
index and mapping. Each document type stores its content in the data field as compressed SMILE-encoded XContent.
All documents are paged (default 1MB per page).
The global document serializes almost all the Metadata object: coordination metadata, persistent settings, customs,
all project-scoped data, except individual index metadata and mappings. The index documents (one document per index,
keyed by index_uuid) store all the IndexMetadata: settings, aliases, number of shards, etc., except for
the index mapping. The mapping documents (one document per mapping, keyed by mapping_hash) store index mappings.
Storing mappings separately is a deduplication optimization: multiple indices that share the same mapping (common
occurrence, eg in data streams) all reference the same mapping hash, so only one copy is stored.
During incremental writes,
the PersistedClusterStateService will first add new mappings and delete obsolete ones. It will then compare the index
metadata for each index UUID between the old and new state. If the version changed, the old document is deleted and a
fresh one is written. Unchanged indices are skipped entirely. The global document update is not incremental. If the
new state's global fields differ from the old ones, the global document
is deleted and rewritten entirely.
Each Lucene commit also records the current term, the last accepted cluster state version, the node ID, the node version, the oldest index version (used for compatibility checks on node startup) and the cluster UUID in the commit user data.
The coordination layer interacts with the recorded state on disk through the PersistedState interface, which the CoordinationState calls at three key points during the publication protocol:
handleStartJoin).On master-eligible nodes, the PersistedState implementation is LucenePersistedState, which writes synchronously on
the cluster coordination thread. On non-master-eligible nodes, the AsyncPersistedState wrapper is used instead. It
applies updates to an in-memory state immediately and queues the disk write to a background thread. This avoids blocking
the coordination thread on disk I/O for nodes that do not participate in master elections.
On startup, the
node reads
the Lucene index, selects the persisted state with the highest accepted term, and reconstructs the
Metadata from the stored documents and commit user data. This metadata is then used to build the initial
ClusterState that the node starts with.
The master's core responsibility is updating the ClusterState and propagating the changes to other nodes.
The cluster maintains (conceptually at least, see term section) at most a single master at all times. If no master is present, master-eligible nodes will attempt to become the master by starting an election. The cluster will not be able to commit any ClusterState changes until a new master is elected.
To elect a master, Elasticsearch uses a consensus algorithm derived
from Paxos (see
also Paxos Made Simple
and Raft for more approachable resources). This algorithm is formally defined in a TLA+
specification referenced from the CoordinationState class. The CoordinationState class is responsible for
maintaining the safety property of the algorithm,
whereas the Coordinator ensures liveness,
by guaranteeing progress in cases of timeouts, failures and/or conflicts. The Coordinator also manages a node's
transition between CANDIDATE,LEADER, and
FOLLOWER modes,
and constructs the main data structures used for cluster state publication.
The CoordinationMetadata instance stored in ClusterState::Metadata, tracks the coordination-related state needed for subsequent master elections.
A quorum is defined as a strict majority (votedNodesCount * 2 > nodeIds.size()) of the nodes listed in
a VotingConfiguration. CoordinationMetadata maintains two such configurations: lastCommittedConfiguration and
lastAcceptedConfiguration. Most of the time, these two configurations are equal. They only diverge during
voting configuration changes.
The Reconfigurator handles the logic for computing the voting configuration changes during the transition.
To guarantee safety during voting configuration updates, an election will only succeed if quorum is achieved for both configurations.
Every election takes place in a
numbered term.
Terms are monotonically increasing longs that act as logical clocks for the coordination layer, allowing the cluster
to distinguish between master mandates coming from distinct elections. They are persisted
via CoordinationState.PersistedState.
A term can have at most one master. A node will never vote in the same term twice.
Note that nodes may not necessarily be aware of the same latest term at the same physical time. The practical consequence of this is that two nodes may temporarily both believe they are the master at the same physical time. But because there is only ever a single master per term, terms are always monotonically increasing and cluster state updates require a quorum of votes, at most one of them will be able to update the cluster state.
Below is an overview of the election process, including the main code components for each step. Subsequent sections provide more detailed explanations of the mechanics and rationale behind each step.
A follower detects current master failure.
LeaderChecker
Coordinator.onLeaderFailure()
CANDIDATE.The node transitions to CANDIDATE mode and triggers the discovery process.
Coordinator.becomeCandidate()
Mode.CANDIDATE
PeerFinder.activate(...)
The candidate probes master-eligible nodes from the last accepted state and the configured hosts resolver. It asks if there is a current master and what other master-eligible nodes are in the cluster.
PeerFinder.handleWakeUp()
ConfiguredHostsResolver
PeerFinder.startProbe(...)
PeersRequest
PeersResponse
If no current master is discovered and the node has enough master-eligible peers to form a quorum, the candidate schedules an election (with randomized backoff to minimize conflicts with other potential candidates).
CoordinatorPeerFinder.onFoundPeersUpdated()
Coordinator.startElectionScheduler()
ElectionScheduler.scheduleNextElection()
The election starts. The candidate double checks it is healthy and eligible to be master. The candidate sends pre-vote requests to master-eligible peers.
NodeEligibility
PreVoteCollector.start(...)
PreVoteRequest
PreVoteResponse
If enough PreVoteResponses are received to satisfy quorum, the candidate proceeds to the real election.
ElectionStrategy.isElectionQuorum(...)
Coordinator.startElection()
StartJoinRequest.The candidate computes the new term: max(current, seen) + 1 and sends StartJoinRequest with the new term to all
nodes discovered in previous step.
Coordinator.getTermForNewElection()
Coordinator.broadcastStartJoinRequest(...)
StartJoinRequest
Master-eligible nodes ack the StartJoinRequest, persist the latest term, verifies they are healthy enough to cast a
vote, and send a JoinRequest to the candidate.
CoordinationState.handleStartJoin(...)
CoordinationState.setCurrentTerm(...)
JoinHelper.sendJoinRequest(...)
LEADERIf enough JoinRequests are received to satisfy quorum, the candidate becomes LEADER.
Coordinator.processJoinRequest(...)
CoordinationState.handleJoin(...)
ElectionStrategy.isElectionQuorum(...)
Coordinator.becomeLeader()
Mode.LEADER
The leader publishes the cluster state, cleans up discovery connections, and starts heartbeating.
CandidateJoinAccumulator.close(...)
JoinTask
LeaderHeartbeatService.start(...)
The cluster uses two complementary checks to detect node failures: the LeaderChecker (run by followers) and the FollowersChecker (run by the leader).
Peer-to-peer communication in Elasticsearch typically involves two distinct TCP connections, so having both checks is necessary to ensure full connectivity. They also serve distinct conceptual purposes: when the leader becomes unreachable, followers must detect it and start an election, whereas when a follower becomes unreachable, the leader will remove it from the cluster so its shards can be reallocated.
Each follower runs a LeaderChecker that periodically sends a LeaderCheckRequest to the current master (every 1 second by default). Upon receiving the request, the master will verify that its local node is healthy, that it is still the elected master, and that the requesting node is part of the current cluster state. If all conditions are met, it will then ack the request.
If the check fails more times than the configured limit (3 by default), if the master reported itself as unhealthy or if
the master disconnected, the follower will conclude that the
master has failed.
The follower will stop the periodic check and notify the Coordinator,
which will transition the node to CANDIDATE mode and start the election process.
On the master side, the FollowersChecker class maintains a map of nodes in the current cluster state to FollowerChecker objects.
Each FollowerChecker periodically (by default every 1s) sends a FollowerCheckRequest containing the current term and the identity of the master node. When receiving this request, each follower will first verify that its local node is healthy (and throw if not), and then check whether the request term matches its current term. If it does, then the follower will ack the request right away. Otherwise, if the request term is higher than the node current term, the request is handed off to the Coordinator so that the node can update its local state and become follower of the master in the new term.
If the follower disconnects or the check fails too many times (3 by default), the master will conclude that the node has
failed
and add it to
its set
of faultyNodes and untrack its corresponding FollowerChecker object. The Coordinator will get notified and
eventually remove the node from the set of discovered nodes in the ClusterState (which will cause its shards to be
reallocated to other healthy nodes).
Discovery is a fast "gossip-like" protocol by which a node in CANDIDATE mode locates master-eligible nodes in the
cluster and/or the master itself.
A node enters CANDIDATE mode either on startup or after detecting a master failure. The PeerFinder class contains
the core logic for the discovery process.
Discovery starts with both the nodes from the last accepted cluster state and a set of resolved seed addresses, that are obtained from the different SeedHostsProvider implementations:
discovery.seed_hosts config. This config is read on
startup and non-refreshable.unicast_hosts.txt file in the config
directory. Note that because it's dynamic, using this file is usually preferred to the discovery.seed_hosts config.SeedHostsResolver is in charge of converting the seed hosts strings to transport addresses using DNS resolution.
When discovery
is active, PeerFinder
will
run handleWakeUp
every 1s (by default). Each iteration will clean up disconnected peers, send a PeersRequest to known master-eligible
nodes (from the last accepted state and the resolved seed addresses)
and schedule
the next handleWakeUp iteration
When receiving a PeersResponse, PeerFinder will reach out to all peers specified in the response, including a potential master. If the peer node reports itself as the master and its term is greater or equal to the local one, then the node will update its local term and try to join the master.
Because the node will eagerly reach out to every peer it learns about (both via the content of a PeersResponse or even
when receiving a new PeersRequest), the discovery protocol will find every other master-eligible node in at most
roughly ⌈log₂(D)+1⌉ steps, where D is the diameter of the graph of seed host configurations.
Note that discovery connections are managed separately from the regular cluster transport connections: they are held by the PeerFinder and used only during discovery. Once a master is found or elected, all discovery connections are released.
The pre-vote phase takes place before a candidate bumps the term. It is meant to prevent a partitioned node from disrupting a healthy term by forcing a re-election when it rejoins the cluster. It ensures that a candidate can only proceed to broadcast a higher term once a quorum of peers agrees that there is no active leader.
When receiving a PreVoteRequest from a candidate node, the receiver
will check
whether it currently knows of an active leader. If not, and if the receiver is healthy, it will reply with
a PreVoteResponse containing its currentTerm, lastAcceptedTerm, and lastAcceptedVersion. If it still considers a
different node to be the leader, it will reject the request.
Pre-vote rounds are triggered by the ElectionSchedulerFactory, which schedules each attempt after a random delay that grows linearly with the number of failed attempts (up to a configured maximum). This randomized backoff reduces the chances that two candidates will run pre-vote rounds simultaneously, avoiding conflicts and giving each attempt enough time to complete before the next one begins.
For additional details on the theory behind the pre-voting phase, see the Pre-voting in distributed consensus article. This blog post also describes a real-world example of a liveness failure caused by this check being omitted.
The Join process is the mechanism by which a node is added to the ClusterState by the master. Most of the join-related logic is located in the Coordinator class, which still delegates specific components to its JoinHelper instance.
Within a master term, a node that starts up will send a JoinRequest to the discovered master (see Discovery section for more details) so it can get added (back) to the cluster. The master will handle the incoming join request by first validating that it can connect back to the joining node, that the joining node is able to deserialize the current ClusterState, and that the request's term and version match its local ones. Then, the master will trigger voting reconfiguration if needed, and submit a JoinTask to the master queue to add the new node to the ClusterState.
During a master election, JoinRequests from peer nodes also act as votes (via the optional Join field) for the candidate that started the election. The join requests are then processed slightly differently by the candidate node. Rather than being submitted immediately to the master queue, incoming joins are buffered by the joinAccumulator. Each Join vote is recorded in CoordinationState. Once quorum is reached, the candidate will become leader, and flush all accumulated joins into a single JoinTask for the master queue to process.
Stateful and Serverless ES share the same Coordinator core logic, but they differ in how they check leader liveness, and define quorum.
In Serverless, the elected master periodically writes a Heartbeat (containing the current term and timestamp) to an external blob store, the HeartbeatStore, via the StoreHeartbeatService. Other nodes check whether the leader is alive by reading the latest heartbeat from the store.
In Serverless, the master uses a single node voting configuration with only itself (defined by the SingleNodeReconfigurator). Committing and publishing a new cluster state therefore only requires the master's own acknowledgment. Similarly, during elections, the pre-vote phase does not contact any peers. Instead, the AtomicRegisterPreVoteCollector solely checks the latest heartbeat from the blob store and starts the election if it's older than the configured threshold (default 30 seconds). To become master, the candidate node will atomically override the current term in the blob store via a CAS operation.
When a cluster starts from scratch, no persisted cluster state exists on disk. The PersistedClusterStateService
returns an empty on-disk state with a CoordinationMetadata containing VotingConfiguration.EMPTY_CONFIG for both the
last-committed and last-accepted configurations. The cluster first needs to
bootstrap
and establish its initial voting configuration before an election can take place.
The ClusterBootstrapService is responsible for this one-time bootstrapping process and runs on master-eligible nodes.
For production clusters, the operator has to provide the list of master-eligible nodes that should form the initial
cluster via the
cluster.initial_master_nodes
setting.
ClusterBootstrapService uses this setting
to construct
its bootstrapRequirements.
On startup, a master-eligible node will
first become candidate,
which will also start the discovery process (see Discovery section). Each time the PeerFinder reports
newly discovered peers, ClusterBootstrapService
will check
whether the discovered nodes satisfy the bootstrap requirements, i.e. when
a strict majority
of the nodes specified in cluster.initial_master_nodes are found.
When this condition is met, ClusterBootstrapService
will construct
a VotingConfiguration from the discovered node IDs
and set
the initial cluster state. The node will
then schedule
an election (see Master Elections for more details). Any requirements that could not be matched to
a discovered node are added
as placeholder
entries. These placeholders occupy slots in the voting configuration's nodeIds set and affect the quorum size, but
they cannot cast a vote until the real nodes come online and replace them.
When cluster.initial_master_nodes is not present,
and no discovery config is provided
either, ClusterBootstrapService
will schedule
a best-effort bootstrap after the discovery.unconfigured_bootstrap_timeout (default 3 seconds). This simply uses all
master-eligible nodes discovered so far. This is inherently unsafe and is only intended for development and testing.
If discovery.type: single-node is set, ClusterBootstrapService uses the local node name as the sole bootstrap
requirement. The node bootstraps itself immediately.
If more than half of the master-eligible nodes are permanently lost and no snapshot is available, the last-resort
UnsafeBootstrapMasterCommand (elasticsearch-node unsafe-bootstrap) can force a single surviving master-eligible
node to become the new cluster leader. It works by setting both voting configurations
to a single-node configuration containing only the local node and regenerating the cluster UUID.
The remaining data nodes cannot join the newly bootstrapped master because their persisted state still references the
old cluster. The DetachClusterCommand (elasticsearch-node detach-cluster) sets both their voting configurations
to VotingConfiguration.MUST_JOIN_ELECTED_MASTER, containing the single node ID _must_join_elected_master_,
which prevents nodes from starting their own elections and forces them to discover and join the already-elected master.
It then resets
the persisted term to 0 and marks clusterUUIDCommitted as false.
Many cluster operations (creating indices, managing snapshots, etc.) run on the elected master node because they result in ClusterState updates. The TransportMasterNodeAction class is the base class for such operations. It provides a common framework that handles routing requests to the current master, retrying when the master changes, and checking for cluster blocks.
See TransportMasterNodeAction Javadoc for a detailed description of the execution flow and retry mechanism.
(More Topics: ReplicationTracker concepts / highlights.)
(How a primary shard is chosen)
(terms and such)
(How an index write replicates across shards -- TransportReplicationAction?)
(What guarantees do we give the user about persistence and readability?)
(rarely use locks)
The IndexShard class is the single entry point for all shard-level operations: indexing, deletion, real-time GET,
refresh, flush, recovery, and snapshot. There is exactly one IndexShard object per allocated shard.
An IndexShard holds references to:
Directory and provides access to the shard's Lucene index files on disk.Store.The lifecycle of these objects (creation, recovery, and teardown) is controlled by the IndicesClusterStateService, which reacts to cluster state changes and updates local state accordingly (see the IndicesClusterStateService section).
The Store is the lowest-level Elasticsearch persistence abstraction for a shard. Each shard has a single
dedicated Store that wraps a
Lucene Directory, which is Lucene's
own file-system abstraction used to read and write index files on disk.
Lucene's Directory is a pure I/O abstraction: callers open
an IndexInput to read a named file
and create an IndexOutput to
write one. The Store builds on the Lucene Directory capabilities by adding reference counting and corruption
detection, exposing committed file metadata and enforcing integrity invariants.
The Store implements RefCounted. Callers call store.incRef() before using it and store.decRef() in
a finally block when done. Once the reference count drops to zero the store is closed and the underlying Lucene
directory is cleaned up. The Store also receives a ShardLock at construction time and only releases it
once closed, allowing other threads waiting to acquire the lock for this shard to proceed.
The Lucene Directory used by a Store is created by
an IndexStorePlugin.DirectoryFactory.
The default built-in implementation is FsDirectoryFactory, which
supports several store types
selectable via the
index.store.type setting:
hybridfs (default):
a HybridDirectory
that delegates
to a MMapDirectory for
performance-sensitive file types (postings, term vectors index, norms, vectors, etc.)
and NIOFSDirectory for files
where sequential access is preferred (e.g. stored fields). Direct I/O is also enabled for vector index files when
supported by the OS. The HybridDirectory selects between mmap and NIO on a per-file basis
by checking
the file's Lucene extension and I/O context.mmapfs: uses MMapDirectory for all files.niofs: uses NIOFSDirectory for all files.fs: automatically selects the best type for the underlying file system.The Store
exposes MetadataSnapshots
for each index commit, read and constructed from the Lucene segments_N files. A MetadataSnapshot is a point-in-time
map from filename to StoreFileMetadata for the committed files belonging to a Lucene index commit, produced by an
Elasticsearch flush.
Each StoreFileMetadata includes the file's name, on-disk length, CRC32 checksum (from the Lucene file footer), the
Lucene version that wrote it, and a writerUuid that uniquely identifies the writer.
MetadataSnapshots are leveraged by several Elasticsearch workflows, including peer recovery and
replica shard allocation (via TransportNodesListShardStoreMetadata). They are used to compare the on-disk state of two
distinct shards and calculate how much data needs to be transferred to bring them into sync.
Access to a shard's on-disk data is protected by three layers of locking, each scoped to a different level.
The ShardLock is a node-wide, coarse-grained lock managed by NodeEnvironment. It is backed by a
Semaphore and guarantees that at most one owner at a time has write access to a given shard directory
within a JVM process. The
Store is given
a ShardLock at creation time. It holds this lock for its entire lifetime, ensuring that write operations (e.g.
creating an IndexWriter, deleting shard files, or recovering from another shard) have exclusive access to the shard
directory.
Callers that need to access the directory without a live Store (e.g. TransportNodesListShardStoreMetadata reading
metadata for allocation
decisions) acquire a temporary
ShardLock for the duration of the read.
The metadataLock
is an in-process ReentrantReadWriteLock inside the Store. It guards access to the
directory's file listing and segment metadata. Operations that only read metadata (e.g. getMetadata with an
existing
commit) take the read lock,
allowing concurrent readers. Operations that structurally modify the
directory (e.g. renaming temporary recovery files via renameTempFilesSafe, cleaning up stale files
via cleanupAndVerify, or running CheckIndex) take the write lock, which excludes all readers and other
writers until the operation completes.
Lucene's IndexWriter write lock
is a native file lock (write.lock) that Lucene places in the index
directory. It prevents two IndexWriter instances from ever opening the same directory simultaneously, even
across processes. When the Store needs to read metadata from a directory that has no active engine (e.g.
during getMetadata(commit=null, lockDirectory=true)), it acquires the Lucene write lock together with the
metadataLock write lock, ensuring no writer can interfere. In normal operation, the running InternalEngine
already holds this
lock through its IndexWriter.
To summarize, ShardLock is a JVM-level lock enforced across the entire node, metadataLock
coordinates in-process readers and writers within the Store, and the Lucene write lock guards the raw
directory at the file-system level.
The Store maintains
corruption markers
as special files prefixed with corrupted_ written into the Lucene directory.
When an unrecoverable I/O error or checksum mismatch
is detected,
a marker file is written containing the serialized exception. Subsequent calls to failIfCorrupted() scan for these
marker files and throw a CorruptIndexException if any are found, preventing any further operations on a known-bad
shard. Corruption markers are removed only after the shard has been successfully recovered from another source (e.g. a
primary or a snapshot).
The Engine abstract class is the Elasticsearch abstraction that manages and coordinates operations on the running
shard index. Where the Store manages files on disk, the Engine owns the write
path (indexing, deletion, no-ops), the read path (searcher acquisition, real-time GET), and Lucene lifecycle
operations (refresh, flush, merge). It also controls the translog, ensuring that every acknowledged write is
durably recorded before a response is sent.
The main implementation is InternalEngine, used for all read-write shards (primary and replica). Other implementations serve more specialized roles. For example, the ReadOnlyEngine gives read-only access to a frozen shard, and the NoOpEngine acts as a placeholder for shards belonging to a closed index. It exists to allow shards of closed indices to be correctly replicated in case of a node failure.
Lucene organizes index data on disk into immutable files called segments. They are created whenever the in-memory
write buffer is flushed or when merges combine existing segments into larger ones.
The IndexWriter
accumulates writes in memory and periodically flushes new segments. The InternalEngine wraps an IndexWriter (for
writes) and a pair of internal and
external reader manager
that wrap
Lucene DirectoryReaders and
map Elasticsearch concepts onto Lucene's:
refresh triggers a Lucene NRT reader reopen (via DirectoryReader.openIfChanged()), making
recently indexed documents searchable without writing a full commit to disk. Refreshes do not call
IndexWriter.commit() and do not persist data durably.flush calls IndexWriter.commit(), writing a durable commit point to disk. A flush is
what allows the translog to be safely truncated up to that commit.This distinction is a core piece of Elasticsearch's durability model. Documents are searchable after a refresh but only durably stored after a flush. In between, the translog bridges the gap. Every write is appended to the translog on disk before being acknowledged, so that it can be replayed in the event of a crash.
The detailed mechanics of how a write flows through the engine (including translog interaction, sequence number assignment, and version map updates) are covered in the Translog and Indexing / CRUD sections.
The IndicesClusterStateService is a high-priority ClusterStateApplier of the ClusterApplierService ( see Cluster State Application). Any time a new cluster state is published, IndicesClusterStateService checks whether the state of indices and shards has changed and updates the local node's shards to match accordingly.
Its doApplyClusterState method goes through the following operations in order each time a new cluster state
is applied:
IndexShard (which closes its
Engine, flushing first), releases the Store reference, and deletes the on-disk shard directory.Engine (without flushing, unflushed data is already in the
translog and safe), releases the Store reference, but leaves the on-disk files intact for later cleanup by
IndicesStore.IndexShard and then to its Engine (e.g. merge
scheduler config, GC deletes policy, soft-delete retention). If mappings changed, updates the MapperService. No
disk writes happen here.INITIALIZING state, creates a new Store and IndexShard and kicks
off recovery. For already-active shards, updates their routing metadata in-place.The below diagram illustrates the end-to-end flow from a master publishing a new cluster state containing a
newly assigned shard down to the Engine becoming active.
sequenceDiagram
participant M as Master Node
participant CA as ClusterApplierService
participant ICSS as IndicesClusterStateService
participant IS as IndexShard
participant S as Store
participant E as InternalEngine
rect rgb(255, 248, 240)
Note over M,CA: Cluster State Committed
M->>CA: ApplyCommitRequest (new ClusterState)
CA->>ICSS: applyClusterState(ClusterChangedEvent)
end
rect rgb(240, 248, 255)
Note over ICSS,S: Shard Creation
ICSS->>ICSS: doApplyClusterState()
deleteIndices()
removeIndicesAndShards()
updateIndices()
ICSS->>S: new Store(shardDirectory)
ICSS->>IS: new IndexShard(store, shardRouting, ...)
ICSS->>IS: startRecovery(recoverySource)
end
rect rgb(240, 255, 240)
Note over IS,E: Recovery and Engine Start
IS->>S: validate / prepare directory
IS->>E: new InternalEngine(engineConfig)
E->>S: IndexWriter.open(store.directory)
Note right of E: Engine open, shard STARTED
end
rect rgb(255, 240, 240)
Note over IS,M: Report Back to Master
IS->>M: ShardStateAction.shardStarted()
Note left of M: Master updates ClusterState
marks shard as STARTED
end
For a newly assigned shard, createShard
first acquires
a ShardLock (preventing concurrent access to the same shard directory from a concurrent request) or throws a
ShardLockObtainFailedException if it fails to do so. It
then creates
a Store referencing the shard's on-disk directory,
and instantiates
an IndexShard. Recovery is
then triggered
based on the RecoverySource in the ShardRouting.
The details of recovery, including how the Engine is created and started, are covered in the
Peer Recovery, Snapshot Recovery, and Local Shards Recovery
sections.
It is important to understand first the Basic write model of documents:
documents are written to Lucene in-memory buffers, then "refreshed" to searchable segments which may not be persisted on disk, and finally "flushed" to a durable Lucene commit on disk.
If this was the only way we stored the data, we would have to delay the response to every write request until after the data had been flushed to disk, which could take many seconds or longer. If we didn't, it would mean that we would lose newly ingested data if there was an outage between sending the response and flushing the data to disk.
For this reason, newly ingested data is also written to a shard's Translog, whose main purpose is to persist uncommitted operations (e.g., document insertions or deletions), so they can be replayed by just reading them sequentially from the translog during recovery in the event of ephemeral failures such as a crash or power loss.
The translog can persist operations quicker than a Lucene commit, because it just stores raw operations / documents without the analysis and indexing that Lucene does.
The translog is always persisted and fsync'ed on disk before acknowledging writes back to the user.
This can be seen in InternalEngine which calls the add() method of the translog to append operations, e.g., its index() method at some point adds a document insertion operation to the translog.
The translog ultimately truncates operations once they have been flushed to disk by a Lucene commit; indeed, in some sense the point of a "flush" is to clear out the translog.
Main usages of the translog are:
Translog files are automatically truncated when they are no longer needed, specifically after all their operations have been persisted by Lucene commits on disk. Lucene commits are initiated by flushes (e.g., with the index Flush API).
Flushes may also be automatically initiated by Elasticsearch, e.g., if the translog exceeds a configurable size INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING or age INDEX_TRANSLOG_FLUSH_THRESHOLD_AGE_SETTING, which ultimately truncates the translog as well.
A bulk request will repeatedly call ultimately the Engine methods such as index() or delete() which adds
operations to the Translog.
Finally, the AfterWrite action of the TransportWriteAction will call indexShard.syncAfterWrite() which will put the last written translog Location of the bulk request into a AsyncIOProcessor that is responsible for gradually fsync'ing the Translog and notifying any waiters.
Ultimately the bulk request is notified that the translog has fsync'ed past the requested location, and can continue to acknowledge the bulk request.
This process involves multiple writes to the translog before the next fsync(), and this is done so that we amortize the cost of the translog's fsync() operations across all writes.
Each translog is a sequence of files, each identified by a translog generation ID, each containing a sequence of operations, with the last file open for writes.
The last file has a part which has been fsync'ed to disk, and a part which has been written but not necessarily fsync'ed yet to disk.
Each operation is identified by a sequence number (seqno), which is monotonically increased by the engine's ingestion functionality.
Typically the entries in the translog are in increasing order of their sequence number, but not necessarily.
A Checkpoint file is also maintained, which is written on each fsync operation of the translog, and is necessary because it records important metadata and statistics about the translog, such as the current translog generation ID, its last fsync'ed operation and location (i.e., we should read only up to this location during recovery), the minimum translog generation ID, and the minimum and maximum sequence number of operations the sequence of translog generations include, all of which are used to identify the translog operations needed to be replayed upon recovery.
When the translog rolls over, e.g., upon the translog file exceeding a configurable size, a new file in the sequence is created for writes, and the last one becomes read-only.
A new commit flushed to the disk will also induce a translog rollover, since the operations in the translog so far will become eligible for truncation.
A few more words on terminology and classes used around the translog Java package.
A Location of an operation is defined by the translog generation file it is contained in, the offset of the operation in that file, and the number of bytes that encode that operation.
An Operation can be a document indexed, a document deletion, or a no-op operation.
A Snapshot iterator can be created to iterate over a range of requested operation sequence numbers read from the translog files.
The sync() method is the one that fsync's the current translog generation file to disk, and updates the checkpoint file with the last fsync'ed operation and location.
The rollGeneration() method is the one that rolls the translog, creating a new translog generation, e.g., called during an index flush.
The createEmptyTranslog() method creates a new translog, e.g., for a new empty index shard.
Each translog file starts with a TranslogHeader that is followed by translog operations.
Some internal classes used for reading and writing from the translog are the following.
A TranslogReader can be used to read operation bytes from a translog file.
A TranslogSnapshot can be used to iterate operations from a translog reader.
A MultiSnapshot can be used to iterate operations over multiple TranslogSnapshots.
A TranslogWriter can be used to write operations to the translog.
The Get API (and by extension, the multi-get API) supports a real-time mode, which can query documents by ID, even recently ingested documents that have not yet been refreshed and not searchable.
This capability is facilitated by another data structure, the LiveVersionMap, which maps recently ingested documents by their ID to the translog location that encodes their indexing operation.
That way, we can return the document by reading the translog operation.
The tracking in the version map is not enabled by default. The first real-time GET induces a refresh of the index shard, and a search to get the document, but also enables the tracking in the version map for newly ingested documents. Thus, next real-time GETs are serviced by going first through the version map, to query the translog, and if not found there, then search (refreshed data) without requiring to refresh the index shard.
On a refresh, the code safely swaps the old map with a new empty map. That is because after a refresh, any documents in the old map are now searchable in Lucene, and thus we do not need them in the version map anymore.
The IndexVersion tracks the on-disk format of an index. It is conceptually similar to TransportVersion
(which controls wire-protocol compatibility between nodes) but instead targets how index data and metadata are
serialized to Lucene files. Every time the serialization format of mappings, postings, doc values, or any other
persisted index structure changes, a new IndexVersion constant must be added.
The IndexVersion class contains two fields: an integer id and the
Lucene Version
that the index was written with. The stored Lucene version is used for Lucene API calls that depend on the version,
such as reading segment metadata.
The IndexVersion class was introduced in 8.8.0. Before that,
the node release Version was used for both purposes. Prior to 8.9.0 the id field was the same as the release version,
for backwards compatibility. In 8.9.0 it changed to an incrementing number, and disconnected from the release version.
All known versions are declared as constants in IndexVersions (e.g. UPGRADE_TO_LUCENE_10_4_0,
SEMANTIC_TEXT_FIELD_TYPE, or GENERIC_DENSE_VECTOR_FORMAT).
This list
serves as the source of truth for version-to-Lucene mappings and merge-conflict checks.
The IndexVersion
is stamped
on every index at creation time via
the IndexMetadata
index.version.created setting. This version is immutable for the lifetime of the index and determines which code
paths are used when reading its data
(PostRecoveryMerger optimization example).
A separate index.version.compatibility
setting
(defaulting to index.version.created)
can be set
to a newer version to opt in to newer behavior while retaining backward-compatible defaults. For example, when
restoring a snapshot of an index with an index.version.created older than MINIMUM_READONLY_COMPATIBLE,
RestoreService.convertLegacyIndex()
sets index.version.compatibility to the minimum index version supported by the cluster's nodes and adds
a write block, so that the index can be opened as a read-only archive. The subsequent paragraphs contain more details
about index version compatibility.
Elasticsearch supports a two-major-version compatibility window for index data. Indices created in the current major version (N) or the previous major version (N-1) are fully supported for reading and writing. The minimum index version for this full support is defined by IndexVersions.MINIMUM_COMPATIBLE. Note that Lucene enforces similar constraints.
Indices created in the penultimate major version (N-2), i.e. older than MINIMUM_COMPATIBLE but at or above
IndexVersions.MINIMUM_READONLY_COMPATIBLE,
can only be opened in read-only mode. They must have been marked as read-only (with a write block) on the previous
major version before upgrading. Their mappings and field types are still fully understood by the current version. Only
writes are blocked.
Indices older than MINIMUM_READONLY_COMPATIBLE are classified as legacy. Their on-disk format
is too old for the current version to fully understand their mappings or field types, so they are automatically converted
to degraded read-only archives via RestoreService.convertLegacyIndex()
when restored from a snapshot.
Both MINIMUM_COMPATIBLE and MINIMUM_READONLY_COMPATIBLE are bumped with each new major release to maintain this
window.
For more details on how index format compatibility interacts with upgrades, see the Index Format Backwards Compatibility section in the General Architecture Guide.
Apache Lucene is the search and indexing library at the core of every Elasticsearch shard. Each shard is a Lucene index. Instead of treating Lucene as an opaque storage engine at the bottom of the stack, Elasticsearch deeply integrates with it. It customizes file I/O, codecs, merge behavior, and reader lifecycle.
For more details on how Elasticsearch wraps Lucene's Directory for file I/O, see the Store section.
For how the IndexWriter and DirectoryReader are managed, including the distinction between "refresh"
(NRT reopen) and "flush" (durable Lucene commit), see Segments, Refresh, and Flush.
A Lucene index on disk is a collection of segment files plus a
segments_N info file that
records which segments belong to the latest commit. Each segment is a self-contained, immutable mini-index composed
of several file types, each responsible for a different data structure:
| Extension | Content |
|---|---|
.si | Segment metadata (doc count, unique id, diagnostics, Lucene version). |
.fnm | Field infos: field names, types (string, numeric, etc.), and index options (stored, indexed, doc values). |
.fdm, .fdt | Stored fields metadata and data (original JSON _source, stored field values). |
.tim, .tip, .doc, .pos, .pay | The inverted index: term dictionary, postings lists, positions, and payloads. |
.dvd, .dvm | Doc values data and metadata (columnar storage for sorting, aggregations, scripting). |
.tvd, .tvx | Term vectors data and index. |
.nvm, .nvd | Norms (per-field length normalization factors used in scoring). |
.liv | Live documents bitset (tracks which docs have been deleted within the segment). |
.vec, .vex, .vem | KNN vector data and graph (HNSW or other ANN structure). |
.kdm, .kdi, .kdd | Points / BKD tree (numeric range queries, geo). |
Check out the Lucene documentation for more details on each of those specific files. Elasticsearch also maintains a registry of all known Lucene file extensions in LuceneFilesExtensions.
The segments_N file and the write.lock file live at the directory root. A commit atomically publishes a new
segments_N+1 as the active commit point, making the new set of segments visible.
The old segment files are removed once they are no longer
referenced by any open
IndexReader or any retained commits. Elasticsearch's
CombinedDeletionPolicy, which implements Lucene's
IndexDeletionPolicy,
manages which commits are retained. All commits more recent than
the safe commit
(the most recent commit whose max sequence number is at most the global checkpoint, used as the starting point
for peer recovery) are preserved. Older commits can also
be pinned
by external consumers (e.g., snapshot operations). CombinedDeletionPolicy also
communicates the safe commit checkpoint information
with the translog deletion policy.
Lucene's Codec abstraction lets each field type choose its own on-disk format. A codec is a bundle of format implementations: one for postings, one for stored fields, one for doc values, one for vectors, etc.
Elasticsearch ships its own codec stack, managed by the CodecService class. It extends the current Lucene codec (e.g. Lucene104Codec) with Elasticsearch-specific customizations:
CodecService to
deduplicate FieldInfos
metadata objects loaded from each segment's .fnm file. Field names, attribute keys, and attribute maps that are
identical across segments are interned
so they share a single object in memory, reducing heap usage on indices with many segments.index.mapping.synthetic_id), the AbstractTSDBSyntheticIdCodec
avoids writing
the _id inverted index and stored field to disk. The _id field is still declared as indexed, but
SyntheticIdField
produces an empty token stream so no postings are actually written on flush.
At read time, the TSDBSyntheticIdPostingsFormat reconstructs postings from _tsid, @timestamp, and
_ts_routing_hash doc values, and the TSDBSyntheticIdStoredFieldsReader synthesizes _id stored-field values on
the fly from the same doc values. An
ES94BloomFilterDocValuesFormat
bloom filter is also built for the _id field so that existence checks during indexing can avoid _tsid and
@timestamp doc-values lookups per document. The synthetic ID feature reduces the storage overhead for
high-cardinality time-series indices.As the IndexVersion advances with each Lucene upgrade, new codec implementations are introduced (e.g. Elasticsearch92Lucene103Codec). Older segments written by previous codecs remain readable because Lucene's SPI mechanism loads the codec that originally wrote each segment.
Because Lucene segments are immutable, updates and deletes cannot modify documents in place. Over time, small segments accumulate from successive flushes. Merges combine multiple segments into fewer, larger ones. Merges both reclaim space from obsolete documents and improve query performance by reducing the number of segments a search must visit.
MergePolicys decide
which segments to merge and when. Elasticsearch configures this through MergePolicyConfig, which uses
Lucene's TieredMergePolicy
as the default base policy, and
LogByteSizeMergePolicy
for time-series indices. Then the InternalEngine
wraps
this base policy with several layers:
_id field postings for deleted documents during segment merges, maintaining
consistent update performance even when a large number of soft deleted/updated documents are retained._recovery_source stored fields and _seq_no-related fields for documents
that no longer need them for replication.MergeSchedulers decide how merges are executed. The default implementation is ThreadPoolMergeScheduler, which runs merges on the ThreadPoolMergeExecutorService shared node-level thread pool with disk-aware I/O throttling.
Elasticsearch never hard-deletes documents from Lucene. Instead, both updates and deletes
use Lucene's soft-delete mechanism. The InternalEngine
calls
IndexWriter.softUpdateDocument,
which atomically marks the previous version of a document as soft-deleted
(by setting a __soft_deletes numeric doc-values field to 1) and writes the new version or a delete
tombstone into the current segment.
The old document is also marked as deleted in the .liv bitset, just like a hard delete. The soft-delete update
triggers a new generational .liv file for the segment that contained the previous version of the document.
The difference between hard and soft deletes is that hard-deleted documents are always discarded during merges,
whereas documents carrying the __soft_deletes marker can be selectively retained. Lucene's
SoftDeletesRetentionMergePolicy
decides which soft-deleted documents to keep by evaluating a retention query at merge time.
Elasticsearch provides this query through its SoftDeletesPolicy, which evaluates whether a document can be discarded
based on the minimum sequence number to retain (evaluated from the global checkpoint), the
index.soft_deletes.retention.operations setting, and any active retention leases held by replicas or CCR followers.
Soft deletes are an essential feature for CCR. CCR followers replay operations from their leader Lucene index to stay up to date. Without soft deletes, the leader Lucene index could discard deleted documents during merges, leaving no trace that the delete operation ever occurred. CCR followers that fall behind would no longer be able to catch up via operation replay. Soft-deleted tombstones prevent this by preserving delete operations in the Lucene index until all followers have processed them.
When a shard is created on a node, it starts out empty. Recovery is the process of loading the shard's data from
some data source into the newly created IndexShard in order to make it available for index or search requests.
When the shard allocation process has chosen a node for a shard, it records its choice by writing an updated ShardRouting
record into the cluster state's IndexRoutingTable. The ShardRouting entry includes RecoverySource metadata that
describes where the shard's data can be found based on the shard's previous allocation. For example, a shard for a newly created
index will have its recoverySource set to EMPTY_STORE to indicate that recovery should bring up the shard without loading
any existing data, while a recoverySource of EXISTING_STORE would tell recovery to load the shard from files already
present on disk, likely because the node was restarted and had hosted the shard until it shut down.
The IndicesClusterStateService on each node listens for updates to the IndexRoutingTable and when it finds that a
shard in state INITIALIZING has been assigned to its node, it creates a fresh IndexShard for the assigned shard and kicks off a recovery process
for that node, using the RecoverySource in the ShardRouting entry to determine the parameters of the recovery process.
The full list of recovery types is defined in RecoverySource.Type. The various modes are discussed below, roughly in
order of complexity. Some modes build on others; for example, snapshot recovery sets up a local data store by copying
files from a snapshot source and then uses EXISTING_STORE recovery. Similarly, if there is any local data, then
peer recovery starts by using EXISTING_STORE recovery to bring the local shard as close to up to date as it can, and then
finishes synchronizing the shard through RPCs (Remote Procedure Calls) to an active source shard.
At the end of the recovery process, recovery finalization marks the shard as STARTED in cluster state, which makes it
available to handle index and search requests.
The simplest form of recovery is EMPTY_STORE, which is just what it sounds like. This recovery type causes the recovery
process to invoke StoreRecovery for the recovery. StoreRecovery will create an empty shard directory on disk (deleting
any existing files that may be present) and bootstrap an empty translog and then tell the Engine to use that directory.
That's pretty much the whole process.
Only slightly more complex is EXISTING_STORE recovery, which is used when the node is expected to have an up-to-date
copy of the shard's data already present on disk. Once again StoreRecovery manages the recovery process, but in this
case it expects to find Lucene files and a translog already present. It validates the Lucene files, potentially dropping
any incomplete commits that may not have been fsynced to disk before shutdown, tells the engine to open the directory
as its backing store, and then replays the transaction log to replay any operations that may have been acknowledged to
the client but not written into a durable Lucene commit. At that point the shard is ready to serve requests.
In snapshot recovery, the data source is a snapshot of the index shard stored on a remote repository. Snapshot recovery,
also managed in StoreRecovery but invoked through the recoverFromRepository method, downloads and unpacks the snapshot
from the remote repository into the local shard directory and then invokes the same logic as EXISTING_STORE to bring
the shard on line, with some small differences. The key one is that the snapshot is taken from a Lucene commit, and
so it does not need to store the shard translog when the snapshot is taken, or restore it during recovery. Instead
it creates a new empty translog before bringing the shard on line.
Local shards recovery is a type of recovery that reuses existing data from other shard(s) allocated on the current node (hence local shards). It is used exclusively to implement index Shrink/Split/Clone APIs.
This recovery type uses HardlinkCopyDirectoryWrapper to hard link or copy data from the source shard(s) directory. Copy is used if the runtime environment does not support hard links (e.g., on Windows). Source shard(s) directories are added using the IndexWriter#addIndexes API. Once an IndexWriter is correctly set up with source shard(s), the necessary data modifications are performed (like deleting excess documents during split) and a new commit is created for the recovering shard. After that recovery proceeds using standard store recovery logic utilizing the commit that was just created.
Whereas the other recovery modes are used to bring up a primary shard, peer recovery is used to add replicas of an already active primary shard. Peer recovery is also used for primary shard relocation, but relocation goes through the standard peer recovery process to bring a replica in sync, before handing off the primary role to the recovered replica during finalization.
Unlike StoreRecovery, peer recovery is managed through a separate service on the node recovering the shard, the
PeerRecoveryTargetService. When the IndexShard sees that its recovery source is of type PEER, it hands over the
recovery process to PeerRecoveryTargetService by invoking its startRecovery method. This service begins by creating
an in-memory record of the recovery process to track its progress, and then runs EXISTING_STORE recovery in case the
recovering replica held a copy of the shard before that has gone out of sync (e.g., because the node holding the
replica restarted). Because the shard is a replica, it only recovers up to the latest known global checkpoint for the shard
and discards any operations in the local store that are ahead of that point (see [Translog][#Translog] for details).
Once the local shard has been brought close to current, the service then sends a request to a corresponding service on the source node, PeerRecoverySourceService,
to complete synchronization.
Synchronization begins by discovering any differences between the source and target shards and transmitting any missing files to the target shard. The source for the files can be the source shard itself, but if a snapshot of the shard is available that has some subset of the files to be transmitted, then recovery will fetch them from the snapshot in order to reduce load on the source shard.
The next step is to transfer any operations from the source translog. Since the source shard is active, it may be receiving index operations while recovery is in process. So, to ensure that the target shard doesn't miss any new operations, the source shard adds the target to the shard's replication group (see the [replication][#Replication] docs) before completing the operation transfer phase. Because of this ordering, any operations accepted on the shard between the time it reads and sends the latest operation in the translog and the time the replica completes recovery are sent through the request replication process and will not be lost. Once the target has been added to the recovery group, the source reads the latest sequence number from its transaction log knowing that any updates past that will be handled by recovery, and replays the translog to the target up to that point.
At this point the target is ready to be started as an in sync replica. However, peer recovery is also used to perform
primary relocation. If the target shard is being recovered in order to take over as primary, then the finalization
stage will call IndexShard.relocate to complete the handoff of primary responsibilities. This method blocks operations
on the source shard and sends an RPC to the target shard with the ReplicationTracker.PrimaryContext needed
to promote the target to primary. Once the target acknowledges the handoff, the source shard moves itself into
replica mode.
(Frozen, warm, hot, etc.)
Each index consists of a fixed number of primary shards. The number of primary shards cannot be changed for the lifetime of the index. Each primary shard can have zero-to-many replicas used for data redundancy. The number of replicas per shard can be changed dynamically.
The allocation assignment status of each shard copy is tracked by its ShardRoutingState. The RoutingTable and RoutingNodes objects
are responsible for tracking the data nodes to which each shard in the cluster is allocated: see the routing package javadoc for more
details about these structures.
The DesiredBalanceShardsAllocator is what runs shard allocation decisions. It leverages the DesiredBalanceComputer to produce
DesiredBalance instances for the cluster based on the latest cluster changes (add/remove nodes, create/remove indices, load, etc.). Then
the DesiredBalanceReconciler is invoked to choose the next steps to take to move the cluster from the current shard allocation to the
latest computed DesiredBalance shard allocation. The DesiredBalanceReconciler will apply changes to a copy of the RoutingNodes, which
is then published in a cluster state update that will reach the data nodes to start the individual shard recovery/deletion/move work.
The DesiredBalanceReconciler is throttled by cluster settings, like the max number of concurrent shard moves and recoveries per cluster
and node: this is why the DesiredBalanceReconciler will make, and publish via cluster state updates, incremental changes to the cluster
shard allocation. The DesiredBalanceShardsAllocator is the endpoint for reroute requests, which may trigger immediate requests to the
DesiredBalanceReconciler, but asynchronous requests to the DesiredBalanceComputer via the ContinuousComputation component. Cluster
state changes that affect shard balancing (for example index deletion) all call some reroute method interface that reaches the
DesiredBalanceShardsAllocator to run reconciliation and queue a request for the DesiredBalancerComputer, leading to desired balance
computation and reconciliation actions. Asynchronous completion of a new DesiredBalance will also invoke a reconciliation action, as will
cluster state updates completing shard moves/recoveries (unthrottling the next shard move/recovery).
The ContinuousComputation saves the latest desired balance computation request, which holds the cluster information at the time of that
request, and a thread that runs the DesiredBalanceComputer. The ContinuousComputation thread takes the latest request, with the
associated cluster information, feeds it into the DesiredBalanceComputer and publishes a DesiredBalance back to the
DesiredBalanceShardsAllocator to use for reconciliation actions. Sometimes the ContinuousComputation thread's desired balance
computation will be signalled to exit early and publish the initial DesiredBalance improvements it has made, when newer rebalancing
requests (due to cluster state changes) have arrived, or in order to begin recovery of unassigned shards as quickly as possible.
There are different priorities in shard allocation, reflected in which moves the DesiredBalancerReconciler selects to do first given that
it can only move, recover, or remove a limited number of shards at once. The first priority is assigning unassigned shards, primaries being
more important than replicas. The second is to move shards that violate any rule (such as node resource limits) as defined by an
AllocationDecider. The AllocationDeciders holds a group of AllocationDecider implementations that place hard constraints on shard
allocation. There is a decider, DiskThresholdDecider, that manages disk memory usage thresholds, such that further shards may not be
allowed assignment to a node, or shards may be required to move off because they grew to exceed the disk space; or another,
FilterAllocationDecider, that excludes a configurable list of indices from certain nodes; or MaxRetryAllocationDecider that will not
attempt to recover a shard on a certain node after so many failed retries. The third priority is to rebalance shards to even out the
relative weight of shards on each node: the intention is to avoid, or ease, future hot-spotting on data nodes due to too many shards being
placed on the same data node. Node shard weight is based on a sum of factors: disk memory usage, projected shard write load, total number
of shards, and an incentive to distribute shards within the same index across different nodes. See the WeightFunction and
NodeAllocationStatsAndWeightsCalculator classes for more details on the weight calculations that support the DesiredBalanceComputer
decisions.
The elected master node creates a shard allocation plan with the DesiredBalanceShardsAllocator and then selects incremental shard
movements towards the target allocation plan with the DesiredBalanceReconciler. The results of the DesiredBalanceReconciler is an
updated RoutingTable. The RoutingTable is part of the cluster state, so the master node updates the cluster state with the new
(incremental) desired shard allocation information. The updated cluster state is then published to the data nodes. Each data node will
observe any change in shard allocation related to itself and take action to achieve the new shard allocation by: initiating creation of a
new empty shard; starting recovery (copying) of an existing shard from another data node; or removing a shard. When the data node finishes
a shard change, a request is sent to the master node to update the shard as having finished recovery/removal in the cluster state. The
cluster state is used by allocation as a fancy work queue: the master node conveys new work to the data nodes, which pick up the work and
report back when done.
DesiredBalanceShardsAllocator#submitReconcileTask for the master node's cluster state update post-reconciliation.IndicesClusterStateService#doApplyClusterState for the data node hook to observe shard changes in the cluster state.ShardStateAction#sendShardAction for the data node request to the master node on completion of a shard state change.The Autoscaling API in ES (Elasticsearch) uses cluster and node level statistics to provide a recommendation for a cluster size to support the current cluster data and active workloads. ES Autoscaling is paired with an ES Cloud service that periodically polls the ES elected master node for suggested cluster changes. The cloud service will add more resources to the cluster based on Elasticsearch's recommendation. Elasticsearch by itself cannot automatically scale.
Autoscaling recommendations are tailored for the user based on user defined policies, composed of data roles (hot, frozen, etc.) and deciders. There's a public webinar on autoscaling, as well as the public Autoscaling APIs docs.
Autoscaling's current implementation is based primary on storage requirements, as well as memory capacity for ML and frozen tier. It does not yet support scaling related to search load. Paired with ES Cloud, autoscaling only scales upward, not downward, except for ML nodes that do get scaled up and down.
Autoscaling is a plugin. All the REST APIs can be found in autoscaling/rest/.
GetAutoscalingCapacityAction is the capacity calculation operation REST endpoint, as opposed to the
other rest commands that get/set/delete the policies guiding the capacity calculation. The Transport
Actions can be found in autoscaling/action/, where TransportGetAutoscalingCapacityAction is the
entrypoint on the master node for calculating the optimal cluster resources based on the autoscaling
policies.
AutoscalingMetadata implements Metadata.ClusterCustom in order to persist autoscaling policies. Each Decider is an implementation of AutoscalingDeciderService. The AutoscalingCalculateCapacityService is responsible for running the calculation.
TransportGetAutoscalingCapacityAction.computeCapacity is the entry point to AutoscalingCalculateCapacityService.calculate, which creates a AutoscalingDeciderResults for each autoscaling policy. AutoscalingDeciderResults.toXContent then determines the maximum required capacity to return to the caller. AutoscalingCapacity is the base unit of a cluster resources recommendation.
The TransportGetAutoscalingCapacityAction response is cached to prevent concurrent callers
overloading the system: the operation is expensive. TransportGetAutoscalingCapacityAction contains
a CapacityResponseCache. TransportGetAutoscalingCapacityAction.masterOperation
calls through the CapacityResponseCache, into the AutoscalingCalculateCapacityService, to handle
concurrent callers.
The Deciders each pull data from different sources as needed to inform their decisions. The
DiskThresholdMonitor is one such data source. The Monitor runs on the master node and maintains
lists of nodes that exceed various disk size thresholds. DiskThresholdSettings contains the
threshold settings with which the DiskThresholdMonitor runs.
The ReactiveStorageDeciderService tracks information that demonstrates storage limitations are causing
problems in the cluster. It uses an algorithm defined here. Some examples are
DiskThresholdMonitor to find out whether nodes are exceeding their storage capacityThe ProactiveStorageDeciderService maintains a forecast window that defaults to 30 minutes. It only
runs on data streams (ILM, rollover, etc.), not regular indexes. It looks at past index changes that
took place within the forecast window to predict resources that will be needed shortly.
There are several more Decider Services, implementing the AutoscalingDeciderService interface.
Snapshot copies index data files from node local storage to a remote repository. These files can later be restored from the repository back to local storage to re-create the index. In addition to indices, it can also backup and restore the cluster state Metadata so that settings, templates, pipelines and other configurations can be preserved.
Snapshots are deduplicated in that it does not copy a data file if it has already been copied in a previous snapshot. Instead, it adds a reference to the existing file in the metadata stored in the repository, effectively a ref-tracking system for the data files. This also means we can freely delete any snapshot without worrying about affecting other snapshots.
This snapshots Java package documentation provides a good explanation on how snapshot operations work.
Restoring a snapshot is a process which largely relies on index recoveries. The restore service initializes the process by preparing shards of restore indices as unassigned with snapshot as their recovery source (SnapshotRecoverySource) in the cluster state. These shards go through the regular allocation process to be allocated. They then recover on the target nodes by copying data files from the snapshot repository to local storage.
Both snapshot and restore are coordinated by the master node, while index data transfer is done by the data nodes. The communications from the master node to data nodes are always cluster state updates. Data nodes send transport requests to the master node to update the status. These requests, at the end, also triggers cluster state updates which can be further reacted upon by the data nodes until the processes are complete.
A Repository must be created before any snapshot operation can take place. There are different types of repositories. The most common ones are file system based (FS) and cloud storage based (S3, GCS, Azure) which all extend the BlobStoreRepository class. A repository must be registered as writeable with a single cluster while registered as readable with multiple clusters. NOTE registering a repository as writeable with multiple clusters can lead to data corruption. We try our best to detect such situation, but it's not completely foolproof.
The content structure of a repository is similar to the local index storage structure, with indices folder
holding indices data separate by their UUID and shard ID. Here is a simple example of a repository structure:
my-repository/
├── index-2 <-- The root blob file of the repository, also called repository generation file
├── index.latest
├── indices
│ └── 8K9JNuqjTnygrjKY8qmsiA <-- UUID of the snapshotted index. Not the same index UUID in the cluster
│ ├── 0 <-- shard 0
│ │ ├── __I0e0reaMQzuXv8fY1GYD2w <-- data file
│ │ ├── __XqE3EVhOREWBnCHASLALtw <-- another data file
│ │ ├── index-pPXvvdFWSmajXZcfrIwggA <-- shard level generation file
│ │ └── snap-3kXOuTRmTFm6VMcEqPkKNQ.dat <-- shard level snapshot info
│ └── meta-tdTzfI8BkmhGlJ2SvOok.dat <-- index metadata
├── meta-3kXOuTRmTFm6VMcEqPkKNQ.dat <-- cluster metadata
└── snap-3kXOuTRmTFm6VMcEqPkKNQ.dat <-- snapshot information
See also the blobstore Java package documentation for more details on the repository structure.
The most important file in the repository is the index-N file, where N is a numeric generation number starting
from 9. Its corresponding Java class is RepositoryData.
This file holds the global state of the repository, including all valid snapshots and their corresponding
indices, shards and index metadata. Every mutable operation on the repository, such as creating or deleting a snapshot,
results in a new index-N file being created with an incremented generation number. The index.latest file stores
the latest repository generation and is effectively a pointer to the latest index-N file.
The repository is not rescuable if the repository generation file is corrupted. This is the reason that we are very careful when updating this file by leveraging cluster consensus to ensure only the latest master node update it to a generation that is accepted by the rest of the cluster members. The updating process also attempts to detect concurrent writes to avoid multiple clusters writing to the same repository. This is done by comparing the expected repository generation to the actual generation files in the repository. If this file is corrupted, as reported occasionally on SDHs, it is almost certain that some other external process has modified it.
If other files in the repository are corrupted, we can usually delete the broken snapshots and retain the good ones. The broken snapshots usually lead to exception being thrown when accessed by APIs like the GetSnapshot API. Since v8.16.0, there is also VerifyRepositoryIntegrity API that can be used to actively scan the repository and identify any corrupted snapshots.
The state of a repository must always transition through fully valid states in that there are no dangling references to non-existent blobs. This is an important property that guarantees the repository integrity as long as there is no external interference. We add new blobs before they become reachable from the root, then update the root blob, and only if the root-blob update succeeds do we delete any now-unreachable blobs. See also Creation of a Snapshot and Deletion of a Snapshot for more details.
It is worth note that repository's compatibility guarantee is more permissive than Elasticsearch's general version
compatibility policy. A repository may contain snapshots from a version as old as 5.0.0 and, if it does, the repository
layout must remain compatible (for reads and writes) with the oldest version in the repository so that these snapshots
remain restorable in the corresponding versions. With older snapshots deleted, the repository will start using
new format when possible (see also the static IndexVersion constants in SnapshotsService).
A snapshot repository has two components: (1) a RepositoryMetadata containing the configuration stored
as a Metadata.ProjectCustom in the cluster state; (2) the actual repository object created on the master
node and each data node. A series of APIs are available to create, update, get and delete repositories.
Each of these APIs is backed by a TransportMasterNodeAction which publishes a cluster state for the
RepositoryMetadata change so that relevant nodes can update their local repository objects accordingly.
The API call will only return after the repository object has been updated on all relevant nodes.
For creating a repository, it also performs extra verification steps which (1) attempts to create a temporary
repository directly on the master node as well as write and read a test file before proceeding with the
cluster state update and (2) performs another write and read test after repository objects are created on all
relevant nodes. This verification steps are enabled by default and can be disabled per request.
The core service class is RepositoriesService which all APIs eventually delegate to. It also implements
ClusterStateApplier which performs the actual repository object creation, update and deletion.
Besides the APIs, reserved repositories are managed via file based settings. These repositories are managed by Elasticsearch service providers, such as the Elastic Cloud. File based settings is effectively a way to publish cluster state based on file contents. Hence, they also go through the same code path as the APIs under the hood. Reserved repositories cannot be modified via APIs.
While new repository can be created at any time, deleting a repository has some restrictions. In general, a repository cannot be deleted if it is in use by either ongoing snapshots or restores or hosting mounted searchable snapshots. Since v9.4.0, the default snapshot repository cannot be deleted either. A default repository is meant to be the repository used by ILM and SLM when no repository is explicitly specified. Updating a repository usually involves closing the existing repository first and creating a new one. Therefore, they often subject to the same restrictions as deletion.
Cloud storage backed (S3, GCS, Azure) repositories requires network access to the storage services. Hence,
they have concept of clients which manages the network requests. At least the default client must be
configured for a cluster which is used by a repository if no client is explicitly specified. Multiple
clients can be added, via elasticsecrh.yml to the same cluster and used by different repositories to
spread snapshots to different locations if so desired.
We allow different implementations of the same cloud storage type to be used as long as they are compatible in both APIs and performance characteristics. For example, many storage service claims S3 compatibility. But they may fall short under load or even just return outright incorrect responses in some corner cases. The RepositoryAnalyze API can be used to proactively test the compatibility which we suggest on SDHs from time to time.
Creating a snapshot is the most involved snapshot operation because it requires work to be done on both master and data nodes. In contrast, cloning snapshot, deleting snapshot and cleaning repository all happen entirely on the master node.
The following Java classes are mostly responsible for the snapshot creation process:
SnapshotsService to reduce file length.As discussed earlier, a snapshot operation always starts with a cluster state triggered by a transport request.
Such is the case for snapshot creation. When SnapshotsService receives the request, it computes all indices
and their shards required for the snapshot and creates an object representing this snapshot and stores it in the
cluster state. The overarching object representing snapshot creation in the cluster state isSnapshotsInProgress
which is essentially a map keyed by repository name with values being a list of ongoing snapshots(SnapshotsInProgress.Entry)
for that repository. It's a list because the order is important: Snapshots within each repository operate as a queue,
such that each shard's snapshots run in order. Different snapshots may complete in different orders if they target different shards.
Each ongoing snapshot further tracks its overall state (SnapshotsInProgress#State)
as well states (SnapshotsInProgress#ShardState) of each shard required for the snapshot.
When the snapshot creation entry (SnapshotsInProgress.Entry) is first added to the cluster state, its shard states
can be in different ShardState depending on the shard's status and snapshot activities:
INIT indicating
it is ready to be snapshotted by the data node hosting it. Shards in this state cannot relocate due to
SnapshotInProgressAllocationDecider.QUEUED indicating it
will be snapshotted later when the ongoing snapshots and any other snapshots queued before it are finished.WAITING which will be changed to a new state
once the shard finishes relocation or initialization.MISSING. This state
is final. A snapshot creation fails on shard with this state if it is issued with partial=false. Note
snapshot creation is always issued with partial=true in Elastic Cloud so that snapshot does not fail
entirely for temporary shard unavailability.PAUSED_FOR_NODE_REMOVAL.
This state will be updated when the node finishes the shutdown process or the shard state changes.When a data node (SnapshotShardsService) receives the updated cluster state with a new snapshot entry,
it takes the shards with the INIT state and hosted on itself to create a shard snapshot task for each
of them. The shard state is computed for all shards involved in the snapshot at once when the snapshot entry
is created. A large snapshot can easily have thousands of shards with INIT state indicating ready to be snapshotted.
To avoid overwhelming the data nodes, a dedicated snapshot thread pool as well as ThrottledTaskRunner are
used to keep concurrent running shard snapshots under control. Priority is given for snapshots which started
earlier. We also order by shard to limit the number of incomplete shard snapshots (see also ShardSnapshotTaskRunner
and ShardSnapshotTaskRunner#COMPARATOR).
The lifecycle of each shard snapshot is also tracked in-memory on the data node with IndexShardSnapshotStatus.
The status is indicated by IndexShardSnapshotStatus#Stage which is updated at various points during the process.
When a shard snapshot task runs, it first acquires an index commit of the shard so that the files to be copied remain
available throughout the shard snapshot process without being deleted by ongoing indexing activities. It then
writes a new shard level generation file (index-<UUID>.data, Java class BlobStoreIndexShardSnapshots).
This is basically a shard level catalog file pointing to all the valid shard snapshots. Each snapshot creates a new one.
The UUID is used to avoid name collision.
Previous shard generation files are not deleted because they may still be needed if the current shard snapshot fails.
Following that, shard level data files are copied to the repository with BlobStoreRepository#doSnapshotShard,
BlobStoreRepository#snapshotFile etc.
After all data files are uploaded, it writes a shard level snapshot file (snap-<UUID>.dat, Java class
BlobStoreIndexShardSnapshot) indicating what data files
should be included in this shard snapshot. Note the data file's physical name is replaced with double underscore (__)
followed by an UUID to avoid name collision. The actual name is mapped and stored in the shard level snapshot file.
It is worth note that the shard level generation file (index-<UUID>.data) can be reconstructed from all shard
level snapshot files (snap-<UUID>.dat) so that it is technically redundant. However, listing and reading
all shard snapshot files can be rather inefficient since there could be hundreds or thousands of these files.
Hence, having the shard level generation file helps with performance for deletion operations which
needs to read only a single file to decide what can be deleted at the shard level.
Once the shard snapshot is completed successfully, the data node releases the previously acquired index commit and
sends a transport request(UpdateIndexShardSnapshotStatusRequest) with the new shard generation (ShardGeneration)
to the master node to update its shard status (SnapshotsInProgress#ShardState) in the cluster state. If there is any
QUEUED shard snapshot for the same shard, the master node (SnapshotsService) updates the next one's
status to INIT so that it can run. The master responds to the data node only after the cluster state is published.
When all shards in a snapshot are completed, the master node performs a finalization step
(SnapshotsService#SnapshotFinalization and BlobStoreRepository#finalizeSnapshot) which does the following:
SnapshotInfo object representing the completed snapshot for serialization.Metadata and IndexMetadata relevant to this snapshot.snap-<UUID>.dat (Java class SnapshotInfo).index-N) with incremented generation number
and updated content including the new snapshot and publish a cluster state to accept this new
generation as the current/safe (BlobStoreRepository#writeIndexGen).Step 4 is the most critical one. The root blob is intentionally written at the very last so that any prior failure only leaves some redundant files in the repository which will be cleaned up in due time. A snapshot is not completed until the root blob is successfully updated. To ensure consistency, updating the root blob is a 3-steps process leveraging cluster consensus:
When master fails over during snapshot creation, the above steps ensures that only the new master can successfully update the root blob to avoid data corruption.
Multiple snapshots can run concurrently in the same repository. But the process is sequential at shard level,
i.e. only one shard snapshot for the same shard can be in the INIT state at any time.
Snapshot deletions and creations are mutually exclusive. See also Deletion of a Snapshot.
When a node is shutting down, it must vacate all shards via relocation. Since shards being snapshotted
(shard snapshot status INIT) cannot relocate, we need a way to transit these shards out of the
INIT state to avoid stall the shutdown process. This is where the shard snapshot pausing mechanism
comes into play.
When a node shutdown is initiated, SnapshotsService reacts to the new shutdown metadata by updating
SnapshotsInProgress#nodesIdsForRemoval which tracks the node IDs for the shutting down node. When
this change is published and observed by the shutting down data node (SnapshotShardsService), it pauses
its shard snapshots by first setting the shard snapshot status PAUSING, which is checked regularly
by the file uploading process (BlobStoreRepository#snapshotFile) and leads to a PausedSnapshotException
to be thrown to abort the shard snapshot. The data node then notifies the master node about the status
change with the same status update request (UpdateIndexShardSnapshotStatusRequest) in the happy path.
The master node updates the shard state to PAUSED_FOR_NODE_REMOVAL upon receiving the notification.
From this point on, the shard can relocate as normal. When the shard is started on the target node,
SnapshotsService will observe the shard state changes and transition the shard snapshot status back to
INIT.
If a node is already being shutdown when a new snapshot creation request arrives, the relevant
shard snapshot will be created with PAUSED_FOR_NODE_REMOVAL as its initial state. This assumes there
is no ongoing shard snapshot that is already PAUSED_FOR_NODE_REMOVAL, in which case the new shard
snapshot will start out as QUEUED.
When the node shutdown completes and its associated shutdown metadata is removed from the cluster state,
SnapshotsService will also remove the node ID from SnapshotsInProgress#nodesIdsForRemoval.
Both completed snapshots and ongoing snapshots can be deleted. Unless the snapshot being deleted has not
started yet, e.g. all its shards are in QUEUED state, which means it can be deleted right away from the
cluster state without touch the repository content, deletion must run exclusively in a repository.
If the deletion requires any file removal in the repository, SnapshotsService creates/updates the
SnapshotDeletionsInProgress in the cluster state to track the new deletion. If the snapshot is
currently running, it also updates any incomplete shard snapshots to ShardState.ABORTED for the
data node (SnapshotShardsService) to react once the cluster state is published. The data node goes through
a similar process to the shard snapshot pausing but with a different exception (AbortedSnapshotException)
to interrupt the shard snapshot process and sends a request back to the master node to update the
corresponding status (ShardState.FAILED) tracked in the cluster state. Once all shard snapshots
stop, deletion will proceed to remove relevant files from the repository as well as create a new
root blob (index-N) with the same mechanism described in the snapshot creation section.
File deletions (BlobStoreRepository#SnapshotsDeletion) happen entirely on the master node.
TODO: Clone is not used in Elastic Cloud Serverless.
Repository clean up is cluster wide exclusive and must run by itself. It does not actually clean up anything more than a regular snapshot deletion. It was useful in the early days when we had some long-since-fixed leaks that needed cleaning up in ECH. It is not used in Elastic Cloud Serverless.
RestoreService is responsible for handling the initial restore request and prepare the unassigned shards
with snapshot as their recovery source in the cluster state. Once the shard is allocated on a data node, the
recovery process kicks in and eventually calls into IndexShard#restoreFromSnapshot which delegates to
BlobStoreRepository#restoreShard to copy data files from the repository to local storage.
This is a best effort attempt to prevent repository corruption due to concurrent writes from multiple clusters.
When writing the root blob (index-N), we cross compare the cached and expected repository generation
(see BlobStoreRepository#latestKnownRepoGen and BlobStoreRepository#latestKnownRepositoryData)
to the generation physically found in the repository. The writing also fails if the blob already exists
since this indicates that some other cluster attempted to write to the repository. We do that in all repositories
that support such a check, which includes FS/GCS/Azure since forever and S3 since 9.2 (but not HDFS).
A RepositoryException is thrown on any mismatch or conflicts. The repository is marked as corrupted
by setting its generation number to RepositoryData.CORRUPTED_REPO_GEN to block further write operations.
The tasks infrastructure is used to track currently executing operations in the Elasticsearch cluster. The Task management API provides an interface for querying, cancelling, and monitoring the status of tasks.
Each individual task is local to a node, but can be related to other tasks, on the same node or other nodes, via a parent-child relationship.
[!NOTE] The Task management API is experimental/beta, its status and outstanding issues can be tracked here.
Tasks are tracked in-memory on each node in the node's TaskManager, new tasks are registered via one of the TaskManager#register methods. Registration of a task creates a Task instance with a unique-for-the-node numeric identifier, populates it with some metadata and stores it in the TaskManager.
The register methods will return the registered Task instance, which can be used to interact with the task. The Task class is often sub-classed to include task-specific data and operations. Specific Task subclasses are created by overriding the createTask method on the TaskAwareRequest passed to the TaskManager#register methods.
When a task is completed, it must be unregistered via TaskManager#unregister.
The IDs given to a task are numeric, supplied by a counter that starts at zero and increments over the life of the node process. So while they are unique in the individual node process, they would collide with IDs allocated after the node restarts, or IDs allocated on other nodes.
To better identify a task in the cluster scope, a tuple of persistent node ID and task ID is used. This is represented in code using the TaskId class and serialized as the string {node-ID}:{local-task-ID} (e.g. oTUltX4IQMOUUVeiohTt8A:124). While TaskId is safe to use to uniquely identify tasks currently running in a cluster, it should be used with caution as it can collide with tasks that have run in the cluster in the past (i.e. tasks that ran prior to a cluster node restart).
The purpose of tasks is to provide management and visibility of the cluster workload. There is some overhead involved in tracking a task, so they are best suited to tracking non-trivial and/or long-running operations. For smaller, more trivial operations, visibility is probably better implemented using telemetry APIs.
Some examples of operations that are tracked using tasks include:
All ThreadPool threads have an associated ThreadContext. The ThreadContext contains a map of headers which carry information relevant to the operation currently being executed. For example, a thread spawned to handle a REST request will include the HTTP headers received in that request.
When threads submit work to an ExecutorService from the ThreadPool, those spawned threads will inherit the ThreadContext of the thread that submitted them. When TransportRequests are dispatched, the headers from the sending ThreadContext are included and then loaded into the ThreadContext of the thread handling the request. In these ways, ThreadContext is preserved across threads involved in an operation, both locally and on remote nodes.
When a task is registered by a thread, a subset (defined by Task#HEADERS_TO_COPY and any ActionPlugins loaded on the node) of the headers from the ThreadContext are copied into the Task's set of headers.
One such header is X-Opaque-Id. This is a string that can be submitted on REST requests, and it will be associated with all tasks created on all nodes in the course of handling that request.
Another way to track the operations of a task is by following the parent/child relationships. When registering a task it can be optionally associated with a parent task. Generally if an executing task initiates sub-tasks, the ID of the executing task will be set as the parent of any spawned tasks (see ParentTaskAssigningClient, TransportService#sendChildRequest and TaskAwareRequest#setParentTask for how this is implemented for TransportActions).
Some long-running tasks are implemented to be cancel-able. Cancellation of a task and its descendants can be done via the Cancel Task REST API or programmatically using TaskManager#cancelTaskAndDescendants. Perhaps the most common use of cancellation you will see is cancellation of TransportActions dispatched from the REST layer when the client disconnects, to facilitate this we use the RestCancellableNodeClient.
In order to support cancellation, the Task instance associated with the task must extend CancellableTask. It is the job of any workload tracked by a CancellableTask to periodically check whether it has been cancelled and, if so, finish early. We generally wait for the result of a cancelled task, so tasks can decide how they complete upon being cancelled, typically it's exceptionally with TaskCancelledException.
When a Task extends CancellableTask the TaskManager keeps track of it and any child tasks that it spawns. When the task is cancelled, requests are sent to any nodes that have had child tasks submitted to them to ban the starting of any further children of that task, and any cancellable child tasks already running are themselves cancelled (see BanParentRequestHandler).
When a cancellable task dispatches child requests through the TransportService, it registers a proxy response handler that will instruct the remote node to cancel that child and any lingering descendants in the event that it completes exceptionally (see UnregisterChildTransportResponseHandler). A typical use-case for this is when no response is received within the time-out, the sending node will cancel the remote action and complete with a timeout exception.
A list of tasks currently running in a cluster can be requested via the Task management API, or the cat task management API. The former returns each task represented using TaskResult, the latter returning a more compact CAT representation.
Some ActionRequests allow the results of the actions they spawn to be stored upon completion for later retrieval. If ActionRequest#getShouldStoreResult returns true, a TaskResultStoringActionListener will be inserted into the chain of response listeners. TaskResultStoringActionListener serializes the TaskResult of the TransportAction and persists it in the .tasks index using the TaskResultsService.
The Task management API also exposes an endpoint where a task ID can be specified, this form of the API will return currently running tasks, or completed tasks whose results were persisted. Note that although we use TaskResult to return task information from all the JSON APIs, the error or response fields will only ever be populated for stored tasks that are already completed.
Up until now we have discussed only ephemeral tasks. If we want a task to survive node failures, it needs to be registered as a persistent task at the cluster level.
Plugins can register persistent tasks definitions by implementing PersistentTaskPlugin and returning one or more PersistentTasksExecutor instances. These are collated into a PersistentTasksExecutorRegistry which is provided to PersistentTasksNodeService active on each node in the cluster, and a PersistentTasksClusterService active on the master. A PersistentTasksExecutor can declare either project or cluster scope, but not both. A project scope task is not able to access data on a different project.
The PersistentTasksClusterService runs on the master to manage the set of running persistent tasks. It periodically checks that all persistent tasks are assigned to live nodes and handles the creation, completion, removal and updates-to-the-state of persistent task instances in the cluster state (see PersistentTasksCustomMetadata and ClusterPersistentTasksCustomMetadata).
The PersistentTasksNodeService monitors the cluster state to:
If a node leaves the cluster while it has a persistent task allocated to it, the master will re-allocate that task to a surviving node. To do this, it creates a new PersistentTasksCustomMetadata.PersistentTask entry with a higher #allocationId. The allocation ID is included any time the PersistentTasksNodeService communicates with the PersistentTasksClusterService about the task, it allows the PersistentTasksClusterService to ignore persistent task messages originating from stale allocations.
Some examples of the use of persistent tasks include:
Tasks are integrated with the ElasticSearch APM infrastructure. They implement the Traceable interface, and spans are published to represent the execution of each task.
(Brief explanation of the use case for CCR)
(Explain how this works at a high level, and details of any significant components / ideas.)
The Distributed team owns Elasticsearch's write path. A typical write begins as an HTTP/REST request, is routed to the appropriate primary shard, is applied to the shard's engine and translog, and is finally replicated across replicas before an ack is sent back to the client.
Note that in serverless Elasticsearch, there is no replication process needed. The same reliability and fault tolerance are instead achieved by uploading the translog to a blob store and relying on the store's durability and fault tolerance guarantees. See this blog post for more details.
The Distributed team also owns select parts of the read path (e.g. real-time GET requests targeting the
translog), but broader search capabilities like query execution, scoring, and aggregations fall under
the Search team.
This section follows a bulk index request end to end in stateful Elasticsearch, using RestBulkAction as the starting point.
For a higher-level overview of the read and write paths, see Reading and writing documents.
A user sends a POST to /_bulk with a newline-delimited body of index, update, or delete actions
(see documentation).
The HTTP stack (described in HTTP Server) hands the request to RestBulkAction, which
parses
the body into a BulkRequest containing one IndexRequest (or UpdateRequest / DeleteRequest) per requested
operation. It then executes
a TransportBulkAction on the receiving node (see the Transport section for more details).
Single-document requests (PUT or POST to /{index}/_doc/{id}) follow the same path: RestIndexAction
builds an IndexRequest that TransportIndexAction wraps in a one-item BulkRequest before delegating to
TransportBulkAction.
The node that received the request is the coordinating node for this request. On that node, TransportBulkAction coordinates the rest of the write path.
Before routing to shards, TransportAbstractBulkAction (the parent class of TransportBulkAction)
resolves
the ingest pipeline for each item by checking the request's pipeline parameter, the index's default_pipeline, and
the final_pipeline from the matching index template. If any item has a pipeline, the coordinating node either executes
the pipelines locally (if it is an ingest node, see the Node Roles section) or
forwards
the entire request to an ingest node via IngestActionForwarder. Once ingest finishes, each processed request has
its pipeline and finalPipeline fields
reset
to "_none" and the request is
re-submitted
through TransportAbstractBulkAction#applyPipelinesAndDoInternalExecute. On this second pass hasPipeline is false
for every item and the ingest path is skipped.
TransportBulkAction also
checks
whether the target index or data stream exists, auto-creating missing indices via AutoCreateAction and
rolling over
data streams marked for lazy rollover before proceeding.
It then eventually runs a BulkOperation on the coordinator. BulkOperation is the class that resolves each item to
a concrete index and shard before sending work to primaries.
If the client did not supply an _id, one is
generated
before routing. The latest applied ClusterState supplies routing for the chosen index (routingTable,
ShardRouting). A routing key is chosen: by default the document _id, though the request may
set routing explicitly.
The routing key is then hashed to a shard number by IndexRouting.hashToShardId. For time series indices the key is
derived from the document's dimension fields.
For indices with a routing_path configured, it is derived from the configured value.
The coordinator now knows which shard holds the document. It still needs the current primary node for that shard.
Note that for a bulk request (multiple documents in one HTTP call), BulkOperation groups items by shard before dispatching each shard’s work to the appropriate primaries.
Elasticsearch uses primary–backup replication. The primary shard copy defines the ordered write history and replicas apply the same operations.
After matching each document to a shard, BulkOperation will hand over the rest of the execution to a TransportShardBulkAction. This is a subclass of TransportReplicationAction, which holds the core replication framework logic for document writes and deletes.
The coordinator uses the cluster state routing table (Cluster State) to send the shard request to the node that currently holds the primary.
In practice, the request can take more than one transport hop before it executes on the primary. The receiving node re-samples cluster state, and if the active primary for that shard is assigned elsewhere, TransportReplicationAction forwards the replication request to that node. That “chase the primary” step can repeat while routing metadata converges. To avoid redirect loops between nodes whose cluster state versions differ, the request carries a routedBasedOnClusterVersion field. Each forward updates it to the forwarding node’s cluster state version so the next receiver is on at least that version before it redirects again. Retries and awaiting cluster state updates are bounded by the request’s timeout. On each retry attempt the node checks whether the timeout has expired, and if so fails the request.
Once the request reaches the node that actually hosts the primary, it will get wrapped in a ConcreteShardRequest that includes the shard’s primary term and target allocation id. That lets the primary and replicas refuse operations that were built for a superseded primary generation.
On the primary node, TransportShardBulkAction
applies
the shard's bulk items through IndexShard, the single entry point for shard-level work
(see IndexShard in Engine & Store sections for more details).
The primary will first try to
acquire
an operation permit via IndexShardOperationPermits. Note that recovery and relocation are operations that can
intentionally grab all available permits with the goal of blocking in-flight writes. In that case, the incoming
operation is queued
and will resume once the block lifts. If the permit is successfully acquired, the primary will then validate and
prepare
the operation: create the mapping (if not already existing), parse the source, etc. IndexShard will then hand over the
request to the engine (typically InternalEngine, see Engine section for more details) that
generates
the sequence number for the operation, updates
Lucene via an IndexWriter, and then
appends
the operation to the Translog.
Note that the translog write happens after the Lucene update. The translog is primarily a durability and recovery log for acknowledged operations, not a write-ahead log in the classic database sense. The translog entry type also depends on the Lucene outcome. A successful Lucene apply writes the full operation, while a failure with an already-assigned seq_no writes a no-op to preserve the sequence number history.
Note that updates and deletes use soft deletes in Lucene (tombstones and retention policies), not physical deletion, and are therefore quite similar to writes. See Engine & Store and Segment Merges for more details.
After the primary applies the operation, a ReplicationOperation
wrapper object
will coordinate the rest. It
loads
the replication group after the primary write (to avoid missing new recovery targets),
captures the global checkpoint and related metadata from the primary, then
forwards the request
to each replica in that group. Each replica
applies
the same operation (using the same seq_no and primary term) to its engine and returns checkpoint information.
Allocation ids that are still in the in-sync set but no longer have a live shard in routing are
marked stale
on the primary. The master will update the cluster state to drop those allocation ids from the index’s
in-sync set
before the write is acknowledged back to the client. Similarly, if a replica fails to execute the replicated operation,
it will also get dropped from the in-sync set before the write is acknowledged back. The primary will not
acknowledge the replication operation as “done” while the cluster state still reports an in-sync copy that has failed
the operation.
A ref-counting listener
also waits for the primary shard application, each replica response, and post-replication hooks. The client only gets an
ack after that coordinated completion, together with wait_for_active_shards and translog / syncAfterWrite behaviour (see
Translog and TransportWriteAction).
sequenceDiagram
participant C as Client
participant CN as Coordinating node
participant P as Primary shard copy
participant R as Replica shard copy
participant M as Elected master
C->>CN: REST index (RestIndexAction → IndexRequest)
CN->>CN: Bulk coordinator: route to shard
CN->>P: Shard bulk on primary (TransportShardBulkAction)
P->>P: IndexShard / engine (Translog + Lucene)
Note over P: ReplicationOperation: stale in-sync ids, then replica fan-out
P->>R: Replica request (seq_no, primary_term, checkpoints)
alt replica applies successfully
R->>R: Engine apply (Translog + Lucene)
R-->>P: ReplicaResponse (checkpoints)
else replica fails replication
R-->>P: failure
P->>M: remoteShardFailed (shard-failed -> update in-sync / routing)
M->>M: Cluster state task (drop failed copy from in-sync set, etc.)
M-->>P: shard-failed applied (listener completes)
end
P-->>CN: Primary result + shard info (after all pending replica / master / post steps)
CN-->>C: HTTP response (may include shard failures in body)
When replication and the ref-counted coordination above have finished, the primary completes the shard-level request and returns the result to the node that issued that shard-level transport request. For a normal REST bulk request, that issuer is the HTTP request coordinating node, which then completes the write action and sends the HTTP response to the client. The same shard-level transport requests can also be generated internally, for example via TriggeredWatchStore#putAll in Watcher. The issuer is then whichever node runs that internal client, and the primary returns the result to that transport caller.
Every successful index, update, or delete operation on a shard is tagged with a sequence number (seq_no) and a primary term.
Together they identify that logical change for replication, recovery, and optimistic concurrency. Sentinel values and
helpers are defined on SequenceNumbers (for example UNASSIGNED_SEQ_NO on the way into the primary, and
UNASSIGNED_PRIMARY_TERM before a shard is operational).
The primary term is a monotonically increasing counter per shard, recorded in cluster state metadata (see IndexMetadata#primaryTerm). It advances when a new primary must take over following a primary failure, e.g. when a replica is promoted to primary because the current primary crashed or became unavailable. Note that the primary term is not increased during graceful primary relocation. The shard stamps all operations with the current term.
The sequence number is another monotonically increasing id for mutations on the shard history. It is
computed
by the primary's InternalEngine before it applies an operation locally. Replicas apply the same seq_no supplied
by the primary. Their engine
records
that sequence number as "seen" before indexing. After the operation, the local checkpoint also gets updated (see the following Checkpoints subsection).
Both values are stored in Lucene for each live document. SeqNoFieldMapper indexes _seq_no as a numeric field (for
search and sort) and stores the primary term in doc values. If two copies
ever share the same _seq_no (for example after a primary promotion), the copy with the higher primary term wins.
Clients can also supply if_seq_no / if_primary_term on write requests (see
IndexRequest#setIfSeqNo),
in which case the engine will use optimistic concurrency control against the last known sequence number.
flowchart LR
subgraph prim["Primary shard"]
MD[IndexMetadata.primaryTerm in cluster state]
TR[ReplicationTracker operation term]
LCT[LocalCheckpointTracker next seq_no]
ENG[InternalEngine assigns seq_no + term]
MD --> TR
TR --> ENG
LCT --> ENG
end
ENG --> TX[ConcreteShardRequest / replica transport]
TX --> REP[Replica engine applies fixed seq_no + term]
Each shard copy tracks how far it has applied the shared history of seq_no values using a local checkpoint,
which is the highest sequence number for which that copy has processed every earlier seq_no (inclusive).
The InternalEngine holds a LocalCheckpointTracker
field
that maintains that marker and a separate persisted checkpoint for what is durably on disk.
The primary also tracks a
global checkpoint,
which is the sequence number up to which all in-sync copies have
processed every earlier operation. The ReplicationTracker computes it as the minimum of those per-copy local checkpoints among in-sync shard copies.
The primary updates
the global checkpoint whenever an in-sync copy’s local checkpoint advances, when the primary activates or takes over
after relocation, when a copy becomes in-sync, or when cluster state drops tracked copies.
During operation replication,
replica responses carry
local and global checkpoint fields so the primary can derive
peer progress. After a successful write, TransportReplicationAction will also
trigger
a GlobalCheckpointSyncAction when there are no more operations in-flight (maxSeqNo == globalCheckpoint).
Indeed, the global checkpoint is piggybacked on every replication operation, but once the last operation completes,
the checkpoint may advance further with no subsequent operation to carry it to the replicas.
The global checkpoint is the replication-group safety line: it is the highest seq_no known to be processed
on every in-sync copy, so it underpins
retention leases,
translog truncation, soft-delete retention, and peer recovery
coordination.
Because replication fan-out is concurrent, a replica may receive operations with out-of-order seq_no values.
The engine still applies each operation once its prerequisites are satisfied. The LocalCheckpointTracker
records
which sequence numbers have been processed (including pending gaps) so the local checkpoint only advances when
the contiguous prefix is complete.
To save memory, LocalCheckpointTracker tracks which seq_no values are still pending
using sparse bit sets
in chunk-keyed maps.
Replicas cache the global checkpoint value the primary advertises and refresh it from ongoing replication traffic (for example, each ConcreteReplicaRequest carries the primary’s global checkpoint for the replica permit path, and each ReplicaResponse returns the replica’s local checkpoint and last synced global checkpoint).
Note that the InternalEngine also tracks the
maxSeqNoOfUpdatesOrDeletes
(MSU) for write path optimization purposes. The Engine
javadoc
goes into details about the LCP < MSU versus MSU <= LCP cases.
The engine also applies an append-only
optimization
for auto-generated _ids. When a document has an auto-generated ID and the operation is not flagged as a retry,
the engine will try to
assert
that no concurrent retry with an equal or higher timestamp has landed on the shard so it can skip the version map
lookup entirely and call IndexWriter.addDocument instead of updateDocument. If the check fails, the engine falls
back to updateDocument to guard against creating a duplicate.
(this can also happen during shard reallocation, right? This might be a standalone topic, or need another section about it in allocation?...)
These APIs are used to create a new index that contains a copy of data from a provided index and differs in number of shards and/or index settings. They can only be executed against source indices that are marked read-only.
The shrink API (/{index}/_shrink/{target}) creates an index that has fewer shards than the original index.
The split API (/{index}/_split/{target}) creates an index that has more shards than the original index.
The clone API (/{index}/_clone/{target}) creates an index that has the same number of shards as the original index but may have different index settings.
The main implementation logic is centralized in TransportResizeAction however it only creates a new index in the cluster state using special recoverFrom and resizeType parameters. The entire workflow involves multiple components.
The high level structure is the following:
TransportResizeAction creates index metadata for the new index that contains information about the resize performed. It also creates a routing table for the new index which assigns a LocalShardsRecoverySource.INSTANCE recovery source to primary shards based on the resize information in the index metadata .ResizeAllocationDecider. Note that this doesn't work for shrink case since during shrink there are multiple source shards that are "merged" together. These source shards may be on different nodes already. Shard movement in this case needs to be performed manually.The Health API (GET /_health_report) provides a structured health report for the cluster. It is indicator-based: each health indicator evaluates a specific aspect of cluster health (for example shard allocation or disk) and returns a status (GREEN, YELLOW, RED, or UNKNOWN) plus structured details, impacts, and diagnoses. The top-level status is derived from the worst indicator status.
A health node is selected by the master node via a persistent task. The health node maintains a cache of per-node health information. Each node periodically publishes its local health info to the health node cache. That cached health info is then used as input to health indicators when evaluating the overall cluster health. If the health node fails or leaves the cluster, the master node selects a new health node.
Publish: LocalHealthMonitor (local node) -> UpdateHealthInfoCacheAction -> HealthInfoCache (health node)
Retrieve: Health API (coordination node) -> FetchHealthInfoCacheAction -> HealthInfoCache (health node)
Relevant classes:
There are two kinds of health indicators: preflight and regular. Preflight indicators run first; if any preflight indicator is not GREEN, regular indicators return UNKNOWN to avoid misleading results on an unstable cluster. Currently, the only preflight indicator is StableMasterHealthIndicatorService, which checks whether the cluster has a stable master node.
Relevant classes:
Cluster health status can be logged periodically by HealthPeriodicLogger, which runs on the health node and calls HealthService at a configured interval. It enables alerting on the health status.
Some health configuration is stored in the cluster state HealthMetadata. It stores user-configurable health settings, such as disk watermark thresholds. The current master node publishes its settings to the cluster state so the same settings are used across nodes in the cluster.
Relevant classes: HealthMetadataService: runs on master node, publishes health metadata to the cluster state.
Watcher lets you set a schedule to run a query, and if a condition is met it executes an action. As an example, the following performs a search every 10 minutes. If the number of hits found is greater than 0 then it logs an error message.
PUT _watcher/watch/log_error_watch
{
"trigger" : { "schedule" : { "interval" : "10m" }},
"input" : {
"search" : {
"request" : {
"indices" : [ "logs" ],
"body" : {
"query" : {
"match" : { "message": "error" }
}
}
}
}
},
"condition" : {
"compare" : { "ctx.payload.hits.total" : { "gt" : 0 }}
},
"actions" : {
"log_error" : {
"logging" : {
"text" : "Found {{ctx.payload.hits.total}} errors in the logs"
}
}
}
}
.watches index.triggered_watches index.watcher_history index.watches shard that the particular watch is in (see WatcherService)
.watcher_history index.watches index.watcher_history, the best way to continue the investigation is in the logs of the remote system..watcher_history.