docs/integrations/prefect-aws/index.mdx
prefect-aws offers significant advantages over direct boto3 integration:
The following command will install a version of prefect-aws compatible with your installed version of prefect. If you don't already have prefect installed, it will install the newest version of prefect as well.
pip install "prefect[aws]"
Upgrade to the latest versions of prefect and prefect-aws:
pip install -U "prefect[aws]"
Most AWS services requires an authenticated session. Prefect makes it simple to provide credentials via AWS Credentials blocks.
Steps:
AwsCredentials block in the Prefect UI or use a Python script like the one below.from prefect_aws import AwsCredentials
AwsCredentials(
aws_access_key_id="PLACEHOLDER",
aws_secret_access_key="PLACEHOLDER",
aws_session_token=None, # replace this with token if necessary
region_name="us-east-2"
).save("BLOCK-NAME-PLACEHOLDER")
Prefect uses the Boto3 library under the hood. To find credentials for authentication, any data not provided to the block are sourced at runtime in the order shown in the Boto3 docs. Prefect creates the session object using the values in the block and then, any missing values follow the sequence in the Boto3 docs.
AwsCredentials supports assuming IAM roles for cross-account access or enhanced security. When assume_role_arn is provided, get_boto3_session() automatically assumes the role and returns a session with temporary credentials.
AwsCredentials(
aws_access_key_id="PLACEHOLDER",
aws_secret_access_key="PLACEHOLDER",
region_name="us-east-2",
assume_role_arn="arn:aws:iam::123456789012:role/MyRole",
assume_role_kwargs={
"RoleSessionName": "my-session",
"DurationSeconds": 3600,
"ExternalId": "unique-external-id"
}
).save("BLOCK-NAME-PLACEHOLDER")
Available assume_role_kwargs parameters include:
RoleSessionName: Session name for the assumed roleDurationSeconds: Session duration (900-43200 seconds)ExternalId: Unique identifier for third-party accessPolicy: Inline session policy (JSON string)PolicyArns: List of managed policy ARNsTags: Session tags for attributionSerialNumber and TokenCode: For MFA authenticationCreate a block for reading and writing files to S3.
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket
S3Bucket(
bucket_name="BUCKET-NAME-PLACEHOLDER",
credentials=aws_credentials
).save("S3-BLOCK-NAME-PLACEHOLDER")
Invoke AWS Lambdas, synchronously or asynchronously.
from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials
LambdaFunction(
function_name="test_lambda_function",
aws_credentials=credentials,
).save("LAMBDA-BLOCK-NAME-PLACEHOLDER")
Create a block to read, write, and delete AWS Secret Manager secrets.
from prefect_aws import AwsCredentials
from prefect_aws.secrets_manager import AwsSecret
AwsSecret(
secret_name="test_secret_name",
aws_credentials=credentials,
).save("AWS-SECRET-BLOCK-NAME-PLACEHOLDER")
prefect-aws includes a plugin to automatically handle authentication for AWS RDS PostgreSQL databases using IAM tokens.
Before using RDS IAM authentication, you need to configure AWS:
Enable IAM authentication on your RDS instance: In the AWS Console, modify your RDS instance and enable "IAM database authentication".
Create an IAM policy with the rds-db:connect permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "rds-db:connect",
"Resource": "arn:aws:rds-db:<region>:<account-id>:dbuser:<db-resource-id>/<db-username>"
}
]
}
Create a database user that uses IAM authentication:
CREATE USER iam_user WITH LOGIN;
GRANT rds_iam TO iam_user;
Enable the experimental plugin system:
export PREFECT_EXPERIMENTS_PLUGINS_ENABLED=true
Enable RDS IAM authentication:
export PREFECT_INTEGRATIONS_AWS_RDS_IAM_ENABLED=true
(Optional) Set the AWS region:
export PREFECT_INTEGRATIONS_AWS_RDS_IAM_REGION_NAME=us-west-2
Configure your database connection URL:
export PREFECT_API_DATABASE_CONNECTION_URL="postgresql+asyncpg://iam_user@your-rds-host:5432/prefect"
The plugin will automatically generate and inject an IAM authentication token as the password when connecting to the database.
prefect-aws provides comprehensive integrations for key AWS services:
| Service | Integration Type | Use Cases |
|---|---|---|
| S3 | S3Bucket block | File storage, data lake operations, deployment storage |
| Secrets Manager | AwsSecret block | Secure credential storage, API key management |
| Lambda | LambdaFunction block | Serverless function execution, event-driven processing |
| Glue | GlueJobBlock block | ETL operations, data transformation pipelines |
| ECS | ECSWorker infrastructure | Container orchestration, scalable compute workloads |
| Batch | batch_submit task | High-throughput computing, batch job processing |
Integration types:
@task for direct use in flowsDeploy and scale your Prefect workflows on AWS ECS for production workloads. prefect-aws provides:
See the ECS worker deployment guide for a step-by-step walkthrough of deploying production-ready workers to your ECS cluster.
Pre-built Docker images with prefect-aws are available for simplified deployment:
docker pull prefecthq/prefect-aws:latest
Image tags have the following format:
prefecthq/prefect-aws:latest - Latest stable release with Python 3.12prefecthq/prefect-aws:latest-python3.11 - Latest stable with Python 3.11prefecthq/prefect-aws:0.5.9-python3.12 - Specific prefect-aws version with Python 3.12prefecthq/prefect-aws:0.5.9-python3.12-prefect3.4.9 - Full version specificationRunning an ECS worker:
docker run -d \
--name prefect-ecs-worker \
-e PREFECT_API_URL=https://api.prefect.cloud/api/accounts/your-account/workspaces/your-workspace \
-e PREFECT_API_KEY=your-api-key \
prefecthq/prefect-aws:latest \
prefect worker start --pool ecs-pool
Local development:
docker run -it --rm \
-v $(pwd):/opt/prefect \
prefecthq/prefect-aws:latest \
python your_flow.py
Upload a file to an AWS S3 bucket and download the same file under a different filename. The following code assumes that the bucket already exists:
from pathlib import Path
from prefect import flow
from prefect_aws import AwsCredentials, S3Bucket
@flow
def s3_flow():
# create a dummy file to upload
file_path = Path("test-example.txt")
file_path.write_text("Hello, Prefect!")
aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
s3_bucket = S3Bucket(
bucket_name="BUCKET-NAME-PLACEHOLDER",
credentials=aws_credentials
)
s3_bucket_path = s3_bucket.upload_from_path(file_path)
downloaded_file_path = s3_bucket.download_object_to_path(
s3_bucket_path, "downloaded-test-example.txt"
)
return downloaded_file_path.read_text()
if __name__ == "__main__":
s3_flow()
Write a secret to AWS Secrets Manager, read the secret data, delete the secret, and return the secret data.
from prefect import flow
from prefect_aws import AwsCredentials, AwsSecret
@flow
def secrets_manager_flow():
aws_credentials = AwsCredentials.load("BLOCK-NAME-PLACEHOLDER")
aws_secret = AwsSecret(secret_name="test-example", aws_credentials=aws_credentials)
aws_secret.write_secret(secret_data=b"Hello, Prefect!")
secret_data = aws_secret.read_secret()
aws_secret.delete_secret()
return secret_data
if __name__ == "__main__":
secrets_manager_flow()
from prefect_aws.lambda_function import LambdaFunction
from prefect_aws.credentials import AwsCredentials
credentials = AwsCredentials()
lambda_function = LambdaFunction(
function_name="test_lambda_function",
aws_credentials=credentials,
)
response = lambda_function.invoke(
payload={"foo": "bar"},
invocation_type="RequestResponse",
)
response["Payload"].read()
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock
@flow
def example_run_glue_job():
aws_credentials = AwsCredentials(
aws_access_key_id="your_access_key_id",
aws_secret_access_key="your_secret_access_key"
)
glue_job_run = GlueJobBlock(
job_name="your_glue_job_name",
arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
).trigger()
return glue_job_run.wait_for_completion()
if __name__ == "__main__":
example_run_glue_job()
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.batch import batch_submit
@flow
def example_batch_submit_flow():
aws_credentials = AwsCredentials(
aws_access_key_id="access_key_id",
aws_secret_access_key="secret_access_key"
)
job_id = batch_submit(
"job_name",
"job_queue",
"job_definition",
aws_credentials
)
return job_id
if __name__ == "__main__":
example_batch_submit_flow()