metadata-integration/java/spark-lineage-legacy/README.md
:::note
This is our legacy Spark Integration which is replaced by DataHub Cloud Spark Lineage
:::
To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such application start/end, and SQLExecution start/end to create pipelines (i.e. DataJob) and tasks (i.e. DataFlow) in Datahub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios.
The Spark agent can be configured using a config file or while creating a Spark Session. If you are using Spark on Databricks, refer Configuration Instructions for Databricks.
Versioning of the jar artifact will follow the semantic versioning of the main DataHub repo and release notes will be available here. Always check the Maven central repository for the latest released version.
When running jobs using spark-submit, the agent needs to be configured in the config file.
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:datahub-spark-lineage:0.8.23
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
Set the following spark-defaults configuration properties as it stated here
spark.jars.packages io.acryl:datahub-spark-lineage:0.8.23
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
spark.datahub.rest.token yourtoken
When running interactive jobs from a notebook, the listener can be configured while building the Spark Session.
spark = SparkSession.builder \
.master("spark://spark-master:7077") \
.appName("test-application") \
.config("spark.jars.packages","io.acryl:datahub-spark-lineage:0.8.23") \
.config("spark.extraListeners","datahub.spark.DatahubSparkListener") \
.config("spark.datahub.rest.server", "http://localhost:8080") \
.enableHiveSupport() \
.getOrCreate()
The configuration for standalone Java apps is very similar.
spark = SparkSession.builder()
.appName("test-application")
.config("spark.master", "spark://spark-master:7077")
.config("spark.jars.packages","io.acryl:datahub-spark-lineage:0.8.23")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
.getOrCreate();
The Spark agent can be configured using Databricks Cluster Spark configuration and Init script.
Databricks Secrets can be leveraged to store sensitive information like tokens.
Download datahub-spark-lineage jar from the Maven central repository.
Create init.sh with below content
#!/bin/bash
cp /dbfs/datahub/datahub-spark-lineage*.jar /databricks/jars
Install and configure Databricks CLI.
Copy jar and init script to Databricks File System(DBFS) using Databricks CLI.
databricks fs mkdirs dbfs:/datahub
databricks fs cp --overwrite datahub-spark-lineage*.jar dbfs:/datahub
databricks fs cp --overwrite init.sh dbfs:/datahub
Open Databricks Cluster configuration page. Click the Advanced Options toggle. Click the Spark tab. Add below configurations under Spark Config.
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
spark.datahub.databricks.cluster cluster-name<any preferred cluster identifier>
Click the Init Scripts tab. Set cluster init script as dbfs:/datahub/init.sh.
Configuring DataHub authentication token
Add below config in cluster spark config.
spark.datahub.rest.token <token>
Alternatively, Databricks secrets can be used to secure token.
Create secret using Databricks CLI.
databricks secrets create-scope --scope datahub --initial-manage-principal users
databricks secrets put --scope datahub --key rest-token
databricks secrets list --scope datahub <<Edit prompted file with token value>>
Add in spark config
spark.datahub.rest.token {{secrets/datahub/rest-token}}
| Field | Required | Default | Description |
|---|---|---|---|
| spark.jars.packages | ✅ | Set with latest/required version io.acryl:datahub-spark-lineage:0.8.23 | |
| spark.extraListeners | ✅ | datahub.spark.DatahubSparkListener | |
| spark.datahub.rest.server | ✅ | Datahub server url eg:http://localhost:8080 | |
| spark.datahub.rest.token | Authentication token. | ||
| spark.datahub.rest.disable_ssl_verification | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | |
| spark.datahub.metadata.pipeline.platformInstance | Pipeline level platform instance | ||
| spark.datahub.metadata.dataset.platformInstance | dataset level platform instance | ||
| spark.datahub.metadata.dataset.env | PROD | Supported values. In all other cases, will fallback to PROD | |
| spark.datahub.metadata.table.hive_platform_alias | hive | By default, datahub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to glue | |
| spark.datahub.metadata.include_scheme | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, it is set to true for backwards compatibility with previous versions | |
| spark.datahub.metadata.remove_partition_pattern | Remove partition pattern. (e.g. /partition=\d+) It change database/table/partition=123 to database/table | ||
| spark.datahub.coalesce_jobs | false | Only one datajob(task) will be emitted containing all input and output datasets for the spark application | |
| spark.datahub.parent.datajob_urn | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |
As of current writing, the Spark agent produces metadata related to the Spark job, tasks and lineage edges to datasets.
For Spark on Databricks,
The following custom properties in pipelines and tasks relate to the Spark UI:
For Spark on Databricks, pipeline start time is the cluster start time.
The primary version tested is Spark/Scala version 2.4.8/2_11. This library has also been tested to work with Spark versions(2.2.0 - 2.4.8) and Scala versions(2.10 - 2.12). For the Spark 3.x series, this has been tested to work with Spark 3.1.2 and 3.2.0 with Scala 2.12. Other combinations are not guaranteed to work currently. Support for other Spark versions is planned in the very near future.
This initial release has been tested with the following environments:
Testing with Databricks Standard and High-concurrency Cluster is not done yet.
Below is a list of Spark commands that are parsed currently:
Effectively, these support data sources/sinks corresponding to Hive, HDFS, JDBC, and Delta Lake.
DataFrame.persist command is supported for below LeafExecNodes:
On Spark context startup
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: DatahubSparkListener initialised.
YY/MM/DD HH:mm:ss INFO SparkContext: Registered listener datahub.spark.DatahubSparkListener
On application start
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application started: SparkListenerApplicationStart(AppName,Some(local-1644489736794),1644489735772,user,None,None)
YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: GMS url <rest.server>
YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: Token XXXXX
On pushing data to server
YY/MM/DD HH:mm:ss INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"<URN>"}, underlyingResponse=HTTP/1.1 200 OK [Date: day, DD month year HH:mm:ss GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 97, Server: Jetty(9.4.46.v20220331)] [Content-Length: 97,Chunked: false])
On application end
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application ended : AppName AppID
log4j.logger.datahub.spark=DEBUG
log4j.logger.datahub.client.rest=DEBUG