Back to Seatunnel

DuckDB

docs/en/connectors/source/DuckDB.md

2.3.1315.1 KB
Original Source

import ChangeLog from '../changelog/connector-jdbc.md';

DuckDB

JDBC DuckDB Source Connector

Description

Read external data source data through JDBC.

Support DuckDB Version

  • 0.8.x/0.9.x/0.10.x/1.x

Support Those Engines

Spark

Flink

SeaTunnel Zeta

Using Dependency

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

For SeaTunnel Zeta Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

Key Features

supports query SQL and can achieve projection effect.

Supported DataSource Info

DatasourceSupported versionsDriverUrlMaven
DuckDBDifferent dependency version has different driver class.org.duckdb.DuckDBDriverjdbc:duckdb:/path/to/database.dbDownload

Data Type Mapping

DuckDB Data TypeSeaTunnel Data Type
BOOLEANBOOLEAN
TINYINTTINYINT
UTINYINT
SMALLINTSMALLINT
USMALLINT
INTEGERINT
UINTEGER
BIGINTBIGINT
UBIGINTDECIMAL(20,0)
HUGEINTDECIMAL(38,0)
FLOATFLOAT
DOUBLEDOUBLE
DECIMAL(x,y)(Get the designated column's specified column size.<38)DECIMAL(x,y)
DECIMAL(x,y)(Get the designated column's specified column size.>38)DECIMAL(38,18)
VARCHAR
CHAR
TEXT
JSON
UUID
INTERVALSTRING
DATEDATE
TIMETIME
TIMESTAMP
TIMESTAMP WITH TIME ZONETIMESTAMP
BLOB
ARRAY
STRUCT
MAPBYTES

Source Options

NameTypeRequiredDefaultDescription
urlStringYes-The URL of the JDBC connection. Refer to a case: jdbc:duckdb:/path/to/database.db
driverStringYes-The jdbc class name used to connect to the remote data source,
if you use DuckDB the value is org.duckdb.DuckDBDriver.
usernameStringNo-Connection instance user name
passwordStringNo-Connection instance password
queryStringYes-Query statement
connection_check_timeout_secIntNo30The time in seconds to wait for the database operation used to validate the connection to complete
partition_columnStringNo-The column name for parallelism's partition, only support numeric type primary key, and only can config one column.
partition_lower_boundBigDecimalNo-The partition_column min value for scan, if not set SeaTunnel will query database get min value.
partition_upper_boundBigDecimalNo-The partition_column max value for scan, if not set SeaTunnel will query database get max value.
partition_numIntNojob parallelismThe number of partition count, only support positive integer. default value is job parallelism
fetch_sizeIntNo0For queries that return a large number of objects, you can configure
the row fetch size used in the query to improve performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value.
propertiesMapNo-Additional connection configuration parameters, when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in DuckDB, properties take precedence over the URL.
table_pathStringNo-The path to the full path of table, you can use this configuration instead of query.
examples:
duckdb: "main.table1"
                                                                                                                          |

| table_list | Array | No | - | The list of tables to be read, you can use this configuration instead of table_path example: [{ table_path = "main.table1"}, {table_path = "main.table2", query = "select * id, name from main.table2"}] | | where_condition | String | No | - | Common row filter conditions for all tables/queries, must start with where. for example where id > 100 | | split.size | Int | No | 8096 | The split size (number of rows) of table, captured tables are split into multiple splits when read of table. | | common-options | | No | - | Source plugin common parameters, please refer to Source Common Options for details |

Parallel Reader

The JDBC Source connector supports parallel reading of data from tables. SeaTunnel will use certain rules to split the data in the table, which will be handed over to readers for reading. The number of readers is determined by the parallelism option.

Split Key Rules:

  1. If partition_column is not null, It will be used to calculate split. The column must in Supported split data type.
  2. If partition_column is null, seatunnel will read the schema from table and get the Primary Key and Unique Index. If there are more than one column in Primary Key and Unique Index, The first column which in the supported split data type will be used to split data. For example, the table have Primary Key(nn guid, name varchar), because guid id not in supported split data type, so the column name will be used to split data.

Supported split data type:

  • String
  • Number(int, bigint, decimal, ...)
  • Date

split.size

How many rows in one split, captured tables are split into multiple splits when read of table.

partition_column [string]

The column name for split data.

partition_upper_bound [BigDecimal]

The partition_column max value for scan, if not set SeaTunnel will query database get max value.

partition_lower_bound [BigDecimal]

The partition_column min value for scan, if not set SeaTunnel will query database get min value.

partition_num [int]

Not recommended for use, The correct approach is to control the number of split through split.size

How many splits do we need to split into, only support positive integer. default value is job parallelism.

tips

If the table can not be split(for example, table have no Primary Key or Unique Index, and partition_column is not set), it will run in single concurrency.

Use table_path to replace query for single table reading. If you need to read multiple tables, use table_list.

Task Example

Simple

This example queries 'user_events' table in your test database in single parallel and queries all of its fields. You can also specify which fields to query for final output to the console.

# Defining the runtime environment
env {
  parallelism = 4
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:duckdb:/tmp/test.db"
        driver = "org.duckdb.DuckDBDriver"
        connection_check_timeout_sec = 100
        username = "duckdb"
        password = ""
        query = "select * from user_events limit 16"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

parallel by partition_column

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:duckdb:/tmp/test.db"
        driver = "org.duckdb.DuckDBDriver"
        connection_check_timeout_sec = 100
        username = "duckdb"
        password = ""
        query = "select * from user_events"
        partition_column = "id"
        split.size = 10000
        # Read start boundary
        #partition_lower_bound = ...
        # Read end boundary
        #partition_upper_bound = ...
    }
}

sink {
  Console {}
}

parallel by Primary Key or Unique Index

Configuring table_path will turn on auto split, you can configure split.* to adjust the split strategy

env {
  parallelism = 4
  job.mode = "BATCH"
}
source {
    Jdbc {
        url = "jdbc:duckdb:/tmp/test.db"
        driver = "org.duckdb.DuckDBDriver"
        connection_check_timeout_sec = 100
        username = ""
        password = ""
        table_path = "main.user_events"
        query = "select * from main.user_events"
        split.size = 10000
    }
}

sink {
  Console {}
}

Parallel Boundary

It is more efficient to specify the data within the upper and lower bounds of the query It is more efficient to read your data source according to the upper and lower boundaries you configured

source {
    Jdbc {
        url = "jdbc:duckdb:/tmp/test.db"
        driver = "org.duckdb.DuckDBDriver"
        connection_check_timeout_sec = 100
        username = "duckdb"
        password = ""
        # Define query logic as required
        query = "select * from user_events"
        partition_column = "id"
        # Read start boundary
        partition_lower_bound = 1
        # Read end boundary
        partition_upper_bound = 500
        partition_num = 10
        properties {
         threads=4
         memory_limit="4GB"
        }
    }
}

Multiple table read

Configuring table_list will turn on auto split, you can configure split.* to adjust the split strategy

hocon
env {
  job.mode = "BATCH"
  parallelism = 4
}
source {
  Jdbc {
    url = "jdbc:duckdb:/tmp/test.db"
    driver = "org.duckdb.DuckDBDriver"
    connection_check_timeout_sec = 100
    username = "duckdb"
    password = ""

    table_list = [
      {
        table_path = "main.table1"
      },
      {
        table_path = "main.table2"
        # Use query filetr rows & columns
        query = "select id, name from main.table2 where id > 100"
      }
    ]
    #where_condition= "where id > 100"
    #split.size = 8096
  }
}

sink {
  Console {}
}

Change Log

<ChangeLog />