Back to Prefect

flows

docs/integrations/prefect-databricks/api-ref/prefect_databricks-flows.mdx

3.7.6.dev211.0 KB
Original Source

prefect_databricks.flows

Module containing flows for interacting with Databricks

Functions

jobs_runs_submit_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L63" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
jobs_runs_submit_and_wait_for_completion(databricks_credentials: DatabricksCredentials, tasks: Optional[List[RunSubmitTaskSettings]] = None, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, git_source: Optional[GitSource] = None, timeout_seconds: Optional[int] = None, idempotency_token: Optional[str] = None, access_control_list: Optional[List[AccessControlRequest]] = None, return_metadata: bool = False, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]

Flow that triggers a job run and waits for the triggered run to complete.

Args:

  • databricks_credentials: Credentials to use for authentication with Databricks.
  • tasks: A list of task specifications (RunSubmitTaskSettings) to run. Each task defines its key, the cluster it runs on, and the work to do (a notebook, JAR, Python, SQL, or dbt task).
  • run_name: An optional name for the run. Defaults to Untitled.
  • git_source: An optional remote Git repository (GitSource) containing the notebooks used by the run's notebook tasks. This functionality is in Public Preview.
  • timeout_seconds: An optional timeout, in seconds, applied to the run. The default is no timeout.
  • idempotency_token: An optional token (at most 64 characters) guaranteeing the idempotency of the request. If a run with the token already exists, the existing run's ID is returned instead of creating a new run. See the Databricks docs on job idempotency for details.
  • access_control_list: A list of permissions (AccessControlRequest) to set on the run.
  • max_wait_seconds: The maximum number of seconds to wait for the entire flow to complete.
  • poll_frequency_seconds: The number of seconds to wait between checks for run completion.
  • return_metadata: If True, return a tuple of the notebook output and the run metadata. By default, only the notebook output is returned.
  • job_submission_handler: An optional callable to intercept job submission.
  • **jobs_runs_submit_kwargs: Additional keyword arguments to pass to jobs_runs_submit.

Returns:

  • Either a dict or a tuple (depends on return_metadata) comprised of
    • task_notebook_outputs: dictionary of task keys to its corresponding notebook output; this is the only object returned by default from this method
    • jobs_runs_metadata: dictionary containing IDs of the jobs runs tasks; this is only returned if return_metadata=True.

Examples:

Submit jobs runs and wait.

python
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AutoScale,
    AwsAttributes,
    JobTaskSettings,
    NotebookTask,
    NewCluster,
)

@flow
async def jobs_runs_submit_and_wait_for_completion_flow(notebook_path, **base_parameters):
    databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")

    # specify new cluster settings
    aws_attributes = AwsAttributes(
        availability="SPOT",
        zone_id="us-west-2a",
        ebs_volume_type="GENERAL_PURPOSE_SSD",
        ebs_volume_count=3,
        ebs_volume_size=100,
    )
    auto_scale = AutoScale(min_workers=1, max_workers=2)
    new_cluster = NewCluster(
        aws_attributes=aws_attributes,
        autoscale=auto_scale,
        node_type_id="m4.large",
        spark_version="10.4.x-scala2.12",
        spark_conf={"spark.speculation": True},
    )

    # specify notebook to use and parameters to pass
    notebook_task = NotebookTask(
        notebook_path=notebook_path,
        base_parameters=base_parameters,
    )

    # compile job task settings
    job_task_settings = JobTaskSettings(
        new_cluster=new_cluster,
        notebook_task=notebook_task,
        task_key="prefect-task"
    )

    multi_task_runs = await jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job",
        tasks=[job_task_settings]
    )

    return multi_task_runs

jobs_runs_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L331" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
jobs_runs_wait_for_completion(multi_task_jobs_runs_id: int, databricks_credentials: DatabricksCredentials, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10)

Flow that triggers a job run and waits for the triggered run to complete.

Args:

  • run_name: The name of the jobs runs task.
  • multi_task_jobs_runs_id: The ID of the jobs runs task to watch.
  • databricks_credentials: Credentials to use for authentication with Databricks.
  • max_wait_seconds: Maximum number of seconds to wait for the entire flow to complete.
  • poll_frequency_seconds: Number of seconds to wait in between checks for run completion.

Returns:

  • A dict containing the jobs runs life cycle state and message.
  • A dict containing IDs of the jobs runs tasks.

Examples:

Waits for completion on jobs runs.

python
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_wait_for_completion

@flow
async def jobs_runs_wait_for_completion_flow():
    databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")
    return await jobs_runs_wait_for_completion(
        multi_task_jobs_runs_id=45429,
        databricks_credentials=databricks_credentials,
        run_name="my_run_name",
        max_wait_seconds=1800,  # 30 minutes
        poll_frequency_seconds=120,  # 2 minutes
    )

jobs_runs_submit_by_id_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L420" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

python
jobs_runs_submit_by_id_and_wait_for_completion(databricks_credentials: DatabricksCredentials, job_id: int, idempotency_token: Optional[str] = None, jar_params: Optional[List[str]] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, notebook_params: Optional[Dict] = None, python_params: Optional[List[str]] = None, spark_submit_params: Optional[List[str]] = None, python_named_params: Optional[Dict] = None, pipeline_params: Optional[str] = None, sql_params: Optional[Dict] = None, dbt_commands: Optional[List] = None, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Dict

flow that triggers an existing job and waits for its completion

Args:

  • databricks_credentials: Credentials to use for authentication with Databricks.
  • job_id: The ID of the Databricks job to run.
  • idempotency_token: An optional token (at most 64 characters) guaranteeing the idempotency of the request. If a run with the token already exists, the existing run's ID is returned instead of creating a new run. See the Databricks docs on job idempotency for details.
  • jar_params: A list of command-line parameters for Spark JAR tasks, used to invoke the main class. Cannot be combined with notebook_params, and its JSON representation cannot exceed 10,000 bytes.
  • max_wait_seconds: The maximum number of seconds to wait for the entire flow to complete.
  • poll_frequency_seconds: The number of seconds to wait between checks for run completion.
  • notebook_params: A map of key-value parameters for notebook tasks, accessible through dbutils.widgets.get. Cannot be combined with jar_params, and its JSON representation cannot exceed 10,000 bytes.
  • python_params: A list of command-line parameters for Python tasks. ASCII characters only; its JSON representation cannot exceed 10,000 bytes.
  • spark_submit_params: A list of parameters passed to the spark-submit script. ASCII characters only; its JSON representation cannot exceed 10,000 bytes.
  • python_named_params: A map of named parameters for Python wheel tasks.
  • pipeline_params: Parameters for Delta Live Tables pipeline tasks, such as whether to trigger a full refresh.
  • sql_params: A map of key-value parameters for SQL tasks. SQL alert tasks do not support custom parameters.
  • dbt_commands: A list of dbt commands to run for dbt tasks, for example ["dbt deps", "dbt seed", "dbt run"].
  • job_submission_handler: An optional callable to intercept job submission.

Raises:

  • DatabricksJobTerminated: Raised when the Databricks job run is terminated with a non-successful result state.
  • DatabricksJobSkipped: Raised when the Databricks job run is skipped.
  • DatabricksJobInternalError: Raised when the Databricks job run encounters an internal error.

Returns:

  • A dictionary containing information about the completed job run.

Examples:

python
import asyncio

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
    jobs_runs_submit_by_id_and_wait_for_completion,
)


@flow
async def submit_existing_job(block_name: str, job_id: int):
    databricks_credentials = await DatabricksCredentials.load(block_name)

    run = await jobs_runs_submit_by_id_and_wait_for_completion(
        databricks_credentials=databricks_credentials, job_id=job_id
    )

    return run


asyncio.run(submit_existing_job(block_name="db-creds", job_id=1234))

Classes

DatabricksJobTerminated <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L31" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when Databricks jobs runs submit terminates

DatabricksJobSkipped <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L35" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when Databricks jobs runs submit skips

DatabricksJobInternalError <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L39" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when Databricks jobs runs submit encounters internal error

DatabricksJobRunTimedOut <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L43" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Raised when Databricks jobs runs does not complete in the configured max wait seconds