docs/en/connectors/source/DuckDB.md
import ChangeLog from '../changelog/connector-jdbc.md';
JDBC DuckDB Source Connector
Read external data source data through JDBC.
Spark
Flink
SeaTunnel Zeta
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/plugins/.
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/lib/.
supports query SQL and can achieve projection effect.
| Datasource | Supported versions | Driver | Url | Maven |
|---|---|---|---|---|
| DuckDB | Different dependency version has different driver class. | org.duckdb.DuckDBDriver | jdbc:duckdb:/path/to/database.db | Download |
| DuckDB Data Type | SeaTunnel Data Type |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| UTINYINT | |
| SMALLINT | SMALLINT |
| USMALLINT | |
| INTEGER | INT |
| UINTEGER | |
| BIGINT | BIGINT |
| UBIGINT | DECIMAL(20,0) |
| HUGEINT | DECIMAL(38,0) |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL(x,y)(Get the designated column's specified column size.<38) | DECIMAL(x,y) |
| DECIMAL(x,y)(Get the designated column's specified column size.>38) | DECIMAL(38,18) |
| VARCHAR | |
| CHAR | |
| TEXT | |
| JSON | |
| UUID | |
| INTERVAL | STRING |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | |
| TIMESTAMP WITH TIME ZONE | TIMESTAMP |
| BLOB | |
| ARRAY | |
| STRUCT | |
| MAP | BYTES |
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:duckdb:/path/to/database.db |
| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, |
if you use DuckDB the value is org.duckdb.DuckDBDriver. | ||||
| username | String | No | - | Connection instance user name |
| password | String | No | - | Connection instance password |
| query | String | Yes | - | Query statement |
| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete |
| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type primary key, and only can config one column. |
| partition_lower_bound | BigDecimal | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. |
| partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. |
| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism |
| fetch_size | Int | No | 0 | For queries that return a large number of objects, you can configure |
| the row fetch size used in the query to improve performance by | ||||
| reducing the number database hits required to satisfy the selection criteria. | ||||
| Zero means use jdbc default value. | ||||
| properties | Map | No | - | Additional connection configuration parameters, when properties and URL have the same parameters, the priority is determined by the |
| specific implementation of the driver. For example, in DuckDB, properties take precedence over the URL. | ||||
| table_path | String | No | - | The path to the full path of table, you can use this configuration instead of query. |
| examples: | ||||
| duckdb: "main.table1" |
|
| table_list | Array | No | - | The list of tables to be read, you can use this configuration instead of table_path example: [{ table_path = "main.table1"}, {table_path = "main.table2", query = "select * id, name from main.table2"}] |
| where_condition | String | No | - | Common row filter conditions for all tables/queries, must start with where. for example where id > 100 |
| split.size | Int | No | 8096 | The split size (number of rows) of table, captured tables are split into multiple splits when read of table. |
| common-options | | No | - | Source plugin common parameters, please refer to Source Common Options for details |
The JDBC Source connector supports parallel reading of data from tables. SeaTunnel will use certain rules to split the data in the table, which will be handed over to readers for reading. The number of readers is determined by the parallelism option.
Split Key Rules:
partition_column is not null, It will be used to calculate split. The column must in Supported split data type.partition_column is null, seatunnel will read the schema from table and get the Primary Key and Unique Index. If there are more than one column in Primary Key and Unique Index, The first column which in the supported split data type will be used to split data. For example, the table have Primary Key(nn guid, name varchar), because guid id not in supported split data type, so the column name will be used to split data.Supported split data type:
How many rows in one split, captured tables are split into multiple splits when read of table.
The column name for split data.
The partition_column max value for scan, if not set SeaTunnel will query database get max value.
The partition_column min value for scan, if not set SeaTunnel will query database get min value.
Not recommended for use, The correct approach is to control the number of split through
split.size
How many splits do we need to split into, only support positive integer. default value is job parallelism.
If the table can not be split(for example, table have no Primary Key or Unique Index, and
partition_columnis not set), it will run in single concurrency.Use
table_pathto replacequeryfor single table reading. If you need to read multiple tables, usetable_list.
This example queries 'user_events' table in your test database in single parallel and queries all of its fields. You can also specify which fields to query for final output to the console.
# Defining the runtime environment
env {
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:duckdb:/tmp/test.db"
driver = "org.duckdb.DuckDBDriver"
connection_check_timeout_sec = 100
username = "duckdb"
password = ""
query = "select * from user_events limit 16"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:duckdb:/tmp/test.db"
driver = "org.duckdb.DuckDBDriver"
connection_check_timeout_sec = 100
username = "duckdb"
password = ""
query = "select * from user_events"
partition_column = "id"
split.size = 10000
# Read start boundary
#partition_lower_bound = ...
# Read end boundary
#partition_upper_bound = ...
}
}
sink {
Console {}
}
Configuring
table_pathwill turn on auto split, you can configuresplit.*to adjust the split strategy
env {
parallelism = 4
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:duckdb:/tmp/test.db"
driver = "org.duckdb.DuckDBDriver"
connection_check_timeout_sec = 100
username = ""
password = ""
table_path = "main.user_events"
query = "select * from main.user_events"
split.size = 10000
}
}
sink {
Console {}
}
It is more efficient to specify the data within the upper and lower bounds of the query It is more efficient to read your data source according to the upper and lower boundaries you configured
source {
Jdbc {
url = "jdbc:duckdb:/tmp/test.db"
driver = "org.duckdb.DuckDBDriver"
connection_check_timeout_sec = 100
username = "duckdb"
password = ""
# Define query logic as required
query = "select * from user_events"
partition_column = "id"
# Read start boundary
partition_lower_bound = 1
# Read end boundary
partition_upper_bound = 500
partition_num = 10
properties {
threads=4
memory_limit="4GB"
}
}
}
Configuring table_list will turn on auto split, you can configure split.* to adjust the split strategy
env {
job.mode = "BATCH"
parallelism = 4
}
source {
Jdbc {
url = "jdbc:duckdb:/tmp/test.db"
driver = "org.duckdb.DuckDBDriver"
connection_check_timeout_sec = 100
username = "duckdb"
password = ""
table_list = [
{
table_path = "main.table1"
},
{
table_path = "main.table2"
# Use query filetr rows & columns
query = "select id, name from main.table2 where id > 100"
}
]
#where_condition= "where id > 100"
#split.size = 8096
}
}
sink {
Console {}
}