flink/README.md
The Delta Flink Connector enables Apache Flink streaming jobs to write data into Delta Lake tables.
Note: this is a private build right now and there will be no product support provided.
Suggestions and feedbacks are welcome.
The project is built using sbt.
sbt flink/assembly
After a successful build:
To access S3 or S3-compatible storage, an AWS SDK bundle must be provided separately.
bundle-2.23.xTo run the unit tests of the project, executes
sbt flink/test
This repository includes a local Flink environment for quick testing via Docker Compose.
Build the connector
sbt flink/assembly
Copy the assembly JAR into the docker folder
We provide two Flink version under docker
cp flink/target/delta-flink-<flink_version>-*.jar flink/docker/<flink_version>/usrlib
(Only the first time) Download Additional Jars
cd docker/<flink_version>/usrlib
chmod +x init.sh
wget https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.23.19/bundle-2.23.19.jar
wget https://repo1.maven.org/maven2/com/google/guava/guava/33.5.0-jre/guava-33.5.0-jre.jar
Start the local Flink cluster
cd docker/<flink_version>
docker compose up -d
This will start:
The setup is intended for local testing and development workflows.
The Delta sink can be used with:
import java.util.Map;
import org.apache.flink.table.data.RowData;
DeltaSink sink =
DeltaSink.builder()
.withFlinkSchema(flinkSchema)
.withConfigurations(
Map.of(
"type", "hadoop",
"hadoop.table_path", "file:///table-path"
)
)
.build();
import java.util.Map;
import org.apache.flink.table.data.RowData;
DeltaSink sink =
DeltaSink.builder()
.withFlinkSchema(flinkSchema)
.withConfigurations(
Map.of(
"type", "unitycatalog",
"unitycatalog.name", "ab",
"unitycatalog.table_name", "ab.cd.ef",
"unitycatalog.endpoint", "http://localhost:8080/",
"unitycatalog.token", "wow"
)
)
.build();
CREATE TEMPORARY TABLE sink (
id BIGINT,
dt STRING
) WITH (
'connector' = 'delta',
'table_path' = '<path>',
'partitions' = 'dt',
'uid' = 'someuid'
);
CREATE TEMPORARY TABLE sink (
id BIGINT,
dt STRING
) WITH (
'connector' = 'delta',
'table_name' = 'name',
'unitycatalog.name' = '<catalog-name>',
'unitycatalog.endpoint' = '<endpoint>',
'unitycatalog.token' = '<token>',
'partitions' = 'dt',
'uid' = 'someuid'
);
Configurations are divided into:
delta-flink.properties)Global configurations are loaded from:
delta-flink.properties
This file must be available on the classpath.
# Retry settings
sink.retry.max_attempt=10
sink.retry.delay_ms=100
sink.retry.max_delay_ms=30000
# Memory / concurrency control
sink.writer.num_concurrent_file=1000
table.thread_pool_size=8
# Table metadata cache
table.cache.enable=true
table.cache.size=200
table.cache.expire_ms=300000
# UC credential refresh
credentials.refresh.thread_pool_size=10
credentials.refresh.ahead_ms=180000 # refresh 3 minutes before expiration
delay-ms * (2 ^ i)sink.retry.max-delay-mssink.writer.num-concurrent-file limits number of concurrent open files (OOM protection)Per-table configurations can be provided via:
withConfigurations(...)WITH (...)There are two categories of per-table config:
delta. are passed to Delta Kernel and stored in the table.| Key | Type, | Default | Description |
|---|---|---|---|
| checkpoint.frequency | Double | 0.0 | Probability [0.0–1.0] to create Delta checkpoint on commit. 0.0 disables checkpoints, 1.0 checkpoints every commit |
| checksum.enable | Boolean | true | Generate checksum files on commit |
| file_rolling.strategy | String | size | size / count |
| file_rolling.size | Integer | 104857600 | Number of bytes per file |
| file_rolling.count | Integer | Max records per file | |
| schema_evolution.mode | String | no | no → strict, newcolumn → allow adding new columns |
The sink sets defaults for certain Delta properties, which can be overridden by user configs:
delta.feature.v2Checkpoint = supported
The Delta sink does not automatically evolve the table schema.
Instead:
schema_evolution.modeNO
NEW_COLUMN
If an unsupported schema change is detected, the sink will fail the job.
When using Unity Catalog:
Typical config keys:
unitycatalog.endpointunitycatalog.tokenWhen using path-based access without UC support:
/opt/flink/conf/core-site.xml
Example:
<configuration>
<property>
<name>fs.s3a.access.key</name>
<value>YOUR_ACCESS_KEY</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>YOUR_SECRET_KEY</value>
</property>
<!-- Optional for some environments -->
<property>
<name>fs.s3a.endpoint</name>
<value>https://s3.amazonaws.com</value>
</property>
</configuration>
This section provides practical defaults for large, highly-partitioned Delta tables where the sink may need to write to many partitions concurrently.
The sink uses an internal limit on the number of concurrently opened output files to prevent excessive memory usage when the table has many partitions.
Approximate memory usage observed:
Default behavior:
The connector uses a conservative default of 1000 concurrent opened files.
Recommendation:
Keep the concurrent file limit near 1000 for high-partition tables unless you have validated that your TaskManagers have sufficient memory headroom.
Configuration key:
sink.writer.num_concurrent_file(global config indelta-flink.properties)
To reduce small files while keeping file sizes manageable, we recommend enabling file rolling based on record size.
Recommended rolling setup:
Configuration keys (per-table config):
file_rolling.strategyfile_rolling.sizeExample (SQL):
WITH (
'file_rolling.strategy' = 'size',
'file_rolling.size' = '50MB'
)