docs/integrations/prefect-databricks/api-ref/prefect_databricks-flows.mdx
prefect_databricks.flowsModule containing flows for interacting with Databricks
jobs_runs_submit_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L63" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>jobs_runs_submit_and_wait_for_completion(databricks_credentials: DatabricksCredentials, tasks: Optional[List[RunSubmitTaskSettings]] = None, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, git_source: Optional[GitSource] = None, timeout_seconds: Optional[int] = None, idempotency_token: Optional[str] = None, access_control_list: Optional[List[AccessControlRequest]] = None, return_metadata: bool = False, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]
Flow that triggers a job run and waits for the triggered run to complete.
Args:
databricks_credentials:
Credentials to use for authentication with Databricks.tasks:
A list of task specifications (RunSubmitTaskSettings) to run. Each
task defines its key, the cluster it runs on, and the work to do
(a notebook, JAR, Python, SQL, or dbt task).run_name: An optional name for the run. Defaults to Untitled.git_source:
An optional remote Git repository (GitSource) containing the
notebooks used by the run's notebook tasks. This functionality is in
Public Preview.timeout_seconds:
An optional timeout, in seconds, applied to the run. The default is
no timeout.idempotency_token:
An optional token (at most 64 characters) guaranteeing the
idempotency of the request. If a run with the token already exists,
the existing run's ID is returned instead of creating a new run. See
the Databricks docs on job idempotency for details.access_control_list:
A list of permissions (AccessControlRequest) to set on the run.max_wait_seconds:
The maximum number of seconds to wait for the entire flow to
complete.poll_frequency_seconds:
The number of seconds to wait between checks for run completion.return_metadata:
If True, return a tuple of the notebook output and the run metadata.
By default, only the notebook output is returned.job_submission_handler:
An optional callable to intercept job submission.**jobs_runs_submit_kwargs:
Additional keyword arguments to pass to jobs_runs_submit.Returns:
return_metadata) comprised ofreturn_metadata=True.Examples:
Submit jobs runs and wait.
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
AutoScale,
AwsAttributes,
JobTaskSettings,
NotebookTask,
NewCluster,
)
@flow
async def jobs_runs_submit_and_wait_for_completion_flow(notebook_path, **base_parameters):
databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")
# specify new cluster settings
aws_attributes = AwsAttributes(
availability="SPOT",
zone_id="us-west-2a",
ebs_volume_type="GENERAL_PURPOSE_SSD",
ebs_volume_count=3,
ebs_volume_size=100,
)
auto_scale = AutoScale(min_workers=1, max_workers=2)
new_cluster = NewCluster(
aws_attributes=aws_attributes,
autoscale=auto_scale,
node_type_id="m4.large",
spark_version="10.4.x-scala2.12",
spark_conf={"spark.speculation": True},
)
# specify notebook to use and parameters to pass
notebook_task = NotebookTask(
notebook_path=notebook_path,
base_parameters=base_parameters,
)
# compile job task settings
job_task_settings = JobTaskSettings(
new_cluster=new_cluster,
notebook_task=notebook_task,
task_key="prefect-task"
)
multi_task_runs = await jobs_runs_submit_and_wait_for_completion(
databricks_credentials=databricks_credentials,
run_name="prefect-job",
tasks=[job_task_settings]
)
return multi_task_runs
jobs_runs_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L331" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>jobs_runs_wait_for_completion(multi_task_jobs_runs_id: int, databricks_credentials: DatabricksCredentials, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10)
Flow that triggers a job run and waits for the triggered run to complete.
Args:
run_name: The name of the jobs runs task.multi_task_jobs_runs_id: The ID of the jobs runs task to watch.databricks_credentials:
Credentials to use for authentication with Databricks.max_wait_seconds:
Maximum number of seconds to wait for the entire flow to complete.poll_frequency_seconds: Number of seconds to wait in between checks for
run completion.Returns:
Examples:
Waits for completion on jobs runs.
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_wait_for_completion
@flow
async def jobs_runs_wait_for_completion_flow():
databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")
return await jobs_runs_wait_for_completion(
multi_task_jobs_runs_id=45429,
databricks_credentials=databricks_credentials,
run_name="my_run_name",
max_wait_seconds=1800, # 30 minutes
poll_frequency_seconds=120, # 2 minutes
)
jobs_runs_submit_by_id_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L420" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>jobs_runs_submit_by_id_and_wait_for_completion(databricks_credentials: DatabricksCredentials, job_id: int, idempotency_token: Optional[str] = None, jar_params: Optional[List[str]] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, notebook_params: Optional[Dict] = None, python_params: Optional[List[str]] = None, spark_submit_params: Optional[List[str]] = None, python_named_params: Optional[Dict] = None, pipeline_params: Optional[str] = None, sql_params: Optional[Dict] = None, dbt_commands: Optional[List] = None, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Dict
flow that triggers an existing job and waits for its completion
Args:
databricks_credentials:
Credentials to use for authentication with Databricks.job_id: The ID of the Databricks job to run.idempotency_token:
An optional token (at most 64 characters) guaranteeing the
idempotency of the request. If a run with the token already exists,
the existing run's ID is returned instead of creating a new run. See
the Databricks docs on job idempotency for details.jar_params:
A list of command-line parameters for Spark JAR tasks, used to invoke
the main class. Cannot be combined with notebook_params, and its
JSON representation cannot exceed 10,000 bytes.max_wait_seconds:
The maximum number of seconds to wait for the entire flow to
complete.poll_frequency_seconds:
The number of seconds to wait between checks for run completion.notebook_params:
A map of key-value parameters for notebook tasks, accessible through
dbutils.widgets.get. Cannot be combined with jar_params, and its
JSON representation cannot exceed 10,000 bytes.python_params:
A list of command-line parameters for Python tasks. ASCII characters
only; its JSON representation cannot exceed 10,000 bytes.spark_submit_params:
A list of parameters passed to the spark-submit script. ASCII
characters only; its JSON representation cannot exceed 10,000 bytes.python_named_params:
A map of named parameters for Python wheel tasks.pipeline_params:
Parameters for Delta Live Tables pipeline tasks, such as whether to
trigger a full refresh.sql_params:
A map of key-value parameters for SQL tasks. SQL alert tasks do not
support custom parameters.dbt_commands:
A list of dbt commands to run for dbt tasks, for example
["dbt deps", "dbt seed", "dbt run"].job_submission_handler:
An optional callable to intercept job submission.Raises:
DatabricksJobTerminated:
Raised when the Databricks job run is terminated with a non-successful
result state.DatabricksJobSkipped: Raised when the Databricks job run is skipped.DatabricksJobInternalError:
Raised when the Databricks job run encounters an internal error.Returns:
Examples:
import asyncio
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
jobs_runs_submit_by_id_and_wait_for_completion,
)
@flow
async def submit_existing_job(block_name: str, job_id: int):
databricks_credentials = await DatabricksCredentials.load(block_name)
run = await jobs_runs_submit_by_id_and_wait_for_completion(
databricks_credentials=databricks_credentials, job_id=job_id
)
return run
asyncio.run(submit_existing_job(block_name="db-creds", job_id=1234))
DatabricksJobTerminated <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L31" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when Databricks jobs runs submit terminates
DatabricksJobSkipped <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L35" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when Databricks jobs runs submit skips
DatabricksJobInternalError <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L39" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when Databricks jobs runs submit encounters internal error
DatabricksJobRunTimedOut <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L43" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when Databricks jobs runs does not complete in the configured max wait seconds