content/shared/influxdb3-plugins/plugins-library/official/influxdb-to-iceberg.md
The InfluxDB to Iceberg Plugin enables data transfer from {{% product-name %}} to Apache Iceberg tables. Transfer time series data to Iceberg for long-term storage, analytics, or integration with data lake architectures. The plugin supports both scheduled batch transfers of historical data and on-demand transfers via HTTP API.
Plugin parameters may be specified as key-value pairs in the --trigger-arguments flag (CLI) or in the trigger_arguments field (API) when creating a trigger. Some plugins support TOML configuration files, which can be specified using the plugin's config_file_path parameter.
If a plugin supports multiple trigger specifications, some parameters may depend on the trigger specification that you use.
This plugin includes a JSON metadata schema in its docstring that defines supported trigger types and configuration parameters. This metadata enables the InfluxDB 3 Explorer UI to display and configure the plugin.
| Parameter | Type | Default | Description |
|---|---|---|---|
measurement | string | required | Source measurement containing data to transfer |
window | string | required | Time window for data transfer. Format: <number><unit> (for example, "1h", "30d") |
catalog_configs | string | required | Base64-encoded JSON string containing Iceberg catalog configuration |
| Parameter | Type | Default | Description |
|---|---|---|---|
included_fields | string | all fields/tags | Dot-separated list of fields and tags to include (for example, "usage_user.host") |
excluded_fields | string | none | Dot-separated list of fields and tags to exclude |
namespace | string | "default" | Iceberg namespace for the target table |
table_name | string | measurement name | Iceberg table name |
auto_update_schema | string | false | Automatically update Iceberg table schema when data doesn't match existing schema |
| Parameter | Type | Default | Description |
|---|---|---|---|
config_file_path | string | none | TOML config file path relative to PLUGIN_DIR (required for TOML configuration) |
To use a TOML configuration file, set the PLUGIN_DIR environment variable and specify the config_file_path in the trigger arguments. This is in addition to the --plugin-dir flag when starting {{% product-name %}}.
influxdb_to_iceberg_config_scheduler.toml
For more information on using TOML configuration files, see the Using TOML Configuration Files section in the influxdb3_plugins/README.md.
| Parameter | Type | Required | Description |
|---|---|---|---|
measurement | string | Yes | Source measurement containing data to transfer |
catalog_configs | object | Yes | Iceberg catalog configuration dictionary. See PyIceberg catalog documentation |
included_fields | array | No | List of field and tag names to include in replication |
excluded_fields | array | No | List of field and tag names to exclude from replication |
namespace | string | No | Target Iceberg namespace (default: "default") |
table_name | string | No | Target Iceberg table name (default: measurement name) |
batch_size | string | No | Batch size duration for processing (default: "1d"). Format: <number><unit> |
backfill_start | string | No | ISO 8601 datetime with timezone for backfill start |
backfill_end | string | No | ISO 8601 datetime with timezone for backfill end |
auto_update_schema | boolean | No | Automatically update Iceberg table schema when data doesn't match existing schema (default: false) |
int64 → IntegerTypefloat64 → FloatTypedatetime64[us] → TimestampTypeobject → StringTyperequiredtime column is converted to datetime64[us] for Iceberg compatibility<namespace>.<table_name>When auto_update_schema=true:
pandas (for data manipulation)pyarrow (for Parquet support)pyiceberg[catalog-options] (for Iceberg integration)Start {{% product-name %}} with the Processing Engine enabled (--plugin-dir /path/to/plugins):
influxdb3 serve \
--node-id node0 \
--object-store file \
--data-dir ~/.influxdb3 \
--plugin-dir ~/.plugins
Install required Python packages:
influxdb3 install package pandas
influxdb3 install package pyarrow
influxdb3 install package "pyiceberg[s3fs,hive,sql-sqlite]"
Note: Include the appropriate PyIceberg extras based on your catalog type:
[s3fs] for S3 storage[hive] for Hive metastore[sql-sqlite] for SQL catalog with SQLiteThe plugin assumes that the table schema is already defined in the database, as it relies on this schema to retrieve field and tag names required for processing.
Periodically transfer data from {{% product-name %}} to Iceberg:
influxdb3 create trigger \
--database mydb \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "every:1h" \
--trigger-arguments 'measurement=cpu,window=1h,catalog_configs="eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0=",namespace=monitoring,table_name=cpu_metrics' \
hourly_iceberg_transfer
Create an on-demand transfer endpoint:
influxdb3 create trigger \
--database mydb \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "request:replicate" \
iceberg_http_transfer
Enable the trigger:
influxdb3 enable trigger --database mydb iceberg_http_transfer
The endpoint is registered at /api/v3/engine/replicate.
Transfer CPU metrics to Iceberg every hour:
# Create trigger with base64-encoded catalog config
# Original JSON: {"uri": "http://nessie:9000"}
# Base64: eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0=
influxdb3 create trigger \
--database metrics \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "every:1h" \
--trigger-arguments 'measurement=cpu,window=24h,catalog_configs="eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0="' \
cpu_to_iceberg
# Write test data
influxdb3 write \
--database metrics \
"cpu,host=server1 usage_user=45.2,usage_system=12.1"
# After trigger runs, data is available in Iceberg table "default.cpu"
Expected output
default.cpu with schema matching the measurementBackfill specific fields from historical data:
# Create and enable HTTP trigger
influxdb3 create trigger \
--database metrics \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "request:replicate" \
iceberg_backfill
influxdb3 enable trigger --database metrics iceberg_backfill
# Request backfill via HTTP
curl -X POST http://localhost:8181/api/v3/engine/replicate \
-H "Authorization: Bearer YOUR_TOKEN" \
-d '{
"measurement": "temperature",
"catalog_configs": {
"type": "sql",
"uri": "sqlite:///path/to/catalog.db"
},
"included_fields": ["temp_celsius", "humidity", "sensor_id"],
"namespace": "weather",
"table_name": "temperature_history",
"batch_size": "12h",
"backfill_start": "2024-01-01T00:00:00+00:00",
"backfill_end": "2024-01-07T00:00:00+00:00"
}'
Expected output
weather.temperature_historytemp_celsius and humidity fieldsTransfer data to Iceberg tables stored in S3:
# Create catalog config JSON
cat > catalog_config.json << EOF
{
"type": "sql",
"uri": "sqlite:///iceberg/catalog.db",
"warehouse": "s3://my-bucket/iceberg-warehouse/",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
"s3.path-style-access": true
}
EOF
# Encode to base64
CATALOG_CONFIG=$(base64 < catalog_config.json)
# Create trigger
influxdb3 create trigger \
--database metrics \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "every:30m" \
--trigger-arguments "measurement=sensor_data,window=1h,catalog_configs=\"$CATALOG_CONFIG\",namespace=iot,table_name=sensors" \
s3_iceberg_transfer
influxdb_to_iceberg.py: The main plugin code containing handlers for scheduled and HTTP triggersinfluxdb_to_iceberg_config_scheduler.toml: Example TOML configuration file for scheduled triggersLogs are stored in the trigger's database in the system.processing_engine_logs table. To view logs:
influxdb3 query --database YOUR_DATABASE "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"
Log columns:
process_scheduled_call(influxdb3_local, call_time, args)Handles scheduled data transfers. Queries data within the specified window and appends to Iceberg tables.
Key operations:
process_http_request(influxdb3_local, request_body, args)Handles on-demand data transfers via HTTP. Supports backfill operations with configurable batch sizes.
Key operations:
Solution: Ensure the catalog configuration is properly base64-encoded:
# Create JSON file
echo '{"uri": "http://nessie:9000"}' > config.json
# Encode to base64
base64 config.json
Solution:
bash influxdb3 install package "pyiceberg[s3fs]" Solution:
bash influxdb3 query --database mydb "SELECT COUNT(*) FROM measurement" bash influxdb3 query --database mydb "SELECT MIN(time), MAX(time) FROM measurement" bash influxdb3 query --database YOUR_DATABASE "SELECT * FROM system.processing_engine_logs WHERE log_level = 'ERROR'" Solution: This occurs when trying to add a required (non-nullable) column to an existing table. With auto_update_schema=true, new columns are automatically added as optional. If you encounter this error:
auto_update_schema=true in your configurationfrom pyiceberg.catalog import load_catalog
catalog = load_catalog("my_catalog", **catalog_configs)
print(catalog.list_namespaces())
Verify field names:
influxdb3 query --database mydb "SHOW FIELD KEYS FROM measurement"
Use smaller windows for initial testing:
--trigger-arguments 'window=5m,...'
batch_size based on available memoryincluded_fields to reduce data volume when only specific fields and tags are neededFor plugin issues, see the Plugins repository issues page.
The InfluxDB Discord server is the best place to find support for InfluxDB 3 Core and InfluxDB 3 Enterprise. For other InfluxDB versions, see the Support and feedback options.