metadata-integration/java/acryl-spark-lineage/README.md
To integrate Spark with DataHub, we provide a lightweight Java agent that listens for Spark application and job events and pushes metadata out to DataHub in real-time. The agent listens to events such as application start/end, and SQLExecution start/end to create pipelines (i.e. DataFlow) and tasks (i.e. DataJob) in DataHub along with lineage to datasets that are being read from and written to. Read on to learn how to configure this for different Spark scenarios.
The Spark agent can be configured using a config file or while creating a Spark Session. If you are using Spark on Databricks, refer to Configuration Instructions for Databricks.
Versioning of the jar artifact will follow the semantic versioning of the main DataHub repo and release notes will be available here. Always check the Maven central repository for the latest released version.
Note: Starting from version 0.2.18, we provide separate jars for different Scala versions:
io.acryl:acryl-spark-lineage_2.12:0.2.18io.acryl:acryl-spark-lineage_2.13:0.2.18When running jobs using spark-submit, the agent needs to be configured in the config file.
#Configuring DataHub spark agent jar
spark.jars.packages io.acryl:acryl-spark-lineage_2.12:0.2.18
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
For Scala 2.13:
#Configuring DataHub spark agent jar for Scala 2.13
spark.jars.packages io.acryl:acryl-spark-lineage_2.13:0.2.18
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
spark-submit --packages io.acryl:acryl-spark-lineage_2.12:0.2.18 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
For Scala 2.13:
spark-submit --packages io.acryl:acryl-spark-lineage_2.13:0.2.18 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
Set the following spark-defaults configuration properties as it stated here
spark.jars.packages io.acryl:acryl-spark-lineage_2.12:0.2.18
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server https://your_datahub_host/gms
#If you have authentication set up then you also need to specify the Datahub access token
spark.datahub.rest.token yourtoken
When running interactive jobs from a notebook, the listener can be configured while building the Spark Session.
spark = SparkSession.builder
.master("spark://spark-master:7077")
.appName("test-application")
.config("spark.jars.packages", "io.acryl:acryl-spark-lineage_2.12:0.2.18")
.config("spark.extraListeners", "datahub.spark.DatahubSparkListener")
.config("spark.datahub.rest.server", "http://localhost:8080")
.enableHiveSupport()
.getOrCreate()
The configuration for standalone Java apps is very similar.
spark =SparkSession.
builder()
.
appName("test-application")
.
config("spark.master","spark://spark-master:7077")
.
config("spark.jars.packages","io.acryl:acryl-spark-lineage_2.12:0.2.18")
.
config("spark.extraListeners","datahub.spark.DatahubSparkListener")
.
config("spark.datahub.rest.server","http://localhost:8080")
.
enableHiveSupport()
.
getOrCreate();
The Spark agent can be configured using Databricks Cluster Spark configuration and Init script.
Databricks Secrets can be leveraged to store sensitive information like tokens.
Download datahub-spark-lineage jar
from the Maven central repository.
Create init.sh with below content
#!/bin/bash
cp /dbfs/datahub/datahub-spark-lineage*.jar /databricks/jars
Install and configure Databricks CLI.
Copy jar and init script to Databricks File System(DBFS) using Databricks CLI.
databricks fs mkdirs dbfs:/datahub
databricks fs cp --overwrite datahub-spark-lineage*.jar dbfs:/datahub
databricks fs cp --overwrite init.sh dbfs:/datahub
Open Databricks Cluster configuration page. Click the Advanced Options toggle. Click the Spark tab. Add below
configurations under Spark Config.
spark.extraListeners datahub.spark.DatahubSparkListener
spark.datahub.rest.server http://localhost:8080
spark.datahub.stage_metadata_coalescing true
spark.datahub.databricks.cluster cluster-name<any preferred cluster identifier>
Click the Init Scripts tab. Set cluster init script as dbfs:/datahub/init.sh.
Configuring DataHub authentication token
Add below config in cluster spark config.
spark.datahub.rest.token <token>
Alternatively, Databricks secrets can be used to secure token.
Create secret using Databricks CLI.
databricks secrets create-scope --scope datahub --initial-manage-principal users
databricks secrets put --scope datahub --key rest-token
databricks secrets list --scope datahub <<Edit prompted file with token value>>
Add in spark config
spark.datahub.rest.token {{secrets/datahub/rest-token}}
| Field | Required | Default | Description |
|---|---|---|---|
| spark.jars.packages | ✅ | Set with latest/required version io.acryl:acryl-spark-lineage_2.12:0.2.18 (or io.acryl:acryl-spark-lineage_2.13:0.2.18 for Scala 2.13) | |
| spark.extraListeners | ✅ | datahub.spark.DatahubSparkListener | |
| spark.datahub.emitter | rest | Specify the ways to emit metadata. By default it sends to DataHub using REST emitter. Valid options are rest, kafka or file | |
| spark.datahub.rest.server | http://localhost:8080 | Datahub server url eg: http://localhost:8080 | |
| spark.datahub.rest.token | Authentication token. | ||
| spark.datahub.rest.disable_ssl_verification | false | Disable SSL certificate validation. Caution: Only use this if you know what you are doing! | |
| spark.datahub.rest.disable_chunked_encoding | false | Disable Chunked Transfer Encoding. In some environment chunked encoding causes issues. With this config option it can be disabled. | |
| spark.datahub.rest.max_retries | 0 | Number of times a request retried if failed | |
| spark.datahub.rest.retry_interval | 10 | Number of seconds to wait between retries | |
| spark.datahub.file.filename | The file where metadata will be written if file emitter is set | ||
| spark.datahub.kafka.bootstrap | The Kafka bootstrap server url to use if the Kafka emitter is set | ||
| spark.datahub.kafka.schema_registry_url | The Schema registry url to use if the Kafka emitter is set | ||
| spark.datahub.kafka.schema_registry_config. | Additional config to pass in to the Schema Registry Client | ||
| spark.datahub.kafka.producer_config. | Additional config to pass in to the Kafka producer. For example: --conf "spark.datahub.kafka.producer_config.client.id=my_client_id" | ||
| spark.datahub.metadata.pipeline.platformInstance | Pipeline level platform instance | ||
| spark.datahub.metadata.dataset.platformInstance | Dataset level platform instance (useful when you need to match dataset URNs with those created by other ingestion sources) | ||
| spark.datahub.metadata.dataset.env | PROD | Supported values. In all other cases, will fall back to PROD | |
| spark.datahub.metadata.dataset.hivePlatformAlias | hive | By default, DataHub assigns Hive-like tables to the Hive platform. If you are using Glue as your Hive metastore, set this config flag to glue | |
| spark.datahub.metadata.include_scheme | true | Include scheme from the path URI (e.g. hdfs://, s3://) in the dataset URN. We recommend setting this value to false, but it is set to true for backwards compatibility with previous versions | |
| spark.datahub.metadata.remove_partition_pattern | Remove partition pattern (e.g. /partition=\d+). It changes database/table/partition=123 to database/table | ||
| spark.datahub.coalesce_jobs | true | Only one DataJob (task) will be emitted containing all input and output datasets for the Spark application | |
| spark.datahub.parent.datajob_urn | Specified dataset will be set as upstream dataset for DataJob created. Effective only when spark.datahub.coalesce_jobs is set to true | ||
| spark.datahub.metadata.dataset.materialize | false | Materialize Datasets in DataHub | |
| spark.datahub.platform.s3.path_spec_list | List of path specs per platform | ||
| spark.datahub.metadata.dataset.include_schema_metadata | false | Emit dataset schema metadata based on the Spark execution. It is recommended to get schema information from platform-specific DataHub sources as this is less reliable | |
| spark.datahub.flow_name | If set, it will be used as the DataFlow name; otherwise, it uses the Spark app name as flow_name | ||
| spark.datahub.file_partition_regexp | Strip partition part from the path if the path end matches the specified regexp. Example: year=.*/month=.*/day=.* | ||
| spark.datahub.tags | Comma-separated list of tags to attach to the DataFlow | ||
| spark.datahub.domains | Comma-separated list of domain URNs to attach to the DataFlow | ||
| spark.datahub.stage_metadata_coalescing | false | Normally metadata is coalesced and sent at the onApplicationEnd event, which is never called on Databricks or on Glue. Enable this on Databricks if you want coalesced runs. | |
| spark.datahub.patch.enabled | false | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default, it is disabled. | |
| spark.datahub.metadata.dataset.lowerCaseUrns | false | Set this to true to lowercase dataset URNs. By default, it is disabled. | |
| spark.datahub.disableSymlinkResolution | false | Set this to true if you prefer using the S3 location instead of the Hive table. By default, it is disabled. | |
| spark.datahub.s3.bucket | The name of the bucket where metadata will be written if s3 emitter is set | ||
| spark.datahub.s3.prefix | The prefix for the file where metadata will be written on s3 if s3 emitter is set | ||
| spark.datahub.s3.filename | The name of the file where metadata will be written if it is not set random filename will be used on s3 if s3 emitter is set | ||
| spark.datahub.log.mcps | true | Set this to true to log MCPS to the log. By default, it is enabled. | |
| spark.datahub.legacyLineageCleanup.enabled | false | Set this to true to remove legacy lineages from older Spark Plugin runs. This will remove those lineages from the Datasets which it adds to DataJob. By default, it is disabled. | |
| spark.datahub.captureColumnLevelLineage | true | Set this to false to disable column-level lineage capture for improved performance on large datasets. | |
| spark.datahub.capture_spark_plan | false | Set this to true to capture the Spark plan. By default, it is disabled. | |
| spark.datahub.metadata.dataset.enableEnhancedMergeIntoExtraction | false | Set this to true to enable enhanced table name extraction for Delta Lake MERGE INTO commands. This improves lineage tracking by including the target table name in the job name. By default, it is disabled. |
As of current writing, the Spark agent produces metadata related to the Spark job, tasks and lineage edges to datasets.
For Spark on Databricks,
The following custom properties in pipelines and tasks relate to the Spark UI:
For Spark on Databricks, pipeline start time is the cluster start time.
The Spark agent captures fine-grained lineage information, including column-level lineage with transformation types. When available, OpenLineage's transformation types are captured and mapped to DataHub's FinegrainedLineage TransformOption, providing detailed insights into how data transformations occur at the column level.
Supports Spark 3.x series.
This initial release has been tested with the following environments:
Testing with Databricks Standard and High-concurrency Cluster is not done yet.
Spark emits lineage between datasets. It has its own logic for generating urns. Python sources emit metadata of
datasets. To link these 2 things, urns generated by both have to match.
This section will help you to match urns to that of other ingestion sources.
By default, URNs are created using
template urn:li:dataset:(urn:li:dataPlatform:<$platform>,<platformInstance>.<name>,<env>). We can configure these 4
things to generate the desired urn.
Platform: Hdfs-based platforms supported explicitly:
Name:
By default, the name is the complete path. For Hdfs base datasets, tables can be at different levels in the path than
that of the actual file read due to various reasons like partitioning, and sharding. 'path_spec' is used to alter the
name.
{table} marker is used to specify the table level. Below are a few examples. One can specify multiple path_specs for
different paths specified in the path_spec_list. Each actual path is matched against all path_spes present in the
list. First, one to match will be used to generate urn.
path_spec Examples
spark.datahub.platform.s3.path_spec_list=s3://my-bucket/foo/{table}/year=*/month=*/day=*/*,s3://my-other-bucket/foo/{table}/year=*/month=*/day=*/*"
| Absolute path | path_spec | Urn |
|---|---|---|
| s3://my-bucket/foo/tests/bar.avro | Not provided | urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests/bar.avro,PROD) |
| s3://my-bucket/foo/tests/bar.avro | s3://my-bucket/foo/{table}/* | urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests,PROD) |
| s3://my-bucket/foo/tests/bar.avro | s3://my-bucket/foo/tests/{table} | urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/foo/tests/bar.avro,PROD) |
| gs://my-bucket/foo/tests/bar.avro | gs://my-bucket/{table}// | urn:li:dataset:(urn:li:dataPlatform:gcs,my-bucket/foo,PROD) |
| gs://my-bucket/foo/tests/bar.avro | gs://my-bucket/{table} | urn:li:dataset:(urn:li:dataPlatform:gcs,my-bucket/foo,PROD) |
| file:///my-bucket/foo/tests/bar.avro | file:///my-bucket///{table} | urn:li:dataset:(urn:li:dataPlatform:local,my-bucket/foo/tests/bar.avro,PROD) |
platform instance and env:
The default value for env is 'PROD' and the platform instance is None. env and platform instances can be set for all
datasets using configurations 'spark.datahub.metadata.dataset.env' and 'spark.datahub.metadata.dataset.platformInstace'.
If spark is processing data that belongs to a different env or platform instance, then 'path_alias' can be used to
specify path_spec specific values of these. 'path_alias' groups the 'path_spec_list', its env, and platform instance
together.
path_alias_list Example:
The below example explains the configuration of the case, where files from 2 buckets are being processed in a single spark application and files from my-bucket are supposed to have "instance1" as platform instance and "PROD" as env, and files from bucket2 should have env "DEV" in their dataset URNs.
spark.datahub.platform.s3.path_alias_list : path1,path2
spark.datahub.platform.s3.path1.env : PROD
spark.datahub.platform.s3.path1.path_spec_list: s3://my-bucket/*/*/{table}
spark.datahub.platform.s3.path1.platform_instance : instance-1
spark.datahub.platform.s3.path2.env: DEV
spark.datahub.platform.s3.path2.path_spec_list: s3://bucket2/*/{table}
When working with Delta Lake MERGE INTO commands, the default behavior creates generic job names based on the internal Spark task names. To improve lineage tracking, you can enable the enhanced table name extraction feature:
spark.datahub.metadata.dataset.enableEnhancedMergeIntoExtraction=true
When enabled, the agent will:
For example, a job named execute_merge_into_command_edge will be enhanced to execute_merge_into_command_edge.database_table_name,
making it clear which table was being modified.
On Spark context startup
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: DatahubSparkListener initialised.
YY/MM/DD HH:mm:ss INFO SparkContext: Registered listener datahub.spark.DatahubSparkListener
On application start
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application started: SparkListenerApplicationStart(AppName,Some(local-1644489736794),1644489735772,user,None,None)
YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: GMS url <rest.server>
YY/MM/DD HH:mm:ss INFO McpEmitter: REST Emitter Configuration: Token XXXXX
On pushing data to server
YY/MM/DD HH:mm:ss INFO McpEmitter: MetadataWriteResponse(success=true, responseContent={"value":"<URN>"}, underlyingResponse=HTTP/1.1 200 OK [Date: day, DD month year HH:mm:ss GMT, Content-Type: application/json, X-RestLi-Protocol-Version: 2.0.0, Content-Length: 97, Server: Jetty(9.4.46.v20220331)] [Content-Length: 97,Chunked: false])
On application end
YY/MM/DD HH:mm:ss INFO DatahubSparkListener: Application ended : AppName AppID
log4j.logger.datahub.spark=DEBUG
log4j.logger.datahub.client.rest=DEBUG
Use Java 8 to build the project. The project uses Gradle as the build tool. To build the project, run the following command:
./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:acryl-spark-lineage:shadowJar
spark.datahub.capture_spark_plan option to capture the Spark plan. By default, it is disabled.io.acryl:acryl-spark-lineage_2.12:0.2.18io.acryl:acryl-spark-lineage_2.13:0.2.18TransformOption as per the OpenLineage column lineage specificationMajor changes:
spark.datahub.legacyLineageCleanup.enabled=true. Make sure you have the latest server if you enable with patch support. (this was introduced since 0.2.17-rc5)Changes:
spark.datahub.rest.disable_chunked_encodingspark.datahub.kafka.mcp_topicspark.datahub.legacyLineageCleanup.enabledFixes:
spark.datahub.platform.<platform_name>.env and spark.datahub.platform.<platform_name>.platform_instance config parameterspark.datahub.metadata.dataset.platformInstance is set