Back to Seatunnel

IoTDB

docs/en/connectors/sink/IoTDB.md

2.3.1311.6 KB
Original Source

import ChangeLog from '../changelog/connector-iotdb.md';

IoTDB

IoTDB sink connector

Support Those Engines

Spark

Flink

SeaTunnel Zeta

Description

Used to write data to IoTDB.

Key Features

  • exactly-once

    IoTDB supports the exactly-once feature through idempotent writing. If multiple data have the same key and timestamp, the latest one will overwrite the previous one.

Supported DataSource Info

DatasourceSupported VersionsUrl
IoTDB0.13.0 <= version <= 1.3.Xlocalhost:6667

Data Type Mapping

IotDB Data TypeSeaTunnel Data Type
BOOLEANBOOLEAN
INT32TINYINT
INT32SMALLINT
INT32INT
INT64BIGINT
FLOATFLOAT
DOUBLEDOUBLE
TEXTSTRING

Sink Options

NameTypeRequiredDefaultDescription
node_urlsArrayYes-IoTDB cluster address, the format is ["host1:port"] or ["host1:port","host2:port"]
usernameStringYes-IoTDB user username
passwordStringYes-IoTDB user password
key_deviceStringYes-Specify field name of the IoTDB deviceId in SeaTunnelRow
key_timestampStringNoprocessing timeSpecify field-name of the IoTDB timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp
key_measurement_fieldsArrayNoexclude device & timestampSpecify field-name of the IoTDB measurement list in SeaTunnelRow. If not specified, include all fields but exclude device & timestamp
storage_groupArrayNo-Specify device storage group(path prefix)
example: deviceId = ${storage_group} + "." + ${key_device}
batch_sizeIntegerNo1024For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the IoTDB
max_retriesIntegerNo-The number of retries to flush failed
retry_backoff_multiplier_msIntegerNo-Using as a multiplier for generating the next delay for backoff
max_retry_backoff_msIntegerNo-The amount of time to wait before attempting to retry a request to IoTDB
default_thrift_buffer_sizeIntegerNo-Thrift init buffer size in IoTDB client
max_thrift_frame_sizeIntegerNo-Thrift max frame size in IoTDB client
zone_idstringNo-java.time.ZoneId in IoTDB client
enable_rpc_compressionBooleanNo-Enable rpc compression in IoTDB client
connection_timeout_in_msIntegerNo-The maximum time (in ms) to wait when connecting to IoTDB
common-optionsno-Sink plugin common parameters, please refer to Sink Common Options for details

Examples

hocon
env {
  parallelism = 2
  job.mode = "BATCH"
}

source {
  FakeSource {
    row.num = 16
    bigint.template = [1664035200001]
    schema = {
      fields {
        device_name = "string"
        temperature = "float"
        moisture = "int"
        event_ts = "bigint"
        c_string = "string"
        c_boolean = "boolean"
        c_tinyint = "tinyint"
        c_smallint = "smallint"
        c_int = "int"
        c_bigint = "bigint"
        c_float = "float"
        c_double = "double"
      }
    }
  }
}

The data format from upstream SeaTunnelRow is as follows:

device_nametemperaturemoistureevent_tsc_stringc_booleanc_tinyintc_smallintc_intc_bigintc_floatc_double
root.test_group.device_a36.11001664035200001abc1true11121474836481.01.0
root.test_group.device_b36.21011664035200001abc2false22221474836492.02.0
root.test_group.device_c36.31021664035200001abc3false33321474836493.03.0

Case1

Only required options used:

  • use current processing time as timestamp
  • measurement fields include all fields excluding key_device
hocon
sink {
  IoTDB {
    node_urls = ["localhost:6667"]
    username = "root"
    password = "root"
    key_device = "device_name" # specify the `deviceId` use device_name field
  }
}

The data format of IoTDB output is as follows:

shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|                    Time|                  Device|   temperature|   moisture|      event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int|   c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2023-09-01T00:00:00.001Z|root.test_group.device_a|          36.1|        100| 1664035200001|     abc1|      true|         1|          1|     1| 2147483648|     1.0|      1.0| 
|2023-09-01T00:00:00.001Z|root.test_group.device_b|          36.2|        101| 1664035200001|     abc2|     false|         2|          2|     2| 2147483649|     2.0|      2.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_c|          36.3|        102| 1664035200001|     abc2|     false|         3|          3|     3| 2147483649|     3.0|      3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+

Case2

Use source event's time:

  • use key_timestamp as timestamp
  • measurement fields include all fields excluding key_device & key_timestamp
hocon
sink {
  IoTDB {
    node_urls = ["localhost:6667"]
    username = "root"
    password = "root"
    key_device = "device_name" # specify the `deviceId` use device_name field
    key_timestamp = "event_ts" # specify the `timestamp` use event_ts field
  }
}

The data format of IoTDB output is as follows:

shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|                    Time|                  Device|   temperature|   moisture|      event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int|   c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100| 1664035200001|     abc1|      true|         1|          1|     1| 2147483648|     1.0|      1.0| 
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101| 1664035200001|     abc2|     false|         2|          2|     2| 2147483649|     2.0|      2.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102| 1664035200001|     abc2|     false|         3|          3|     3| 2147483649|     3.0|      3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+

Case3

Use source event's time and limit measurement fields:

  • use key_timestamp as timestamp
  • measurement fields include only fields specified in key_measurement_fields
hocon
sink {
  IoTDB {
    node_urls = ["localhost:6667"]
    username = "root"
    password = "root"
    key_device = "device_name"
    key_timestamp = "event_ts"
    key_measurement_fields = ["temperature", "moisture"]
  }
}

The data format of IoTDB output is as follows:

shell
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
|                    Time|                  Device|   temperature|   moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102|
+------------------------+------------------------+--------------+-----------+

Changelog

<ChangeLog />