Back to Seatunnel

Elasticsearch

docs/en/connectors/source/Elasticsearch.md

2.3.1316.1 KB
Original Source

import ChangeLog from '../changelog/connector-elasticsearch.md';

Elasticsearch

Elasticsearch source connector

Description

Used to read data from Elasticsearch.

support version >= 2.x and <= 8.x.

Key features

Options

nametyperequireddefault value
hostsarrayyes-
auth_typestringnobasic
usernamestringno-
passwordstringno-
auth.api_key_idstringno-
auth.api_keystringno-
auth.api_key_encodedstringno-
indexstringnoIf the index list does not exist, the index must be configured
index_listarraynoused to define a multiple table task
sourcearrayno-
queryjsonno{"match_all": {}}
search_typeenumnoQuery type, SQL or DSL, default DSL
search_api_typeenumnoPagination API type, SCROLL or PIT, default SCROLL
sql_queryjsonnoSQL query, required when search_type is SQL
scroll_timestringno1m
scroll_sizeintno100
tls_verify_certificatebooleannotrue
tls_verify_hostnamebooleannotrue
array_columnmapno
tls_keystore_pathstringno-
tls_keystore_passwordstringno-
tls_truststore_pathstringno-
tls_truststore_passwordstringno-
pit_keep_alivelongno60000 (1 minute)
pit_batch_sizeintno100
runtime_fieldsarrayno-
common-optionsno-

hosts [array]

Elasticsearch cluster http address, the format is host:port, allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"].

Authentication

The Elasticsearch connector supports multiple authentication methods to connect to secured Elasticsearch clusters. You can choose the appropriate authentication method based on your Elasticsearch security configuration.

auth_type [enum]

Specifies the authentication method to use. Supported values:

  • basic (default): HTTP Basic Authentication using username and password
  • api_key: Elasticsearch API Key authentication using separate ID and key
  • api_key_encoded: Elasticsearch API Key authentication using encoded key

If not specified, defaults to basic for backward compatibility.

Basic Authentication

Basic authentication uses HTTP Basic Authentication with username and password credentials.

username [string]

Username for basic authentication (x-pack username).

password [string]

Password for basic authentication (x-pack password).

Example:

hocon
source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        auth_type = "basic"
        username = "elastic"
        password = "your_password"
        index = "my_index"
    }
}

API Key Authentication

API Key authentication provides a more secure way to authenticate with Elasticsearch using API keys.

auth.api_key_id [string]

The API key ID generated by Elasticsearch.

auth.api_key [string]

The API key secret generated by Elasticsearch.

auth.api_key_encoded [string]

Base64 encoded API key in the format base64(id:api_key). This is an alternative to specifying auth.api_key_id and auth.api_key separately.

Note: You can use either auth.api_key_id + auth.api_key OR auth.api_key_encoded, but not both.

Example with separate ID and key:

hocon
source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        auth_type = "api_key"
        auth.api_key_id = "your_api_key_id"
        auth.api_key = "your_api_key_secret"
        index = "my_index"
    }
}

Example with encoded key:

hocon
source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        auth_type = "api_key_encoded"
        auth.api_key_encoded = "eW91cl9hcGlfa2V5X2lkOnlvdXJfYXBpX2tleV9zZWNyZXQ="
        index = "my_index"
    }
}

index [string]

Elasticsearch index name, support * fuzzy matching.

source [array]

The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit. If you don't config source, it is automatically retrieved from the mapping of the index.

array_column [map]

The fields of array type. Since there is no array index in es,so need assign array type,just like {c_array = "array<tinyint>"}.

query [json]

Elasticsearch DSL. You can control the range of data read.

scroll_time [String]

Amount of time Elasticsearch will keep the search context alive for scroll requests.

scroll_size [int]

Maximum number of hits to be returned with each Elasticsearch scroll request.

index_list [array]

The index_list is used to define multi-index synchronization tasks. It is an array that contains the parameters required for single-table synchronization, such as query, source/schema, scroll_size, and scroll_time. It is recommended that index_list and query should not be configured at the same level simultaneously. Please refer to the upcoming multi-table synchronization example for more details.

tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints

tls_verify_hostname [boolean]

Enable hostname validation for HTTPS endpoints

tls_keystore_path [string]

The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.

tls_keystore_password [string]

The key password for the key store specified

tls_truststore_path [string]

The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.

tls_truststore_password [string]

The key password for the trust store specified

search_type

Query type, available values:

  • DSL: Use Domain Specific Language query (default)
  • SQL: Use SQL query

search_api_type

Pagination API type, available values:

  • SCROLL: Use Scroll API for pagination (default)
  • PIT: Use Point in Time (PIT) API for pagination

pit_keep_alive [long]

The amount of time (in milliseconds) for which the PIT should be keep alive

pit_batch_size [int]

Maximum number of hits to be returned with each PIT search request

runtime_fields [array]

Runtime fields to be computed at query time (Elasticsearch 7.11+). Each runtime field should contain:

  • name: The name of the runtime field
  • type: The data type (boolean, date, double, geo_point, ip, keyword, long)
  • script: Painless script to compute the field value
  • script_lang (optional): Script language (default: painless)
  • script_params (optional): Script parameters

Example:

hocon
runtime_fields = [
  {
    name = "day_of_week"
    type = "keyword"
    script = "emit(doc['timestamp'].value.dayOfWeekEnum.toString())"
  },
  {
    name = "total_price"
    type = "double"
    script = "emit(doc['quantity'].value * doc['price'].value)"
  }
]

Runtime Fields Use Cases:

  1. Date Extraction: Extract day of week, month, year from timestamps
  2. Calculations: Compute derived values like total price, tax amount
  3. String Operations: Concatenate fields, extract substrings
  4. Conditional Logic: Categorize data based on conditions
  5. Data Transformation: Convert units, format values on-the-fly

Performance Considerations:

  • Runtime fields are computed at query time, which may impact performance for large datasets
  • Best suited for ad-hoc analysis, prototyping, and infrequent queries
  • Keep scripts simple to minimize performance impact
  • Consider indexing frequently used computed fields

Limitations:

  • Requires Elasticsearch 7.11 or higher
  • Only Painless scripts are supported
  • May be slower than indexed fields for large-scale queries

common options

Source plugin common parameters, please refer to Source Common Options for details

Examples

Demo 1

This case will read data from indices matching the seatunnel-* pattern based on a query. The query will only return documents containing the id, name, age, tags, and phones fields. In this example, the source field configuration is used to specify which fields should be read, and the array_column is used to indicate that tags and phones should be treated as arrays.

hocon
Elasticsearch {
    hosts = ["localhost:9200"]
    index = "seatunnel-*"
    array_column = {tags = "array<string>",phones = "array<string>"}
    source = ["_id","name","age","tags","phones"]
    query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}}
}

Demo 2 : Multi-table synchronization

This example demonstrates how to read different data from read_index1 and read_index2 and write separately to read_index1_copy,read_index2_copy. in read_index1,I used source to specify the fields to be read and specify which fields are array fields using the 'array_column'.

hocon
source {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false
    index_list = [
       {
           index = "read_index1"
           query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
           source = [
           c_map,
           c_array,
           c_string,
           c_boolean,
           c_tinyint,
           c_smallint,
           c_bigint,
           c_float,
           c_double,
           c_decimal,
           c_bytes,
           c_int,
           c_date,
           c_timestamp]
           array_column = {
           c_array = "array<tinyint>"
           }
       }
       {
           index = "read_index2"
           query = {"match_all": {}}
           source = [
           c_int2,
           c_date2,
           c_null
           ]

       }

    ]

  }
}

transform {
}

sink {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false

    index = "${table_name}_copy"
    index_type = "st"
    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
    "data_save_mode"="APPEND_DATA"
  }
}

Demo 3 : SSL (Disable certificates validation)

hocon
source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"

        tls_verify_certificate = false
    }
}

Demo 4 :SSL (Disable hostname validation)

hocon
source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"

        tls_verify_hostname = false
    }
}

Demo 5 :SSL (Enable certificates validation)

hocon
source {
    Elasticsearch {
        hosts = ["https://localhost:9200"]
        username = "elastic"
        password = "elasticsearch"

        tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
        tls_keystore_password = "${your password}"
    }
}

Demo 6 : sql query notes: sql does not support map and array types

hocon
source {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false
    index = "st_index_sql"
    sql_query = "select * from st_index_sql where c_int>=10 and c_int<=20"
    search_type = "sql"
  }
}

Demo7: PIT

hocon
source {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false

    index = "st_index"
    query = {"range": {"c_int": {"gte": 10, "lte": 20}}}

    # Use DSL query with PIT API
    search_type = DSL
    search_api_type = PIT
    pit_keep_alive = 60000  # 1 minute in milliseconds
    pit_batch_size = 100
  }
}

Demo 8: Runtime Fields (Elasticsearch 7.11+)

This example demonstrates how to use runtime fields to compute values at query time without reindexing data.

hocon
source {
  Elasticsearch {
    hosts = ["https://elasticsearch:9200"]
    username = "elastic"
    password = "elasticsearch"
    tls_verify_certificate = false
    tls_verify_hostname = false
    
    index = "sales_data"
    
    # Define runtime fields for dynamic computation
    runtime_fields = [
      {
        # Calculate total amount
        name = "total_amount"
        type = "double"
        script = "emit(doc['quantity'].value * doc['price'].value)"
      },
      {
        # Extract day of week from timestamp
        name = "day_of_week"
        type = "keyword"
        script = "emit(doc['order_date'].value.dayOfWeekEnum.getDisplayName(TextStyle.FULL, Locale.ROOT))"
      },
      {
        # Categorize orders
        name = "order_category"
        type = "keyword"
        script = """
          double amount = doc['quantity'].value * doc['price'].value;
          if (amount > 1000) {
            emit('high_value');
          } else if (amount > 100) {
            emit('medium_value');
          } else {
            emit('low_value');
          }
        """
      },
      {
        # Calculate with parameters
        name = "price_with_tax"
        type = "double"
        script = "emit(doc['price'].value * (1 + params.tax_rate))"
        script_params = {
          tax_rate = 0.13
        }
      }
    ]
    
    # Include runtime fields in the output
    source = [
      "product_id",
      "quantity",
      "price",
      "order_date",
      "total_amount",
      "day_of_week",
      "order_category",
      "price_with_tax"
    ]
    
    schema = {
      fields {
        product_id = string
        quantity = int
        price = double
        order_date = timestamp
        total_amount = double
        day_of_week = string
        order_category = string
        price_with_tax = double
      }
    }
  }
}

sink {
  Console {
  }
}

Changelog

<ChangeLog />