Back to Seatunnel

GraphQL

docs/en/connectors/sink/GraphQL.md

2.3.136.3 KB
Original Source

import ChangeLog from '../changelog/connector-graphql.md';

GraphQL

GraphQL sink connector

Support Those Engines

Spark

Flink

SeaTunnel Zeta

Key Features

Description

Used to launch web hooks using data.

For example, if the data from upstream is [label: {"__name__": "test1"}, value: 1.2.3,time:2024-08-15T17:00:00], the body content is the following: {"label":{"__name__": "test1"}, "value":"1.23","time":"2024-08-15T17:00:00"}

Tips: GraphQL sink only support post json webhook and the data from source will be treated as body content in web hook.And does not support passing past data

Supported DataSource Info

In order to use the GraphQL connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.

DatasourceSupported VersionsDependency
HttpuniversalDownload

Sink Options

NameTypeRequiredDefaultDescription
urlStringYes-Http request url
queryStringYes-GraphQL query
variablesStringNo-GraphQL variables
valueCoverBooleanNo-Whether the data overwrites the variable value
headersMapNo-Http headers
retryIntNo-The max retry times if request http return to IOException
retry_backoff_multiplier_msIntNo100The retry-backoff times(millis) multiplier if request http failed
retry_backoff_max_msIntNo10000The maximum retry-backoff times(millis) if request http failed
connect_timeout_msIntNo12000Connection timeout setting, default 12s.
socket_timeout_msIntNo60000Socket timeout setting, default 60s.
common-optionsNo-Sink plugin common parameters, please refer to Sink Common Options for details

Example

simple:

hocon
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    tables_configs = [
       {
        schema = {
          table = "graphql_sink_1"
         fields {
                id = int
                val_bool = boolean
                val_int8 = tinyint
                val_int16 = smallint
                val_int32 = int
                val_int64 = bigint
                val_float = float
                val_double = double
                val_decimal = "decimal(16, 1)"
                val_string = string
                val_unixtime_micros = timestamp
      }
        }
            rows = [
              {
                kind = INSERT
                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
              }
              ]
       },
       {
       schema = {
         table = "graphql_sink_2"
              fields {
                        id = int
                        val_bool = boolean
                        val_int8 = tinyint
                        val_int16 = smallint
                        val_int32 = int
                        val_int64 = bigint
                        val_float = float
                        val_double = double
                        val_decimal = "decimal(16, 1)"
                        val_string = string
                        val_unixtime_micros = timestamp
              }
       }
           rows = [
             {
               kind = INSERT
               fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
             }
             ]
      }
    ]
  }
}

sink {
   GraphQL {
        url = "http://192.168.1.103:9081/v1/graphql"
        query = """
         mutation MyMutation(
           $id: Int!
           $val_bool: Boolean!
           $val_int8: smallint!
           $val_int16: smallint!
           $val_int32: Int!
           $val_int64: bigint!
           $val_float: Float!
           $val_double: Float!
           $val_decimal: numeric!
           $val_string: String!
           $val_unixtime_micros: timestamp!
         ) {
           insert_sink(objects: {
             id: $id,
             val_bool: $val_bool,
             val_int8: $val_int8,
             val_int16: $val_int16,
             val_int32: $val_int32,
             val_int64: $val_int64,
             val_float: $val_float,
             val_double: $val_double,
             val_decimal: $val_decimal,
             val_string: $val_string,
             val_unixtime_micros: $val_unixtime_micros
           }) {
             affected_rows
             returning {
               id
               val_bool
               val_decimal
               val_double
               val_float
               val_int16
               val_int32
               val_int64
               val_int8
               val_string
               val_unixtime_micros
             }
           }
         }
        """
        variables = {
            "val_bool": True
        }
    }
}

Changelog

<ChangeLog />