docs/content/docs/connectors/pipeline-connectors/elasticsearch.md
The Elasticsearch Pipeline connector can be used as the Data Sink of the pipeline, and write data to Elasticsearch. This document describes how to set up the Elasticsearch Pipeline connector.
The pipeline for reading data from MySQL and sink to Elasticsearch can be defined as follows:
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: elasticsearch
name: Elasticsearch Sink
hosts: http://127.0.0.1:9092,http://127.0.0.1:9093
route:
- source-table: adb.\.*
sink-table: default_index
description: sync adb.\.* table to default_index
pipeline:
name: MySQL to Elasticsearch Pipeline
parallelism: 2
The written index of Elasticsearch will be namespace.schemaName.tableName string of TableId,this can be changed using route function of pipeline.
No support for automatic Elasticsearch index creation.
Elasticsearch stores document in a JSON string. So the data type mapping is between Flink CDC data type and JSON data type.
<div class="wy-table-responsive"> <table class="colwidths-auto docutils"> <thead> <tr> <th class="text-left">CDC type</th> <th class="text-left">JSON type</th> <th class="text-left" style="width:60%;">NOTE</th> </tr> </thead> <tbody> <tr> <td>TINYINT</td> <td>NUMBER</td> <td></td> </tr> <tr> <td>SMALLINT</td> <td>NUMBER</td> <td></td> </tr> <tr> <td>INT</td> <td>NUMBER</td> <td></td> </tr> <tr> <td>BIGINT</td> <td>NUMBER</td> <td></td> </tr> <tr> <td>FLOAT</td> <td>NUMBER</td> <td></td> </tr> <tr> <td>DOUBLE</td> <td>NUMBER</td> <td></td> </tr> <tr> <td>DECIMAL(p, s)</td> <td>STRING</td> <td></td> </tr> <tr> <td>BOOLEAN</td> <td>BOOLEAN</td> <td></td> </tr> <tr> <td>DATE</td> <td>STRING</td> <td>with format: date (yyyy-MM-dd), example: 2024-10-21</td> </tr> <tr> <td>TIMESTAMP</td> <td>STRING</td> <td>with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000</td> </tr> <tr> <td>TIMESTAMP_LTZ</td> <td>STRING</td> <td>with format: date-time (yyyy-MM-dd HH:mm:ss.SSSSSS, with UTC time zone), example: 2024-10-21 14:10:56.000000</td> </tr> <tr> <td>CHAR(n)</td> <td>STRING</td> <td></td> </tr> <tr> <td>VARCHAR(n)</td> <td>STRING</td> <td></td> </tr> <tr> <td>ARRAY</td> <td>ARRAY</td> <td></td> </tr> <tr> <td>MAP</td> <td>STRING</td> <td></td> </tr> <tr> <td>ROW</td> <td>STRING</td> <td></td> </tr> </tbody> </table> </div>{{< top >}}