Back to Elasticsearch

Distributed Area Internals

docs/internal/DistributedArchitectureGuide.md

9.4.0232.9 KB
Original Source

Distributed Area Internals

The Distributed Area contains indexing and coordination systems.

The index path stretches from the user REST command through shard routing down to each individual shard's translog and storage engine. Reindexing is effectively reading from a source index and writing to a destination index (perhaps on different nodes). The coordination side includes cluster coordination, shard allocation, cluster autoscaling stats, task management, and cross cluster replication. Less obvious coordination systems include networking, the discovery plugin system, the snapshot/restore logic, and shard recovery.

A guide to the general Elasticsearch components can be found here.

Networking

Every elasticsearch node maintains various networking clients and servers, protocols, and synchronous/asynchronous handling. Our public docs cover user facing settings and some internal aspects - Network Settings.

HTTP Server

The HTTP Server is a single entry point for all external clients (excluding cross-cluster communication). Management, ingestion, search, and all other external operations pass through the HTTP server.

Elasticsearch works over HTTP 1.1 and supports features such as TLS, chunked transfer encoding, content compression, and pipelining. While attempting to be HTTP spec compliant, Elasticsearch is not a webserver. ES Supports GET requests with a payload (though some old proxies may drop content) and POST for clients unable to send GET-with-body. Requests cannot be cached by middle boxes.

There is no connection limit, but a limit on payload size exists. The default maximum payload is 100MB after compression. It's a very large number and almost never a good target that the client should approach. See HttpTransportSettings class.

Security features, including basic security: authentication(authc), authorization(authz), Transport Layer Security (TLS) are available in the free tier and achieved with separate x-pack modules.

The HTTP server provides two options for content processing: full aggregation and incremental processing. Aggregated content is a preferable choice for small messages that do not fit for incremental parsing (e.g., JSON). Aggregation has drawbacks: it requires more memory, which is reserved until all bytes are received. Concurrent incomplete requests can lead to unbounded memory growth and potential OOMs. Large delimited content, such as bulk indexing, which is processed in byte chunks, provides better control over memory usage but is more complicated for application code.

Incremental bulk indexing includes a back-pressure feature. See org. elasticsearch.index.IndexingPressure. When memory pressure grows high (LOW_WATERMARK), reading bytes from TCP sockets is paused for some connections, allowing only a few to proceed until the pressure is resolved. When memory grows too high (HIGH_WATERMARK) bulk items are rejected with 429. This mechanism protects against unbounded memory usage and OutOfMemory errors (OOMs).

ES supports multiple Content-Types for the payload. These are implementations of MediaType interface. A common implementation is called XContentType, including CBOR, JSON, SMILE, YAML, and their versioned types. X-pack extensions includes PLAIN_TEXT, CSV, etc. Classes that implement ToXContent and friends can be serialized and sent over HTTP.

HTTP routing is based on a combination of Method and URI. For example, RestCreateIndexAction handler uses ("PUT", "/{index}"), where curly braces indicate path variables. RestBulkAction specifies a list of routes

java
@Override
  public List<Route> routes() {
    return List.of(
      new Route(POST, "/_bulk"),
      new Route(PUT, "/_bulk"),
      new Route(POST, "/{index}/_bulk"),
      new Route(PUT, "/{index}/_bulk")
    );
  }

Every REST handler must be declared in the ActionModule class in the initRestHandlers method. Plugins implementing ActionPlugin can extend the list of handlers via the getRestHandlers override. Every REST handler should extend BaseRestHandler.

The REST handler’s job is to parse and validate the HTTP request and construct a typed version of the request, often a Transport request. When security is enabled, the HTTP layer handles authentication (based on headers), and the Transport layer handles authorization.

Request handling flow from Java classes view goes as:

(if security enabled) Security.getHttpServerTransportWithHeadersValidator
-> `Netty4HttpServerTransport`
-> `AbstractHttpServerTransport`
-> `RestController`
-> `BaseRestHandler`
-> `Rest{Some}Action`

Netty4HttpServerTransport is a single implementation of AbstractHttpServerTransport from the transport-netty4 module. The x-pack/security module injects TLS and headers validator.

Transport

Transport is the term for node-to-node communication, utilizing a TCP-based custom binary protocol. Every node acts as both a client and a server. Node-to-node communication almost never uses HTTP transport (except for reindex-from-remote).

Netty4Transport is the sole implementation of TCP transport, initializing both the Transport client and server. The x-pack/security plugin provides a secure version: SecurityNetty4Transport (with TLS and authentication).

A Connection between nodes is a pool of Channels, where each channel is a non-blocking TCP connection (Java NIO terminology). Once a cluster is discovered, a Connection (pool of Channels) is opened to every other node, and every other node opens a Connection back. This results in two Connections between any two nodes (A→B and B→A). A node sends requests only on the Connection it opens (acting as a client). The default pool is around 13 Channels, divided into sub-pools for different purposes (e.g., ping, node-state, bulks). The pool structure is defined in the ConnectionProfile class.

ES never behaves incorrectly (e.g. loses data) in the face of network outages but it may become unavailable unless the network is stable. Network stability between nodes is assumed, though connectivity issues remain a constant challenge.

Request timeouts are discouraged, as Transport requests are guaranteed to eventually receive a response, even without a timeout. SO_KEEPALIVE helps detect and tear down dead connections. When a connection closes with an error, the entire pool is closed, outstanding requests fail, and the pool is reconnected.

There are no retries on the Transport layer itself. The application layer decides when and how to retry (e.g., via RetryableAction or TransportMasterNodeAction). In the future Transport framework might support retries #95100.

Transport can multiplex requests and responses in a single Channel, but cannot multiplex parts of messages. Each transport message must be fully dispatched before the next can be sent. Proper application-layer sizing/chunking of messages is recommended to ensure fairness of delivery across multiple senders. A Transport message cannot be larger than 30% of heap ( org.elasticsearch.transport.TcpTransport#THIRTY_PER_HEAP_SIZE) or 2GB (due to org.elasticsearch.transport.Header#networkMessageSize being an int).

The TransportMessage family tree includes various types (node-to-node, broadcast, master node acknowledged) to ensure correct dispatch and response handling. For example when a message must be accepted on all nodes.

Other networking stacks

Snapshotting to remote repositories involves different networking clients and SDKs. For example AWS SDK comes with Apache or Netty HTTP client, Azure with Netty-based Project-Reactor, GCP uses default Java HTTP client. Underlying clients may be reused between repositories, with varying levels of control over networking settings.

There are other features such as SAML/JWT metadata reloading, Watcher HTTP action, reindex and ML related features such as inference that also use HTTP clients.

Sync/Async IO and threading

ES handles a mix of I/O operations (disk, HTTP server, Transport client/server, repositories), resulting in a combination of synchronous and asynchronous styles. Asynchronous IO utilizes a small set of threads by running small tasks, minimizing context switch. Synchronous IO uses many threads and relies on an OS scheduler. ES typically runs with 100+ threads, where Async and Sync threads compete for resources.

Netty

Netty is a networking framework/toolkit used extensively for HTTP and Transport networks, providing foundational building blocks for networking applications.

Event-Loop (Transport-Thread)

Netty is an Async IO framework, it runs with a few threads. An event-loop is a thread that processes events for one or many Channels (TCP connections). Every Channel has exactly one, unchanging event-loop, eliminating the need to synchronize events within that Channel. A single, CPU-bound Transport ThreadPool (e.g.,4 threads for 4 cores) serves all HTTP and Transport servers and clients, handling potentially hundreds or thousands of connections.

Event-loop threads serve many connections each, it's critical to not block threads for a long time. Fork any blocking operation or heavy computation to another thread pool. Forking, however, comes with overhead. Do not fork simple requests that can be served from memory and do not require heavy computations (milliseconds).

Transport threads are monitored by ThreadWatchdog. A warning log appears if a single task runs longer than 5 seconds. Slowness can be caused by blocking, GC pauses, or CPU starvation from other thread pools.

ByteBuf - byte buffers and reference counting

Netty's controlled memory allocation provides a performance edge by managing and reusing byte buffer pools (e.g., pools of 1MiB byte chunks sliced into 16KiB pages). Some pages might not be in use while taking up heap space and show up in the heap dump.

Netty reads socket bytes into direct buffers, and ES copies them into pooled byte-buffers (CopyBytesSocketChannel). The application is responsible for retaining (increasing ref-count) and releasing (decreasing ref-count) for pooled buffers.

Reference counting introduces two primary problems:

  1. Use after release (free): Accessing a buffer after it has been explicitly released.
  2. Never release (leak): Failing to release a buffer, leading to memory leaks.

The compiler does not help detect these issues. They require careful testing using Netty's LeakDetector with a Paranoid level. It's enabled by default in all tests.

Async methods return futures

Every asynchronous operation in Netty returns a future. It is easy to forget to check the result, as a following call always succeeds:

java
ctx.write(message)

Check the result of an async operation:

java
ctx.write(message).addListener(f -> { if (f.isSuccess() ...)});

ThreadPool

(We have many thread pools, what and why)

ActionListener

See the Javadocs for ActionListener

(TODO: add useful starter references and explanations for a range of Listener classes. Reference the Netty section.)

Chunk Encoding

XContent

Performance

(long running actions should be forked off of the Netty thread. Keep short operations to avoid forking costs)

Work Queues

RestClient

The RestClient is primarily used in testing, to send requests against cluster nodes in the same format as would users. There are some uses of RestClient, via RestClientBuilder, in the production code. For example, remote reindex leverages the RestClient internally as the REST client to the remote elasticsearch cluster, and to take advantage of the compatibility of RestClient requests with much older elasticsearch versions. The RestClient is also used externally by the Java API Client to communicate with Elasticsearch.

Cluster Coordination

Node Roles

Every Elasticsearch node in a cluster is assigned a set of roles that define the kind of responsibilities it can own. Roles are represented internally by the DiscoveryNodeRole class. Possible values are master, data, data_content, data_hot, data_warm, data_cold, data_frozen, ingest, voting_only, remote_cluster_client, ml, transform, index and search.

Roles are configured via the node.roles config field in elasticsearch.yml. If node.roles is not explicitly set, the node starts with all default-enabled roles. If node.roles is explicitly set, only the listed roles are enabled.

A node with the master role is master-eligible: it can participate in master elections and can be elected as the cluster's master node.

A node with the data role (relevant for stateful ES only) can host shards and perform data related operations such as CRUD, search, and aggregations.

Data-tier roles (relevant for stateful ES only) allow users to move indices to specific hardware specs as data grows older. This is managed by index lifecycle management policies. The available data tier roles are:

  • data_content for long-lived indices that can be queried and updated frequently, no matter how old they are.
  • data_hot, which holds the most recent, and frequently queried, time-series data.
  • data_warm, which typically holds less recent time-series data, that will not be read as often as the hot tier and rarely needs to be updated.
  • data_cold, for infrequently accessed time-series data, sometimes in the form of searchable snapshots.
  • data_frozen, which holds rarely accessed data stored entirely as searchable snapshots.

A node with the ingest role can execute ingest pipelines.

A node with the ml role can run machine learning jobs and trained model inference. Typically, those are memory and CPU-intensive workloads so having dedicated nodes with this role avoids contention with indexing and searching.

A node with the remote_cluster_client role can establish outbound connections to remote clusters for cross-cluster search (CCS) and cross-cluster replication (CCR).

A node with the transform role can execute transform jobs.

A node with the voting_only role participates in master elections and cluster state publication quorum, but can never be elected as master itself. The voting_only role must always be combined with the master role. Voting-only nodes are useful for providing an additional vote for quorum without requiring the node to handle the full master workload (tiebreaker nodes).

A node configured with an empty role list (node.roles: []) has no specific role and acts as a coordinating-only node. Such nodes can receive client requests, route them to the appropriate data or master nodes, and aggregate results. They do not hold data, run ingest pipelines, or participate in master elections.

Stateless Roles

In stateless deployments, data is persisted in a shared object storage and nodes' disk is only used as a cache layer. The data (and data tier) roles are disabled in favor of two new roles:

  • index: indexing nodes, which host primary shards and handle all write operations.
  • search: search nodes, which host search-only replica shards and handle read operations.

Master Node Role

Master-eligible (master role) nodes form the cluster's voting configuration and participate in master elections.

The elected master node is responsible for cluster-wide coordination: it processes all cluster state updates and publishes them to the rest of the cluster. The cluster maintains (conceptually, see term section) at most a single master at all times; if the master fails, a new one is elected from the remaining master-eligible nodes (excluding voting_only nodes, which can vote but cannot be elected).

The next few sections explore the cluster state and the master election process in more detail.

Cluster State

The ClusterState is the portion of the cluster state which is held in-memory by every node. To ensure correctness, updates to the ClusterState require strong consistency (linearizability). Without strong consistency, nodes could observe conflicting cluster states (eg two nodes could believe they both hold the primary for the same shard, or apply different mappings to the same index) leading to data loss or corruption.

The ClusterState is (conceptually) immutable: every update produces a new instance of the ClusterState class. The elected master is responsible for coordinating all cluster state updates and publishing the latest to the other nodes in the cluster. Note that ClusterState updates are expensive, often taking hundreds of milliseconds or more, so they should only be executed when absolutely necessary.

The Metadata part of the ClusterState is persisted to disk via the PersistedClusterStateService and will survive restarts. The rest of the ClusterState components are in-memory only, and need to be rebuilt every time there is a full cluster restart.

Persisted State

The Metadata of a ClusterState is persisted on disk and comprises information from two categories:

  1. Cluster-scoped information

A few standouts are:

  • clusterUUID: the cluster unique id.
  • coordinationMetadata: the current term, voting configurations ... etc. (see Master Elections section).
  • persistentSettings: cluster-level settings applied via the cluster settings API.
  • customs: cluster-level custom metadata: NodesShutdownMetadata, RepositoriesMetadata .. etc.
  1. Project-scoped information (located in the ProjectMetadata)

Notable components of the ProjectMetadata include:

  • id: the project unique id.
  • indices: map of index name to IndexMetadata (settings, mappings, aliases, number of shards/replicas, etc.). Contains all indices in this project.
  • aliasedIndices and templates. See aliases and templates for more details.
  • customs: project-level custom metadata. Includes data stream definitions (via DataStreamMetadata project custom), index lifecycle metadata (IndexLifecycleMetadata) and others.

Note that some concepts are applicable to both cluster and project scopes, e.g. persistent tasks.

Ephemeral State

The rest of the ClusterState is ephemeral, it is not persisted to disk and is rebuilt from scratch if the cluster restarts. Some key components are:

The set of all nodes currently in the cluster. Tracks the masterNodeId, the localNodeId and categorizes nodes by role (master-eligible, data and ingest). Also records the minimum and maximum node versions and supported index versions across the cluster.

Maps each project to its RoutingTable, which itself maps each index to an IndexRoutingTable containing an IndexShardRoutingTable per shard. Each IndexShardRoutingTable lists the ShardRouting entries for a shard, detailing which node owns its, its state (UNASSIGNED,INITIALIZING, STARTED or RELOCATING), and whether it is a primary or replica.

Cluster-level, project-level and index-level blocks that restrict certain operations for the relevant scope. For example, an index can be blocked for writes during a close operation, or the entire cluster can be set to read-only. Blocks operate at five levels: READ, WRITE, METADATA_READ, METADATA_WRITE, and REFRESH.

  • customs

Additional custom ephemeral state. Examples include SnapshotsInProgress, RestoreInProgress, SnapshotDeletionsInProgress, and HealthMetadata.

Per-node and cluster-wide min TransportVersion and system index mappings versions used to figure out serialization compatibility across the cluster.

Info on what features are present throughout the cluster.

  • Cluster state version fields

The version field is a monotonically increasing long that uniquely identifies committed states. It is technically persisted to disk by PersistedClusterStateService, which stores it as last_accepted_version in the Lucene commit user data alongside the cluster metadata. On node startup, this version is loaded back and used to initialize the in-memory cluster state, so the monotonically-increasing property is preserved across restarts. The stateUUID field uniquely identifies every state, including uncommitted ones. It is not persisted.

Master Service

The MasterService is the single-threaded coordinator for all cluster state updates on the master node. It organizes pending cluster state update tasks into Priority-based queues: IMMEDIATE, URGENT, HIGH, NORMAL, LOW, and LANGUID. It processes them in that order, highest priority first.

The MasterService uses a batching framework that groups multiple cluster state update tasks together, processing them as a single batch and publishing only one resulting ClusterState update. This avoids triggering a distinct publication for each individual task, which would be very costly.

Producers of cluster state update tasks, such as SnapshotsService, can then define their own task queues, priority and batch executors (ClusterStateTaskExecutor), which the MasterService uses to group and process related tasks together.

A queue processor, backed by a single-threaded java.util.concurrent.ExecutorService, processes batches one at a time. The executeAndPublishBatch method takes the next batch, invokes the batch's ClusterStateTaskExecutor to compute a new ClusterState, and publishes the result.

If the new state is identical to the previous one (by reference), no publication takes place. Otherwise, the master assigns a new version and stateUUID to the state and proceeds to publication.

Cluster State Publication

mermaid
sequenceDiagram
    participant M as Master Node
    participant F as Follower Node

    rect rgb(255, 248, 240)
    Note over M,F: Phase 1 — Publish
    M->>F: PublishRequest (diff or full state)
    M->>M: Local PublishRequest/Response
    Note right of F: PublicationTransportHandler
::handleIncomingPublishRequest
Deserializes state
    Note right of F: CoordinationState
::handlePublishRequest
Validates term/version,
persists as lastAcceptedState
    F->>M: PublishResponse (term, version)
    Note left of M: CoordinationState
::handlePublishResponse
Collects responses until quorum
    end

    rect rgb(255, 220, 220)
    alt Quorum not reached
        Note over M: FailedToCommitClusterStateException
Master steps down, new election begins
    end
    end

    rect rgb(255, 248, 240)
    Note over M,F: Phase 2 — Commit
    M->>F: ApplyCommitRequest
    M->>M: Local ApplyCommitRequest/Response
    Note right of F: Coordinator::handleApplyCommit
Marks last accepted state as committed
    Note right of F: ClusterApplierService
::onNewClusterState
Applies new ClusterState
    F->>M: ACK (after application completes)
    Note left of M: ClusterApplierService
::onNewClusterState
Master applies its own state
    end

Once the MasterService has computed a new ClusterState, it passes it to the Coordinator, which is responsible for then sharing with it with all nodes in the cluster. This publication process follows a two-step commit protocol. The progress of the publication for each follower node is tracked via a PublicationTarget object which maintains a PublicationTargetState state.

The Coordinator creates a PublicationContext via the PublicationTransportHandler, which pre-serializes the cluster state into shared, reference-counted byte buffers. A ClusterStateDiff against the previous state is prepared. A full serialization is also prepared when the previous state's blocks had its disable state persistence flag set, or when a node present in the new state was absent from the previous one (since that node will not have a previous state to apply a diff against). It then builds a PublishRequest and creates a CoordinatorPublication to coordinate the protocol.

  1. Publish

The master then sends the serialized cluster state to every node in the cluster. As an optimization, the diff is sent to nodes that were already part of the cluster, while new nodes receive the full state directly. If a node responds with IncompatibleClusterStateVersionException the PublicationTransportHandler then retries with the full state.

The recipient nodes verify the received state is valid, compare it against their local state via CoordinationState.handlePublishRequest and send back a PublishResponse containing the new cluster state term and version. At that point, the recipient nodes have persisted the new state but have not yet applied it.

  1. Commit

Once the master has collected enough PublishResponses from master-eligible nodes to satisfy quorum ( see Quorum section), it creates an ApplyCommitRequest. The master then sends this commit message to all nodes that responded to the PublishRequest.

When receiving this request, each node marks this last accepted state as committed and proceeds to apply the new state (see Cluster State Application section). The node only ACKs the ApplyCommitRequest back to the master after application completes (i.e. after ClusterApplierService.onNewClusterState finishes).

If the master cannot achieve a PublishResponse quorum (e.g. too many nodes are faulty), the Publication will fail the entire publication with a FailedToCommitClusterStateException. The master will then step down (it can no longer be master if it is not able to publish cluster states) and a new election begins.

Once the ApplyCommitRequests have been sent, the master will try waiting for all nodes to apply the new state before applying its own state and moving on. No quorum of responses is needed at that point. If follower nodes time out, the master will still move on to applying the new cluster state locally.

Cluster State Application

When a new ClusterState is committed, the follower nodes need to apply the committed state locally. This is the responsibility of the ClusterApplierService class, which runs on a single dedicated thread.

When receiving an ApplyCommitRequest from the master, the Coordinator will hand over the committed state to the ClusterApplierService, which will submit an UpdateTask to the executor. If the new state differs from the currently applied state (by reference), the service applies the changes through the following sequence of steps:

  1. A ClusterChangedEvent is created, holding the new state, the previous state, and their deltas.

  2. The node opens network connections to newly added nodes (blocking call).

  3. If needed, the updated cluster settings are applied.

  4. ClusterStateAppliers (which update the node's internal state) are invoked in priority order. Example high-priority appliers are IndicesClusterStateService, which creates, deletes and updates local shard copies, and RepositoriesService, which manages snapshot repository lifecycle.

  5. Transport connections to nodes that are no longer in the cluster are closed (non-blocking call).

  6. ClusterApplierService::state is updated to the new state, making it visible to the rest of the classes on the node.

  7. ClusterStateListeners are notified. Unlike appliers, listeners run after the state is visible. Examples include the PersistentTasksNodeService which reacts to changes in persistent tasks, and the SnapshotShardsService which watches for snapshot-related shard assignments.

Once application completes, the Coordinator's callback onClusterStateApplied closes the election scheduler (if active) and stops peer finding, since the node is now part of a cluster with a functioning master.

Persistence

As mentioned in previous sections, only the Metadata portion of the ClusterState is persisted to disk ( see Persisted State). The rest of the ClusterState (routing table, node membership, in-progress snapshots ... etc.) is ephemeral and rebuilt from scratch after each full cluster restart.

The PersistedClusterStateService is responsible for storing the cluster metadata in a simple Lucene index in each of the node's data paths, under the _state directory. The metadata is split across several documents and is written incrementally where possible.

The PersistedClusterStateService uses 3 types of documents, each identified by a type string field: global, index and mapping. Each document type stores its content in the data field as compressed SMILE-encoded XContent. All documents are paged (default 1MB per page).

The global document serializes almost all the Metadata object: coordination metadata, persistent settings, customs, all project-scoped data, except individual index metadata and mappings. The index documents (one document per index, keyed by index_uuid) store all the IndexMetadata: settings, aliases, number of shards, etc., except for the index mapping. The mapping documents (one document per mapping, keyed by mapping_hash) store index mappings. Storing mappings separately is a deduplication optimization: multiple indices that share the same mapping (common occurrence, eg in data streams) all reference the same mapping hash, so only one copy is stored.

During incremental writes, the PersistedClusterStateService will first add new mappings and delete obsolete ones. It will then compare the index metadata for each index UUID between the old and new state. If the version changed, the old document is deleted and a fresh one is written. Unchanged indices are skipped entirely. The global document update is not incremental. If the new state's global fields differ from the old ones, the global document is deleted and rewritten entirely.

Each Lucene commit also records the current term, the last accepted cluster state version, the node ID, the node version, the oldest index version (used for compatibility checks on node startup) and the cluster UUID in the commit user data.

The coordination layer interacts with the recorded state on disk through the PersistedState interface, which the CoordinationState calls at three key points during the publication protocol:

On master-eligible nodes, the PersistedState implementation is LucenePersistedState, which writes synchronously on the cluster coordination thread. On non-master-eligible nodes, the AsyncPersistedState wrapper is used instead. It applies updates to an in-memory state immediately and queues the disk write to a background thread. This avoids blocking the coordination thread on disk I/O for nodes that do not participate in master elections.

On startup, the node reads the Lucene index, selects the persisted state with the highest accepted term, and reconstructs the Metadata from the stored documents and commit user data. This metadata is then used to build the initial ClusterState that the node starts with.

Master Elections

The master's core responsibility is updating the ClusterState and propagating the changes to other nodes.

The cluster maintains (conceptually at least, see term section) at most a single master at all times. If no master is present, master-eligible nodes will attempt to become the master by starting an election. The cluster will not be able to commit any ClusterState changes until a new master is elected.

To elect a master, Elasticsearch uses a consensus algorithm derived from Paxos (see also Paxos Made Simple and Raft for more approachable resources). This algorithm is formally defined in a TLA+ specification referenced from the CoordinationState class. The CoordinationState class is responsible for maintaining the safety property of the algorithm, whereas the Coordinator ensures liveness, by guaranteeing progress in cases of timeouts, failures and/or conflicts. The Coordinator also manages a node's transition between CANDIDATE,LEADER, and FOLLOWER modes, and constructs the main data structures used for cluster state publication.

The CoordinationMetadata instance stored in ClusterState::Metadata, tracks the coordination-related state needed for subsequent master elections.

Quorum

A quorum is defined as a strict majority (votedNodesCount * 2 > nodeIds.size()) of the nodes listed in a VotingConfiguration. CoordinationMetadata maintains two such configurations: lastCommittedConfiguration and lastAcceptedConfiguration. Most of the time, these two configurations are equal. They only diverge during voting configuration changes. The Reconfigurator handles the logic for computing the voting configuration changes during the transition.

To guarantee safety during voting configuration updates, an election will only succeed if quorum is achieved for both configurations.

Terms

Every election takes place in a numbered term. Terms are monotonically increasing longs that act as logical clocks for the coordination layer, allowing the cluster to distinguish between master mandates coming from distinct elections. They are persisted via CoordinationState.PersistedState. A term can have at most one master. A node will never vote in the same term twice.

Note that nodes may not necessarily be aware of the same latest term at the same physical time. The practical consequence of this is that two nodes may temporarily both believe they are the master at the same physical time. But because there is only ever a single master per term, terms are always monotonically increasing and cluster state updates require a quorum of votes, at most one of them will be able to update the cluster state.

Election Flow

Below is an overview of the election process, including the main code components for each step. Subsequent sections provide more detailed explanations of the mechanics and rationale behind each step.

  1. Leader failure detected.

A follower detects current master failure.

LeaderChecker
Coordinator.onLeaderFailure()
  1. Node becomes CANDIDATE.

The node transitions to CANDIDATE mode and triggers the discovery process.

Coordinator.becomeCandidate()
Mode.CANDIDATE
PeerFinder.activate(...)
  1. Peer discovery

The candidate probes master-eligible nodes from the last accepted state and the configured hosts resolver. It asks if there is a current master and what other master-eligible nodes are in the cluster.

PeerFinder.handleWakeUp()
ConfiguredHostsResolver 
PeerFinder.startProbe(...) 
PeersRequest
PeersResponse
  1. Candidate schedules election.

If no current master is discovered and the node has enough master-eligible peers to form a quorum, the candidate schedules an election (with randomized backoff to minimize conflicts with other potential candidates).

CoordinatorPeerFinder.onFoundPeersUpdated()
Coordinator.startElectionScheduler()
ElectionScheduler.scheduleNextElection()
  1. Pre-vote round.

The election starts. The candidate double checks it is healthy and eligible to be master. The candidate sends pre-vote requests to master-eligible peers.

NodeEligibility
PreVoteCollector.start(...)
PreVoteRequest
PreVoteResponse
  1. Pre-vote quorum reached

If enough PreVoteResponses are received to satisfy quorum, the candidate proceeds to the real election.

ElectionStrategy.isElectionQuorum(...)
Coordinator.startElection()
  1. Candidate bumps term and broadcasts StartJoinRequest.

The candidate computes the new term: max(current, seen) + 1 and sends StartJoinRequest with the new term to all nodes discovered in previous step.

Coordinator.getTermForNewElection()
Coordinator.broadcastStartJoinRequest(...)
StartJoinRequest
  1. Other nodes start the join process

Master-eligible nodes ack the StartJoinRequest, persist the latest term, verifies they are healthy enough to cast a vote, and send a JoinRequest to the candidate.

CoordinationState.handleStartJoin(...)
CoordinationState.setCurrentTerm(...)
JoinHelper.sendJoinRequest(...)
  1. Candidate becomes LEADER

If enough JoinRequests are received to satisfy quorum, the candidate becomes LEADER.

Coordinator.processJoinRequest(...)
CoordinationState.handleJoin(...)
ElectionStrategy.isElectionQuorum(...)
Coordinator.becomeLeader()
Mode.LEADER
  1. Leader completes election

The leader publishes the cluster state, cleans up discovery connections, and starts heartbeating.

CandidateJoinAccumulator.close(...)
JoinTask
LeaderHeartbeatService.start(...)

Failure Detection

The cluster uses two complementary checks to detect node failures: the LeaderChecker (run by followers) and the FollowersChecker (run by the leader).

Peer-to-peer communication in Elasticsearch typically involves two distinct TCP connections, so having both checks is necessary to ensure full connectivity. They also serve distinct conceptual purposes: when the leader becomes unreachable, followers must detect it and start an election, whereas when a follower becomes unreachable, the leader will remove it from the cluster so its shards can be reallocated.

Each follower runs a LeaderChecker that periodically sends a LeaderCheckRequest to the current master (every 1 second by default). Upon receiving the request, the master will verify that its local node is healthy, that it is still the elected master, and that the requesting node is part of the current cluster state. If all conditions are met, it will then ack the request.

If the check fails more times than the configured limit (3 by default), if the master reported itself as unhealthy or if the master disconnected, the follower will conclude that the master has failed. The follower will stop the periodic check and notify the Coordinator, which will transition the node to CANDIDATE mode and start the election process.

On the master side, the FollowersChecker class maintains a map of nodes in the current cluster state to FollowerChecker objects.

Each FollowerChecker periodically (by default every 1s) sends a FollowerCheckRequest containing the current term and the identity of the master node. When receiving this request, each follower will first verify that its local node is healthy (and throw if not), and then check whether the request term matches its current term. If it does, then the follower will ack the request right away. Otherwise, if the request term is higher than the node current term, the request is handed off to the Coordinator so that the node can update its local state and become follower of the master in the new term.

If the follower disconnects or the check fails too many times (3 by default), the master will conclude that the node has failed and add it to its set of faultyNodes and untrack its corresponding FollowerChecker object. The Coordinator will get notified and eventually remove the node from the set of discovered nodes in the ClusterState (which will cause its shards to be reallocated to other healthy nodes).

Discovery

Discovery is a fast "gossip-like" protocol by which a node in CANDIDATE mode locates master-eligible nodes in the cluster and/or the master itself. A node enters CANDIDATE mode either on startup or after detecting a master failure. The PeerFinder class contains the core logic for the discovery process.

Discovery starts with both the nodes from the last accepted cluster state and a set of resolved seed addresses, that are obtained from the different SeedHostsProvider implementations:

  • SettingsBasedSeedHostsProvider: reads addresses from the discovery.seed_hosts config. This config is read on startup and non-refreshable.
  • FileBasedSeedHostsProvider: reads addresses from the runtime-refreshable unicast_hosts.txt file in the config directory. Note that because it's dynamic, using this file is usually preferred to the discovery.seed_hosts config.
  • Additional seed providers registered by users via the DiscoveryPlugin. One example of such plugin is the AwsEc2SeedHostsProvider, which uses the AWS EC2 client to find the relevant instances that could host elasticsearch nodes.

SeedHostsResolver is in charge of converting the seed hosts strings to transport addresses using DNS resolution.

When discovery is active, PeerFinder will run handleWakeUp every 1s (by default). Each iteration will clean up disconnected peers, send a PeersRequest to known master-eligible nodes (from the last accepted state and the resolved seed addresses) and schedule the next handleWakeUp iteration

When receiving a PeersResponse, PeerFinder will reach out to all peers specified in the response, including a potential master. If the peer node reports itself as the master and its term is greater or equal to the local one, then the node will update its local term and try to join the master.

Because the node will eagerly reach out to every peer it learns about (both via the content of a PeersResponse or even when receiving a new PeersRequest), the discovery protocol will find every other master-eligible node in at most roughly ⌈log₂(D)+1⌉ steps, where D is the diameter of the graph of seed host configurations.

Note that discovery connections are managed separately from the regular cluster transport connections: they are held by the PeerFinder and used only during discovery. Once a master is found or elected, all discovery connections are released.

Pre-Vote

The pre-vote phase takes place before a candidate bumps the term. It is meant to prevent a partitioned node from disrupting a healthy term by forcing a re-election when it rejoins the cluster. It ensures that a candidate can only proceed to broadcast a higher term once a quorum of peers agrees that there is no active leader.

When receiving a PreVoteRequest from a candidate node, the receiver will check whether it currently knows of an active leader. If not, and if the receiver is healthy, it will reply with a PreVoteResponse containing its currentTerm, lastAcceptedTerm, and lastAcceptedVersion. If it still considers a different node to be the leader, it will reject the request.

Pre-vote rounds are triggered by the ElectionSchedulerFactory, which schedules each attempt after a random delay that grows linearly with the number of failed attempts (up to a configured maximum). This randomized backoff reduces the chances that two candidates will run pre-vote rounds simultaneously, avoiding conflicts and giving each attempt enough time to complete before the next one begins.

For additional details on the theory behind the pre-voting phase, see the Pre-voting in distributed consensus article. This blog post also describes a real-world example of a liveness failure caused by this check being omitted.

Join

The Join process is the mechanism by which a node is added to the ClusterState by the master. Most of the join-related logic is located in the Coordinator class, which still delegates specific components to its JoinHelper instance.

Within a master term, a node that starts up will send a JoinRequest to the discovered master (see Discovery section for more details) so it can get added (back) to the cluster. The master will handle the incoming join request by first validating that it can connect back to the joining node, that the joining node is able to deserialize the current ClusterState, and that the request's term and version match its local ones. Then, the master will trigger voting reconfiguration if needed, and submit a JoinTask to the master queue to add the new node to the ClusterState.

During a master election, JoinRequests from peer nodes also act as votes (via the optional Join field) for the candidate that started the election. The join requests are then processed slightly differently by the candidate node. Rather than being submitted immediately to the master queue, incoming joins are buffered by the joinAccumulator. Each Join vote is recorded in CoordinationState. Once quorum is reached, the candidate will become leader, and flush all accumulated joins into a single JoinTask for the master queue to process.

Serverless Elections

Stateful and Serverless ES share the same Coordinator core logic, but they differ in how they check leader liveness, and define quorum.

In Serverless, the elected master periodically writes a Heartbeat (containing the current term and timestamp) to an external blob store, the HeartbeatStore, via the StoreHeartbeatService. Other nodes check whether the leader is alive by reading the latest heartbeat from the store.

In Serverless, the master uses a single node voting configuration with only itself (defined by the SingleNodeReconfigurator). Committing and publishing a new cluster state therefore only requires the master's own acknowledgment. Similarly, during elections, the pre-vote phase does not contact any peers. Instead, the AtomicRegisterPreVoteCollector solely checks the latest heartbeat from the blob store and starts the election if it's older than the configured threshold (default 30 seconds). To become master, the candidate node will atomically override the current term in the blob store via a CAS operation.

New Cluster Formation

When a cluster starts from scratch, no persisted cluster state exists on disk. The PersistedClusterStateService returns an empty on-disk state with a CoordinationMetadata containing VotingConfiguration.EMPTY_CONFIG for both the last-committed and last-accepted configurations. The cluster first needs to bootstrap and establish its initial voting configuration before an election can take place. The ClusterBootstrapService is responsible for this one-time bootstrapping process and runs on master-eligible nodes.

For production clusters, the operator has to provide the list of master-eligible nodes that should form the initial cluster via the cluster.initial_master_nodes setting. ClusterBootstrapService uses this setting to construct its bootstrapRequirements.

On startup, a master-eligible node will first become candidate, which will also start the discovery process (see Discovery section). Each time the PeerFinder reports newly discovered peers, ClusterBootstrapService will check whether the discovered nodes satisfy the bootstrap requirements, i.e. when a strict majority of the nodes specified in cluster.initial_master_nodes are found.

When this condition is met, ClusterBootstrapService will construct a VotingConfiguration from the discovered node IDs and set the initial cluster state. The node will then schedule an election (see Master Elections for more details). Any requirements that could not be matched to a discovered node are added as placeholder entries. These placeholders occupy slots in the voting configuration's nodeIds set and affect the quorum size, but they cannot cast a vote until the real nodes come online and replace them.

When cluster.initial_master_nodes is not present, and no discovery config is provided either, ClusterBootstrapService will schedule a best-effort bootstrap after the discovery.unconfigured_bootstrap_timeout (default 3 seconds). This simply uses all master-eligible nodes discovered so far. This is inherently unsafe and is only intended for development and testing.

Single-Node Discovery

If discovery.type: single-node is set, ClusterBootstrapService uses the local node name as the sole bootstrap requirement. The node bootstraps itself immediately.

Disaster Recovery

If more than half of the master-eligible nodes are permanently lost and no snapshot is available, the last-resort UnsafeBootstrapMasterCommand (elasticsearch-node unsafe-bootstrap) can force a single surviving master-eligible node to become the new cluster leader. It works by setting both voting configurations to a single-node configuration containing only the local node and regenerating the cluster UUID.

The remaining data nodes cannot join the newly bootstrapped master because their persisted state still references the old cluster. The DetachClusterCommand (elasticsearch-node detach-cluster) sets both their voting configurations to VotingConfiguration.MUST_JOIN_ELECTED_MASTER, containing the single node ID _must_join_elected_master_, which prevents nodes from starting their own elections and forces them to discover and join the already-elected master. It then resets the persisted term to 0 and marks clusterUUIDCommitted as false.

Master Transport Actions

Many cluster operations (creating indices, managing snapshots, etc.) run on the elected master node because they result in ClusterState updates. The TransportMasterNodeAction class is the base class for such operations. It provides a common framework that handles routing requests to the current master, retrying when the master changes, and checking for cluster blocks.

See TransportMasterNodeAction Javadoc for a detailed description of the execution flow and retry mechanism.

Replication

(More Topics: ReplicationTracker concepts / highlights.)

What is a Shard

Primary Shard Selection

(How a primary shard is chosen)

Versioning

(terms and such)

How Data Replicates

(How an index write replicates across shards -- TransportReplicationAction?)

Consistency Guarantees

(What guarantees do we give the user about persistence and readability?)

Locking

(rarely use locks)

ShardLock

Translog / Engine Locking

Lucene Locking

Engine & Store

IndexShard

The IndexShard class is the single entry point for all shard-level operations: indexing, deletion, real-time GET, refresh, flush, recovery, and snapshot. There is exactly one IndexShard object per allocated shard.

An IndexShard holds references to:

  • The shard's Store, which wraps a Lucene Directory and provides access to the shard's Lucene index files on disk.
  • The shard's Engine, which manages all indexing and search operations for this shard, writing to both the Translog and the Lucene files managed by the Store.

The lifecycle of these objects (creation, recovery, and teardown) is controlled by the IndicesClusterStateService, which reacts to cluster state changes and updates local state accordingly (see the IndicesClusterStateService section).

Store

The Store is the lowest-level Elasticsearch persistence abstraction for a shard. Each shard has a single dedicated Store that wraps a Lucene Directory, which is Lucene's own file-system abstraction used to read and write index files on disk. Lucene's Directory is a pure I/O abstraction: callers open an IndexInput to read a named file and create an IndexOutput to write one. The Store builds on the Lucene Directory capabilities by adding reference counting and corruption detection, exposing committed file metadata and enforcing integrity invariants.

Reference Counting and Lifecycle

The Store implements RefCounted. Callers call store.incRef() before using it and store.decRef() in a finally block when done. Once the reference count drops to zero the store is closed and the underlying Lucene directory is cleaned up. The Store also receives a ShardLock at construction time and only releases it once closed, allowing other threads waiting to acquire the lock for this shard to proceed.

Backing Directory

The Lucene Directory used by a Store is created by an IndexStorePlugin.DirectoryFactory. The default built-in implementation is FsDirectoryFactory, which supports several store types selectable via the index.store.type setting:

  • hybridfs (default): a HybridDirectory that delegates to a MMapDirectory for performance-sensitive file types (postings, term vectors index, norms, vectors, etc.) and NIOFSDirectory for files where sequential access is preferred (e.g. stored fields). Direct I/O is also enabled for vector index files when supported by the OS. The HybridDirectory selects between mmap and NIO on a per-file basis by checking the file's Lucene extension and I/O context.
  • mmapfs: uses MMapDirectory for all files.
  • niofs: uses NIOFSDirectory for all files.
  • fs: automatically selects the best type for the underlying file system.

MetadataSnapshot

The Store exposes MetadataSnapshots for each index commit, read and constructed from the Lucene segments_N files. A MetadataSnapshot is a point-in-time map from filename to StoreFileMetadata for the committed files belonging to a Lucene index commit, produced by an Elasticsearch flush. Each StoreFileMetadata includes the file's name, on-disk length, CRC32 checksum (from the Lucene file footer), the Lucene version that wrote it, and a writerUuid that uniquely identifies the writer.

MetadataSnapshots are leveraged by several Elasticsearch workflows, including peer recovery and replica shard allocation (via TransportNodesListShardStoreMetadata). They are used to compare the on-disk state of two distinct shards and calculate how much data needs to be transferred to bring them into sync.

Concurrency

Access to a shard's on-disk data is protected by three layers of locking, each scoped to a different level.

The ShardLock is a node-wide, coarse-grained lock managed by NodeEnvironment. It is backed by a Semaphore and guarantees that at most one owner at a time has write access to a given shard directory within a JVM process. The Store is given a ShardLock at creation time. It holds this lock for its entire lifetime, ensuring that write operations (e.g. creating an IndexWriter, deleting shard files, or recovering from another shard) have exclusive access to the shard directory. Callers that need to access the directory without a live Store (e.g. TransportNodesListShardStoreMetadata reading metadata for allocation decisions) acquire a temporary ShardLock for the duration of the read.

The metadataLock is an in-process ReentrantReadWriteLock inside the Store. It guards access to the directory's file listing and segment metadata. Operations that only read metadata (e.g. getMetadata with an existing commit) take the read lock, allowing concurrent readers. Operations that structurally modify the directory (e.g. renaming temporary recovery files via renameTempFilesSafe, cleaning up stale files via cleanupAndVerify, or running CheckIndex) take the write lock, which excludes all readers and other writers until the operation completes.

Lucene's IndexWriter write lock is a native file lock (write.lock) that Lucene places in the index directory. It prevents two IndexWriter instances from ever opening the same directory simultaneously, even across processes. When the Store needs to read metadata from a directory that has no active engine (e.g. during getMetadata(commit=null, lockDirectory=true)), it acquires the Lucene write lock together with the metadataLock write lock, ensuring no writer can interfere. In normal operation, the running InternalEngine already holds this lock through its IndexWriter.

To summarize, ShardLock is a JVM-level lock enforced across the entire node, metadataLock coordinates in-process readers and writers within the Store, and the Lucene write lock guards the raw directory at the file-system level.

Corruption

The Store maintains corruption markers as special files prefixed with corrupted_ written into the Lucene directory. When an unrecoverable I/O error or checksum mismatch is detected, a marker file is written containing the serialized exception. Subsequent calls to failIfCorrupted() scan for these marker files and throw a CorruptIndexException if any are found, preventing any further operations on a known-bad shard. Corruption markers are removed only after the shard has been successfully recovered from another source (e.g. a primary or a snapshot).

Engine

The Engine abstract class is the Elasticsearch abstraction that manages and coordinates operations on the running shard index. Where the Store manages files on disk, the Engine owns the write path (indexing, deletion, no-ops), the read path (searcher acquisition, real-time GET), and Lucene lifecycle operations (refresh, flush, merge). It also controls the translog, ensuring that every acknowledged write is durably recorded before a response is sent.

The main implementation is InternalEngine, used for all read-write shards (primary and replica). Other implementations serve more specialized roles. For example, the ReadOnlyEngine gives read-only access to a frozen shard, and the NoOpEngine acts as a placeholder for shards belonging to a closed index. It exists to allow shards of closed indices to be correctly replicated in case of a node failure.

Segments, Refresh, and Flush

Lucene organizes index data on disk into immutable files called segments. They are created whenever the in-memory write buffer is flushed or when merges combine existing segments into larger ones. The IndexWriter accumulates writes in memory and periodically flushes new segments. The InternalEngine wraps an IndexWriter (for writes) and a pair of internal and external reader manager that wrap Lucene DirectoryReaders and map Elasticsearch concepts onto Lucene's:

  • An Elasticsearch refresh triggers a Lucene NRT reader reopen (via DirectoryReader.openIfChanged()), making recently indexed documents searchable without writing a full commit to disk. Refreshes do not call IndexWriter.commit() and do not persist data durably.
  • An Elasticsearch flush calls IndexWriter.commit(), writing a durable commit point to disk. A flush is what allows the translog to be safely truncated up to that commit.

This distinction is a core piece of Elasticsearch's durability model. Documents are searchable after a refresh but only durably stored after a flush. In between, the translog bridges the gap. Every write is appended to the translog on disk before being acknowledged, so that it can be replayed in the event of a crash.

The detailed mechanics of how a write flows through the engine (including translog interaction, sequence number assignment, and version map updates) are covered in the Translog and Indexing / CRUD sections.

IndicesClusterStateService

The IndicesClusterStateService is a high-priority ClusterStateApplier of the ClusterApplierService ( see Cluster State Application). Any time a new cluster state is published, IndicesClusterStateService checks whether the state of indices and shards has changed and updates the local node's shards to match accordingly.

Its doApplyClusterState method goes through the following operations in order each time a new cluster state is applied:

  1. Delete indices that no longer exist in the new cluster state: closes each IndexShard (which closes its Engine, flushing first), releases the Store reference, and deletes the on-disk shard directory.
  2. Remove shards that are no longer assigned to this node: closes the Engine (without flushing, unflushed data is already in the translog and safe), releases the Store reference, but leaves the on-disk files intact for later cleanup by IndicesStore.
  3. Update index metadata: if index settings changed, propagates them in-memory to each IndexShard and then to its Engine (e.g. merge scheduler config, GC deletes policy, soft-delete retention). If mappings changed, updates the MapperService. No disk writes happen here.
  4. Create or update shards: for each ShardRouting targeting this node in INITIALIZING state, creates a new Store and IndexShard and kicks off recovery. For already-active shards, updates their routing metadata in-place.

The below diagram illustrates the end-to-end flow from a master publishing a new cluster state containing a newly assigned shard down to the Engine becoming active.

mermaid
sequenceDiagram
    participant M as Master Node
    participant CA as ClusterApplierService
    participant ICSS as IndicesClusterStateService
    participant IS as IndexShard
    participant S as Store
    participant E as InternalEngine

    rect rgb(255, 248, 240)
    Note over M,CA: Cluster State Committed
    M->>CA: ApplyCommitRequest (new ClusterState)
    CA->>ICSS: applyClusterState(ClusterChangedEvent)
    end

    rect rgb(240, 248, 255)
    Note over ICSS,S: Shard Creation
    ICSS->>ICSS: doApplyClusterState()
deleteIndices()
removeIndicesAndShards()
updateIndices()
    ICSS->>S: new Store(shardDirectory)
    ICSS->>IS: new IndexShard(store, shardRouting, ...)
    ICSS->>IS: startRecovery(recoverySource)
    end

    rect rgb(240, 255, 240)
    Note over IS,E: Recovery and Engine Start
    IS->>S: validate / prepare directory
    IS->>E: new InternalEngine(engineConfig)
    E->>S: IndexWriter.open(store.directory)
    Note right of E: Engine open, shard STARTED
    end

    rect rgb(255, 240, 240)
    Note over IS,M: Report Back to Master
    IS->>M: ShardStateAction.shardStarted()
    Note left of M: Master updates ClusterState
marks shard as STARTED
    end

For a newly assigned shard, createShard first acquires a ShardLock (preventing concurrent access to the same shard directory from a concurrent request) or throws a ShardLockObtainFailedException if it fails to do so. It then creates a Store referencing the shard's on-disk directory, and instantiates an IndexShard. Recovery is then triggered based on the RecoverySource in the ShardRouting.

The details of recovery, including how the Engine is created and started, are covered in the Peer Recovery, Snapshot Recovery, and Local Shards Recovery sections.

Translog

It is important to understand first the Basic write model of documents: documents are written to Lucene in-memory buffers, then "refreshed" to searchable segments which may not be persisted on disk, and finally "flushed" to a durable Lucene commit on disk. If this was the only way we stored the data, we would have to delay the response to every write request until after the data had been flushed to disk, which could take many seconds or longer. If we didn't, it would mean that we would lose newly ingested data if there was an outage between sending the response and flushing the data to disk. For this reason, newly ingested data is also written to a shard's Translog, whose main purpose is to persist uncommitted operations (e.g., document insertions or deletions), so they can be replayed by just reading them sequentially from the translog during recovery in the event of ephemeral failures such as a crash or power loss. The translog can persist operations quicker than a Lucene commit, because it just stores raw operations / documents without the analysis and indexing that Lucene does. The translog is always persisted and fsync'ed on disk before acknowledging writes back to the user. This can be seen in InternalEngine which calls the add() method of the translog to append operations, e.g., its index() method at some point adds a document insertion operation to the translog. The translog ultimately truncates operations once they have been flushed to disk by a Lucene commit; indeed, in some sense the point of a "flush" is to clear out the translog.

Main usages of the translog are:

  • During recovery, an index shard can be recovered up to at least the last acknowledged operation by replaying the translog onto the last flushed commit of the shard.
  • Facilitate real-time (m)GETs of documents without refreshing.

Translog Truncation

Translog files are automatically truncated when they are no longer needed, specifically after all their operations have been persisted by Lucene commits on disk. Lucene commits are initiated by flushes (e.g., with the index Flush API).

Flushes may also be automatically initiated by Elasticsearch, e.g., if the translog exceeds a configurable size INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING or age INDEX_TRANSLOG_FLUSH_THRESHOLD_AGE_SETTING, which ultimately truncates the translog as well.

Acknowledging writes

A bulk request will repeatedly call ultimately the Engine methods such as index() or delete() which adds operations to the Translog. Finally, the AfterWrite action of the TransportWriteAction will call indexShard.syncAfterWrite() which will put the last written translog Location of the bulk request into a AsyncIOProcessor that is responsible for gradually fsync'ing the Translog and notifying any waiters. Ultimately the bulk request is notified that the translog has fsync'ed past the requested location, and can continue to acknowledge the bulk request. This process involves multiple writes to the translog before the next fsync(), and this is done so that we amortize the cost of the translog's fsync() operations across all writes.

Translog internals

Each translog is a sequence of files, each identified by a translog generation ID, each containing a sequence of operations, with the last file open for writes. The last file has a part which has been fsync'ed to disk, and a part which has been written but not necessarily fsync'ed yet to disk. Each operation is identified by a sequence number (seqno), which is monotonically increased by the engine's ingestion functionality. Typically the entries in the translog are in increasing order of their sequence number, but not necessarily. A Checkpoint file is also maintained, which is written on each fsync operation of the translog, and is necessary because it records important metadata and statistics about the translog, such as the current translog generation ID, its last fsync'ed operation and location (i.e., we should read only up to this location during recovery), the minimum translog generation ID, and the minimum and maximum sequence number of operations the sequence of translog generations include, all of which are used to identify the translog operations needed to be replayed upon recovery. When the translog rolls over, e.g., upon the translog file exceeding a configurable size, a new file in the sequence is created for writes, and the last one becomes read-only. A new commit flushed to the disk will also induce a translog rollover, since the operations in the translog so far will become eligible for truncation.

A few more words on terminology and classes used around the translog Java package. A Location of an operation is defined by the translog generation file it is contained in, the offset of the operation in that file, and the number of bytes that encode that operation. An Operation can be a document indexed, a document deletion, or a no-op operation. A Snapshot iterator can be created to iterate over a range of requested operation sequence numbers read from the translog files. The sync() method is the one that fsync's the current translog generation file to disk, and updates the checkpoint file with the last fsync'ed operation and location. The rollGeneration() method is the one that rolls the translog, creating a new translog generation, e.g., called during an index flush. The createEmptyTranslog() method creates a new translog, e.g., for a new empty index shard. Each translog file starts with a TranslogHeader that is followed by translog operations.

Some internal classes used for reading and writing from the translog are the following. A TranslogReader can be used to read operation bytes from a translog file. A TranslogSnapshot can be used to iterate operations from a translog reader. A MultiSnapshot can be used to iterate operations over multiple TranslogSnapshots. A TranslogWriter can be used to write operations to the translog.

Real-time GETs from the translog

The Get API (and by extension, the multi-get API) supports a real-time mode, which can query documents by ID, even recently ingested documents that have not yet been refreshed and not searchable. This capability is facilitated by another data structure, the LiveVersionMap, which maps recently ingested documents by their ID to the translog location that encodes their indexing operation. That way, we can return the document by reading the translog operation.

The tracking in the version map is not enabled by default. The first real-time GET induces a refresh of the index shard, and a search to get the document, but also enables the tracking in the version map for newly ingested documents. Thus, next real-time GETs are serviced by going first through the version map, to query the translog, and if not found there, then search (refreshed data) without requiring to refresh the index shard.

On a refresh, the code safely swaps the old map with a new empty map. That is because after a refresh, any documents in the old map are now searchable in Lucene, and thus we do not need them in the version map anymore.

Index Version

The IndexVersion tracks the on-disk format of an index. It is conceptually similar to TransportVersion (which controls wire-protocol compatibility between nodes) but instead targets how index data and metadata are serialized to Lucene files. Every time the serialization format of mappings, postings, doc values, or any other persisted index structure changes, a new IndexVersion constant must be added.

The IndexVersion class contains two fields: an integer id and the Lucene Version that the index was written with. The stored Lucene version is used for Lucene API calls that depend on the version, such as reading segment metadata.

The IndexVersion class was introduced in 8.8.0. Before that, the node release Version was used for both purposes. Prior to 8.9.0 the id field was the same as the release version, for backwards compatibility. In 8.9.0 it changed to an incrementing number, and disconnected from the release version.

All known versions are declared as constants in IndexVersions (e.g. UPGRADE_TO_LUCENE_10_4_0, SEMANTIC_TEXT_FIELD_TYPE, or GENERIC_DENSE_VECTOR_FORMAT). This list serves as the source of truth for version-to-Lucene mappings and merge-conflict checks.

The IndexVersion is stamped on every index at creation time via the IndexMetadata index.version.created setting. This version is immutable for the lifetime of the index and determines which code paths are used when reading its data (PostRecoveryMerger optimization example).

A separate index.version.compatibility setting (defaulting to index.version.created) can be set to a newer version to opt in to newer behavior while retaining backward-compatible defaults. For example, when restoring a snapshot of an index with an index.version.created older than MINIMUM_READONLY_COMPATIBLE, RestoreService.convertLegacyIndex() sets index.version.compatibility to the minimum index version supported by the cluster's nodes and adds a write block, so that the index can be opened as a read-only archive. The subsequent paragraphs contain more details about index version compatibility.

Elasticsearch supports a two-major-version compatibility window for index data. Indices created in the current major version (N) or the previous major version (N-1) are fully supported for reading and writing. The minimum index version for this full support is defined by IndexVersions.MINIMUM_COMPATIBLE. Note that Lucene enforces similar constraints.

Indices created in the penultimate major version (N-2), i.e. older than MINIMUM_COMPATIBLE but at or above IndexVersions.MINIMUM_READONLY_COMPATIBLE, can only be opened in read-only mode. They must have been marked as read-only (with a write block) on the previous major version before upgrading. Their mappings and field types are still fully understood by the current version. Only writes are blocked.

Indices older than MINIMUM_READONLY_COMPATIBLE are classified as legacy. Their on-disk format is too old for the current version to fully understand their mappings or field types, so they are automatically converted to degraded read-only archives via RestoreService.convertLegacyIndex() when restored from a snapshot. Both MINIMUM_COMPATIBLE and MINIMUM_READONLY_COMPATIBLE are bumped with each new major release to maintain this window.

For more details on how index format compatibility interacts with upgrades, see the Index Format Backwards Compatibility section in the General Architecture Guide.

Lucene

Apache Lucene is the search and indexing library at the core of every Elasticsearch shard. Each shard is a Lucene index. Instead of treating Lucene as an opaque storage engine at the bottom of the stack, Elasticsearch deeply integrates with it. It customizes file I/O, codecs, merge behavior, and reader lifecycle.

For more details on how Elasticsearch wraps Lucene's Directory for file I/O, see the Store section. For how the IndexWriter and DirectoryReader are managed, including the distinction between "refresh" (NRT reopen) and "flush" (durable Lucene commit), see Segments, Refresh, and Flush.

Lucene File Layout

A Lucene index on disk is a collection of segment files plus a segments_N info file that records which segments belong to the latest commit. Each segment is a self-contained, immutable mini-index composed of several file types, each responsible for a different data structure:

ExtensionContent
.siSegment metadata (doc count, unique id, diagnostics, Lucene version).
.fnmField infos: field names, types (string, numeric, etc.), and index options (stored, indexed, doc values).
.fdm, .fdtStored fields metadata and data (original JSON _source, stored field values).
.tim, .tip, .doc, .pos, .payThe inverted index: term dictionary, postings lists, positions, and payloads.
.dvd, .dvmDoc values data and metadata (columnar storage for sorting, aggregations, scripting).
.tvd, .tvxTerm vectors data and index.
.nvm, .nvdNorms (per-field length normalization factors used in scoring).
.livLive documents bitset (tracks which docs have been deleted within the segment).
.vec, .vex, .vemKNN vector data and graph (HNSW or other ANN structure).
.kdm, .kdi, .kddPoints / BKD tree (numeric range queries, geo).

Check out the Lucene documentation for more details on each of those specific files. Elasticsearch also maintains a registry of all known Lucene file extensions in LuceneFilesExtensions.

The segments_N file and the write.lock file live at the directory root. A commit atomically publishes a new segments_N+1 as the active commit point, making the new set of segments visible. The old segment files are removed once they are no longer referenced by any open IndexReader or any retained commits. Elasticsearch's CombinedDeletionPolicy, which implements Lucene's IndexDeletionPolicy, manages which commits are retained. All commits more recent than the safe commit (the most recent commit whose max sequence number is at most the global checkpoint, used as the starting point for peer recovery) are preserved. Older commits can also be pinned by external consumers (e.g., snapshot operations). CombinedDeletionPolicy also communicates the safe commit checkpoint information with the translog deletion policy.

Codecs

Lucene's Codec abstraction lets each field type choose its own on-disk format. A codec is a bundle of format implementations: one for postings, one for stored fields, one for doc values, one for vectors, etc.

Elasticsearch ships its own codec stack, managed by the CodecService class. It extends the current Lucene codec (e.g. Lucene104Codec) with Elasticsearch-specific customizations:

  • The PerFieldMapperCodec delegates postings, doc values, and KNN vector format selection to each field's Mapper, allowing field types to choose specialized formats (See the PerFieldFormatSupplier for specific examples).
  • The DeduplicateFieldInfosCodec wraps codecs registered in CodecService to deduplicate FieldInfos metadata objects loaded from each segment's .fnm file. Field names, attribute keys, and attribute maps that are identical across segments are interned so they share a single object in memory, reducing heap usage on indices with many segments.
  • For time-series indices with synthetic IDs enabled (index.mapping.synthetic_id), the AbstractTSDBSyntheticIdCodec avoids writing the _id inverted index and stored field to disk. The _id field is still declared as indexed, but SyntheticIdField produces an empty token stream so no postings are actually written on flush. At read time, the TSDBSyntheticIdPostingsFormat reconstructs postings from _tsid, @timestamp, and _ts_routing_hash doc values, and the TSDBSyntheticIdStoredFieldsReader synthesizes _id stored-field values on the fly from the same doc values. An ES94BloomFilterDocValuesFormat bloom filter is also built for the _id field so that existence checks during indexing can avoid _tsid and @timestamp doc-values lookups per document. The synthetic ID feature reduces the storage overhead for high-cardinality time-series indices.

As the IndexVersion advances with each Lucene upgrade, new codec implementations are introduced (e.g. Elasticsearch92Lucene103Codec). Older segments written by previous codecs remain readable because Lucene's SPI mechanism loads the codec that originally wrote each segment.

Segment Merges

Because Lucene segments are immutable, updates and deletes cannot modify documents in place. Over time, small segments accumulate from successive flushes. Merges combine multiple segments into fewer, larger ones. Merges both reclaim space from obsolete documents and improve query performance by reducing the number of segments a search must visit.

MergePolicys decide which segments to merge and when. Elasticsearch configures this through MergePolicyConfig, which uses Lucene's TieredMergePolicy as the default base policy, and LogByteSizeMergePolicy for time-series indices. Then the InternalEngine wraps this base policy with several layers:

  • SoftDeletesRetentionMergePolicy: retains soft-deleted documents (more details about soft deletes in subsequent paragraphs).
  • PrunePostingsMergePolicy: drops the _id field postings for deleted documents during segment merges, maintaining consistent update performance even when a large number of soft deleted/updated documents are retained.
  • PruningMergePolicy: prunes _recovery_source stored fields and _seq_no-related fields for documents that no longer need them for replication.
  • ShuffleForcedMergePolicy: randomizes segment ordering during force merges so that recently-indexed documents are not always co-located at the end, improving the efficiency of time-based queries.

MergeSchedulers decide how merges are executed. The default implementation is ThreadPoolMergeScheduler, which runs merges on the ThreadPoolMergeExecutorService shared node-level thread pool with disk-aware I/O throttling.

Elasticsearch never hard-deletes documents from Lucene. Instead, both updates and deletes use Lucene's soft-delete mechanism. The InternalEngine calls IndexWriter.softUpdateDocument, which atomically marks the previous version of a document as soft-deleted (by setting a __soft_deletes numeric doc-values field to 1) and writes the new version or a delete tombstone into the current segment. The old document is also marked as deleted in the .liv bitset, just like a hard delete. The soft-delete update triggers a new generational .liv file for the segment that contained the previous version of the document. The difference between hard and soft deletes is that hard-deleted documents are always discarded during merges, whereas documents carrying the __soft_deletes marker can be selectively retained. Lucene's SoftDeletesRetentionMergePolicy decides which soft-deleted documents to keep by evaluating a retention query at merge time. Elasticsearch provides this query through its SoftDeletesPolicy, which evaluates whether a document can be discarded based on the minimum sequence number to retain (evaluated from the global checkpoint), the index.soft_deletes.retention.operations setting, and any active retention leases held by replicas or CCR followers.

Soft deletes are an essential feature for CCR. CCR followers replay operations from their leader Lucene index to stay up to date. Without soft deletes, the leader Lucene index could discard deleted documents during merges, leaving no trace that the delete operation ever occurred. CCR followers that fall behind would no longer be able to catch up via operation replay. Soft-deleted tombstones prevent this by preserving delete operations in the Lucene index until all followers have processed them.

Recovery

When a shard is created on a node, it starts out empty. Recovery is the process of loading the shard's data from some data source into the newly created IndexShard in order to make it available for index or search requests. When the shard allocation process has chosen a node for a shard, it records its choice by writing an updated ShardRouting record into the cluster state's IndexRoutingTable. The ShardRouting entry includes RecoverySource metadata that describes where the shard's data can be found based on the shard's previous allocation. For example, a shard for a newly created index will have its recoverySource set to EMPTY_STORE to indicate that recovery should bring up the shard without loading any existing data, while a recoverySource of EXISTING_STORE would tell recovery to load the shard from files already present on disk, likely because the node was restarted and had hosted the shard until it shut down.

The IndicesClusterStateService on each node listens for updates to the IndexRoutingTable and when it finds that a shard in state INITIALIZING has been assigned to its node, it creates a fresh IndexShard for the assigned shard and kicks off a recovery process for that node, using the RecoverySource in the ShardRouting entry to determine the parameters of the recovery process. The full list of recovery types is defined in RecoverySource.Type. The various modes are discussed below, roughly in order of complexity. Some modes build on others; for example, snapshot recovery sets up a local data store by copying files from a snapshot source and then uses EXISTING_STORE recovery. Similarly, if there is any local data, then peer recovery starts by using EXISTING_STORE recovery to bring the local shard as close to up to date as it can, and then finishes synchronizing the shard through RPCs (Remote Procedure Calls) to an active source shard.

At the end of the recovery process, recovery finalization marks the shard as STARTED in cluster state, which makes it available to handle index and search requests.

Create a New Shard

The simplest form of recovery is EMPTY_STORE, which is just what it sounds like. This recovery type causes the recovery process to invoke StoreRecovery for the recovery. StoreRecovery will create an empty shard directory on disk (deleting any existing files that may be present) and bootstrap an empty translog and then tell the Engine to use that directory. That's pretty much the whole process.

Restore from an Existing Directory

Only slightly more complex is EXISTING_STORE recovery, which is used when the node is expected to have an up-to-date copy of the shard's data already present on disk. Once again StoreRecovery manages the recovery process, but in this case it expects to find Lucene files and a translog already present. It validates the Lucene files, potentially dropping any incomplete commits that may not have been fsynced to disk before shutdown, tells the engine to open the directory as its backing store, and then replays the transaction log to replay any operations that may have been acknowledged to the client but not written into a durable Lucene commit. At that point the shard is ready to serve requests.

Snapshot Recovery

In snapshot recovery, the data source is a snapshot of the index shard stored on a remote repository. Snapshot recovery, also managed in StoreRecovery but invoked through the recoverFromRepository method, downloads and unpacks the snapshot from the remote repository into the local shard directory and then invokes the same logic as EXISTING_STORE to bring the shard on line, with some small differences. The key one is that the snapshot is taken from a Lucene commit, and so it does not need to store the shard translog when the snapshot is taken, or restore it during recovery. Instead it creates a new empty translog before bringing the shard on line.

Local Shards Recovery

Local shards recovery is a type of recovery that reuses existing data from other shard(s) allocated on the current node (hence local shards). It is used exclusively to implement index Shrink/Split/Clone APIs.

This recovery type uses HardlinkCopyDirectoryWrapper to hard link or copy data from the source shard(s) directory. Copy is used if the runtime environment does not support hard links (e.g., on Windows). Source shard(s) directories are added using the IndexWriter#addIndexes API. Once an IndexWriter is correctly set up with source shard(s), the necessary data modifications are performed (like deleting excess documents during split) and a new commit is created for the recovering shard. After that recovery proceeds using standard store recovery logic utilizing the commit that was just created.

Peer Recovery

Whereas the other recovery modes are used to bring up a primary shard, peer recovery is used to add replicas of an already active primary shard. Peer recovery is also used for primary shard relocation, but relocation goes through the standard peer recovery process to bring a replica in sync, before handing off the primary role to the recovered replica during finalization.

Unlike StoreRecovery, peer recovery is managed through a separate service on the node recovering the shard, the PeerRecoveryTargetService. When the IndexShard sees that its recovery source is of type PEER, it hands over the recovery process to PeerRecoveryTargetService by invoking its startRecovery method. This service begins by creating an in-memory record of the recovery process to track its progress, and then runs EXISTING_STORE recovery in case the recovering replica held a copy of the shard before that has gone out of sync (e.g., because the node holding the replica restarted). Because the shard is a replica, it only recovers up to the latest known global checkpoint for the shard and discards any operations in the local store that are ahead of that point (see [Translog][#Translog] for details). Once the local shard has been brought close to current, the service then sends a request to a corresponding service on the source node, PeerRecoverySourceService, to complete synchronization.

Synchronization begins by discovering any differences between the source and target shards and transmitting any missing files to the target shard. The source for the files can be the source shard itself, but if a snapshot of the shard is available that has some subset of the files to be transmitted, then recovery will fetch them from the snapshot in order to reduce load on the source shard.

The next step is to transfer any operations from the source translog. Since the source shard is active, it may be receiving index operations while recovery is in process. So, to ensure that the target shard doesn't miss any new operations, the source shard adds the target to the shard's replication group (see the [replication][#Replication] docs) before completing the operation transfer phase. Because of this ordering, any operations accepted on the shard between the time it reads and sends the latest operation in the translog and the time the replica completes recovery are sent through the request replication process and will not be lost. Once the target has been added to the recovery group, the source reads the latest sequence number from its transaction log knowing that any updates past that will be handled by recovery, and replays the translog to the target up to that point.

At this point the target is ready to be started as an in sync replica. However, peer recovery is also used to perform primary relocation. If the target shard is being recovered in order to take over as primary, then the finalization stage will call IndexShard.relocate to complete the handoff of primary responsibilities. This method blocks operations on the source shard and sends an RPC to the target shard with the ReplicationTracker.PrimaryContext needed to promote the target to primary. Once the target acknowledges the handoff, the source shard moves itself into replica mode.

Data Tiers

(Frozen, warm, hot, etc.)

Allocation

Indexes and Shards

Each index consists of a fixed number of primary shards. The number of primary shards cannot be changed for the lifetime of the index. Each primary shard can have zero-to-many replicas used for data redundancy. The number of replicas per shard can be changed dynamically.

The allocation assignment status of each shard copy is tracked by its ShardRoutingState. The RoutingTable and RoutingNodes objects are responsible for tracking the data nodes to which each shard in the cluster is allocated: see the routing package javadoc for more details about these structures.

Core Components

The DesiredBalanceShardsAllocator is what runs shard allocation decisions. It leverages the DesiredBalanceComputer to produce DesiredBalance instances for the cluster based on the latest cluster changes (add/remove nodes, create/remove indices, load, etc.). Then the DesiredBalanceReconciler is invoked to choose the next steps to take to move the cluster from the current shard allocation to the latest computed DesiredBalance shard allocation. The DesiredBalanceReconciler will apply changes to a copy of the RoutingNodes, which is then published in a cluster state update that will reach the data nodes to start the individual shard recovery/deletion/move work.

The DesiredBalanceReconciler is throttled by cluster settings, like the max number of concurrent shard moves and recoveries per cluster and node: this is why the DesiredBalanceReconciler will make, and publish via cluster state updates, incremental changes to the cluster shard allocation. The DesiredBalanceShardsAllocator is the endpoint for reroute requests, which may trigger immediate requests to the DesiredBalanceReconciler, but asynchronous requests to the DesiredBalanceComputer via the ContinuousComputation component. Cluster state changes that affect shard balancing (for example index deletion) all call some reroute method interface that reaches the DesiredBalanceShardsAllocator to run reconciliation and queue a request for the DesiredBalancerComputer, leading to desired balance computation and reconciliation actions. Asynchronous completion of a new DesiredBalance will also invoke a reconciliation action, as will cluster state updates completing shard moves/recoveries (unthrottling the next shard move/recovery).

The ContinuousComputation saves the latest desired balance computation request, which holds the cluster information at the time of that request, and a thread that runs the DesiredBalanceComputer. The ContinuousComputation thread takes the latest request, with the associated cluster information, feeds it into the DesiredBalanceComputer and publishes a DesiredBalance back to the DesiredBalanceShardsAllocator to use for reconciliation actions. Sometimes the ContinuousComputation thread's desired balance computation will be signalled to exit early and publish the initial DesiredBalance improvements it has made, when newer rebalancing requests (due to cluster state changes) have arrived, or in order to begin recovery of unassigned shards as quickly as possible.

Rebalancing Process

There are different priorities in shard allocation, reflected in which moves the DesiredBalancerReconciler selects to do first given that it can only move, recover, or remove a limited number of shards at once. The first priority is assigning unassigned shards, primaries being more important than replicas. The second is to move shards that violate any rule (such as node resource limits) as defined by an AllocationDecider. The AllocationDeciders holds a group of AllocationDecider implementations that place hard constraints on shard allocation. There is a decider, DiskThresholdDecider, that manages disk memory usage thresholds, such that further shards may not be allowed assignment to a node, or shards may be required to move off because they grew to exceed the disk space; or another, FilterAllocationDecider, that excludes a configurable list of indices from certain nodes; or MaxRetryAllocationDecider that will not attempt to recover a shard on a certain node after so many failed retries. The third priority is to rebalance shards to even out the relative weight of shards on each node: the intention is to avoid, or ease, future hot-spotting on data nodes due to too many shards being placed on the same data node. Node shard weight is based on a sum of factors: disk memory usage, projected shard write load, total number of shards, and an incentive to distribute shards within the same index across different nodes. See the WeightFunction and NodeAllocationStatsAndWeightsCalculator classes for more details on the weight calculations that support the DesiredBalanceComputer decisions.

Inter-Node Communication

The elected master node creates a shard allocation plan with the DesiredBalanceShardsAllocator and then selects incremental shard movements towards the target allocation plan with the DesiredBalanceReconciler. The results of the DesiredBalanceReconciler is an updated RoutingTable. The RoutingTable is part of the cluster state, so the master node updates the cluster state with the new (incremental) desired shard allocation information. The updated cluster state is then published to the data nodes. Each data node will observe any change in shard allocation related to itself and take action to achieve the new shard allocation by: initiating creation of a new empty shard; starting recovery (copying) of an existing shard from another data node; or removing a shard. When the data node finishes a shard change, a request is sent to the master node to update the shard as having finished recovery/removal in the cluster state. The cluster state is used by allocation as a fancy work queue: the master node conveys new work to the data nodes, which pick up the work and report back when done.

  • See DesiredBalanceShardsAllocator#submitReconcileTask for the master node's cluster state update post-reconciliation.
  • See IndicesClusterStateService#doApplyClusterState for the data node hook to observe shard changes in the cluster state.
  • See ShardStateAction#sendShardAction for the data node request to the master node on completion of a shard state change.

Autoscaling

The Autoscaling API in ES (Elasticsearch) uses cluster and node level statistics to provide a recommendation for a cluster size to support the current cluster data and active workloads. ES Autoscaling is paired with an ES Cloud service that periodically polls the ES elected master node for suggested cluster changes. The cloud service will add more resources to the cluster based on Elasticsearch's recommendation. Elasticsearch by itself cannot automatically scale.

Autoscaling recommendations are tailored for the user based on user defined policies, composed of data roles (hot, frozen, etc.) and deciders. There's a public webinar on autoscaling, as well as the public Autoscaling APIs docs.

Autoscaling's current implementation is based primary on storage requirements, as well as memory capacity for ML and frozen tier. It does not yet support scaling related to search load. Paired with ES Cloud, autoscaling only scales upward, not downward, except for ML nodes that do get scaled up and down.

Plugin REST and TransportAction entrypoints

Autoscaling is a plugin. All the REST APIs can be found in autoscaling/rest/. GetAutoscalingCapacityAction is the capacity calculation operation REST endpoint, as opposed to the other rest commands that get/set/delete the policies guiding the capacity calculation. The Transport Actions can be found in autoscaling/action/, where TransportGetAutoscalingCapacityAction is the entrypoint on the master node for calculating the optimal cluster resources based on the autoscaling policies.

How cluster capacity is determined

AutoscalingMetadata implements Metadata.ClusterCustom in order to persist autoscaling policies. Each Decider is an implementation of AutoscalingDeciderService. The AutoscalingCalculateCapacityService is responsible for running the calculation.

TransportGetAutoscalingCapacityAction.computeCapacity is the entry point to AutoscalingCalculateCapacityService.calculate, which creates a AutoscalingDeciderResults for each autoscaling policy. AutoscalingDeciderResults.toXContent then determines the maximum required capacity to return to the caller. AutoscalingCapacity is the base unit of a cluster resources recommendation.

The TransportGetAutoscalingCapacityAction response is cached to prevent concurrent callers overloading the system: the operation is expensive. TransportGetAutoscalingCapacityAction contains a CapacityResponseCache. TransportGetAutoscalingCapacityAction.masterOperation calls through the CapacityResponseCache, into the AutoscalingCalculateCapacityService, to handle concurrent callers.

Where the data comes from

The Deciders each pull data from different sources as needed to inform their decisions. The DiskThresholdMonitor is one such data source. The Monitor runs on the master node and maintains lists of nodes that exceed various disk size thresholds. DiskThresholdSettings contains the threshold settings with which the DiskThresholdMonitor runs.

Deciders

The ReactiveStorageDeciderService tracks information that demonstrates storage limitations are causing problems in the cluster. It uses an algorithm defined here. Some examples are

  • information from the DiskThresholdMonitor to find out whether nodes are exceeding their storage capacity
  • number of unassigned shards that failed allocation because of insufficient storage
  • the max shard size and minimum node size, and whether these can be satisfied with the existing infrastructure

The ProactiveStorageDeciderService maintains a forecast window that defaults to 30 minutes. It only runs on data streams (ILM, rollover, etc.), not regular indexes. It looks at past index changes that took place within the forecast window to predict resources that will be needed shortly.

There are several more Decider Services, implementing the AutoscalingDeciderService interface.

Snapshot / Restore

Snapshot copies index data files from node local storage to a remote repository. These files can later be restored from the repository back to local storage to re-create the index. In addition to indices, it can also backup and restore the cluster state Metadata so that settings, templates, pipelines and other configurations can be preserved.

Snapshots are deduplicated in that it does not copy a data file if it has already been copied in a previous snapshot. Instead, it adds a reference to the existing file in the metadata stored in the repository, effectively a ref-tracking system for the data files. This also means we can freely delete any snapshot without worrying about affecting other snapshots.

This snapshots Java package documentation provides a good explanation on how snapshot operations work.

Restoring a snapshot is a process which largely relies on index recoveries. The restore service initializes the process by preparing shards of restore indices as unassigned with snapshot as their recovery source (SnapshotRecoverySource) in the cluster state. These shards go through the regular allocation process to be allocated. They then recover on the target nodes by copying data files from the snapshot repository to local storage.

Both snapshot and restore are coordinated by the master node, while index data transfer is done by the data nodes. The communications from the master node to data nodes are always cluster state updates. Data nodes send transport requests to the master node to update the status. These requests, at the end, also triggers cluster state updates which can be further reacted upon by the data nodes until the processes are complete.

Snapshot Repository

A Repository must be created before any snapshot operation can take place. There are different types of repositories. The most common ones are file system based (FS) and cloud storage based (S3, GCS, Azure) which all extend the BlobStoreRepository class. A repository must be registered as writeable with a single cluster while registered as readable with multiple clusters. NOTE registering a repository as writeable with multiple clusters can lead to data corruption. We try our best to detect such situation, but it's not completely foolproof.

The content structure of a repository is similar to the local index storage structure, with indices folder holding indices data separate by their UUID and shard ID. Here is a simple example of a repository structure:

my-repository/
├── index-2       <-- The root blob file of the repository, also called repository generation file
├── index.latest
├── indices
│   └── 8K9JNuqjTnygrjKY8qmsiA   <-- UUID of the snapshotted index. Not the same index UUID in the cluster
│       ├── 0                    <-- shard 0
│       │   ├── __I0e0reaMQzuXv8fY1GYD2w           <-- data file
│       │   ├── __XqE3EVhOREWBnCHASLALtw           <-- another data file
│       │   ├── index-pPXvvdFWSmajXZcfrIwggA       <-- shard level generation file
│       │   └── snap-3kXOuTRmTFm6VMcEqPkKNQ.dat    <-- shard level snapshot info
│       └── meta-tdTzfI8BkmhGlJ2SvOok.dat          <-- index metadata
├── meta-3kXOuTRmTFm6VMcEqPkKNQ.dat    <-- cluster metadata
└── snap-3kXOuTRmTFm6VMcEqPkKNQ.dat    <-- snapshot information

See also the blobstore Java package documentation for more details on the repository structure.

The most important file in the repository is the index-N file, where N is a numeric generation number starting from 9. Its corresponding Java class is RepositoryData. This file holds the global state of the repository, including all valid snapshots and their corresponding indices, shards and index metadata. Every mutable operation on the repository, such as creating or deleting a snapshot, results in a new index-N file being created with an incremented generation number. The index.latest file stores the latest repository generation and is effectively a pointer to the latest index-N file.

The repository is not rescuable if the repository generation file is corrupted. This is the reason that we are very careful when updating this file by leveraging cluster consensus to ensure only the latest master node update it to a generation that is accepted by the rest of the cluster members. The updating process also attempts to detect concurrent writes to avoid multiple clusters writing to the same repository. This is done by comparing the expected repository generation to the actual generation files in the repository. If this file is corrupted, as reported occasionally on SDHs, it is almost certain that some other external process has modified it.

If other files in the repository are corrupted, we can usually delete the broken snapshots and retain the good ones. The broken snapshots usually lead to exception being thrown when accessed by APIs like the GetSnapshot API. Since v8.16.0, there is also VerifyRepositoryIntegrity API that can be used to actively scan the repository and identify any corrupted snapshots.

The state of a repository must always transition through fully valid states in that there are no dangling references to non-existent blobs. This is an important property that guarantees the repository integrity as long as there is no external interference. We add new blobs before they become reachable from the root, then update the root blob, and only if the root-blob update succeeds do we delete any now-unreachable blobs. See also Creation of a Snapshot and Deletion of a Snapshot for more details.

It is worth note that repository's compatibility guarantee is more permissive than Elasticsearch's general version compatibility policy. A repository may contain snapshots from a version as old as 5.0.0 and, if it does, the repository layout must remain compatible (for reads and writes) with the oldest version in the repository so that these snapshots remain restorable in the corresponding versions. With older snapshots deleted, the repository will start using new format when possible (see also the static IndexVersion constants in SnapshotsService).

Repository Management

A snapshot repository has two components: (1) a RepositoryMetadata containing the configuration stored as a Metadata.ProjectCustom in the cluster state; (2) the actual repository object created on the master node and each data node. A series of APIs are available to create, update, get and delete repositories. Each of these APIs is backed by a TransportMasterNodeAction which publishes a cluster state for the RepositoryMetadata change so that relevant nodes can update their local repository objects accordingly. The API call will only return after the repository object has been updated on all relevant nodes. For creating a repository, it also performs extra verification steps which (1) attempts to create a temporary repository directly on the master node as well as write and read a test file before proceeding with the cluster state update and (2) performs another write and read test after repository objects are created on all relevant nodes. This verification steps are enabled by default and can be disabled per request.

The core service class is RepositoriesService which all APIs eventually delegate to. It also implements ClusterStateApplier which performs the actual repository object creation, update and deletion.

Besides the APIs, reserved repositories are managed via file based settings. These repositories are managed by Elasticsearch service providers, such as the Elastic Cloud. File based settings is effectively a way to publish cluster state based on file contents. Hence, they also go through the same code path as the APIs under the hood. Reserved repositories cannot be modified via APIs.

While new repository can be created at any time, deleting a repository has some restrictions. In general, a repository cannot be deleted if it is in use by either ongoing snapshots or restores or hosting mounted searchable snapshots. Since v9.4.0, the default snapshot repository cannot be deleted either. A default repository is meant to be the repository used by ILM and SLM when no repository is explicitly specified. Updating a repository usually involves closing the existing repository first and creating a new one. Therefore, they often subject to the same restrictions as deletion.

Cloud storage backed (S3, GCS, Azure) repositories requires network access to the storage services. Hence, they have concept of clients which manages the network requests. At least the default client must be configured for a cluster which is used by a repository if no client is explicitly specified. Multiple clients can be added, via elasticsecrh.yml to the same cluster and used by different repositories to spread snapshots to different locations if so desired.

We allow different implementations of the same cloud storage type to be used as long as they are compatible in both APIs and performance characteristics. For example, many storage service claims S3 compatibility. But they may fall short under load or even just return outright incorrect responses in some corner cases. The RepositoryAnalyze API can be used to proactively test the compatibility which we suggest on SDHs from time to time.

Creation of a Snapshot

Creating a snapshot is the most involved snapshot operation because it requires work to be done on both master and data nodes. In contrast, cloning snapshot, deleting snapshot and cleaning repository all happen entirely on the master node.

The following Java classes are mostly responsible for the snapshot creation process:

  1. SnapshotsService - runs on the master node and coordinates the overall snapshot process.
  2. SnapshotsServiceUtils - utility class separated from SnapshotsService to reduce file length.
  3. SnapshotShardsService - runs on data node and manages the actual snapshot of individual shards.
  4. BlobStoreRepository - used by both master and data nodes to read and write snapshot data and metadata.

As discussed earlier, a snapshot operation always starts with a cluster state triggered by a transport request. Such is the case for snapshot creation. When SnapshotsService receives the request, it computes all indices and their shards required for the snapshot and creates an object representing this snapshot and stores it in the cluster state. The overarching object representing snapshot creation in the cluster state isSnapshotsInProgress which is essentially a map keyed by repository name with values being a list of ongoing snapshots(SnapshotsInProgress.Entry) for that repository. It's a list because the order is important: Snapshots within each repository operate as a queue, such that each shard's snapshots run in order. Different snapshots may complete in different orders if they target different shards. Each ongoing snapshot further tracks its overall state (SnapshotsInProgress#State) as well states (SnapshotsInProgress#ShardState) of each shard required for the snapshot.

When the snapshot creation entry (SnapshotsInProgress.Entry) is first added to the cluster state, its shard states can be in different ShardState depending on the shard's status and snapshot activities:

  • If the shard is started and has no other active snapshot activity, its state is set to INIT indicating it is ready to be snapshotted by the data node hosting it. Shards in this state cannot relocate due to SnapshotInProgressAllocationDecider.
  • If the shard is started but is still running another snapshot, its state is set to QUEUED indicating it will be snapshotted later when the ongoing snapshots and any other snapshots queued before it are finished.
  • If the shard is relocating or initializing, its state is set to WAITING which will be changed to a new state once the shard finishes relocation or initialization.
  • If the index no longer exists or the shard is unassigned, the shard state is set to MISSING. This state is final. A snapshot creation fails on shard with this state if it is issued with partial=false. Note snapshot creation is always issued with partial=true in Elastic Cloud so that snapshot does not fail entirely for temporary shard unavailability.
  • If the node hosting the shard is being shutdown, the shard state is set to PAUSED_FOR_NODE_REMOVAL. This state will be updated when the node finishes the shutdown process or the shard state changes.

When a data node (SnapshotShardsService) receives the updated cluster state with a new snapshot entry, it takes the shards with the INIT state and hosted on itself to create a shard snapshot task for each of them. The shard state is computed for all shards involved in the snapshot at once when the snapshot entry is created. A large snapshot can easily have thousands of shards with INIT state indicating ready to be snapshotted. To avoid overwhelming the data nodes, a dedicated snapshot thread pool as well as ThrottledTaskRunner are used to keep concurrent running shard snapshots under control. Priority is given for snapshots which started earlier. We also order by shard to limit the number of incomplete shard snapshots (see also ShardSnapshotTaskRunner and ShardSnapshotTaskRunner#COMPARATOR).

The lifecycle of each shard snapshot is also tracked in-memory on the data node with IndexShardSnapshotStatus. The status is indicated by IndexShardSnapshotStatus#Stage which is updated at various points during the process. When a shard snapshot task runs, it first acquires an index commit of the shard so that the files to be copied remain available throughout the shard snapshot process without being deleted by ongoing indexing activities. It then writes a new shard level generation file (index-<UUID>.data, Java class BlobStoreIndexShardSnapshots). This is basically a shard level catalog file pointing to all the valid shard snapshots. Each snapshot creates a new one. The UUID is used to avoid name collision. Previous shard generation files are not deleted because they may still be needed if the current shard snapshot fails. Following that, shard level data files are copied to the repository with BlobStoreRepository#doSnapshotShard, BlobStoreRepository#snapshotFile etc. After all data files are uploaded, it writes a shard level snapshot file (snap-<UUID>.dat, Java class BlobStoreIndexShardSnapshot) indicating what data files should be included in this shard snapshot. Note the data file's physical name is replaced with double underscore (__) followed by an UUID to avoid name collision. The actual name is mapped and stored in the shard level snapshot file.

It is worth note that the shard level generation file (index-<UUID>.data) can be reconstructed from all shard level snapshot files (snap-<UUID>.dat) so that it is technically redundant. However, listing and reading all shard snapshot files can be rather inefficient since there could be hundreds or thousands of these files. Hence, having the shard level generation file helps with performance for deletion operations which needs to read only a single file to decide what can be deleted at the shard level.

Once the shard snapshot is completed successfully, the data node releases the previously acquired index commit and sends a transport request(UpdateIndexShardSnapshotStatusRequest) with the new shard generation (ShardGeneration) to the master node to update its shard status (SnapshotsInProgress#ShardState) in the cluster state. If there is any QUEUED shard snapshot for the same shard, the master node (SnapshotsService) updates the next one's status to INIT so that it can run. The master responds to the data node only after the cluster state is published.

When all shards in a snapshot are completed, the master node performs a finalization step (SnapshotsService#SnapshotFinalization and BlobStoreRepository#finalizeSnapshot) which does the following:

  1. Create a SnapshotInfo object representing the completed snapshot for serialization.
  2. Collect and write the latest Metadata and IndexMetadata relevant to this snapshot.
  3. Write the snapshot metadata file snap-<UUID>.dat (Java class SnapshotInfo).
  4. Create a new root blob (repository generation file, index-N) with incremented generation number and updated content including the new snapshot and publish a cluster state to accept this new generation as the current/safe (BlobStoreRepository#writeIndexGen).

Step 4 is the most critical one. The root blob is intentionally written at the very last so that any prior failure only leaves some redundant files in the repository which will be cleaned up in due time. A snapshot is not completed until the root blob is successfully updated. To ensure consistency, updating the root blob is a 3-steps process leveraging cluster consensus:

  1. Picks a new pending repository generation number which is greater than the current pending generation and publishes a cluster state update for it. Both repository generation number and pending generation numbers are part of RepositoryMetadata.
  2. If previous step is successful, writes the new root blob with the pending generation number.
  3. Publishes another cluster state to set the current/safe repository generation to the new pending generation.

When master fails over during snapshot creation, the above steps ensures that only the new master can successfully update the root blob to avoid data corruption.

Multiple snapshots can run concurrently in the same repository. But the process is sequential at shard level, i.e. only one shard snapshot for the same shard can be in the INIT state at any time. Snapshot deletions and creations are mutually exclusive. See also Deletion of a Snapshot.

Shard snapshot pausing

When a node is shutting down, it must vacate all shards via relocation. Since shards being snapshotted (shard snapshot status INIT) cannot relocate, we need a way to transit these shards out of the INIT state to avoid stall the shutdown process. This is where the shard snapshot pausing mechanism comes into play.

When a node shutdown is initiated, SnapshotsService reacts to the new shutdown metadata by updating SnapshotsInProgress#nodesIdsForRemoval which tracks the node IDs for the shutting down node. When this change is published and observed by the shutting down data node (SnapshotShardsService), it pauses its shard snapshots by first setting the shard snapshot status PAUSING, which is checked regularly by the file uploading process (BlobStoreRepository#snapshotFile) and leads to a PausedSnapshotException to be thrown to abort the shard snapshot. The data node then notifies the master node about the status change with the same status update request (UpdateIndexShardSnapshotStatusRequest) in the happy path. The master node updates the shard state to PAUSED_FOR_NODE_REMOVAL upon receiving the notification. From this point on, the shard can relocate as normal. When the shard is started on the target node, SnapshotsService will observe the shard state changes and transition the shard snapshot status back to INIT.

If a node is already being shutdown when a new snapshot creation request arrives, the relevant shard snapshot will be created with PAUSED_FOR_NODE_REMOVAL as its initial state. This assumes there is no ongoing shard snapshot that is already PAUSED_FOR_NODE_REMOVAL, in which case the new shard snapshot will start out as QUEUED.

When the node shutdown completes and its associated shutdown metadata is removed from the cluster state, SnapshotsService will also remove the node ID from SnapshotsInProgress#nodesIdsForRemoval.

Deletion of a Snapshot

Both completed snapshots and ongoing snapshots can be deleted. Unless the snapshot being deleted has not started yet, e.g. all its shards are in QUEUED state, which means it can be deleted right away from the cluster state without touch the repository content, deletion must run exclusively in a repository.

If the deletion requires any file removal in the repository, SnapshotsService creates/updates the SnapshotDeletionsInProgress in the cluster state to track the new deletion. If the snapshot is currently running, it also updates any incomplete shard snapshots to ShardState.ABORTED for the data node (SnapshotShardsService) to react once the cluster state is published. The data node goes through a similar process to the shard snapshot pausing but with a different exception (AbortedSnapshotException) to interrupt the shard snapshot process and sends a request back to the master node to update the corresponding status (ShardState.FAILED) tracked in the cluster state. Once all shard snapshots stop, deletion will proceed to remove relevant files from the repository as well as create a new root blob (index-N) with the same mechanism described in the snapshot creation section. File deletions (BlobStoreRepository#SnapshotsDeletion) happen entirely on the master node.

Clone of a Snapshot

TODO: Clone is not used in Elastic Cloud Serverless.

Cleaning a Repository

Repository clean up is cluster wide exclusive and must run by itself. It does not actually clean up anything more than a regular snapshot deletion. It was useful in the early days when we had some long-since-fixed leaks that needed cleaning up in ECH. It is not used in Elastic Cloud Serverless.

Restoring a Snapshot

RestoreService is responsible for handling the initial restore request and prepare the unassigned shards with snapshot as their recovery source in the cluster state. Once the shard is allocated on a data node, the recovery process kicks in and eventually calls into IndexShard#restoreFromSnapshot which delegates to BlobStoreRepository#restoreShard to copy data files from the repository to local storage.

Detecting Multiple Writers to a Single Repository

This is a best effort attempt to prevent repository corruption due to concurrent writes from multiple clusters. When writing the root blob (index-N), we cross compare the cached and expected repository generation (see BlobStoreRepository#latestKnownRepoGen and BlobStoreRepository#latestKnownRepositoryData) to the generation physically found in the repository. The writing also fails if the blob already exists since this indicates that some other cluster attempted to write to the repository. We do that in all repositories that support such a check, which includes FS/GCS/Azure since forever and S3 since 9.2 (but not HDFS). A RepositoryException is thrown on any mismatch or conflicts. The repository is marked as corrupted by setting its generation number to RepositoryData.CORRUPTED_REPO_GEN to block further write operations.

Task Management / Tracking

The tasks infrastructure is used to track currently executing operations in the Elasticsearch cluster. The Task management API provides an interface for querying, cancelling, and monitoring the status of tasks.

Each individual task is local to a node, but can be related to other tasks, on the same node or other nodes, via a parent-child relationship.

[!NOTE] The Task management API is experimental/beta, its status and outstanding issues can be tracked here.

Task tracking and registration

Tasks are tracked in-memory on each node in the node's TaskManager, new tasks are registered via one of the TaskManager#register methods. Registration of a task creates a Task instance with a unique-for-the-node numeric identifier, populates it with some metadata and stores it in the TaskManager.

The register methods will return the registered Task instance, which can be used to interact with the task. The Task class is often sub-classed to include task-specific data and operations. Specific Task subclasses are created by overriding the createTask method on the TaskAwareRequest passed to the TaskManager#register methods.

When a task is completed, it must be unregistered via TaskManager#unregister.

A note about task IDs

The IDs given to a task are numeric, supplied by a counter that starts at zero and increments over the life of the node process. So while they are unique in the individual node process, they would collide with IDs allocated after the node restarts, or IDs allocated on other nodes.

To better identify a task in the cluster scope, a tuple of persistent node ID and task ID is used. This is represented in code using the TaskId class and serialized as the string {node-ID}:{local-task-ID} (e.g. oTUltX4IQMOUUVeiohTt8A:124). While TaskId is safe to use to uniquely identify tasks currently running in a cluster, it should be used with caution as it can collide with tasks that have run in the cluster in the past (i.e. tasks that ran prior to a cluster node restart).

What Tasks Are Tracked

The purpose of tasks is to provide management and visibility of the cluster workload. There is some overhead involved in tracking a task, so they are best suited to tracking non-trivial and/or long-running operations. For smaller, more trivial operations, visibility is probably better implemented using telemetry APIs.

Some examples of operations that are tracked using tasks include:

Tracking a Task Across Threads and Nodes

ThreadContext

All ThreadPool threads have an associated ThreadContext. The ThreadContext contains a map of headers which carry information relevant to the operation currently being executed. For example, a thread spawned to handle a REST request will include the HTTP headers received in that request.

When threads submit work to an ExecutorService from the ThreadPool, those spawned threads will inherit the ThreadContext of the thread that submitted them. When TransportRequests are dispatched, the headers from the sending ThreadContext are included and then loaded into the ThreadContext of the thread handling the request. In these ways, ThreadContext is preserved across threads involved in an operation, both locally and on remote nodes.

Headers

When a task is registered by a thread, a subset (defined by Task#HEADERS_TO_COPY and any ActionPlugins loaded on the node) of the headers from the ThreadContext are copied into the Task's set of headers.

One such header is X-Opaque-Id. This is a string that can be submitted on REST requests, and it will be associated with all tasks created on all nodes in the course of handling that request.

Parent/child relationships

Another way to track the operations of a task is by following the parent/child relationships. When registering a task it can be optionally associated with a parent task. Generally if an executing task initiates sub-tasks, the ID of the executing task will be set as the parent of any spawned tasks (see ParentTaskAssigningClient, TransportService#sendChildRequest and TaskAwareRequest#setParentTask for how this is implemented for TransportActions).

Kill / Cancel A Task

Some long-running tasks are implemented to be cancel-able. Cancellation of a task and its descendants can be done via the Cancel Task REST API or programmatically using TaskManager#cancelTaskAndDescendants. Perhaps the most common use of cancellation you will see is cancellation of TransportActions dispatched from the REST layer when the client disconnects, to facilitate this we use the RestCancellableNodeClient.

In order to support cancellation, the Task instance associated with the task must extend CancellableTask. It is the job of any workload tracked by a CancellableTask to periodically check whether it has been cancelled and, if so, finish early. We generally wait for the result of a cancelled task, so tasks can decide how they complete upon being cancelled, typically it's exceptionally with TaskCancelledException.

When a Task extends CancellableTask the TaskManager keeps track of it and any child tasks that it spawns. When the task is cancelled, requests are sent to any nodes that have had child tasks submitted to them to ban the starting of any further children of that task, and any cancellable child tasks already running are themselves cancelled (see BanParentRequestHandler).

When a cancellable task dispatches child requests through the TransportService, it registers a proxy response handler that will instruct the remote node to cancel that child and any lingering descendants in the event that it completes exceptionally (see UnregisterChildTransportResponseHandler). A typical use-case for this is when no response is received within the time-out, the sending node will cancel the remote action and complete with a timeout exception.

Publishing Task Results

A list of tasks currently running in a cluster can be requested via the Task management API, or the cat task management API. The former returns each task represented using TaskResult, the latter returning a more compact CAT representation.

Some ActionRequests allow the results of the actions they spawn to be stored upon completion for later retrieval. If ActionRequest#getShouldStoreResult returns true, a TaskResultStoringActionListener will be inserted into the chain of response listeners. TaskResultStoringActionListener serializes the TaskResult of the TransportAction and persists it in the .tasks index using the TaskResultsService.

The Task management API also exposes an endpoint where a task ID can be specified, this form of the API will return currently running tasks, or completed tasks whose results were persisted. Note that although we use TaskResult to return task information from all the JSON APIs, the error or response fields will only ever be populated for stored tasks that are already completed.

Persistent Tasks

Up until now we have discussed only ephemeral tasks. If we want a task to survive node failures, it needs to be registered as a persistent task at the cluster level.

Plugins can register persistent tasks definitions by implementing PersistentTaskPlugin and returning one or more PersistentTasksExecutor instances. These are collated into a PersistentTasksExecutorRegistry which is provided to PersistentTasksNodeService active on each node in the cluster, and a PersistentTasksClusterService active on the master. A PersistentTasksExecutor can declare either project or cluster scope, but not both. A project scope task is not able to access data on a different project.

The PersistentTasksClusterService runs on the master to manage the set of running persistent tasks. It periodically checks that all persistent tasks are assigned to live nodes and handles the creation, completion, removal and updates-to-the-state of persistent task instances in the cluster state (see PersistentTasksCustomMetadata and ClusterPersistentTasksCustomMetadata).

The PersistentTasksNodeService monitors the cluster state to:

If a node leaves the cluster while it has a persistent task allocated to it, the master will re-allocate that task to a surviving node. To do this, it creates a new PersistentTasksCustomMetadata.PersistentTask entry with a higher #allocationId. The allocation ID is included any time the PersistentTasksNodeService communicates with the PersistentTasksClusterService about the task, it allows the PersistentTasksClusterService to ignore persistent task messages originating from stale allocations.

Some examples of the use of persistent tasks include:

Integration with APM

Tasks are integrated with the ElasticSearch APM infrastructure. They implement the Traceable interface, and spans are published to represent the execution of each task.

Cross Cluster Replication (CCR)

(Brief explanation of the use case for CCR)

(Explain how this works at a high level, and details of any significant components / ideas.)

Indexing / CRUD

The Distributed team owns Elasticsearch's write path. A typical write begins as an HTTP/REST request, is routed to the appropriate primary shard, is applied to the shard's engine and translog, and is finally replicated across replicas before an ack is sent back to the client.

Note that in serverless Elasticsearch, there is no replication process needed. The same reliability and fault tolerance are instead achieved by uploading the translog to a blob store and relying on the store's durability and fault tolerance guarantees. See this blog post for more details.

The Distributed team also owns select parts of the read path (e.g. real-time GET requests targeting the translog), but broader search capabilities like query execution, scoring, and aggregations fall under the Search team.

This section follows a bulk index request end to end in stateful Elasticsearch, using RestBulkAction as the starting point.

For a higher-level overview of the read and write paths, see Reading and writing documents.

The Write Path

Coordinator: REST to Transport

A user sends a POST to /_bulk with a newline-delimited body of index, update, or delete actions (see documentation). The HTTP stack (described in HTTP Server) hands the request to RestBulkAction, which parses the body into a BulkRequest containing one IndexRequest (or UpdateRequest / DeleteRequest) per requested operation. It then executes a TransportBulkAction on the receiving node (see the Transport section for more details).

Single-document requests (PUT or POST to /{index}/_doc/{id}) follow the same path: RestIndexAction builds an IndexRequest that TransportIndexAction wraps in a one-item BulkRequest before delegating to TransportBulkAction.

The node that received the request is the coordinating node for this request. On that node, TransportBulkAction coordinates the rest of the write path.

Before routing to shards, TransportAbstractBulkAction (the parent class of TransportBulkAction) resolves the ingest pipeline for each item by checking the request's pipeline parameter, the index's default_pipeline, and the final_pipeline from the matching index template. If any item has a pipeline, the coordinating node either executes the pipelines locally (if it is an ingest node, see the Node Roles section) or forwards the entire request to an ingest node via IngestActionForwarder. Once ingest finishes, each processed request has its pipeline and finalPipeline fields reset to "_none" and the request is re-submitted through TransportAbstractBulkAction#applyPipelinesAndDoInternalExecute. On this second pass hasPipeline is false for every item and the ingest path is skipped.

TransportBulkAction also checks whether the target index or data stream exists, auto-creating missing indices via AutoCreateAction and rolling over data streams marked for lazy rollover before proceeding.

It then eventually runs a BulkOperation on the coordinator. BulkOperation is the class that resolves each item to a concrete index and shard before sending work to primaries.

If the client did not supply an _id, one is generated before routing. The latest applied ClusterState supplies routing for the chosen index (routingTable, ShardRouting). A routing key is chosen: by default the document _id, though the request may set routing explicitly. The routing key is then hashed to a shard number by IndexRouting.hashToShardId. For time series indices the key is derived from the document's dimension fields. For indices with a routing_path configured, it is derived from the configured value.

The coordinator now knows which shard holds the document. It still needs the current primary node for that shard.

Note that for a bulk request (multiple documents in one HTTP call), BulkOperation groups items by shard before dispatching each shard’s work to the appropriate primaries.

Primary Routing & Execution

Primary Routing

Elasticsearch uses primary–backup replication. The primary shard copy defines the ordered write history and replicas apply the same operations.

After matching each document to a shard, BulkOperation will hand over the rest of the execution to a TransportShardBulkAction. This is a subclass of TransportReplicationAction, which holds the core replication framework logic for document writes and deletes.

The coordinator uses the cluster state routing table (Cluster State) to send the shard request to the node that currently holds the primary.

In practice, the request can take more than one transport hop before it executes on the primary. The receiving node re-samples cluster state, and if the active primary for that shard is assigned elsewhere, TransportReplicationAction forwards the replication request to that node. That “chase the primary” step can repeat while routing metadata converges. To avoid redirect loops between nodes whose cluster state versions differ, the request carries a routedBasedOnClusterVersion field. Each forward updates it to the forwarding node’s cluster state version so the next receiver is on at least that version before it redirects again. Retries and awaiting cluster state updates are bounded by the request’s timeout. On each retry attempt the node checks whether the timeout has expired, and if so fails the request.

Once the request reaches the node that actually hosts the primary, it will get wrapped in a ConcreteShardRequest that includes the shard’s primary term and target allocation id. That lets the primary and replicas refuse operations that were built for a superseded primary generation.

Primary Execution

On the primary node, TransportShardBulkAction applies the shard's bulk items through IndexShard, the single entry point for shard-level work (see IndexShard in Engine & Store sections for more details).

The primary will first try to acquire an operation permit via IndexShardOperationPermits. Note that recovery and relocation are operations that can intentionally grab all available permits with the goal of blocking in-flight writes. In that case, the incoming operation is queued and will resume once the block lifts. If the permit is successfully acquired, the primary will then validate and prepare the operation: create the mapping (if not already existing), parse the source, etc. IndexShard will then hand over the request to the engine (typically InternalEngine, see Engine section for more details) that generates the sequence number for the operation, updates Lucene via an IndexWriter, and then appends the operation to the Translog.

Note that the translog write happens after the Lucene update. The translog is primarily a durability and recovery log for acknowledged operations, not a write-ahead log in the classic database sense. The translog entry type also depends on the Lucene outcome. A successful Lucene apply writes the full operation, while a failure with an already-assigned seq_no writes a no-op to preserve the sequence number history.

Note that updates and deletes use soft deletes in Lucene (tombstones and retention policies), not physical deletion, and are therefore quite similar to writes. See Engine & Store and Segment Merges for more details.

Replication

After the primary applies the operation, a ReplicationOperation wrapper object will coordinate the rest. It loads the replication group after the primary write (to avoid missing new recovery targets), captures the global checkpoint and related metadata from the primary, then forwards the request to each replica in that group. Each replica applies the same operation (using the same seq_no and primary term) to its engine and returns checkpoint information.

Allocation ids that are still in the in-sync set but no longer have a live shard in routing are marked stale on the primary. The master will update the cluster state to drop those allocation ids from the index’s in-sync set before the write is acknowledged back to the client. Similarly, if a replica fails to execute the replicated operation, it will also get dropped from the in-sync set before the write is acknowledged back. The primary will not acknowledge the replication operation as “done” while the cluster state still reports an in-sync copy that has failed the operation. A ref-counting listener also waits for the primary shard application, each replica response, and post-replication hooks. The client only gets an ack after that coordinated completion, together with wait_for_active_shards and translog / syncAfterWrite behaviour (see Translog and TransportWriteAction).

mermaid
sequenceDiagram
    participant C as Client
    participant CN as Coordinating node
    participant P as Primary shard copy
    participant R as Replica shard copy
    participant M as Elected master

    C->>CN: REST index (RestIndexAction → IndexRequest)
    CN->>CN: Bulk coordinator: route to shard
    CN->>P: Shard bulk on primary (TransportShardBulkAction)
    P->>P: IndexShard / engine (Translog + Lucene)
    Note over P: ReplicationOperation: stale in-sync ids, then replica fan-out
    P->>R: Replica request (seq_no, primary_term, checkpoints)
    alt replica applies successfully
        R->>R: Engine apply (Translog + Lucene)
        R-->>P: ReplicaResponse (checkpoints)
    else replica fails replication
        R-->>P: failure
        P->>M: remoteShardFailed (shard-failed -> update in-sync / routing)
        M->>M: Cluster state task (drop failed copy from in-sync set, etc.)
        M-->>P: shard-failed applied (listener completes)
    end
    P-->>CN: Primary result + shard info (after all pending replica / master / post steps)
    CN-->>C: HTTP response (may include shard failures in body)

When replication and the ref-counted coordination above have finished, the primary completes the shard-level request and returns the result to the node that issued that shard-level transport request. For a normal REST bulk request, that issuer is the HTTP request coordinating node, which then completes the write action and sends the HTTP response to the client. The same shard-level transport requests can also be generated internally, for example via TriggeredWatchStore#putAll in Watcher. The issuer is then whichever node runs that internal client, and the primary returns the result to that transport caller.

Primary Terms & Sequence Numbers

Every successful index, update, or delete operation on a shard is tagged with a sequence number (seq_no) and a primary term. Together they identify that logical change for replication, recovery, and optimistic concurrency. Sentinel values and helpers are defined on SequenceNumbers (for example UNASSIGNED_SEQ_NO on the way into the primary, and UNASSIGNED_PRIMARY_TERM before a shard is operational).

The primary term is a monotonically increasing counter per shard, recorded in cluster state metadata (see IndexMetadata#primaryTerm). It advances when a new primary must take over following a primary failure, e.g. when a replica is promoted to primary because the current primary crashed or became unavailable. Note that the primary term is not increased during graceful primary relocation. The shard stamps all operations with the current term.

The sequence number is another monotonically increasing id for mutations on the shard history. It is computed by the primary's InternalEngine before it applies an operation locally. Replicas apply the same seq_no supplied by the primary. Their engine records that sequence number as "seen" before indexing. After the operation, the local checkpoint also gets updated (see the following Checkpoints subsection).

Both values are stored in Lucene for each live document. SeqNoFieldMapper indexes _seq_no as a numeric field (for search and sort) and stores the primary term in doc values. If two copies ever share the same _seq_no (for example after a primary promotion), the copy with the higher primary term wins. Clients can also supply if_seq_no / if_primary_term on write requests (see IndexRequest#setIfSeqNo), in which case the engine will use optimistic concurrency control against the last known sequence number.

mermaid
flowchart LR
    subgraph prim["Primary shard"]
        MD[IndexMetadata.primaryTerm in cluster state]
        TR[ReplicationTracker operation term]
        LCT[LocalCheckpointTracker next seq_no]
        ENG[InternalEngine assigns seq_no + term]
        MD --> TR
        TR --> ENG
        LCT --> ENG
    end
    ENG --> TX[ConcreteShardRequest / replica transport]
    TX --> REP[Replica engine applies fixed seq_no + term]

Checkpoints & Gaps

Each shard copy tracks how far it has applied the shared history of seq_no values using a local checkpoint, which is the highest sequence number for which that copy has processed every earlier seq_no (inclusive). The InternalEngine holds a LocalCheckpointTracker field that maintains that marker and a separate persisted checkpoint for what is durably on disk.

The primary also tracks a global checkpoint, which is the sequence number up to which all in-sync copies have processed every earlier operation. The ReplicationTracker computes it as the minimum of those per-copy local checkpoints among in-sync shard copies. The primary updates the global checkpoint whenever an in-sync copy’s local checkpoint advances, when the primary activates or takes over after relocation, when a copy becomes in-sync, or when cluster state drops tracked copies. During operation replication, replica responses carry local and global checkpoint fields so the primary can derive peer progress. After a successful write, TransportReplicationAction will also trigger a GlobalCheckpointSyncAction when there are no more operations in-flight (maxSeqNo == globalCheckpoint). Indeed, the global checkpoint is piggybacked on every replication operation, but once the last operation completes, the checkpoint may advance further with no subsequent operation to carry it to the replicas.

The global checkpoint is the replication-group safety line: it is the highest seq_no known to be processed on every in-sync copy, so it underpins retention leases, translog truncation, soft-delete retention, and peer recovery coordination.

Because replication fan-out is concurrent, a replica may receive operations with out-of-order seq_no values. The engine still applies each operation once its prerequisites are satisfied. The LocalCheckpointTracker records which sequence numbers have been processed (including pending gaps) so the local checkpoint only advances when the contiguous prefix is complete. To save memory, LocalCheckpointTracker tracks which seq_no values are still pending using sparse bit sets in chunk-keyed maps.

Replicas cache the global checkpoint value the primary advertises and refresh it from ongoing replication traffic (for example, each ConcreteReplicaRequest carries the primary’s global checkpoint for the replica permit path, and each ReplicaResponse returns the replica’s local checkpoint and last synced global checkpoint).

Note that the InternalEngine also tracks the maxSeqNoOfUpdatesOrDeletes (MSU) for write path optimization purposes. The Engine javadoc goes into details about the LCP < MSU versus MSU <= LCP cases.

The engine also applies an append-only optimization for auto-generated _ids. When a document has an auto-generated ID and the operation is not flagged as a retry, the engine will try to assert that no concurrent retry with an equal or higher timestamp has landed on the shard so it can skip the version map lookup entirely and call IndexWriter.addDocument instead of updateDocument. If the check fails, the engine falls back to updateDocument to guard against creating a duplicate.

Server Startup

Server Shutdown

Closing a Shard

(this can also happen during shard reallocation, right? This might be a standalone topic, or need another section about it in allocation?...)

Shrink/Split/Clone index APIs

These APIs are used to create a new index that contains a copy of data from a provided index and differs in number of shards and/or index settings. They can only be executed against source indices that are marked read-only.

The shrink API (/{index}/_shrink/{target}) creates an index that has fewer shards than the original index.

The split API (/{index}/_split/{target}) creates an index that has more shards than the original index.

The clone API (/{index}/_clone/{target}) creates an index that has the same number of shards as the original index but may have different index settings.

The main implementation logic is centralized in TransportResizeAction however it only creates a new index in the cluster state using special recoverFrom and resizeType parameters. The entire workflow involves multiple components.

The high level structure is the following:

  1. TransportResizeAction creates index metadata for the new index that contains information about the resize performed. It also creates a routing table for the new index which assigns a LocalShardsRecoverySource.INSTANCE recovery source to primary shards based on the resize information in the index metadata .
  2. Allocation logic allocates shards of the index so that they are on the same node as the corresponding shards of the source index. See ResizeAllocationDecider. Note that this doesn't work for shrink case since during shrink there are multiple source shards that are "merged" together. These source shards may be on different nodes already. Shard movement in this case needs to be performed manually.
  3. Primary shards perform local shards recovery using index metadata to know what type of resize operation is performed.

Health API

The Health API (GET /_health_report) provides a structured health report for the cluster. It is indicator-based: each health indicator evaluates a specific aspect of cluster health (for example shard allocation or disk) and returns a status (GREEN, YELLOW, RED, or UNKNOWN) plus structured details, impacts, and diagnoses. The top-level status is derived from the worst indicator status.

Health node and data flow

A health node is selected by the master node via a persistent task. The health node maintains a cache of per-node health information. Each node periodically publishes its local health info to the health node cache. That cached health info is then used as input to health indicators when evaluating the overall cluster health. If the health node fails or leaves the cluster, the master node selects a new health node.

Publish: LocalHealthMonitor (local node) -> UpdateHealthInfoCacheAction -> HealthInfoCache (health node)
Retrieve: Health API (coordination node) -> FetchHealthInfoCacheAction -> HealthInfoCache (health node)

Relevant classes:

Health indicators

There are two kinds of health indicators: preflight and regular. Preflight indicators run first; if any preflight indicator is not GREEN, regular indicators return UNKNOWN to avoid misleading results on an unstable cluster. Currently, the only preflight indicator is StableMasterHealthIndicatorService, which checks whether the cluster has a stable master node.

Relevant classes:

Health periodic logger

Cluster health status can be logged periodically by HealthPeriodicLogger, which runs on the health node and calls HealthService at a configured interval. It enables alerting on the health status.

Health metadata

Some health configuration is stored in the cluster state HealthMetadata. It stores user-configurable health settings, such as disk watermark thresholds. The current master node publishes its settings to the cluster state so the same settings are used across nodes in the cluster.

Relevant classes: HealthMetadataService: runs on master node, publishes health metadata to the cluster state.

Watcher

Watcher lets you set a schedule to run a query, and if a condition is met it executes an action. As an example, the following performs a search every 10 minutes. If the number of hits found is greater than 0 then it logs an error message.

console
PUT _watcher/watch/log_error_watch
{
  "trigger" : { "schedule" : { "interval" : "10m" }},
  "input" : {
    "search" : {
      "request" : {
        "indices" : [ "logs" ],
        "body" : {
          "query" : {
            "match" : { "message": "error" }
          }
        }
      }
    }
  },
  "condition" : {
    "compare" : { "ctx.payload.hits.total" : { "gt" : 0 }}
  },
  "actions" : {
    "log_error" : {
      "logging" : {
        "text" : "Found {{ctx.payload.hits.total}} errors in the logs"
      }
    }
  }
}

How Watcher Works

  • We have an API to define a “watch”, which includes the schedule, the query, the condition, and the action
  • Watch definitions are kept in the .watches index
  • Information about currently running watches is in the .triggered_watches index
  • History is written to the .watcher_history index
  • Watcher (WatcherLifeCycleService) runs on all nodes, but only executes watches on a node that has a copy of the .watches shard that the particular watch is in (see WatcherService)
    • Uses a hash to choose the node if there is more than one shard
  • Example common use cases:
    • Periodically send data to a 3rd party system
    • Email users with alerts if certain conditions appear in log files
    • Periodically generate a report using Kibana, and email that report as an attachment. This is supported by declaring a ReportingAttachment ReportingAttachment to the EmailAction EmailAction in the watch definition.

Relevant classes:

Debugging

  • The most useful debugging information is in the Elasticsearch logs and the .watcher_history index
  • It is often useful to get the contents of the .watches index
  • Frequent sources of problems:
    • There is no guarantee that an interval schedule watch will run at exactly the requested interval after the last run
    • In older versions (before 8.17), the counter for the interval schedule restarts if the shard moves. For example, if the interval is once every 12 hours, and the shard moves 10 hours into that interval, it will be at least 12 more hours until it runs.
    • Calls to remote systems (EmailAction and WebhookAction) are a frequent source of failures. Watcher sends the request but doesn't know what happens after that. If you see that the call was successful in .watcher_history, the best way to continue the investigation is in the logs of the remote system.
    • Even if watcher fails during a call to a remote system, the error is likely to be outside of watcher (e.g. network problems). Check the error message in .watcher_history.