docs/integrations/prefect-databricks/api-ref/prefect_databricks-flows.mdx
prefect_databricks.flowsModule containing flows for interacting with Databricks
jobs_runs_submit_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L63" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>jobs_runs_submit_and_wait_for_completion(databricks_credentials: DatabricksCredentials, tasks: Optional[List[RunSubmitTaskSettings]] = None, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, git_source: Optional[GitSource] = None, timeout_seconds: Optional[int] = None, idempotency_token: Optional[str] = None, access_control_list: Optional[List[AccessControlRequest]] = None, return_metadata: bool = False, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]
Flow that triggers a job run and waits for the triggered run to complete.
Args:
databricks_credentials:
Credentials to use for authentication with Databricks.tasks: Tasks to run, e.g.[
{
"task_key"\: "Sessionize",
"description"\: "Extracts session data from events",
"depends_on"\: [],
"existing_cluster_id"\: "0923-164208-meows279",
"spark_jar_task"\: {
"main_class_name"\: "com.databricks.Sessionize",
"parameters"\: ["--data", "dbfs\:/path/to/data.json"],
},
"libraries"\: [{"jar"\: "dbfs\:/mnt/databricks/Sessionize.jar"}],
"timeout_seconds"\: 86400,
},
{
"task_key"\: "Orders_Ingest",
"description"\: "Ingests order data",
"depends_on"\: [],
"existing_cluster_id"\: "0923-164208-meows279",
"spark_jar_task"\: {
"main_class_name"\: "com.databricks.OrdersIngest",
"parameters"\: ["--data", "dbfs\:/path/to/order-data.json"],
},
"libraries"\: [{"jar"\: "dbfs\:/mnt/databricks/OrderIngest.jar"}],
"timeout_seconds"\: 86400,
},
{
"task_key"\: "Match",
"description"\: "Matches orders with user sessions",
"depends_on"\: [
{"task_key"\: "Orders_Ingest"},
{"task_key"\: "Sessionize"},
],
"new_cluster"\: {
"spark_version"\: "7.3.x-scala2.12",
"node_type_id"\: "i3.xlarge",
"spark_conf"\: {"spark.speculation"\: True},
"aws_attributes"\: {
"availability"\: "SPOT",
"zone_id"\: "us-west-2a",
},
"autoscale"\: {"min_workers"\: 2, "max_workers"\: 16},
},
"notebook_task"\: {
"notebook_path"\: "/Users/[email protected]/Match",
"base_parameters"\: {"name"\: "John Doe", "age"\: "35"},
},
"timeout_seconds"\: 86400,
},
]
run_name:
An optional name for the run. The default value is Untitled, e.g. A multitask job run.git_source:
This functionality is in Public Preview. An optional specification for
a remote repository containing the notebooks used by this
job's notebook tasks. Key-values:https\://github.com/databricks/databricks-cli.github.main.release-1.0.0.e0056d01.timeout_seconds:
An optional timeout applied to each run of this job. The default
behavior is to have no timeout, e.g. 86400.idempotency_token:
An optional token that can be used to guarantee the idempotency of job
run requests. If a run with the provided token already
exists, the request does not create a new run but returns
the ID of the existing run instead. If a run with the
provided token is deleted, an error is returned. If you
specify the idempotency token, upon failure you can retry
until the request succeeds. Databricks guarantees that
exactly one run is launched with that idempotency token.
This token must have at most 64 characters. For more
information, see How to ensure idempotency for
jobs,
e.g. 8f018174-4792-40d5-bcbc-3e6a527352c8.access_control_list:
List of permissions to set on the job.max_wait_seconds: Maximum number of seconds to wait for the entire flow to complete.poll_frequency_seconds: Number of seconds to wait in between checks for
run completion.return_metadata: When True, method will return a tuple of notebook output as well as
job run metadata; by default though, the method only returns notebook outputjob_submission_handler: An optional callable to intercept job submission.**jobs_runs_submit_kwargs: Additional keyword arguments to pass to jobs_runs_submit.Returns:
return_metadata) comprised ofreturn_metadata=True.Examples:
Submit jobs runs and wait.
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
AutoScale,
AwsAttributes,
JobTaskSettings,
NotebookTask,
NewCluster,
)
@flow
def jobs_runs_submit_and_wait_for_completion_flow(notebook_path, **base_parameters):
databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")
# specify new cluster settings
aws_attributes = AwsAttributes(
availability="SPOT",
zone_id="us-west-2a",
ebs_volume_type="GENERAL_PURPOSE_SSD",
ebs_volume_count=3,
ebs_volume_size=100,
)
auto_scale = AutoScale(min_workers=1, max_workers=2)
new_cluster = NewCluster(
aws_attributes=aws_attributes,
autoscale=auto_scale,
node_type_id="m4.large",
spark_version="10.4.x-scala2.12",
spark_conf={"spark.speculation": True},
)
# specify notebook to use and parameters to pass
notebook_task = NotebookTask(
notebook_path=notebook_path,
base_parameters=base_parameters,
)
# compile job task settings
job_task_settings = JobTaskSettings(
new_cluster=new_cluster,
notebook_task=notebook_task,
task_key="prefect-task"
)
multi_task_runs = jobs_runs_submit_and_wait_for_completion(
databricks_credentials=databricks_credentials,
run_name="prefect-job",
tasks=[job_task_settings]
)
return multi_task_runs
jobs_runs_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L409" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>jobs_runs_wait_for_completion(multi_task_jobs_runs_id: int, databricks_credentials: DatabricksCredentials, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10)
Flow that triggers a job run and waits for the triggered run to complete.
Args:
run_name: The name of the jobs runs task.multi_task_jobs_run_id: The ID of the jobs runs task to watch.databricks_credentials:
Credentials to use for authentication with Databricks.max_wait_seconds:
Maximum number of seconds to wait for the entire flow to complete.poll_frequency_seconds: Number of seconds to wait in between checks for
run completion.Returns:
jobs_runs_submit_by_id_and_wait_for_completion <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L498" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>jobs_runs_submit_by_id_and_wait_for_completion(databricks_credentials: DatabricksCredentials, job_id: int, idempotency_token: Optional[str] = None, jar_params: Optional[List[str]] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, notebook_params: Optional[Dict] = None, python_params: Optional[List[str]] = None, spark_submit_params: Optional[List[str]] = None, python_named_params: Optional[Dict] = None, pipeline_params: Optional[str] = None, sql_params: Optional[Dict] = None, dbt_commands: Optional[List] = None, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Dict
flow that triggers an existing job and waits for its completion
Args:
databricks_credentials: Credentials to use for authentication with Databricks.job_id: Id of the databricks job.idempotency_token:
An optional token that can be used to guarantee the idempotency of job
run requests. If a run with the provided token already
exists, the request does not create a new run but returns
the ID of the existing run instead. If a run with the
provided token is deleted, an error is returned. If you
specify the idempotency token, upon failure you can retry
until the request succeeds. Databricks guarantees that
exactly one run is launched with that idempotency token.
This token must have at most 64 characters. For more
information, see How to ensure idempotency for
jobs,
e.g. 8f018174-4792-40d5-bcbc-3e6a527352c8.jar_params:
A list of parameters for jobs with Spark JAR tasks, for example "jar_params"
: ["john doe", "35"]. The parameters are used to invoke the main function of
the main class specified in the Spark JAR task. If not specified upon run-
now, it defaults to an empty list. jar_params cannot be specified in
conjunction with notebook_params. The JSON representation of this field (for
example {"jar_params"\: ["john doe","35"]}) cannot exceed 10,000 bytes.max_wait_seconds:
Maximum number of seconds to wait for the entire flow to complete.poll_frequency_seconds: Number of seconds to wait in between checks for
run completion.notebook_params:
A map from keys to values for jobs with notebook task, for example
"notebook_params": {"name"\: "john doe", "age"\: "35"}. The map is
passed to the notebook and is accessible through the dbutils.widgets.get
function. If not specified upon run-now, the triggered run uses the job’s
base parameters. notebook_params cannot be specified in conjunction with
jar_params. Use Task parameter variables to set parameters containing
information about job runs. The JSON representation of this field
(for example {"notebook_params"\:{"name"\:"john doe","age"\:"35"}}) cannot
exceed 10,000 bytes.python_params:
A list of parameters for jobs with Python tasks, for example "python_params"
:["john doe", "35"]. The parameters are passed to Python file as command-
line parameters. If specified upon run-now, it would overwrite the
parameters specified in job setting. The JSON representation of this field
(for example {"python_params"\:["john doe","35"]}) cannot exceed 10,000 bytes
Use Task parameter variables to set parameters containing information
about job runs. These parameters accept only Latin characters (ASCII
character set). Using non-ASCII characters returns an error. Examples of
invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.spark_submit_params:
A list of parameters for jobs with spark submit task, for example
"spark_submit_params": ["--class", "org.apache.spark.examples.SparkPi"].
The parameters are passed to spark-submit script as command-line parameters.
If specified upon run-now, it would overwrite the parameters specified in
job setting. The JSON representation of this field (for example
{"python_params"\:["john doe","35"]}) cannot exceed 10,000 bytes.
Use Task parameter variables to set parameters containing information about
job runs. These parameters accept only Latin characters (ASCII character
set). Using non-ASCII characters returns an error. Examples of invalid,
non-ASCII characters are Chinese, Japanese kanjis, and emojis.python_named_params:
A map from keys to values for jobs with Python wheel task, for example
"python_named_params": {"name"\: "task", "data"\: "dbfs\:/path/to/data.json"}.pipeline_params:
If full_refresh is set to true, trigger a full refresh on the
delta live table e.g. "pipeline_params"\: {"full_refresh"\: true}
sql_params:
A map from keys to values for SQL tasks, for example "sql_params":
{"name"\: "john doe", "age"\: "35"}. The SQL alert task does not support
custom parameters.dbt_commands:
An array of commands to execute for jobs with the dbt task,
for example "dbt_commands": ["dbt deps", "dbt seed", "dbt run"]job_submission_handler: An optional callable to intercept job submissionRaises:
DatabricksJobTerminated:
Raised when the Databricks job run is terminated with a non-successful
result state.DatabricksJobSkipped: Raised when the Databricks job run is skipped.DatabricksJobInternalError:
Raised when the Databricks job run encounters an internal error.Returns:
DatabricksJobTerminated <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L31" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when Databricks jobs runs submit terminates
DatabricksJobSkipped <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L35" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when Databricks jobs runs submit skips
DatabricksJobInternalError <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L39" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when Databricks jobs runs submit encounters internal error
DatabricksJobRunTimedOut <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/integrations/prefect-databricks/prefect_databricks/flows.py#L43" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>Raised when Databricks jobs runs does not complete in the configured max wait seconds