Back to Seatunnel

Hbase

docs/en/connectors/sink/Hbase.md

2.3.136.4 KB
Original Source

import ChangeLog from '../changelog/connector-hbase.md';

Hbase

Hbase sink connector

Description

Output data to Hbase

Key features

Options

nametyperequireddefault value
zookeeper_quorumstringyes-
tablestringyes-
rowkey_columnlistyes-
family_nameconfigyes-
rowkey_delimiterstringno""
version_columnstringno-
null_modestringnoskip
wal_writebooleanyesfalse
write_buffer_sizestringno8 * 1024 * 1024
encodingstringnoutf8
hbase_extra_configconfigno-
common-optionsno-
ttllongno-

zookeeper_quorum [string]

The zookeeper cluster host of hbase, example: "hadoop001:2181,hadoop002:2181,hadoop003:2181"

table [string]

The table name you want to write, example: "seatunnel" If your table is under a custom namespace, use namespace:table (for example, ns1:seatunnel_test); if omitted, SeaTunnel will write to HBase's default namespace (default).

rowkey_column [list]

The column name list of row keys, example: ["id", "uuid"]

family_name [config]

The family name mapping of fields. For example the row from upstream like the following shown:

idnameage
1tyrantlucifer27

id as the row key and other fields written to the different families, you can assign

family_name { name = "info1" age = "info2" }

this means that name will be written to the family info1 and the age will be written to the family info2

if you want other fields written to the same family, you can assign

family_name { all_columns = "info" }

this means that all fields will be written to the family info

rowkey_delimiter [string]

The delimiter of joining multi row keys, default ""

version_column [string]

The version column name, you can use it to assign timestamp for hbase record

null_mode [double]

The mode of writing null value, support [skip, empty], default skip

  • skip: When the field is null, connector will not write this field to hbase
  • empty: When the field is null, connector will write generate empty value for this field

wal_write [boolean]

The wal log write flag, default false

write_buffer_size [int]

The write buffer size of hbase client, default 8 * 1024 * 1024

encoding [string]

The encoding used for STRING/DECIMAL/DATE/TIME/TIMESTAMP/ARRAY fields, support [utf8, gbk], default utf8

Data types

Hbase stores bytes. The connector supports:

  • TINYINT/SMALLINT/INT/BIGINT/FLOAT/DOUBLE/BOOLEAN/BYTES
  • STRING/DECIMAL/DATE/TIME/TIMESTAMP/ARRAY (serialized as strings using encoding)

hbase_extra_config [config]

The extra configuration of hbase

ttl [long]

Hbase writes data TTL time, the default is based on the TTL set in the table, unit: milliseconds

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Example

hocon

Hbase {
  zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
  table = "seatunnel_test"
  rowkey_column = ["name"]
  family_name {
    all_columns = seatunnel
  }
}

Kerberos Example

Note:

  • connector-hbase does not parse krb5_path, kerberos_principal, or kerberos_keytab_path.
  • Prepare Kerberos credentials and krb5.conf in the runtime environment (for example, kinit -kt ... or JVM -Djava.security.krb5.conf=...), and put HBase/Hadoop security settings into hbase_extra_config.
hocon
sink {
  Hbase {
    zookeeper_quorum = "zk1:2181,zk2:2181,zk3:2181"
    table = "target_table"
    rowkey_column = ["rowkey"]
    family_name {
      all_columns = "info"
    }

    # HBase security config
    hbase_extra_config = {
      "hbase.security.authentication" = "kerberos"
      "hadoop.security.authentication" = "kerberos"
      "hbase.master.kerberos.principal" = "hbase/_HOST@REALM"
      "hbase.regionserver.kerberos.principal" = "hbase/_HOST@REALM"
      "hbase.rpc.protection" = "authentication"
      "hbase.zookeeper.useSasl" = "false"
    }
  }
}

Multiple Table

hocon
env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    tables_configs = [
       {
        schema = {
          table = "hbase_sink_1"
         fields {
                    name = STRING
                    c_string = STRING
                    c_double = DOUBLE
                    c_bigint = BIGINT
                    c_float = FLOAT
                    c_int = INT
                    c_smallint = SMALLINT
                    c_boolean = BOOLEAN
                    time = BIGINT
           }
        }
            rows = [
              {
                kind = INSERT
                fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356]
              }
              ]
       },
       {
       schema = {
         table = "hbase_sink_2"
              fields {
                    name = STRING
                    c_string = STRING
                    c_double = DOUBLE
                    c_bigint = BIGINT
                    c_float = FLOAT
                    c_int = INT
                    c_smallint = SMALLINT
                    c_boolean = BOOLEAN
                    time = BIGINT
              }
       }
           rows = [
             {
               kind = INSERT
               fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357]
             }
             ]
      }
    ]
  }
}

sink {
  Hbase {
    zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
    table = "${table_name}"
    rowkey_column = ["name"]
    family_name {
      all_columns = info
    }
  }
}

Writes To The Specified Column Family

hocon
Hbase {
  zookeeper_quorum = "hbase_e2e:2181"
  table = "assign_cf_table"
  rowkey_column = ["id"]
  family_name {
    c_double = "cf1"
    c_bigint = "cf2"
  }
}

Changelog

<ChangeLog />