Back to Seatunnel

TiDB CDC

docs/en/connectors/source/TiDB-CDC.md

2.3.1311.7 KB
Original Source

import ChangeLog from '../changelog/connector-cdc-tidb.md';

TiDB CDC

TiDB CDC source connector

Support Those Engines

SeaTunnel Zeta

Flink

Key features

Description

The TiDB CDC connector allows for reading snapshot data and incremental data from TiDB database. This document describes how to set up the TiDB CDC connector to snapshot data and capture streaming event in TiDB database.

Supported DataSource Info

DatasourceSupported versionsDriverUrlMaven
MySQL<li> MySQL: 5.5, 5.6, 5.7, 8.0.x </li><li> RDS MySQL: 5.6, 5.7, 8.0.x </li>com.mysql.cj.jdbc.Driverjdbc:mysql://localhost:3306/testhttps://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28
tikv-client-java3.2.0--https://mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0

Using Dependency

Install Jdbc Driver

For Flink Engine

  1. You need to ensure that the jdbc driver jar package and the tikv-client-java jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

For SeaTunnel Zeta Engine

  1. You need to ensure that the jdbc driver jar package and the tikv-client-java jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

Please download and put Mysql driver and tikv-java-client in ${SEATUNNEL_HOME}/lib/ dir. For example: cp mysql-connector-java-xxx.jar $SEATUNNEL_HOME/lib/

Data Type Mapping

Mysql Data TypeSeaTunnel Data Type
BIT(1)
TINYINT(1)BOOLEAN
TINYINTTINYINT
TINYINT UNSIGNED
SMALLINTSMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEARINT
INT UNSIGNED
INTEGER UNSIGNED
BIGINTBIGINT
BIGINT UNSIGNEDDECIMAL(20,0)
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNEDDECIMAL(p,s)
FLOAT
FLOAT UNSIGNEDFLOAT
DOUBLE
DOUBLE UNSIGNED
REAL
REAL UNSIGNEDDOUBLE
CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
ENUM
JSON
ENUMSTRING
DATEDATE
TIME(s)TIME(s)
DATETIME
TIMESTAMP(s)TIMESTAMP(s)
BINARY
VARBINAR
BIT(p)
TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
GEOMETRYBYTES

Source Options

NameTypeRequiredDefaultDescription
urlStringYes-The URL of the JDBC connection. Refer to a case: jdbc:mysql://tidb0:4000/inventory.
usernameStringYes-Name of the database to use when connecting to the database server.
passwordStringYes-Password to use when connecting to the database server.
pd-addressesStringYes-TiKV cluster's PD address
database-nameStringYes-Database name of the database to monitor.
table-nameStringYes-Table name of the database to monitor. The table name needs to include the database name.
startup.modeEnumNoINITIALOptional startup mode for TiDB CDC consumer, valid enumerations are initial, earliest, latest and specific.
initial: Synchronize historical data at startup, and then synchronize incremental data.
earliest: Startup from the earliest offset possible.
latest: Startup from the latest offset.
specific: Startup from user-supplied specific offsets.
batch-size-per-scanIntNo1000Size per scan.
tikv.grpc.timeout_in_msLongNo-TiKV GRPC timeout in ms.
tikv.grpc.scan_timeout_in_msLongNo-TiKV GRPC scan timeout in ms.
tikv.batch_get_concurrencyIntegerNo-TiKV GRPC batch get concurrency
tikv.batch_scan_concurrencyIntegerNo-TiKV GRPC batch scan concurrency

Task Example

Simple

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  TiDB-CDC {
    plugin_output = "products_tidb_cdc"
    url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    tikv.grpc.timeout_in_ms = 20000
    pd-addresses = "pd0:2379"
    username = "root"
    password = ""
    database-name = "inventory"
    table-name = "products"
  }
}

transform {
}

sink {
  jdbc {
    plugin_input = "products_tidb_cdc"
    url = "jdbc:mysql://tidb0:4000/inventory"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = ""
    database = "inventory"
    table = "products_sink"
    generate_sink_sql = true
    primary_keys = ["id"]
  }
}

Changelog

<ChangeLog />