docs/en/connectors/sink/Hive.md
import ChangeLog from '../changelog/connector-hive.md';
Hive sink connector
Write data to Hive.
:::tip
In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9 and 3.1.3 .
If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and hive-exec-3.1.3.jar and libfb303-0.9.3.jar in $SEATUNNEL_HOME/lib/ dir. :::
By default, we use 2PC commit to ensure exactly-once
| name | type | required | default value |
|---|---|---|---|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compress_codec | string | no | none |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| hive.hadoop.conf | Map | no | - |
| hive.hadoop.conf-path | string | no | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| abort_drop_partition_metadata | boolean | no | true |
| parquet_avro_write_timestamp_as_int96 | boolean | no | false |
| overwrite | boolean | no | false |
| data_save_mode | enum | no | APPEND_DATA |
| schema_save_mode | enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | | save_mode_create_template | string | no | - | | common-options | | no | - |
Target Hive table name eg: db1.table1, and if the source is multiple mode, you can use ${database_name}.${table_name} to generate the table name, it will replace the ${database_name} and ${table_name} with the value of the CatalogTable generate from the source.
Hive metastore uri. Supports comma-separated multiple URIs for HA/failover (whitespace is ignored). SeaTunnel passes this value to Hive hive.metastore.uris and uses Hive RetryingMetaStoreClient (if available) to retry/failover between URIs. This is client-side endpoint failover; make sure your metastores share/replicate the same backend to keep metadata consistent.
The path of hdfs-site.xml, used to load ha configuration of namenodes
The path of hive-site.xml
Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')
The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files
The path of krb5.conf, used to authentication kerberos
The path of hive-site.xml, used to authentication hive metastore
The principal of kerberos
The keytab path of kerberos
Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process).
Support writing Parquet INT96 from a timestamp, only valid for parquet files.
Flag to decide whether to use overwrite mode when inserting data into Hive. If set to true, for non-partitioned tables, the existing data in the table will be deleted before inserting new data. For partitioned tables, the data in the relevant partition will be deleted before inserting new data.
commit() is invoked after each completed checkpoint. To avoid deleting on every checkpoint (which would wipe previously committed files), SeaTunnel deletes each target directory (table directory / partition directory) at most once (empty commits will skip deletion). On recovery, the delete step is best-effort and may be skipped to avoid deleting already committed data, so streaming overwrite is not a strict snapshot overwrite.Select how to handle existing data on the target before writing new data.
Note: overwrite=true and data_save_mode=DROP_DATA are equivalent. Use either one; do not set both.
Before starting the synchronization task, different processing schemes are selected for the existing table structure on the target side.
Default value: CREATE_SCHEMA_WHEN_NOT_EXIST
Option values:
RECREATE_SCHEMA: Will create when the table does not exist, delete and rebuild when the table existsCREATE_SCHEMA_WHEN_NOT_EXIST: Will create when the table does not exist, skip when the table existsERROR_WHEN_SCHEMA_NOT_EXIST: Error will be reported when the table does not existIGNORE: Ignore the treatment of the tableWe use templates to automatically create Hive tables, which will create corresponding table creation statements based on the type of upstream data and schema type, and the default template can be modified according to the situation. Available template variables: ${database}, ${table}, ${rowtype_fields}, ${rowtype_partition_fields}, ${table_location}.
Default value: When not specified, uses a default PARQUET non-partitioned table template:
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
${rowtype_fields}
)
STORED AS PARQUET
LOCATION '${table_location}'
Sink plugin common parameters, please refer to Sink Common Options for details
Hive {
table_name = "default.seatunnel_orc"
metastore_uri = "thrift://namenode001:9083"
}
Metastore URI failover example (multiple URIs):
Hive {
table_name = "default.seatunnel_orc"
metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
}
We have a source table like this:
create table test_hive_source(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE,
test_array ARRAY<INT>,
test_map MAP<STRING, FLOAT>,
test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);
We need read data from the source table and write to another table:
create table test_hive_sink_text_simple(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);
The job config file can like this:
env {
parallelism = 3
job.name="test_hive_source_to_hive"
}
source {
Hive {
table_name = "test_hive.test_hive_source"
metastore_uri = "thrift://ctyun7:9083"
}
}
sink {
# choose stdout output plugin to output data to console
Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
hive.hadoop.conf = {
bucket = "s3a://mybucket"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}
sink {
Hive {
table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
metastore_uri = "thrift://metastore:9083"
hive_site_path = "/tmp/hive-site.xml"
kerberos_principal = "hive/[email protected]"
kerberos_keytab_path = "/tmp/hive.keytab"
krb5_path = "/tmp/krb5.conf"
}
}
Description:
hive_site_path: The path to the hive-site.xml file.kerberos_principal: The principal for Kerberos authentication.kerberos_keytab_path: The keytab file path for Kerberos authentication.krb5_path: The path to the krb5.conf file used for Kerberos authentication.Run the case:
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}
sink {
Hive {
table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
metastore_uri = "thrift://metastore:9083"
hive_site_path = "/tmp/hive-site.xml"
kerberos_principal = "hive/[email protected]"
kerberos_keytab_path = "/tmp/hive.keytab"
krb5_path = "/tmp/krb5.conf"
}
}
Create the lib dir for hive of emr.
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
Get the jars from maven center to the lib.
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
Copy the jars from your environment on emr to the lib dir.
cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
Run the case.
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}
sink {
Hive {
table_name = "test_hive.test_hive_sink_on_s3"
metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
}
}
}
Create the lib dir for hive of emr.
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
Get the jars from maven center to the lib.
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
Copy the jars from your environment on emr to the lib dir and delete the conflicting jar.
cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
Run the case.
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}
sink {
Hive {
table_name = "test_hive.test_hive_sink_on_oss"
metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
hive.hadoop.conf = {
bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
}
}
}
We have multiple source table like this:
create table test_1(
)
PARTITIONED BY (xx);
create table test_2(
)
PARTITIONED BY (xx);
...
We need read data from these source tables and write to another tables:
The job config file can like this:
env {
# You can set flink configuration here
parallelism = 3
job.name="test_hive_source_to_hive"
}
source {
Hive {
tables_configs = [
{
table_name = "test_hive.test_1"
metastore_uri = "thrift://ctyun6:9083"
},
{
table_name = "test_hive.test_2"
metastore_uri = "thrift://ctyun7:9083"
}
]
}
}
sink {
# choose stdout output plugin to output data to console
Hive {
table_name = "${database_name}.${table_name}"
metastore_uri = "thrift://ctyun7:9083"
}
}
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
id = bigint
name = string
department = string
salary = decimal(10,2)
hire_date = date
}
}
rows = [
{
kind = INSERT
fields = [1, "John Doe", "Engineering", 75000.50, "2022-01-15"]
}
]
}
}
sink {
Hive {
table_name = "warehouse.employees"
metastore_uri = "thrift://metastore:9083"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
save_mode_create_template = """
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
${rowtype_fields}
)
PARTITIONED BY (
department string COMMENT 'Department partition'
)
STORED AS PARQUET
LOCATION '${table_location}'
TBLPROPERTIES (
'seatunnel.creation.mode' = 'template'
)
"""
}
}