docs/v3/advanced/experimental-plugins.mdx
The plugin system is an experimental feature under active development. The API is subject to change without notice, and features may be modified or removed in future releases. </Warning>
The Prefect plugin system allows third-party packages to run hooks when Prefect is imported. This enables plugins to configure environment variables, authenticate with external services, or perform other initialization tasks automatically - whether you're running CLI commands, Python scripts, workers, or agents.
The plugin system is designed for scenarios where you need to:
The plugin system is opt-in and disabled by default. Enable it by setting the environment variable:
export PREFECT_EXPERIMENTS_PLUGINS_ENABLED=1
Once enabled, plugins will automatically run whenever Prefect is imported - this includes:
import prefect)prefect deploy, prefect server start, etc.)This ensures your environment is properly configured before any Prefect code executes.
Here's a minimal example plugin that sets environment variables:
# my_plugin/__init__.py
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1"
@register_hook
def setup_environment(*, ctx: HookContext) -> SetupResult:
"""Configure environment before Prefect starts."""
logger = ctx.logger_factory("my-plugin")
# Perform authentication or configuration
credentials = fetch_credentials()
logger.info("Configured credentials")
return SetupResult(
env={
"MY_SERVICE_TOKEN": credentials.token,
"MY_SERVICE_URL": "https://api.example.com",
},
note="Configured my-service credentials",
required=True # Abort if this plugin fails in strict mode
)
def fetch_credentials():
# Your authentication logic here
pass
Register the plugin in your pyproject.toml:
[project]
name = "my-plugin"
version = "0.1.0"
dependencies = ["prefect>=3.4"]
[project.entry-points."prefect.plugins"]
my_plugin = "my_plugin"
Install the plugin:
pip install -e .
Configure the plugin system with these environment variables:
| Variable | Description | Default |
|---|---|---|
PREFECT_EXPERIMENTS_PLUGINS_ENABLED | Enable/disable the plugin system | 0 (disabled) |
PREFECT_EXPERIMENTS_PLUGINS_ALLOW | Comma-separated list of allowed plugin names | None (all allowed) |
PREFECT_EXPERIMENTS_PLUGINS_DENY | Comma-separated list of denied plugin names | None (none denied) |
PREFECT_EXPERIMENTS_PLUGINS_SETUP_TIMEOUT_SECONDS | Maximum time for all plugins to complete | 20 |
PREFECT_EXPERIMENTS_PLUGINS_STRICT | Exit if a required plugin fails | 0 (disabled) |
PREFECT_EXPERIMENTS_PLUGINS_SAFE_MODE | Load plugins without executing hooks | 0 (disabled) |
Allow only specific plugins:
export PREFECT_EXPERIMENTS_PLUGINS_ALLOW="aws-plugin,gcp-plugin"
Deny problematic plugins:
export PREFECT_EXPERIMENTS_PLUGINS_DENY="legacy-plugin"
Enable strict mode for production:
export PREFECT_EXPERIMENTS_PLUGINS_STRICT=1
Debug plugins without execution:
export PREFECT_EXPERIMENTS_PLUGINS_SAFE_MODE=1
Plugins implement the setup_environment hook to configure the environment or run code on startup. The simplest approach is to use a decorated function:
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1"
@register_hook
def setup_environment(*, ctx: HookContext) -> SetupResult | None:
"""
Prepare process environment for Prefect.
Args:
ctx: Context with Prefect version, API URL, and logger factory
Returns:
SetupResult with environment variables, or None for no changes
"""
logger = ctx.logger_factory("my-plugin")
logger.info(f"Running with Prefect {ctx.prefect_version}")
# Return None if no setup needed
if not should_configure():
return None
# Return SetupResult with configuration
return SetupResult(
env={"KEY": "value"},
note="Short description",
required=False # Optional
)
Entry Point: When using the function-based pattern, your entry point should reference the module (e.g., "my_plugin") rather than a specific class or instance.
</Note>
Alternative: Class-Based Plugins
For plugins that need to maintain state, you can also use a class-based approach:
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
class MyPlugin:
def __init__(self):
self.state = {}
@register_hook
def setup_environment(self, *, ctx: HookContext) -> SetupResult | None:
# Access instance state
return SetupResult(env={"KEY": "value"})
# Create instance at module level
Plugin = MyPlugin()
Entry point: "my_plugin:Plugin"
The context object passed to plugins:
from dataclasses import dataclass
from typing import Callable
import logging
@dataclass
class HookContext:
prefect_version: str # e.g., "3.0.0"
api_url: str | None # Configured Prefect API URL
logger_factory: Callable[[str], logging.Logger] # Create loggers
The result returned by plugins:
from dataclasses import dataclass
from typing import Mapping
from datetime import datetime
@dataclass
class SetupResult:
env: Mapping[str, str] # Environment variables to set
note: str | None = None # Human-readable description
required: bool = False # Abort in strict mode if fails
Here's a complete example of a plugin that assumes an AWS IAM role and provides temporary credentials:
# prefect_aws_setup/__init__.py
from __future__ import annotations
import os
from datetime import timezone
import botocore.session
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1"
@register_hook
def setup_environment(*, ctx: HookContext) -> SetupResult | None:
"""Assume AWS IAM role and provide temporary credentials."""
logger = ctx.logger_factory("prefect-aws-setup")
role_arn = os.getenv("PREFECT_AWS_SETUP_ROLE_ARN")
if not role_arn:
logger.debug("PREFECT_AWS_SETUP_ROLE_ARN not set, skipping")
return None
profile = os.getenv("PREFECT_AWS_SETUP_PROFILE")
region = os.getenv("AWS_REGION", "us-east-1")
duration = int(os.getenv("PREFECT_AWS_SETUP_DURATION", "3600"))
try:
# Create AWS session and assume role
session = botocore.session.Session(profile=profile)
sts = session.create_client("sts", region_name=region)
response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="prefect-plugin",
DurationSeconds=duration
)
credentials = response["Credentials"]
return SetupResult(
env={
"AWS_ACCESS_KEY_ID": credentials["AccessKeyId"],
"AWS_SECRET_ACCESS_KEY": credentials["SecretAccessKey"],
"AWS_SESSION_TOKEN": credentials["SessionToken"],
"AWS_REGION": region,
},
note=f"Assumed role {role_arn.split('/')[-1]}",
required=bool(os.getenv("PREFECT_AWS_SETUP_REQUIRED")),
)
except Exception:
logger.exception("Failed to assume AWS role")
if os.getenv("PREFECT_AWS_SETUP_REQUIRED"):
raise
return None
Package configuration:
# pyproject.toml
[project]
name = "prefect-aws-setup"
version = "0.1.0"
dependencies = ["prefect>=3.4", "botocore>=1.34"]
[project.entry-points."prefect.plugins"]
aws_setup = "prefect_aws_setup"
Installation:
pip install -e .
Usage:
# Configure the plugin
export PREFECT_AWS_SETUP_ROLE_ARN="arn:aws:iam::123456789012:role/PrefectRole"
export PREFECT_AWS_SETUP_PROFILE="my-aws-profile"
export AWS_REGION="us-west-2"
export PREFECT_AWS_SETUP_REQUIRED=1
# Enable plugins
export PREFECT_EXPERIMENTS_PLUGINS_ENABLED=1
# Run Prefect commands - credentials are automatically configured
prefect deploy --all
Use the diagnostic command to troubleshoot plugin issues:
prefect experimental plugins diagnose
Example output:
Prefect Experimental Plugin System Diagnostics
Enabled: True
Timeout: 20.0s
Strict mode: False
Safe mode: False
Allow list: None
Deny list: None
Discoverable Plugins (entry point group: prefect.plugins)
• aws-setup: active
Module: prefect_aws_setup:Plugin
API requirement: >=0.1,<1
Running Startup Hooks
• aws-setup: success
Environment variables: 4
AWS_ACCESS_KEY_ID=••••••
AWS_SECRET_ACCESS_KEY=••••••
AWS_SESSION_TOKEN=••••••
AWS_REGION=us-west-2
Note: Assumed role PrefectRole
Expires: 2024-01-15 18:30:00+00:00
PREFECT_EXPERIMENTS_PLUGINS_DENY to quarantine known-bad pluginsThe plugin system automatically redacts sensitive environment variables in logs and diagnostics. Variables containing these keywords are redacted:
SECRETTOKENPASSWORDKEYExample:
# This environment variable will be redacted
{"AWS_SECRET_ACCESS_KEY": "••••••"}
# This will not
{"AWS_REGION": "us-east-1"}
Plugins execute with the same permissions as the Prefect process. Be cautious about:
Plugins run automatically when Prefect is imported, before any other Prefect code executes. This means:
If plugin initialization fails (and strict mode is disabled), Prefect will log the error and continue loading.
prefect.plugins grouprequired=True, plugin failures abort startupPlugins can be either synchronous or asynchronous:
from prefect._experimental.plugins import register_hook, HookContext, SetupResult
# Synchronous plugin
@register_hook
def setup_environment(*, ctx: HookContext) -> SetupResult:
# Synchronous implementation
return SetupResult(env={"KEY": "value"})
# Asynchronous plugin
@register_hook
async def setup_environment(*, ctx: HookContext) -> SetupResult:
# Asynchronous implementation
data = await fetch_data()
return SetupResult(env={"KEY": data})
Plugins should be idempotent - multiple invocations should produce the same result. Prefect may call plugins multiple times during initialization or in different processes.
Plugins should declare their API version compatibility using the PREFECT_PLUGIN_API_REQUIRES attribute:
from prefect._experimental.plugins import register_hook, HookContext
# At module level in your plugin
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1" # Supports API version 0.1.x
class Plugin:
@register_hook
def setup_environment(self, *, ctx: HookContext):
# Your plugin implementation
pass
The version specifier follows PEP 440 syntax:
">=0.1,<1" - Compatible with 0.1.x releases, incompatible with 1.0+">=0.1" - Compatible with 0.1 and all future versions"==0.1" - Only compatible with exactly version 0.1The current plugin API version is 0.1. This will be incremented when breaking changes are made to the plugin interface.
When Prefect loads plugins, it automatically validates version compatibility:
">=0.1,<1" if not specifiedExample logs:
# Incompatible version
WARNING: Skipping plugin my-plugin: requires API version >=1.0, current version is 0.1
# Invalid specifier
DEBUG: Plugin my-plugin has invalid version specifier 'not-valid', ignoring version check
Use the diagnostics command to see plugin version requirements:
prefect experimental plugins diagnose
Output shows API requirements for each plugin:
• my-plugin: active
Module: my_plugin:Plugin
API requirement: >=0.1,<1
Verify plugins are enabled:
echo $PREFECT_EXPERIMENTS_PLUGINS_ENABLED
Check plugin is discoverable:
prefect experimental plugins diagnose
Verify entry point is registered:
pip show your-plugin-package
Enable safe mode to test loading without execution:
PREFECT_EXPERIMENTS_PLUGINS_SAFE_MODE=1 prefect experimental plugins diagnose
Check plugin logs:
PREFECT_LOGGING_LEVEL=DEBUG prefect --version
Test plugin in isolation:
PREFECT_EXPERIMENTS_PLUGINS_ALLOW="your-plugin" prefect experimental plugins diagnose
If a plugin isn't loading due to version mismatch:
Check the plugin's API requirement:
prefect experimental plugins diagnose
Look for warnings about incompatible API versions
Update the plugin to a version compatible with your Prefect installation, or update Prefect to match the plugin's requirements
If you're developing a plugin, adjust PREFECT_PLUGIN_API_REQUIRES to match the current API version:
PREFECT_PLUGIN_API_REQUIRES = ">=0.1,<1" # Current API version is 0.1
Enable debug logging to see detailed version validation:
PREFECT_LOGGING_LEVEL=DEBUG prefect --version
Increase the timeout for slow operations:
export PREFECT_EXPERIMENTS_PLUGINS_SETUP_TIMEOUT_SECONDS=60
Or optimize plugin startup time by deferring expensive operations.