docs/lineage/prefect.md
DataHub supports integration with Prefect, allowing you to ingest:
This integration enables you to track and monitor your Prefect workflows within DataHub, providing a comprehensive view of your data pipeline activities.
Blocks in Prefect are primitives that enable the storage of configuration and provide an interface for interacting with external systems. The prefect-datahub block uses the DataHub REST emitter to send metadata events while running Prefect flows.
Use either Prefect Cloud (recommended) or a self-hosted Prefect server.
For Prefect Cloud setup, refer to the Cloud Quickstart guide.
For self-hosted Prefect server setup, refer to the Host Prefect Server guide.
Ensure the Prefect API URL is set correctly. Verify using:
prefect profile inspect
API URL format:
https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>http://<host>:<port>/apiInstall prefect-datahub using pip:
pip install 'prefect-datahub'
Note: Requires Python 3.10+
Save your configuration to the Prefect block document store:
from prefect_datahub.datahub_emitter import DatahubEmitter
DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="PROD",
platform_instance="local_prefect"
).save("MY-DATAHUB-BLOCK")
Configuration options:
| Config | Type | Default | Description |
|---|---|---|---|
| datahub_rest_url | str | http://localhost:8080 | DataHub GMS REST URL |
| env | str | PROD | Environment for assets (see FabricType) |
| platform_instance | str | None | Platform instance for assets (see Platform Instances) |
Load and use the saved block in your Prefect workflows:
from prefect import flow, task
from prefect_datahub.dataset import Dataset
from prefect_datahub.datahub_emitter import DatahubEmitter
datahub_emitter = DatahubEmitter.load("MY-DATAHUB-BLOCK")
@task(name="Transform", description="Transform the data")
def transform(data):
data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableA")],
outputs=[Dataset("snowflake", "mydb.schema.tableC")],
)
return data
@flow(name="ETL flow", description="Extract transform load flow")
def etl():
data = transform("This is data")
datahub_emitter.emit_flow()
Note: To emit tasks, you must call emit_flow(). Otherwise, no metadata will be emitted.
| Prefect Concept | DataHub Concept |
|---|---|
| Flow | DataFlow |
| Flow Run | DataProcessInstance |
| Task | DataJob |
| Task Run | DataProcessInstance |
| Task Tag | Tag |
Check the Prefect UI's Blocks menu for the DataHub emitter.
Run a Prefect workflow and look for DataHub-related log messages:
Emitting flow to datahub...
Emitting tasks to datahub...
If the Prefect API URL is incorrect, set it manually:
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'
If you encounter a ConnectionError: HTTPConnectionPool(host='localhost', port=8080), ensure that your DataHub GMS service is running.
For more information or support, please refer to the official Prefect and DataHub documentation or reach out to their respective communities.