docs/v3/how-to-guides/migrate/upgrade-to-prefect-3.mdx
Prefect 3.0 introduces a number of enhancements to the OSS product: a new events & automations backend for event-driven workflows and observability, improved runtime performance, autonomous task execution and a streamlined caching layer based on transactional semantics.
The majority of these enhancements maintain compatibility with most Prefect 2.0 workflows, but there are a few caveats that you may need to adjust for.
To learn more about the enhanced performance and new features, see What's new in Prefect 3.0.
For the majority of users, upgrading to Prefect 3.0 will be a seamless process that requires few or no code changes. This guide highlights key changes that you may need to consider when upgrading.
<Info> **Prefect 2.0** refers to the 2.x lineage of the open source prefect package, and **Prefect 3.0** refers exclusively to the 3.x lineage of the prefect package. Neither version is strictly tied to any aspect of Prefect's commercial product, [Prefect Cloud](/v3/how-to-guides/cloud/connect-to-cloud). </Info>To upgrade to Prefect 3.0, run:
pip install -U prefect
If you self-host a Prefect server, run this command to update your database:
prefect server database upgrade
If you use a Prefect integration or extra, remember to upgrade it as well. For example:
pip install -U 'prefect[aws]'
Prefect 3.0 is built with Pydantic 2.0 for improved performance. All Prefect objects will automatically upgrade, but if you use custom Pydantic models for flow parameters or custom blocks, you'll need to ensure they are compatible with Pydantic 2.0. You can continue to use Pydantic 1.0 models in your own code if they do not interact directly with Prefect.
Refer to Pydantic's migration guide for detailed information on necessary changes.
<Warning> We recommend pausing all deployment schedules prior to upgrading. Because of differences in Pydantic datetime handling that affect the scheduler's idempotency logic, there is a small risk of the scheduler duplicating runs in its first loop. </Warning>Some less-commonly used modules have been renamed, reorganized, or removed for clarity. The old import paths will continue to be supported for 6 months, but emit deprecation warnings. You can look at the deprecation code to see a full list of affected paths.
In Prefect 2.0, it was possible to call native async tasks from synchronous flows, a pattern that is not normally supported in Python. Prefect 3.0.0 removes this behavior to reduce complexity and potential issues and edge cases. If you relied on asynchronous tasks in synchronous flows, you must either make your flow asynchronous or use a task runner that supports asynchronous execution.
In Prefect 2.0, the final state of a flow run was influenced by the states of its task runs; if any task run failed, the flow run was marked as failed.
In Prefect 3.0, the final state of a flow run is entirely determined by:
The return value of the flow function (same as in Prefect 2.0):
State that is returned will be considered the final state of the flow run. If an iterable of State objects is returned, all must be Completed for the flow run to be considered Completed. If any are Failed, the flow run will be marked as Failed.Whether the flow function allows an exception to raise:
Failed state.raise_on_failure=False will not affect the flow run state.This change means that task failures within a flow do not automatically cause the flow run to fail unless they affect the flow's return value or raise an uncaught exception.
<Warning> When migrating from Prefect 2.0 to Prefect 3, be aware that flows may now complete successfully even if they contain failed tasks, unless you explicitly handle task failures. </Warning>To ensure your flow fails when critical tasks fail, consider these approaches:
raise_on_failure=False.return_state=True and explicitly check task states to conditionally raise the underlying exception or return a failed state.@task def failing_task(): raise ValueError("Task failed")
@flow def my_flow(): failing_task() # Exception propagates, causing flow failure
try: my_flow() except ValueError as e: print(f"Flow failed: {e}") # Output: Flow failed: Task failed
```python Use return_state
from prefect import flow, task
from prefect.states import Failed
@task
def failing_task():
raise ValueError("Task failed")
@flow
def my_flow():
state = failing_task(return_state=True)
if state.is_failed():
raise ValueError(state.result())
return "Flow completed successfully"
try:
print(my_flow())
except ValueError as e:
print(f"Flow failed: {e}") # Output: Flow failed: Task failed
from prefect import flow, task
from prefect.states import Failed
@task
def failing_task():
raise ValueError("Task failed")
@flow
def my_flow():
try:
failing_task()
except ValueError:
return Failed(message="Flow failed due to task failure")
return "Flow completed successfully"
print(my_flow()) # Output: Failed(message='Flow failed due to task failure')
Choose the strategy that best fits your specific use case and error handling requirements.
PrefectFutures now have a standard synchronous interface, with an asynchronous one planned soon.
Prefect 3.0 introduces a powerful idempotency engine. By default, tasks in a flow run are automatically cached if they are called more than once with the same inputs. If you rely on tasks with side effects, this may result in surprising behavior. To disable caching, pass cache_policy=None to your task.
In Prefect 2.0, when passing a mutable object (such as a list or dict) using unmapped() to a mapped task, each task instance appeared to receive an independent copy of the object. In Prefect 3.0, the unmapped object is shared by reference across all mapped task runs. If any task mutates the object, all other concurrently running tasks will see those mutations, potentially causing race conditions and unexpected behavior.
To avoid this issue, create a copy of the mutable object within each task before modifying it:
<CodeGroup> ```python Problematic (Prefect 3.0) from prefect import flow, task, unmapped@task def process_item(item: str, shared_state: dict): # This mutation affects ALL tasks! shared_state["current_item"] = item # Race condition: another task may overwrite this before we read it print(f"Processing {shared_state['current_item']}")
@flow def my_flow(): shared_state = {"current_item": None}
# All tasks share the same dict reference
process_item.map(
item=["A", "B", "C"],
shared_state=unmapped(shared_state)
)
```python Recommended (Prefect 3.0)
from prefect import flow, task, unmapped
from copy import deepcopy
@task
def process_item(item: str, shared_state: dict):
# Create a copy for this task
local_state = deepcopy(shared_state)
local_state["current_item"] = item
print(f"Processing {local_state['current_item']}")
@flow
def my_flow():
shared_state = {"current_item": None}
process_item.map(
item=["A", "B", "C"],
shared_state=unmapped(shared_state)
)
from prefect import flow, task, unmapped
@task
def process_item(item: str, config_value: str):
# Immutable strings are safe to share
print(f"Processing {item} with config: {config_value}")
@flow
def my_flow():
config = "production"
process_item.map(
item=["A", "B", "C"],
config_value=unmapped(config)
)
If you need to share read-only configuration or settings across tasks, consider using immutable types (strings, tuples, frozen dataclasses) instead of mutable objects.
In Prefect 2.0, agents were deprecated in favor of next-generation workers. Workers are now standard in Prefect 3. For detailed information on upgrading from agents to workers, please refer to our upgrade guide.
AttributeError: 'coroutine' object has no attribute <some attribute>When within an asynchronous task or flow context, if you do not await an asynchronous function or method, this error will be raised when you try to use the object.
To fix it, await the asynchronous function or method or use _sync=True.
For example, Block's load method is asynchronous in an async context:
<CodeGroup>
from prefect.blocks.system import Secret
async def my_async_function():
my_secret = Secret.load("my-secret")
print(my_secret.get()) # AttributeError: 'coroutine' object has no attribute 'get'
from prefect.blocks.system import Secret
async def my_async_function():
my_secret = await Secret.load("my-secret")
print(my_secret.get()) # This will work
from prefect.blocks.system import Secret
async def my_async_function():
my_secret = Secret.load("my-secret", _sync=True)
print(my_secret.get()) # This will work
Similarly, if you never use an un-awaited coroutine, you may see a warning like this:
RuntimeWarning: coroutine 'some_async_callable' was never awaited
...
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
This is the same problem as above, and you may await the coroutine to fix it or use _sync=True.
TypeError: object <some Type> can't be used in 'await' expressionThis error occurs when using the await keyword before an object that is not a coroutine.
To fix it, remove the await.
For example, my_task.submit(...) is always synchronous in Prefect 3.x:
@task async def my_task(): pass
@flow async def my_flow(): future = await my_task.submit() # TypeError: object PrefectConcurrentFuture can't be used in 'await' expression
```python Correct
from prefect import flow, task
@task
async def my_task():
pass
@flow
async def my_flow():
future = my_task.submit() # This will work
See the Futures interface section for more information on this particular gotcha.
TypeError: Flow.deploy() got an unexpected keyword argument 'schedule'In Prefect 3.0, the schedule argument has been removed in favor of the schedules argument.
This applies to both the Flow.serve and Flow.deploy methods.
@flow def my_flow(): pass
my_flow.serve( name="my-flow", schedule=IntervalSchedule(interval=timedelta(minutes=1)) )
```python Prefect 3.0 {11}
from datetime import timedelta
from prefect import flow
from prefect.schedules import Interval
@flow
def my_flow():
pass
my_flow.serve(
name="my-flow",
schedules=[Interval(timedelta(minutes=1))]
)