docs/src/main/sphinx/admin/fault-tolerant-execution.md
By default, if a Trino node lacks the resources to execute a task or otherwise fails during query execution, the query fails and must be run again manually. The longer the runtime of a query, the more likely it is to be susceptible to such failures.
Fault-tolerant execution is a mechanism in Trino that enables a cluster to mitigate query failures by retrying queries or their component tasks in the event of failure. With fault-tolerant execution enabled, intermediate exchange data is spooled and can be re-used by another worker in the event of a worker outage or other fault during query execution.
:::{note} Fault tolerance does not apply to broken queries or other user error. For example, Trino does not spend resources retrying a query that fails because its SQL cannot be parsed.
For a step-by-step guide explaining how to configure a Trino cluster with
fault-tolerant execution to improve query processing resilience, read
{doc}/installation/query-resiliency.
:::
Fault-tolerant execution is turned off by default. To enable the feature, set the
retry-policy configuration property to either QUERY or TASK
depending on the desired {ref}retry policy <fte-retry-policy>.
retry-policy=QUERY
:::{warning}
Setting retry-policy may cause queries to fail with connectors that do not
explicitly support fault-tolerant execution, resulting in a "This connector
does not support query retries" error message.
Support for fault-tolerant execution of SQL statements varies on a per-connector basis, with more details in the documentation for each connector. The following connectors support fault-tolerant execution:
BigQuery connector <bigquery-fte-support>Delta Lake connector <delta-lake-fte-support>Hive connector <hive-fte-support>Iceberg connector <iceberg-fte-support>MariaDB connector <mariadb-fte-support>MongoDB connector <mongodb-fte-support>MySQL connector <mysql-fte-support>Oracle connector <oracle-fte-support>PostgreSQL connector <postgresql-fte-support>Redshift connector <redshift-fte-support>SQL Server connector <sqlserver-fte-support>
:::The following configuration properties control the behavior of fault-tolerant execution on a Trino cluster:
:::{list-table} Fault-tolerant execution configuration properties :widths: 30, 50, 20 :header-rows: 1
retry-policyQUERY to retry
the whole query, or TASK to retry tasks individually if they fail. See
retry policy for more information. Use the equivalent
session property retry_policy only on clusters configured for
fault-tolerant execution and typically only to deactivate with NONE, since
switching between modes on a cluster is not tested.NONEretry-policy.allowedNONE, QUERY, TASKexchange.deduplication-buffer-size32MBfault-tolerant-execution.exchange-encryption-enabledtrue
:::Find further related properties in , specifically in and .
(fte-retry-policy)=
The retry-policy configuration property, or the retry_policy session
property, designates whether Trino retries entire queries or a query's
individual tasks in the event of failure.
A QUERY retry policy instructs Trino to automatically retry a query in the
event of an error occurring on a worker node. A QUERY retry policy is
recommended when the majority of the Trino cluster's workload consists of many
small queries.
By default Trino does not implement fault tolerance for queries whose result set
exceeds 32MB in size, such as {doc}/sql/select statements that return a very
large data set to the user. This limit can be increased by modifying the
exchange.deduplication-buffer-size configuration property to be greater than
the default value of 32MB, but this results in higher memory usage on the
coordinator.
To enable fault-tolerant execution on queries with a larger result set, it is
strongly recommended to configure an {ref}exchange manager <fte-exchange-manager> that utilizes external storage for spooled data and
therefore allows for storage of spilled data beyond the in-memory buffer size.
A TASK retry policy instructs Trino to retry individual query {ref}tasks <trino-concept-task> in the event of failure. You must configure an
{ref}exchange manager <fte-exchange-manager> to use the task retry policy.
This policy is recommended when executing large batch queries, as the cluster
can more efficiently retry smaller tasks within the query rather than retry the
whole query.
When a cluster is configured with a TASK retry policy, some relevant
configuration properties have their default values changed to follow best
practices for a fault-tolerant cluster. However, this automatic change does not
affect clusters that have these properties manually configured. If you have
any of the following properties configured in the config.properties file on
a cluster with a TASK retry policy, it is strongly recommended to set the
task.low-memory-killer.policy
{doc}query management property </admin/properties-query-management> to
total-reservation-on-blocked-nodes, or queries may need to be manually killed
if the cluster runs out of memory.
:::{note}
A TASK retry policy is best suited for large batch queries, but this
policy can result in higher latency for short-running queries executed in high
volume. As a best practice, it is recommended to run a dedicated cluster
with a TASK retry policy for large batch queries, separate from another
cluster that handles short queries.
:::
(fte-encryption)=
Trino encrypts data before spooling it to storage. This prevents access to query data by anyone besides the Trino cluster that wrote it, including administrators of the storage system. A new encryption key is randomly generated for every query, and the key is discarded once a query is completed.
You can further configure fault-tolerant execution with the following configuration properties. The default values for these properties should work for most deployments, but you can change these values for testing or troubleshooting purposes.
The following configuration properties control the thresholds at which queries/tasks are no longer retried in the event of repeated failures:
:::{list-table} Fault tolerance retry limit configuration properties :widths: 30, 50, 20, 30 :header-rows: 1
query-retry-attempts4QUERYtask-retry-attempts-per-task4TASKretry-initial-delayretry_initial_delay
session property.10sQUERY and TASKretry-max-delayretry_max_delay session
property.1mQUERY and TASKretry-delay-scale-factorretry_delay_scale_factor session
property.2.0QUERY and TASK
:::With a TASK retry policy, it is important to manage the amount of data
processed in each task. If tasks are too small, the management of task
coordination can take more processing time and resources than executing the task
itself. If tasks are too large, then a single task may require more resources
than are available on any one node and therefore prevent the query from
completing.
Trino supports limited automatic task sizing. If issues are occurring
during fault-tolerant task execution, you can configure the following
configuration properties to manually control task sizing. These configuration
properties only apply to a TASK retry policy.
:::{list-table} Task sizing configuration properties :widths: 30, 50, 20 :header-rows: 1
fault-tolerant-execution-standard-split-size
Standard split data size processed by tasks that read data from source tables. Value is interpreted with split weight taken into account. If the weight of splits produced by a catalog denotes that they are lighter or heavier than "standard" split, then the number of splits processed by a single task is adjusted accordingly.
May be overridden for the current session with the
fault_tolerant_execution_standard_split_size session
property.
64MB
fault-tolerant-execution-max-task-split-count
Maximum number of splits processed by a single task. This value is not split weight-adjusted and serves as protection against situations where catalogs report an incorrect split weight.
May be overridden for the current session with the
fault_tolerant_execution_max_task_split_count session
property.
2048
fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period64fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor1.26fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min512MBfault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max50GBfault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period64fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor1.26fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min4GBfault-tolerant-execution-arbitrary-distribution-write-task-target-size-max50GBfault-tolerant-execution-hash-distribution-compute-task-target-size512MBfault-tolerant-execution-hash-distribution-write-task-target-size4GBfault-tolerant-execution-hash-distribution-write-task-target-max-count2000
:::With a TASK retry policy, nodes are allocated to tasks based on available
memory and estimated memory usage. If task failure occurs due to exceeding
available memory on a node, the task is restarted with a request to allocate the
full node for its execution.
The initial task memory-requirements estimation is static and configured with
the fault-tolerant-execution-task-memory configuration property. This property only
applies to a TASK retry policy.
:::{list-table} Node allocation configuration properties :widths: 30, 50, 20 :header-rows: 1
fault-tolerant-execution-task-memoryfault_tolerant_execution_task_memory
session property.5GB
:::The following additional configuration property can be used to manage fault-tolerant execution:
:::{list-table} Other fault-tolerant execution configuration properties :widths: 30, 50, 20, 30 :header-rows: 1
fault-tolerant-execution-task-descriptor-storage-max-memoryTASKfault-tolerant-execution-max-partition-countquery.max-hash-partition-count query
management property. It is not
recommended to increase this property value higher than the default of 50,
which may result in instability and poor performance. May be overridden for
the current session with the fault_tolerant_execution_max_partition_count
session property.50TASKfault-tolerant-execution-min-partition-countquery.min-hash-partition-count query
management property. May be overridden
for the current session with the
fault_tolerant_execution_min_partition_count session
property.4TASKfault-tolerant-execution-min-partition-count-for-writequery.min-hash-partition-count-for-write query management
property. May be overridden for the
current session with the
fault_tolerant_execution_min_partition_count_for_write session
property.50TASKmax-tasks-waiting-for-node-per-query50TASK
:::(fte-exchange-manager)=
Exchange spooling is responsible for storing and managing spooled data for
fault-tolerant execution. You can configure a filesystem-based exchange manager
that stores spooled data in a specified location, such as {ref}AWS S3 <fte-exchange-aws-s3> and S3-compatible systems, {ref}Azure Blob Storage <fte-exchange-azure-blob>, {ref}Google Cloud Storage <fte-exchange-gcs>,
{ref}Alluxio <fte-exchange-alluxio>, or {ref}HDFS <fte-exchange-hdfs>.
To configure an exchange manager, create a new
etc/exchange-manager.properties configuration file on the coordinator and
all worker nodes. In this file, set the exchange-manager.name configuration
property to filesystem or hdfs, and set additional configuration properties as needed
for your storage solution.
You can also specify the location of the exchange manager configuration file
in config.properties with the exchange-manager.config-file property.
When this property is set, Trino loads the exchange manager configuration
from the specified path instead of the default etc/exchange-manager.properties.
The following table lists the available configuration properties for
exchange-manager.properties, their default values, and which file systems
the property may be configured for:
:::{list-table} Exchange manager configuration properties :widths: 30, 50, 20, 30 :header-rows: 1
exchange.base-directoriesexchange.max-page-storage-size16MBexchange.sink-buffer-pool-min-size10exchange.sink-buffers-per-partition2exchange.sink-max-file-size1GBexchange.source-concurrent-readers4exchange.s3.aws-access-keyexchange.s3.aws-secret-keyexchange.s3.iam-roleexchange.s3.external-idexchange.s3.regionexchange.s3.endpointhttps://s3.us-east-1.amazonaws.com.
Note that TLS is redundant due to {ref}automatic encryption <fte-encryption>.
If using GCS, set it to https://storage.googleapis.com.exchange.s3.max-error-retries10exchange.s3.path-style-accessfalseexchange.s3.upload.part-size5MBexchange.gcs.json-key-file-pathexchange.gcs.json-keyexchange.gcs.json-keyexchange.gcs.json-key-file-pathexchange.azure.endpointexchange.azure.connection-stringexchange.azure.connection-stringexchange.azure.endpointexchange.azure.block-size4MBexchange.azure.max-error-retries10exchange.alluxio.block-size4MBexchange.alluxio.site-file-path/etc/alluxio-site.properties. The file must exist on all
nodes in the Trino cluster. Follow the Alluxio client configuration
documentation
for more details.exchange.hdfs.block-size4MBexchange.hdfs.skip-directory-scheme-validationhdfs.config.resources/etc/hdfs-site.xml. The files must exist on all nodes in the Trino
cluster.To reduce the exchange manager's overall I/O load, the
configuration property defaults to LZ4. In
addition, is automatically performed and some details can
be configured.
It is also recommended to configure a bucket lifecycle rule to automatically expire abandoned objects in the event of a node crash.
(fte-exchange-aws-s3)=
The following example exchange-manager.properties configuration specifies an
AWS S3 bucket as the spooling storage destination. Note that the destination
does not have to be in AWS, but can be any S3-compatible storage system. While
the exchange manager is designed to support S3-compatible storage systems, only
AWS S3 and MinIO are tested for compatibility. For other storage systems,
perform your own testing and consult your vendor for more information.
exchange-manager.name=filesystem
exchange.base-directories=s3://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
You can configure multiple S3 buckets for the exchange manager to distribute
spooled data across buckets, reducing the I/O load on any one bucket. If a query
fails with the error message
"software.amazon.awssdk.services.s3.model.S3Exception: Please reduce your
request rate", this indicates that the workload is I/O intensive, and you should
specify multiple S3 buckets in exchange.base-directories to balance the
load:
exchange.base-directories=s3://exchange-spooling-bucket-1,s3://exchange-spooling-bucket-2
(fte-exchange-azure-blob)=
The following example exchange-manager.properties configuration specifies an
Azure Blob Storage container as the spooling storage destination. You must use
Azure Blob Storage, not Azure Data Lake Storage or any other hierarchical
storage option in Azure.
exchange-manager.name=filesystem
exchange.base-directories=abfs://container_name@account_name.dfs.core.windows.net
exchange.azure.connection-string=connection-string
(fte-exchange-gcs)=
To enable exchange spooling on GCS in Trino, change the request endpoint to the
https://storage.googleapis.com Google storage URI, and configure your AWS
access/secret keys to use the GCS HMAC keys. If you deploy Trino on GCP, you
must either create a service account with access to your spooling bucket or
configure the key path to your GCS credential file.
For more information on GCS's S3 compatibility, refer to the Google Cloud documentation on S3 migration.
The following example exchange-manager.properties configuration specifies a
GCS bucket as the spooling storage destination.
exchange-manager.name=filesystem
exchange.base-directories=gs://exchange-spooling-bucket
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
exchange.s3.aws-secret-key=example-secret-key
exchange.s3.endpoint=https://storage.googleapis.com
exchange.gcs.json-key-file-path=/path/to/gcs_keyfile.json
(fte-exchange-alluxio)=
The following exchange-manager.properties configuration example specifies Alluxio
as the spooling storage destination.
exchange-manager.name=filesystem
exchange.base-directories=alluxio://alluxio-master:19998/exchange-spooling-directory
exchange.alluxio.site-file-path=/path/to/alluxio-site.properties
(fte-exchange-hdfs)=
The following exchange-manager.properties configuration example specifies HDFS
as the spooling storage destination.
exchange-manager.name=hdfs
exchange.base-directories=hadoop-master:9000/exchange-spooling-directory
hdfs.config.resources=/usr/lib/hadoop/etc/hadoop/core-site.xml
When you want use Hadoop-compatible file system as the spooling storage location,
you should enable exchange.hdfs.skip-directory-scheme-validation in exchange-manager.properties
when configure exchange.base-directories with a specific scheme instead of hdfs and the following steps
may be necessary.
AbstractFileSystem implementation in core-site.xml.${Trino_HOME}/plugin/exchange-hdfs
on all Trino cluster nodes.(fte-exchange-local-filesystem)=
The following example exchange-manager.properties configuration specifies a
local directory, /tmp/trino-exchange-manager, as the spooling storage
destination.
:::{note} It is only recommended to use a local filesystem for exchange in standalone, non-production clusters. A local directory can only be used for exchange in a distributed cluster if the exchange directory is shared and accessible from all nodes. :::
exchange-manager.name=filesystem
exchange.base-directories=/tmp/trino-exchange-manager
Fault-tolerant execution mode offers several adaptive plan optimizations that adjust query execution plans dynamically based on runtime statistics. For more information, see .