docs/en/connectors/sink/SelectDB-Cloud.md
import ChangeLog from '../changelog/connector-selectdb-cloud.md';
SelectDB Cloud sink connector
Spark
Flink
SeaTunnel Zeta
Used to send data to SelectDB Cloud. Both support streaming and batch mode. The internal implementation of SelectDB Cloud sink connector upload after batch caching and commit the CopyInto sql to load data into the table.
:::tip
Version Supported
SelectDB Cloud version is >= 2.2.x:::
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| load-url | String | Yes | - | SelectDB Cloud warehouse http address, the format is warehouse_ip:http_port |
| jdbc-url | String | Yes | - | SelectDB Cloud warehouse jdbc address, the format is warehouse_ip:mysql_port |
| cluster-name | String | Yes | - | SelectDB Cloud cluster name |
| username | String | Yes | - | SelectDB Cloud user username |
| password | String | Yes | - | SelectDB Cloud user password |
| sink.enable-2pc | bool | No | true | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. SelectDB uses cache files to load data. When the amount of data is large, cached data may become invalid (the default expiration time is 1 hour). If you encounter a large amount of data write loss, please configure sink.enable-2pc to false. |
| table.identifier | String | Yes | - | The name of SelectDB Cloud table, the format is database.table |
| sink.enable-delete | bool | No | false | Whether to enable deletion. This option requires SelectDB Cloud table to enable batch delete function, and only supports Unique model. |
| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed |
| sink.buffer-size | int | No | 10 * 1024 * 1024 (1MB) | the buffer size to cache data for stream load. |
| sink.buffer-count | int | No | 10000 | the buffer count to cache data for stream load. |
| selectdb.config | map | yes | - | This option is used to support operations such as insert, delete, and update when automatically generate sql,and supported formats. |
| SelectDB Cloud Data type | SeaTunnel Data type |
|---|---|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| TINYINT | |
| INT | INT |
| SMALLINT | |
| TINYINT | |
| BIGINT | BIGINT |
| INT | |
| SMALLINT | |
| TINYINT | |
| LARGEINT | BIGINT |
| INT | |
| SMALLINT | |
| TINYINT | |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| FLOAT | |
| DECIMAL | DECIMAL |
| DOUBLE | |
| FLOAT | |
| DATE | DATE |
| DATETIME | TIMESTAMP |
| CHAR | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| ARRAY | ARRAY |
| MAP | MAP |
| JSON | STRING |
| HLL | Not supported yet |
| BITMAP | Not supported yet |
| QUANTILE_STATE | Not supported yet |
| STRUCT | Not supported yet |
The supported formats include CSV and JSON
The following example describes writing multiple data types to SelectDBCloud, and users need to create corresponding tables downstream
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "csv"
file.column_separator = ","
file.line_delimiter = "\n"
}
}
}