hudi-examples/hudi-examples-dbt/README.md
hudi_examples_dbtThis dbt project transforms demonstrates hudi integration with dbt, it has a few models to demonstrate the different ways in which you can create hudi datasets using dbt.
This directory serves as a self-contained playground dbt project, useful for testing out scripts, and communicating some of the core dbt concepts.
Switch working directory and have python3 installed.
cd hudi-examples/hudi-examples-dbt
Create python virtual environment (Reference).
python3 -m venv dbt-env
source dbt-env/bin/activate
We are using thrift as the connection method (Reference).
python3 -m pip install "dbt-spark[PyHive]"
Set up a profile called spark to connect to a spark cluster via thrift server (Reference).
spark:
target: dev
outputs:
dev:
type: spark
method: thrift
schema: hudi_examples_dbt
host: localhost
port: 10000
server_side_parameters:
"spark.driver.memory": "3g"
If you have access to a data warehouse, you can use those credentials – we recommend setting your target schema to be a new schema (dbt will create the schema for you, as long as you have the right privileges). If you don't have access to an existing data warehouse, you can also setup a local postgres database and connect to it in your profile.
NOTE Using these versions
- Spark 3.2.3 (with Derby 10.14.2.0)
- Hudi 0.14.0
Start a local Derby server
export DERBY_VERSION=10.14.2.0
wget https://archive.apache.org/dist/db/derby/db-derby-$DERBY_VERSION/db-derby-$DERBY_VERSION-bin.tar.gz -P /opt/
tar -xf /opt/db-derby-$DERBY_VERSION-bin.tar.gz -C /opt/
export DERBY_HOME=/opt/db-derby-$DERBY_VERSION-bin
$DERBY_HOME/bin/startNetworkServer -h 0.0.0.0
Start a local Thrift server for Spark
export SPARK_VERSION=3.2.3
wget https://archive.apache.org/dist/spark/spark-$SPARK_VERSION/spark-$SPARK_VERSION-bin-hadoop2.7.tgz -P /opt/
tar -xf /opt/spark-$SPARK_VERSION-bin-hadoop2.7.tgz -C /opt/
export SPARK_HOME=/opt/spark-$SPARK_VERSION-bin-hadoop2.7
# install dependencies
cp $DERBY_HOME/lib/{derby,derbyclient}.jar $SPARK_HOME/jars/
wget https://repository.apache.org/content/repositories/releases/org/apache/hudi/hudi-spark3.2-bundle_2.12/0.14.0/hudi-spark3.2-bundle_2.12-0.14.0.jar -P $SPARK_HOME/jars/
# start Thrift server connecting to Derby as HMS backend
$SPARK_HOME/sbin/start-thriftserver.sh \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
--conf spark.sql.warehouse.dir=/tmp/hudi/hive/warehouse \
--hiveconf hive.metastore.warehouse.dir=/tmp/hudi/hive/warehouse \
--hiveconf hive.metastore.schema.verification=false \
--hiveconf datanucleus.schema.autoCreateAll=true \
--hiveconf javax.jdo.option.ConnectionDriverName=org.apache.derby.jdbc.ClientDriver \
--hiveconf 'javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/default;create=true'
dbt debug
Output of the above command should show this text at the end of the output:
All checks passed!
exampledbt run -m example
05:47:28 Running with dbt=1.0.0
05:47:28 Found 5 models, 10 tests, 0 snapshots, 0 analyses, 0 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
05:47:28
05:47:29 Concurrency: 1 threads (target='local')
05:47:29
05:47:29 1 of 5 START incremental model analytics.hudi_insert_table...................... [RUN]
05:47:31 1 of 5 OK created incremental model analytics.hudi_insert_table................. [OK in 2.61s]
05:47:31 2 of 5 START incremental model analytics.hudi_insert_overwrite_table............ [RUN]
05:47:34 2 of 5 OK created incremental model analytics.hudi_insert_overwrite_table....... [OK in 3.19s]
05:47:34 3 of 5 START incremental model analytics.hudi_upsert_table...................... [RUN]
05:47:37 3 of 5 OK created incremental model analytics.hudi_upsert_table................. [OK in 2.68s]
05:47:37 4 of 5 START incremental model analytics.hudi_upsert_partitioned_cow_table...... [RUN]
05:47:40 4 of 5 OK created incremental model analytics.hudi_upsert_partitioned_cow_table. [OK in 2.60s]
05:47:40 5 of 5 START incremental model analytics.hudi_upsert_partitioned_mor_table...... [RUN]
05:47:42 5 of 5 OK created incremental model analytics.hudi_upsert_partitioned_mor_table. [OK in 2.53s]
05:47:42
05:47:42 Finished running 5 incremental models in 14.70s.
05:47:42
05:47:42 Completed successfully
exampledbt test -m example
05:48:17 Running with dbt=1.0.0
05:48:17 Found 5 models, 10 tests, 0 snapshots, 0 analyses, 0 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
05:48:17
05:48:19 Concurrency: 1 threads (target='local')
05:48:19
05:48:19 1 of 10 START test not_null_hudi_insert_overwrite_table_id...................... [RUN]
05:48:19 1 of 10 PASS not_null_hudi_insert_overwrite_table_id............................ [PASS in 0.50s]
05:48:19 2 of 10 START test not_null_hudi_insert_overwrite_table_name.................... [RUN]
05:48:20 2 of 10 PASS not_null_hudi_insert_overwrite_table_name.......................... [PASS in 0.45s]
05:48:20 3 of 10 START test not_null_hudi_insert_overwrite_table_ts...................... [RUN]
05:48:20 3 of 10 PASS not_null_hudi_insert_overwrite_table_ts............................ [PASS in 0.47s]
05:48:20 4 of 10 START test not_null_hudi_insert_table_id................................ [RUN]
05:48:20 4 of 10 PASS not_null_hudi_insert_table_id...................................... [PASS in 0.44s]
05:48:20 5 of 10 START test not_null_hudi_upsert_table_id................................ [RUN]
05:48:21 5 of 10 PASS not_null_hudi_upsert_table_id...................................... [PASS in 0.38s]
05:48:21 6 of 10 START test not_null_hudi_upsert_table_name.............................. [RUN]
05:48:21 6 of 10 PASS not_null_hudi_upsert_table_name.................................... [PASS in 0.40s]
05:48:21 7 of 10 START test not_null_hudi_upsert_table_ts................................ [RUN]
05:48:22 7 of 10 PASS not_null_hudi_upsert_table_ts...................................... [PASS in 0.38s]
05:48:22 8 of 10 START test unique_hudi_insert_overwrite_table_id........................ [RUN]
05:48:23 8 of 10 PASS unique_hudi_insert_overwrite_table_id.............................. [PASS in 1.32s]
05:48:23 9 of 10 START test unique_hudi_insert_table_id.................................. [RUN]
05:48:24 9 of 10 PASS unique_hudi_insert_table_id........................................ [PASS in 1.26s]
05:48:24 10 of 10 START test unique_hudi_upsert_table_id................................. [RUN]
05:48:25 10 of 10 PASS unique_hudi_upsert_table_id....................................... [PASS in 1.29s]
05:48:26
05:48:26 Finished running 10 tests in 8.23s.
05:48:26
05:48:26 Completed successfully
05:48:26
05:48:26 Done. PASS=10 WARN=0 ERROR=0 SKIP=0 TOTAL=10
example_cdcBootstrap the raw table raw_updates and profiles.
dbt run -m example_cdc.raw_updates -m example_cdc.profiles
Launch a spark-sql shell to interact with the tables created by example_cdc.
spark-sql \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.14.0 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.warehouse.dir=/tmp/hudi/hive/warehouse \
--conf spark.hadoop.hive.metastore.warehouse.dir=/tmp/hudi/hive/warehouse \
--conf spark.hadoop.hive.metastore.schema.verification=false \
--conf spark.hadoop.datanucleus.schema.autoCreateAll=true \
--conf spark.hadoop.javax.jdo.option.ConnectionDriverName=org.apache.derby.jdbc.ClientDriver \
--conf 'spark.hadoop.javax.jdo.option.ConnectionURL=jdbc:derby://localhost:1527/default;create=true' \
--conf 'spark.hadoop.hive.cli.print.header=true'
Insert sample records.
use hudi_examples_dbt;
insert into raw_updates values ('101', 'D', UNIX_TIMESTAMP());
insert into raw_updates values ('102', 'E', UNIX_TIMESTAMP());
insert into raw_updates values ('103', 'F', UNIX_TIMESTAMP());
Process the updates and write new date to profiles.
dbt run -m example_cdc.profiles
spark-sql> refresh table profiles;
spark-sql> select _hoodie_commit_time, user_id, city, updated_at from profiles order by updated_at;
_hoodie_commit_time user_id city updated_at
20231128013722030 101 D 1701157027
20231128013722030 102 E 1701157031
20231128013722030 103 F 1701157035
Time taken: 0.219 seconds, Fetched 3 row(s)
Extract changed data from profiles to profile_changes.
dbt run -m example_cdc.profile_changes
spark-sql> refresh table profile_changes;
spark-sql> select user_id, old_city, new_city from profile_changes order by process_ts;
user_id old_city new_city
101 Nil A
102 Nil B
103 Nil C
101 A D
102 B E
103 C F
Time taken: 0.129 seconds, Fetched 6 row(s)
dbt docs generate
dbt docs serve
# then visit http://127.0.0.1:8080/#!/overview
For more information on dbt: