docs/v3/how-to-guides/workflows/cache-workflow-steps.mdx
The simplest way to cache the results of tasks within in a flow is to set persist_result=True on a task definition.
from prefect import task, flow
@task(persist_result=True)
def add_one(x: int):
return x + 1
@flow
def my_flow():
add_one(1) # will not be cached
add_one(1) # will be cached
add_one(2) # will not be cached
if __name__ == "__main__":
my_flow()
16:28:51.230 | INFO | Flow run 'outgoing-cicada' - Beginning flow run 'outgoing-cicada' for flow 'my-flow'
16:28:51.257 | INFO | Task run 'add_one-d85' - Finished in state Completed()
16:28:51.276 | INFO | Task run 'add_one-eff' - Finished in state Cached(type=COMPLETED)
16:28:51.294 | INFO | Task run 'add_one-d16' - Finished in state Completed()
16:28:51.311 | INFO | Flow run 'outgoing-cicada' - Finished in state Completed()
This will implicitly use the DEFAULT cache policy, which is a composite cache policy defined as:
DEFAULT = INPUTS + TASK_SOURCE + RUN_ID
This means subsequent calls of a task with identical inputs from within the same parent run will return cached results without executing the body of the function.
<Note> The `TASK_SOURCE` component of the `DEFAULT` cache policy helps avoid naming collisions between similar tasks that should not share a cache. </Note>To cache the result of a task based only on task inputs, set cache_policy=INPUTS in the task decorator:
from prefect import task
from prefect.cache_policies import INPUTS
import time
@task(cache_policy=INPUTS)
def my_stateful_task(x: int):
print('sleeping')
time.sleep(10)
return x + 1
my_stateful_task(x=1) # sleeps
my_stateful_task(x=1) # does not sleep
my_stateful_task(x=2) # sleeps
The above task will sleep the first time it is called with x=1, but will not sleep for any subsequent calls with the same input.
Prefect ships with several cache policies that can be used to customize caching behavior.
To cache based on a subset of inputs, you can subtract kwargs from the INPUTS cache policy.
from prefect import task
from prefect.cache_policies import INPUTS
import time
@task(cache_policy=INPUTS - 'debug')
def my_stateful_task(x: int, debug: bool = False):
print('sleeping')
time.sleep(10)
return x + 1
my_stateful_task(x=1) # sleeps
my_stateful_task(x=1, debug=True) # does not sleep
my_stateful_task(x=1, debug=False) # does not sleep
To cache with an expiration, set the cache_expiration parameter on the task decorator.
from prefect import task
from prefect.cache_policies import INPUTS
import time
from datetime import timedelta
@task(cache_policy=INPUTS, cache_expiration=timedelta(seconds=10))
def my_stateful_task(x: int):
print('sleeping')
time.sleep(10)
return x + 1
my_stateful_task(x=1) # sleeps
my_stateful_task(x=1) # does not sleep
# ... 10 seconds pass ...
my_stateful_task(x=1) # sleeps again
To ignore the cache regardless of the cache policy, set the refresh_cache parameter on the task decorator.
import random
from prefect import task
from prefect.cache_policies import TASK_SOURCE
@task(cache_policy=TASK_SOURCE, refresh_cache=True)
def never_caching_task():
return random.random()
To refresh the cache for all tasks, use the PREFECT_TASKS_REFRESH_CACHE setting.
Setting PREFECT_TASKS_REFRESH_CACHE=true changes the default behavior of all tasks to refresh.
If you have tasks that should not refresh when this setting is enabled, set refresh_cache to False. These tasks will never write to the cache. If a cache key exists it will be read, not updated.
If a cache key does not exist yet, these tasks can still write to the cache.
import random
from prefect import task
from prefect.cache_policies import TASK_SOURCE
@task(cache_policy=TASK_SOURCE, refresh_cache=False)
def caching_task():
return random.random()
Cache policies can be combined using the + operator.
from prefect import task
from prefect.cache_policies import INPUTS, TASK_SOURCE
@task(cache_policy=INPUTS + TASK_SOURCE)
def my_task(x: int):
return x + 1
The above task will use a cached result as long as the same inputs and task source are used.
By default Prefect stores results locally in ~/.prefect/storage/. To share the cache across tasks running on different machines, provide a storage block to the result_storage parameter on the task decorator.
Here's an example with of a task that uses an S3 bucket to store cache records:
from prefect import task
from prefect.cache_policies import INPUTS
from prefect_aws import AwsCredentials, S3Bucket
s3_bucket = S3Bucket(
credentials=AwsCredentials(
aws_access_key_id="my-access-key-id",
aws_secret_access_key="my-secret-access-key",
),
bucket_name="my-bucket",
)
# save the block to ensure it is available across machines
s3_bucket.save("my-cache-bucket")
@task(cache_policy=INPUTS, result_storage=s3_bucket)
def my_cached_task(x: int):
return x + 42
For example, the prefect_aws package must be installed to use the S3Bucket storage block.
</Note>
For more advanced caching examples, see the advanced caching how-to guide.
For more information on Prefect's caching system, see the caching concepts page.