Back to Seatunnel

Hive

docs/en/connectors/sink/Hive.md

2.3.1316.2 KB
Original Source

import ChangeLog from '../changelog/connector-hive.md';

Hive

Hive sink connector

Description

Write data to Hive.

:::tip

In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9 and 3.1.3 .

If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and hive-exec-3.1.3.jar and libfb303-0.9.3.jar in $SEATUNNEL_HOME/lib/ dir. :::

Key features

By default, we use 2PC commit to ensure exactly-once

  • file format
    • text
    • csv
    • parquet
    • orc
    • json
  • compress codec
    • lzo

Options

nametyperequireddefault value
table_namestringyes-
metastore_uristringyes-
compress_codecstringnonone
hdfs_site_pathstringno-
hive_site_pathstringno-
hive.hadoop.confMapno-
hive.hadoop.conf-pathstringno-
krb5_pathstringno/etc/krb5.conf
kerberos_principalstringno-
kerberos_keytab_pathstringno-
abort_drop_partition_metadatabooleannotrue
parquet_avro_write_timestamp_as_int96booleannofalse
overwritebooleannofalse
data_save_modeenumnoAPPEND_DATA

| schema_save_mode | enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | | save_mode_create_template | string | no | - | | common-options | | no | - |

table_name [string]

Target Hive table name eg: db1.table1, and if the source is multiple mode, you can use ${database_name}.${table_name} to generate the table name, it will replace the ${database_name} and ${table_name} with the value of the CatalogTable generate from the source.

metastore_uri [string]

Hive metastore uri. Supports comma-separated multiple URIs for HA/failover (whitespace is ignored). SeaTunnel passes this value to Hive hive.metastore.uris and uses Hive RetryingMetaStoreClient (if available) to retry/failover between URIs. This is client-side endpoint failover; make sure your metastores share/replicate the same backend to keep metadata consistent.

hdfs_site_path [string]

The path of hdfs-site.xml, used to load ha configuration of namenodes

hive_site_path [string]

The path of hive-site.xml

hive.hadoop.conf [map]

Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')

hive.hadoop.conf-path [string]

The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files

krb5_path [string]

The path of krb5.conf, used to authentication kerberos

The path of hive-site.xml, used to authentication hive metastore

kerberos_principal [string]

The principal of kerberos

kerberos_keytab_path [string]

The keytab path of kerberos

abort_drop_partition_metadata [boolean]

Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process).

parquet_avro_write_timestamp_as_int96 [boolean]

Support writing Parquet INT96 from a timestamp, only valid for parquet files.

overwrite [boolean]

Flag to decide whether to use overwrite mode when inserting data into Hive. If set to true, for non-partitioned tables, the existing data in the table will be deleted before inserting new data. For partitioned tables, the data in the relevant partition will be deleted before inserting new data.

  • Batch mode (BATCH): Delete existing data in the target path before commit (for non-partitioned tables, delete the table directory; for partitioned tables, delete the related partition directories), then write new data.
  • Streaming mode (STREAMING): In streaming jobs with checkpointing enabled, commit() is invoked after each completed checkpoint. To avoid deleting on every checkpoint (which would wipe previously committed files), SeaTunnel deletes each target directory (table directory / partition directory) at most once (empty commits will skip deletion). On recovery, the delete step is best-effort and may be skipped to avoid deleting already committed data, so streaming overwrite is not a strict snapshot overwrite.

data_save_mode [enum]

Select how to handle existing data on the target before writing new data.

  • APPEND_DATA (default): Keep existing data and append new records.
  • DROP_DATA: Behaves the same as overwrite=true. Before commit, delete the existing data in the target path (for non-partitioned tables, delete the table directory; for partitioned tables, delete the related partition directories), then write new data.
  • CUSTOM_PROCESSING / ERROR_WHEN_DATA_EXISTS: Currently not recommended for Hive sink unless you have specific requirements.

Note: overwrite=true and data_save_mode=DROP_DATA are equivalent. Use either one; do not set both.

schema_save_mode [enum]

Before starting the synchronization task, different processing schemes are selected for the existing table structure on the target side.

Default value: CREATE_SCHEMA_WHEN_NOT_EXIST

Option values:

  • RECREATE_SCHEMA: Will create when the table does not exist, delete and rebuild when the table exists
  • CREATE_SCHEMA_WHEN_NOT_EXIST: Will create when the table does not exist, skip when the table exists
  • ERROR_WHEN_SCHEMA_NOT_EXIST: Error will be reported when the table does not exist
  • IGNORE: Ignore the treatment of the table

save_mode_create_template [string]

We use templates to automatically create Hive tables, which will create corresponding table creation statements based on the type of upstream data and schema type, and the default template can be modified according to the situation. Available template variables: ${database}, ${table}, ${rowtype_fields}, ${rowtype_partition_fields}, ${table_location}.

Default value: When not specified, uses a default PARQUET non-partitioned table template:

sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
  ${rowtype_fields}
)
STORED AS PARQUET
LOCATION '${table_location}'

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Example

bash

  Hive {
    table_name = "default.seatunnel_orc"
    metastore_uri = "thrift://namenode001:9083"
  }

Metastore URI failover example (multiple URIs):

bash
  Hive {
    table_name = "default.seatunnel_orc"
    metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
  }

example 1

We have a source table like this:

bash
create table test_hive_source(
     test_tinyint                          TINYINT,
     test_smallint                       SMALLINT,
     test_int                                INT,
     test_bigint                           BIGINT,
     test_boolean                       BOOLEAN,
     test_float                             FLOAT,
     test_double                         DOUBLE,
     test_string                           STRING,
     test_binary                          BINARY,
     test_timestamp                  TIMESTAMP,
     test_decimal                       DECIMAL(8,2),
     test_char                             CHAR(64),
     test_varchar                        VARCHAR(64),
     test_date                             DATE,
     test_array                            ARRAY<INT>,
     test_map                              MAP<STRING, FLOAT>,
     test_struct                           STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
     )
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

We need read data from the source table and write to another table:

bash
create table test_hive_sink_text_simple(
     test_tinyint                          TINYINT,
     test_smallint                       SMALLINT,
     test_int                                INT,
     test_bigint                           BIGINT,
     test_boolean                       BOOLEAN,
     test_float                             FLOAT,
     test_double                         DOUBLE,
     test_string                           STRING,
     test_binary                          BINARY,
     test_timestamp                  TIMESTAMP,
     test_decimal                       DECIMAL(8,2),
     test_char                             CHAR(64),
     test_varchar                        VARCHAR(64),
     test_date                             DATE
     )
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

The job config file can like this:

env {
  parallelism = 3
  job.name="test_hive_source_to_hive"
}

source {
  Hive {
    table_name = "test_hive.test_hive_source"
    metastore_uri = "thrift://ctyun7:9083"
  }
}

sink {
  # choose stdout output plugin to output data to console

  Hive {
    table_name = "test_hive.test_hive_sink_text_simple"
    metastore_uri = "thrift://ctyun7:9083"
    hive.hadoop.conf = {
      bucket = "s3a://mybucket"
      fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
    }
}

example2: Kerberos

bash
sink {
  Hive {
    table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
    metastore_uri = "thrift://metastore:9083"
    hive_site_path = "/tmp/hive-site.xml"
    kerberos_principal = "hive/[email protected]"
    kerberos_keytab_path = "/tmp/hive.keytab"
    krb5_path = "/tmp/krb5.conf"
  }
}

Description:

  • hive_site_path: The path to the hive-site.xml file.
  • kerberos_principal: The principal for Kerberos authentication.
  • kerberos_keytab_path: The keytab file path for Kerberos authentication.
  • krb5_path: The path to the krb5.conf file used for Kerberos authentication.

Run the case:

bash
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
    metastore_uri = "thrift://metastore:9083"
    hive_site_path = "/tmp/hive-site.xml"
    kerberos_principal = "hive/[email protected]"
    kerberos_keytab_path = "/tmp/hive.keytab"
    krb5_path = "/tmp/krb5.conf"
  }
}

Hive on s3

Step 1

Create the lib dir for hive of emr.

shell
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

Step 2

Get the jars from maven center to the lib.

shell
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

Step 3

Copy the jars from your environment on emr to the lib dir.

shell
cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib

Step 4

Run the case.

shell
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_on_s3"
    metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
    hive.hadoop.conf = {
       bucket="s3://ws-package"
       fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
    }
  }
}

Hive on oss

Step 1

Create the lib dir for hive of emr.

shell
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

Step 2

Get the jars from maven center to the lib.

shell
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

Step 3

Copy the jars from your environment on emr to the lib dir and delete the conflicting jar.

shell
cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar

Step 4

Run the case.

shell
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
      }
      primaryKey {
        name = "pk_id"
        columnNames = [pk_id]
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100]
      },
      {
        kind = INSERT
        fields = [2, "B", 100]
      },
      {
        kind = INSERT
        fields = [3, "C", 100]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "test_hive.test_hive_sink_on_oss"
    metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
    hive.hadoop.conf-path = "/tmp/hadoop"
    hive.hadoop.conf = {
        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
    }
  }
}

example 2

We have multiple source table like this:

bash
create table test_1(
)
PARTITIONED BY (xx);

create table test_2(
)
PARTITIONED BY (xx);
...

We need read data from these source tables and write to another tables:

The job config file can like this:

env {
  # You can set flink configuration here
  parallelism = 3
  job.name="test_hive_source_to_hive"
}

source {
  Hive {
    tables_configs = [
      {
        table_name = "test_hive.test_1"
        metastore_uri = "thrift://ctyun6:9083"
      },
      {
        table_name = "test_hive.test_2"
        metastore_uri = "thrift://ctyun7:9083"
      }
    ]
  }
}

sink {
  # choose stdout output plugin to output data to console
  Hive {
    table_name = "${database_name}.${table_name}"
    metastore_uri = "thrift://ctyun7:9083"
  }
}

Auto Table Creation Examples

Example 1: Basic Auto Table Creation

hocon
env {
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    schema = {
      fields {
        id = bigint
        name = string
        department = string
        salary = decimal(10,2)
        hire_date = date
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "John Doe", "Engineering", 75000.50, "2022-01-15"]
      }
    ]
  }
}

sink {
  Hive {
    table_name = "warehouse.employees"
    metastore_uri = "thrift://metastore:9083"
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
    save_mode_create_template = """
      CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
        ${rowtype_fields}
      )
      PARTITIONED BY (
        department string COMMENT 'Department partition'
      )
      STORED AS PARQUET
      LOCATION '${table_location}'
      TBLPROPERTIES (
        'seatunnel.creation.mode' = 'template'
      )
    """
  }
}

Changelog

<ChangeLog />