flink-filesystems/flink-s3-fs-native/README.md
This module provides a native S3 filesystem implementation for Apache Flink using AWS SDK v2.
The Native S3 FileSystem is a direct implementation of Flink's FileSystem interface using AWS SDK v2, without Hadoop dependencies. It provides exactly-once semantics for checkpointing and file sinks through S3 multipart uploads.
This module supports both s3:// and s3a:// URI schemes:
| Scheme | Description |
|---|---|
s3:// | Primary scheme for native S3 filesystem |
s3a:// | Hadoop S3A compatibility scheme - allows drop-in replacement for existing Hadoop-based configurations |
Both schemes use the same native AWS SDK v2 implementation and share identical configuration options.
Example usage with either scheme:
// Using s3:// scheme
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");
// Using s3a:// scheme (for Hadoop compatibility)
env.getCheckpointConfig().setCheckpointStorage("s3a://my-bucket/checkpoints");
Add this module to Flink's plugins directory:
mkdir -p $FLINK_HOME/plugins/s3-fs-native
cp flink-s3-fs-native-*.jar $FLINK_HOME/plugins/s3-fs-native/
Configure S3 credentials in conf/config.yaml:
s3.access-key: YOUR_ACCESS_KEY
s3.secret-key: YOUR_SECRET_KEY
s3.endpoint: https://s3.amazonaws.com # Optional, defaults to AWS
Use S3 paths in your Flink application:
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");
DataStream<String> input = env.readTextFile("s3://my-bucket/input");
input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"),
new SimpleStringEncoder<>()).build());
| Key | Default | Description |
|---|---|---|
| s3.access-key | (none) | AWS access key |
| s3.secret-key | (none) | AWS secret key |
| s3.region | (auto-detect) | AWS region (auto-detected via AWS_REGION, ~/.aws/config, EC2 metadata) |
| s3.endpoint | (none) | Custom S3 endpoint (for MinIO, LocalStack, etc.) |
| s3.path-style-access | false | Use path-style access (auto-enabled for custom endpoints) |
| s3.upload.min.part.size | 5242880 | Minimum part size for multipart uploads (5MB) |
| s3.upload.max.concurrent.uploads | CPU cores | Maximum concurrent uploads per stream |
| s3.entropy.key | (none) | Key for entropy injection in paths |
| s3.entropy.length | 4 | Length of entropy string |
| s3.bulk-copy.enabled | true | Enable bulk copy operations |
| s3.async.enabled | true | Enable async read/write with TransferManager |
| s3.read.buffer.size | 262144 (256KB) | Read buffer size per stream (64KB - 4MB) |
| Key | Default | Description |
|---|---|---|
| s3.sse.type | none | Encryption type: none, sse-s3 (AES256), sse-kms (AWS KMS) |
| s3.sse.kms.key-id | (none) | KMS key ID/ARN/alias for SSE-KMS (uses default aws/s3 key if not specified) |
| Key | Default | Description |
|---|---|---|
| s3.assume-role.arn | (none) | ARN of the IAM role to assume |
| s3.assume-role.external-id | (none) | External ID for cross-account access |
| s3.assume-role.session-name | flink-s3-session | Session name for the assumed role |
| s3.assume-role.session-duration | 3600 | Session duration in seconds (900-43200) |
The filesystem supports server-side encryption for data at rest:
Amazon S3 manages the encryption keys. Simplest option with no additional configuration.
s3.sse.type: sse-s3
All objects will be encrypted with AES-256 using keys managed by S3.
Use AWS Key Management Service for encryption key management. Provides additional security features like key rotation, audit trails, and fine-grained access control.
Using the default aws/s3 key:
s3.sse.type: sse-kms
Using a custom KMS key:
s3.sse.type: sse-kms
s3.sse.kms.key-id: arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789abc
# Or use an alias:
# s3.sse.kms.key-id: alias/my-s3-encryption-key
Using encryption context (for fine-grained IAM access control):
Encryption context allows you to add additional authenticated data (AAD) to KMS encrypt/decrypt operations. This enables IAM policies to restrict access based on context values:
s3.sse.type: sse-kms
s3.sse.kms.key-id: alias/my-key
# Configure encryption context as key-value pairs
s3.sse.kms.encryption-context.department: finance
s3.sse.kms.encryption-context.project: budget-reports
With encryption context, you can create IAM policies like:
{
"Effect": "Allow",
"Action": ["kms:Encrypt", "kms:GenerateDataKey"],
"Resource": "arn:aws:kms:region:account:key/key-id",
"Condition": {
"StringEquals": {
"kms:EncryptionContext:department": "finance"
}
}
}
See AWS KMS Encryption Context for more details.
Note: Ensure the IAM role/user has kms:Encrypt and kms:GenerateDataKey permissions on the KMS key.
For cross-account access or temporary elevated permissions, configure an IAM role to assume:
s3.assume-role.arn: arn:aws:iam::123456789012:role/S3AccessRole
For enhanced security when granting access to third parties:
s3.assume-role.arn: arn:aws:iam::123456789012:role/CrossAccountS3Role
s3.assume-role.external-id: your-secret-external-id
s3.assume-role.session-name: flink-cross-account-session
s3.assume-role.session-duration: 3600 # 1 hour
IAM Policy Example for the Assumed Role:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::my-bucket",
"arn:aws:s3:::my-bucket/*"
]
}
]
}
Trust Policy for Cross-Account Access:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::SOURCE_ACCOUNT_ID:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "your-secret-external-id"
}
}
}
]
}
The filesystem auto-detects custom endpoints and configures appropriate settings:
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.endpoint: http://localhost:9000
s3.path-style-access: true # Auto-enabled for custom endpoints
MinIO-specific optimizations are applied automatically:
The filesystem is optimized to handle large files without OOM errors:
# Read buffer: smaller = less memory, larger = better throughput
s3.read.buffer.size: 262144 # 256KB (default)
# For memory-constrained environments: 65536 (64KB)
# For high-throughput: 1048576 (1MB)
Memory Calculation Per Parallel Read:
s3.read.buffer.size × parallelism = peak FS memoryImportant Note on Memory Types:
taskmanager.memory.managed.size controls RocksDB native memory, NOT filesystem bufferstaskmanager.memory.task.heap.size) if you see OOM errors during S3 operationsThe filesystem uses AWS SDK's TransferManager for high-performance async read/write operations:
Benefits:
Configuration:
s3.async.enabled: true # Default: enabled
When enabled, file uploads automatically use TransferManager for:
Performance Impact:
Configure checkpoint storage in conf/config.yaml:
state.checkpoints.dir: s3://my-bucket/checkpoints
execution.checkpointing.interval: 10s
execution.checkpointing.mode: EXACTLY_ONCE
Or programmatically:
env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/checkpoints");
For high-throughput checkpointing to avoid S3 hot partitions:
s3.entropy.key: _entropy_
s3.entropy.length: 4
Paths like s3://bucket/_entropy_/checkpoints will be expanded to s3://bucket/af7e/checkpoints with random characters.
The filesystem uses:
Key classes:
NativeS3FileSystem - Main FileSystem implementationNativeS3RecoverableWriter - Exactly-once writer using multipart uploadsS3ClientProvider - Manages S3 client lifecycleNativeS3AccessHelper - Low-level S3 operationsmvn clean package
# Start MinIO
docker run -d -p 9000:9000 -p 9001:9001 \
-e "MINIO_ROOT_USER=minioadmin" \
-e "MINIO_ROOT_PASSWORD=minioadmin" \
minio/minio server /data --console-address ":9001"
# Create bucket
mc alias set local http://localhost:9000 minioadmin minioadmin
mc mb local/test-bucket
# Run Flink with MinIO
export FLINK_HOME=/path/to/flink
cat > $FLINK_HOME/conf/config.yaml <<EOF
s3.endpoint: http://localhost:9000
s3.access-key: minioadmin
s3.secret-key: minioadmin
s3.path-style-access: true
EOF
$FLINK_HOME/bin/flink run YourJob.jar
The filesystem supports delegation tokens for secure multi-tenant deployments. The delegation token service name is s3-native to avoid conflicts with other S3 filesystem implementations.
Configure delegation tokens:
security.delegation.token.provider.s3-native.enabled: true
security.delegation.token.provider.s3-native.access-key: YOUR_KEY
security.delegation.token.provider.s3-native.secret-key: YOUR_SECRET
security.delegation.token.provider.s3-native.region: us-east-1