docs/en/connectors/sink/Elasticsearch.md
import ChangeLog from '../changelog/connector-elasticsearch.md';
Output data to Elasticsearch.
:::tip
Engine Supported
ElasticSearch version is >= 2.x and <= 8.x:::
| name | type | required | default value |
|---|---|---|---|
| hosts | array | yes | - |
| index | string | yes | - |
| schema_save_mode | string | yes | CREATE_SCHEMA_WHEN_NOT_EXIST |
| data_save_mode | string | yes | APPEND_DATA |
| index_type | string | no | |
| primary_keys | list | no | |
| key_delimiter | string | no | _ |
| auth_type | string | no | basic |
| username | string | no | |
| password | string | no | |
| auth.api_key_id | string | no | - |
| auth.api_key | string | no | - |
| auth.api_key_encoded | string | no | - |
| max_retry_count | int | no | 3 |
| max_batch_size | int | no | 10 |
| tls_verify_certificate | boolean | no | true |
| tls_verify_hostname | boolean | no | true |
| tls_keystore_path | string | no | - |
| tls_keystore_password | string | no | - |
| tls_truststore_path | string | no | - |
| tls_truststore_password | string | no | - |
| common-options | no | - | |
| vectorization_fields | array | no | - |
| vector_dimensions | int | no | - |
Elasticsearch cluster http address, the format is host:port , allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"].
Elasticsearch index name.Index support contains variables of field name,such as seatunnel_${age}(Need to configure schema_save_mode="IGNORE"),and the field must appear at seatunnel row.
If not, we will treat it as a normal index.
Elasticsearch index type, it is recommended not to specify in elasticsearch 6 and above
Primary key fields used to generate the document _id, this is cdc required options.
Delimiter for composite keys ("_" by default), e.g., "$" would result in document _id "KEY1$KEY2$KEY3".
The Elasticsearch connector supports multiple authentication methods to connect to secured Elasticsearch clusters. You can choose the appropriate authentication method based on your Elasticsearch security configuration.
Specifies the authentication method to use. Supported values:
basic (default): HTTP Basic Authentication using username and passwordapi_key: Elasticsearch API Key authentication using separate ID and keyapi_key_encoded: Elasticsearch API Key authentication using encoded keyIf not specified, defaults to basic for backward compatibility.
Basic authentication uses HTTP Basic Authentication with username and password credentials.
Username for basic authentication (x-pack username).
Password for basic authentication (x-pack password).
Field names that require vector conversion, supported by Elasticsearch 7.3 and later versions
Vector dimension, supported by Elasticsearch 7.3 and later versions
Example:
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
auth_type = "basic"
username = "elastic"
password = "your_password"
index = "my_index"
}
}
API Key authentication provides a more secure way to authenticate with Elasticsearch using API keys.
The API key ID generated by Elasticsearch.
The API key secret generated by Elasticsearch.
Base64 encoded API key in the format base64(id:api_key). This is an alternative to specifying auth.api_key_id and auth.api_key separately.
Note: You can use either auth.api_key_id + auth.api_key OR auth.api_key_encoded, but not both.
Example with separate ID and key:
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
auth_type = "api_key"
auth.api_key_id = "your_api_key_id"
auth.api_key = "your_api_key_secret"
index = "my_index"
}
}
Example with encoded key:
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
auth_type = "api_key_encoded"
auth.api_key_encoded = "eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ="
index = "my_index"
}
}
one bulk request max try size
fields to embeddings
embeddings dimensions
batch bulk doc max size
Enable certificates validation for HTTPS endpoints
Enable hostname validation for HTTPS endpoints
The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.
The key password for the key store specified
The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.
The key password for the trust store specified
Sink plugin common parameters, please refer to Sink Common Options for details
Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
Option introduction:
RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved
ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist
IGNORE :Ignore the treatment of the table
Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
Option introduction:
DROP_DATA: Preserve database structure and delete data
APPEND_DATA:Preserve database structure, preserve data
ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported
Simple
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
schema_save_mode="IGNORE"
}
}
Multi-table writing
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "${table_name}"
schema_save_mode="IGNORE"
}
}
vector-field writing
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "${table_name}"
schema_save_mode="IGNORE"
vectorization_fields = ["review_embedding"]
vector_dimensions = 1024
}
}
CDC(Change data capture) event
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
schema_save_mode="IGNORE"
# cdc required options
primary_keys = ["key1", "key2", ...]
}
}
CDC(Change data capture) event Multi-table writing
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "${table_name}"
schema_save_mode="IGNORE"
primary_keys = ["${primary_key}"]
}
}
SSL (Disable certificates validation)
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
}
}
SSL (Disable hostname validation)
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_hostname = false
}
}
SSL (Enable certificates validation)
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
}
SAVE_MODE
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}
CDC collection supports a limited number of schema changes. The currently supported schema changes include:
env {
# You can set engine configuration here
parallelism = 5
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second = 7000000
read_limit.rows_per_second = 400
}
source {
MySQL-CDC {
server-id = 5652-5657
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true
}
}
sink {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index = "schema_change_index"
index_type = "_doc"
"schema_save_mode" = "CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode" = "APPEND_DATA"
}
}