docs/integrations/prefect-databricks/index.mdx
prefect-databricks lets a Prefect flow drive Databricks Jobs: authenticate with a credentials block, trigger an existing job or submit a one-time notebook/JAR run, and wait for the run to finish and collect its output. The most common workflows are:
dbc-abc12345-6789.cloud.databricks.com), without the https:// scheme.client_id / client_secret) with permission to run the jobs you target.prefect-databricksThe following installs a version of prefect-databricks compatible with your installed version of prefect. If you don't already have prefect installed, it installs the newest version as well.
pip install "prefect[databricks]"
uv add "prefect[databricks]"
Every workflow below loads a DatabricksCredentials block by name, so create one first. Construct it and call .save() to persist it to your Prefect API:
from prefect_databricks import DatabricksCredentials
# Personal access token (PAT)
DatabricksCredentials(
databricks_instance="YOUR_INSTANCE.cloud.databricks.com",
token="YOUR_TOKEN",
).save("databricks", overwrite=True)
To authenticate as a service principal (OAuth) instead of a PAT, provide client_id and client_secret (and tenant_id for Azure Databricks):
from prefect_databricks import DatabricksCredentials
DatabricksCredentials(
databricks_instance="dbc-abc12345-6789.cloud.databricks.com",
client_id="my-client-id",
client_secret="my-client-secret",
).save("databricks", overwrite=True)
You can also register the block type so it appears in the UI:
prefect block register -m prefect_databricks
This is the most common workflow: kick off a job you've already defined in Databricks (by its job_id) and block until it finishes, polling along the way. jobs_runs_submit_by_id_and_wait_for_completion is an async flow, so run it with asyncio.run.
import asyncio
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_by_id_and_wait_for_completion
async def trigger_databricks_job():
databricks_credentials = await DatabricksCredentials.load("databricks")
run = await jobs_runs_submit_by_id_and_wait_for_completion(
databricks_credentials=databricks_credentials,
job_id=11223344,
max_wait_seconds=1800, # give up after 30 minutes
poll_frequency_seconds=30,
)
return run
if __name__ == "__main__":
asyncio.run(trigger_databricks_job())
To run a notebook without first defining a job, submit a one-time run on a new cluster. jobs_runs_submit_and_wait_for_completion waits for completion and returns the notebook outputs keyed by task.
Given a notebook at /Users/[email protected]/example that reads a name widget:
name = dbutils.widgets.get("name")
print(f"Welcome to prefect-databricks, {name}!")
The flow that launches a cluster, runs the notebook, and waits for its output:
import asyncio
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
AutoScale,
JobTaskSettings,
NewCluster,
NotebookTask,
)
async def run_notebook():
databricks_credentials = await DatabricksCredentials.load("databricks")
new_cluster = NewCluster(
autoscale=AutoScale(min_workers=1, max_workers=2),
node_type_id="m4.large",
spark_version="10.4.x-scala2.12",
)
notebook_task = NotebookTask(
notebook_path="/Users/[email protected]/example",
base_parameters={"name": "Marvin"},
)
task = JobTaskSettings(
task_key="prefect-task",
new_cluster=new_cluster,
notebook_task=notebook_task,
)
notebook_outputs = await jobs_runs_submit_and_wait_for_completion(
databricks_credentials=databricks_credentials,
run_name="prefect-job",
tasks=[task],
)
return notebook_outputs
if __name__ == "__main__":
asyncio.run(run_notebook())
Instead of the typed models, you can pass equivalent JSON. For example, AutoScale(min_workers=1, max_workers=2) is the same as {"min_workers": 1, "max_workers": 2}.
For finer-grained control, prefect_databricks.jobs wraps individual Databricks Jobs REST endpoints as async tasks (jobs_list, jobs_get, jobs_runs_get, and more). Call them from within a flow:
import asyncio
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.jobs import jobs_list
@flow
async def list_databricks_jobs():
databricks_credentials = await DatabricksCredentials.load("databricks")
return await jobs_list(databricks_credentials, limit=5)
if __name__ == "__main__":
asyncio.run(list_databricks_jobs())
For assistance using Databricks, consult the Databricks documentation.
Refer to the prefect-databricks SDK reference for the full list of credentials options, flows, and job tasks.