docs/content/product/apis-integrations/orchestration-api/airflow.mdx
Apache Airflow is a popular open-source workflow scheduler commonly used for data orchestration. Astro is a fully managed service for Airflow by Astronomer.
This guide demonstrates how to setup Cube and Airflow to work together so that Airflow can push changes from upstream data sources to Cube via the Orchestration API.
In Airflow, pipelines are represented by directed acyclic graphs (DAGs), Python
function decorated with a @dag decorator. DAGs include calls to tasks,
implemented as instances of the Operator class. Operators can perform various
tasks: poll for some precondition, perform extract-load-transform (ETL), or
trigger external systems like Cube.
Integration between Cube and Airflow is enabled by the
airflow-provider-cube package that provides
the following operators.
CubeQueryOperator is used to query Cube via the
/v1/load endpoint of the REST API.
It supports the following options:
| Option | Type | Default | Description |
|---|---|---|---|
cube_conn_id | string | cube_default | Airflow connection name. |
headers | dict | HTTP headers to be added to the request. | |
query | dict | Cube query object. | |
timeout | int | 30 | Response wait timeout in seconds. |
wait | int | 10 | Interval between API calls in seconds. |
CubeBuildOperator is used to trigger pre-aggregation builds and check their
status via the /v1/pre-aggregations/jobs endpoint of
the Orchestration API.
It supports the following options:
| Option | Type | Default | Description |
|---|---|---|---|
cube_conn_id | string | cube_default | Airflow connection name. |
headers | dict | HTTP headers to be added to the request. | |
selector | dict | /v1/pre-aggregations/jobs selector. | |
complete | bool | False | Whether a task should wait for builds to complete or not. |
wait | int | 10 | Interval between API calls in seconds. |
Install Astro CLI installed.
Create a new directory and initialize a new Astro project:
mkdir cube-astro
cd cube-astro
astro dev init
Add the integration package to requirements.txt:
echo "airflow-provider-cube" >> ./requirements.txt
Create an Airflow connection via the web console or by adding the following
contents to the airflow_settings.yaml file:
airflow:
connections:
- conn_id: cube_default
conn_type: generic
conn_host: https://awesome-ecom.gcp-us-central1.cubecloudapp.dev
conn_schema:
conn_login:
conn_password: SECRET
conn_port:
conn_extra:
security_context: {}
Let's break the options down:
cube_default as an Airflow connection name.generic type.conn_host should be set to the URL of your Cube deployment.conn_password should be set to the value of the <EnvVar>CUBEJS_API_SECRET</EnvVar>
environment variable.conn_extra should contain a security context (as security_context) that
will be sent with API requests.Create a new DAG named cube_query.py in the dags subdirectory with the
following contents. As you can see, the CubeQueryOperator accepts a Cube query
via the query option.
from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeQueryOperator
@dag(
start_date=datetime(2023, 6, 1),
schedule='*/1 * * * *',
max_active_runs=1,
concurrency=1,
default_args={"retries": 1, "cube_conn_id": "cube_default"},
tags=["cube"],
)
def cube_query_workflow():
query_op = CubeQueryOperator(
task_id="query_op",
query={
"measures": ["Orders.count"],
"dimensions": ["Orders.status"]
}
)
@task()
def print_op(data: Any):
print(f"Result: {data}")
print_op(query_op.output)
cube_query_workflow()
Create a new DAG named cube_build.py in the dags subdirectory with the
following contents. As you can see, the CubeBuildOperator accepts a
pre-aggregation selector via the selector option.
from typing import Any
from pendulum import datetime
from airflow.decorators import dag, task
from cube_provider.operators.cube import CubeBuildOperator
@dag(
start_date=datetime(2023, 6, 1),
schedule='*/1 * * * *',
max_active_runs=1,
concurrency=1,
default_args={"retries": 1, "cube_conn_id": "cube_default"},
tags=["cube"],
)
def cube_build_workflow():
build_op = CubeBuildOperator(
task_id="build_op",
selector={
"contexts": [
{"securityContext": {}}
],
"timezones": ["UTC"]
},
complete=True,
wait=10,
)
@task()
def print_op(data: Any):
print(f"Result: {data}")
print_op(build_op.output)
cube_build_workflow()
Pay attention to the complete option. When it's set to True, the operator
will wait for pre-aggregation builds to complete before allowing downstream
tasks to run.
Now, you can run these DAGs:
astro run cube_query_workflow
astro run cube_build_workflow
Alternatively, you can run Airflow and navigate to the web console at
localhost:8080 (use admin/admin to authenticate):
astro dev start