akka-docs/src/main/paradox/cluster-sharding.md
@@includeincludes.md { #actor-api } For the full documentation of this feature and for new projects see @ref:Cluster Sharding.
@@@note The Akka dependencies are available from Akka’s secure library repository. To access them you need to use a secure, tokenized URL as specified at https://account.akka.io/token. @@@
To use Cluster Sharding, you must add the following dependency in your project:
@@dependency[sbt,Maven,Gradle] { bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion symbol1=AkkaVersion value1="$akka.version$" group=com.typesafe.akka artifact=akka-cluster-sharding_$scala.binary.version$ version=AkkaVersion }
@@project-info{ projectId="akka-cluster-sharding" }
For an introduction to Sharding concepts see @ref:Cluster Sharding.
This is what an entity actor may look like:
Scala : @@snip ClusterShardingSpec.scala { #counter-actor }
Java : @@snip ClusterShardingTest.java { #counter-actor }
The above actor uses Event Sourcing and the support provided in @scala[PersistentActor] @java[AbstractPersistentActor] to store its state.
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
its state if it is valuable.
Note how the persistenceId is defined. The name of the actor is the entity identifier (utf-8 URL-encoded).
You may define it another way, but it must be unique.
When using the sharding extension you are first, typically at system startup on each node
in the cluster, supposed to register the supported entity types with the ClusterSharding.start
method. ClusterSharding.start gives you the reference which you can pass along.
Please note that ClusterSharding.start will start a ShardRegion in @ref:proxy only mode
when there is no match between the roles of the current cluster node and the role specified in
ClusterShardingSettings.
Scala : @@snip ClusterShardingSpec.scala { #counter-start }
Java : @@snip ClusterShardingTest.java { #counter-start }
The @scala[extractEntityId and extractShardId are two] @java[messageExtractor defines] application specific @scala[functions] @java[methods] to extract the entity
identifier and the shard identifier from incoming messages.
Scala : @@snip ClusterShardingSpec.scala { #counter-extractor }
Java : @@snip ClusterShardingTest.java { #counter-extractor }
This example illustrates two different ways to define the entity identifier in the messages:
Get message includes the identifier itself.EntityEnvelope holds the identifier, and the actual message that is
sent to the entity actor is wrapped in the envelope.Note how these two messages types are handled in the @scala[extractEntityId function] @java[entityId and entityMessage methods] shown above.
The message sent to the entity actor is @scala[the second part of the tuple returned by the extractEntityId] @java[what entityMessage returns] and that makes it possible to unwrap envelopes
if needed.
A shard is a group of entities that will be managed together. The grouping is defined by the
extractShardId function shown above. For a specific entity identifier the shard identifier must always
be the same. Otherwise the entity actor might accidentally be started in several places at the same time.
Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution, i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater than the planned maximum number of cluster nodes. Fewer shards than number of nodes will result in that some nodes will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing overhead, and increased latency because the coordinator is involved in the routing of the first message for each shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping all nodes in the cluster.
A simple sharding algorithm that works fine in most cases is to take the absolute value of the hashCode of
the entity identifier modulo number of shards. As a convenience this is provided by the
ShardRegion.HashCodeMessageExtractor.
Messages to the entities are always sent via the local ShardRegion. The ShardRegion actor reference for a
named entity type is returned by ClusterSharding.start and it can also be retrieved with ClusterSharding.shardRegion.
The ShardRegion will lookup the location of the shard for the entity if it does not already know its location. It will
delegate the message to the right node and it will create the entity actor on demand, i.e. when the
first message for a specific entity is delivered.
Scala : @@snip ClusterShardingSpec.scala { #counter-usage }
Java : @@snip ClusterShardingTest.java { #counter-usage }
See @ref:Cluster Sharding concepts in the documentation of the new APIs.
<a id="cluster-sharding-mode"></a>
There are two cluster sharding states managed:
Shard locationsShard, which is optional, and disabled by defaultFor these, there are currently two modes which define how these states are stored:
@@includecluster.md { #sharding-persistence-mode-deprecated }
Changing the mode requires @ref:a full cluster restart.
The state of the ShardCoordinator is replicated across the cluster but is not durable, not stored to disk.
The ShardCoordinator state replication is handled by @ref:Distributed Data with WriteMajority/ReadMajority consistency.
When all nodes in the cluster have been stopped, the state is no longer needed and dropped.
See @ref:Distributed Data mode in the documentation of the new APIs.
See @ref:Persistence Mode in the documentation of the new APIs.
The ShardRegion actor can also be started in proxy only mode, i.e. it will not
host any entities itself, but knows how to delegate messages to the right location.
A ShardRegion is started in proxy only mode with the ClusterSharding.startProxy method.
Also a ShardRegion is started in proxy only mode when there is no match between the
roles of the current cluster node and the role specified in ClusterShardingSettings
passed to the ClusterSharding.start method.
If the state of the entities are persistent you may stop entities that are not used to
reduce memory consumption. This is done by the application specific implementation of
the entity actors for example by defining receive timeout (context.setReceiveTimeout).
If a message is already enqueued to the entity when it stops itself the enqueued message
in the mailbox will be dropped. To support graceful passivation without losing such
messages the entity actor can send ShardRegion.Passivate to its parent Shard.
The specified wrapped message in Passivate will be sent back to the entity, which is
then supposed to stop itself. Incoming messages will be buffered by the Shard
between reception of Passivate and termination of the entity. Such buffered messages
are thereafter delivered to a new incarnation of the entity.
See @ref:Automatic Passivation in the documentation of the new APIs.
<a id="cluster-sharding-remembering"></a>
See @ref:Remembering Entities in the documentation of the new APIs, including behavior when enabled and disabled.
Note that the state of the entities themselves will not be restored unless they have been made persistent, for example with @ref:Event Sourcing.
To make the list of entities in each Shard persistent (durable), set
the rememberEntities flag to true in ClusterShardingSettings when calling
ClusterSharding.start and make sure the shardIdExtractor handles
Shard.StartEntity(EntityId) which implies that a ShardId must be possible to
extract from the EntityId.
Scala : @@snip ClusterShardingSpec.scala { #extractShardId-StartEntity }
Java : @@snip ClusterShardingTest.java { #extractShardId-StartEntity }
If you need to use another supervisorStrategy for the entity actors than the default (restarting) strategy
you need to create an intermediate parent actor that defines the supervisorStrategy to the
child entity actor.
Scala : @@snip ClusterShardingSpec.scala { #supervisor }
Java : @@snip ClusterShardingTest.java { #supervisor }
You start such a supervisor in the same way as if it was the entity actor.
Scala : @@snip ClusterShardingSpec.scala { #counter-supervisor-start }
Java : @@snip ClusterShardingTest.java { #counter-supervisor-start }
Note that stopped entities will be started again when a new message is targeted to the entity.
If 'on stop' backoff supervision strategy is used, a final termination message must be set and used for passivation, see @ref:Backoff supervisor and sharding
You can send the @scala[ShardRegion.GracefulShutdown] @java[ShardRegion.gracefulShutdownInstance] message
to the ShardRegion actor to hand off all shards that are hosted by that ShardRegion and then the
ShardRegion actor will be stopped. You can watch the ShardRegion actor to know when it is completed.
During this period other regions will buffer messages for those shards in the same way as when a rebalance is
triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere.
This is performed automatically by the @ref:Coordinated Shutdown and is therefore part of the graceful leaving process of a cluster member.
<a id="removeinternalclustershardingdata"></a>
See @ref:removal of Internal Cluster Sharding Data in the documentation of the new APIs.
Two requests to inspect the cluster state are available:
@scala[ShardRegion.GetShardRegionState] @java[ShardRegion.getShardRegionStateInstance] which will return
a @scala[ShardRegion.CurrentShardRegionState] @java[ShardRegion.ShardRegionState] that contains
the identifiers of the shards running in a Region and what entities are alive for each of them.
ShardRegion.GetClusterShardingStats which will query all the regions in the cluster and return
a ShardRegion.ClusterShardingStats containing the identifiers of the shards running in each region and a count
of entities that are alive in each shard.
If any shard queries failed, for example due to timeout if a shard was too busy to reply within the configured akka.cluster.sharding.shard-region-query-timeout,
ShardRegion.CurrentShardRegionState and ShardRegion.ClusterShardingStats will also include the set of shard identifiers by region that failed.
The type names of all started shards can be acquired via @scala[ClusterSharding.shardTypeNames] @java[ClusterSharding.getShardTypeNames].
The purpose of these messages is testing and monitoring, they are not provided to give access to directly sending messages to the individual entities.
A lease can be used as an additional safety measure to ensure a shard does not run on two nodes. See @ref:Lease in the documentation of the new APIs.
ClusterShardingSettings is a parameter to the start method of
the ClusterSharding extension, i.e. each each entity type can be configured with different settings
if needed.
See @ref:configuration for more information.