metadata-ingestion-modules/prefect-plugin/README.md
Emit flows & tasks metadata to DataHub REST API with prefect-datahub
<a href="https://datahubspace.slack.com" alt="Slack">
</a>
The prefect-datahub collection allows you to easily integrate DataHub's metadata ingestion capabilities into your Prefect workflows. With this collection, you can emit metadata about your flows, tasks, and workspace to DataHub's metadata service.
Install prefect-datahub using pip:
pip install prefect-datahub
We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.
Before using prefect-datahub, you need to deploy an instance of DataHub. Follow the instructions on the DataHub Quickstart page to set up DataHub.
After successful deployment, the DataHub GMS service should be running on http://localhost:8080 if deployed locally.
Save your DataHub configuration as a Prefect block:
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="DEV",
platform_instance="local_prefect",
token=None, # generate auth token in the datahub and provide here if gms endpoint is secure
)
datahub_emitter.save("datahub-emitter-test")
Configuration options:
| Config | Type | Default | Description |
|---|---|---|---|
| datahub_rest_url | str | http://localhost:8080 | DataHub GMS REST URL |
| env | str | PROD | Environment for assets (see FabricType) |
| platform_instance | str | None | Platform instance for assets (see Platform Instances) |
Here's an example of how to use the DataHub Emitter in a Prefect workflow:
from prefect import flow, task
from prefect_datahub.datahub_emitter import DatahubEmitter
from prefect_datahub.entities import Dataset
datahub_emitter_block = DatahubEmitter.load("datahub-emitter-test")
@task(name="Extract", description="Extract the data")
def extract():
return "This is data"
@task(name="Transform", description="Transform the data")
def transform(data, datahub_emitter):
transformed_data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableX")],
outputs=[Dataset("snowflake", "mydb.schema.tableY")],
)
return transformed_data
@flow(name="ETL", description="Extract transform load flow")
def etl():
datahub_emitter = datahub_emitter_block
data = extract()
transformed_data = transform(data, datahub_emitter)
datahub_emitter.emit_flow()
if __name__ == "__main__":
etl()
Note: To emit task metadata, you must call emit_flow() at the end of your flow. Otherwise, no metadata will be emitted.
For more advanced usage and configuration options, please refer to the prefect-datahub documentation.
We welcome contributions to prefect-datahub! Please refer to our Contributing Guidelines for more information on how to get started.
If you encounter any issues or have questions, you can:
prefect-datahub is released under the Apache 2.0 license. See the LICENSE file for details.