docs/content/product/apis-integrations/orchestration-api/dagster.mdx
Dagster is a popular open-source data pipeline orchestrator. Dagster Cloud is a fully managed service for Dagster.
This guide demonstrates how to setup Cube and Dagster to work together so that Dagster can push changes from upstream data sources to Cube via the Orchestration API.
In Dagster, each workflow is represented by jobs, Python functions decorated
with a @job decorator. Jobs include calls to ops, Python functions decorated
with an @op decorator. Ops represent distinct pieces of work executed within a
job. They can perform various jobs: poll for some precondition, perform
extract-load-transform (ETL), or trigger external systems like Cube.
Integration between Cube and Dagster is enabled by the
dagster_cube package.
Cube and Dagster integration package was originally contributed by Olivier Dupuis, founder of discursus.io, for which we're very grateful.
</InfoBox>The package provides the CubeResource class:
/v1/load endpoint of the
REST API./v1/pre-aggregations/jobs endpoint of the
Orchestration API.Please refer to the package documentation for details and options reference.
Install Dagster.
Create a new directory:
mkdir cube-dagster
cd cube-dagster
Install the integration package:
pip install dagster_cube
Create a new file named cube.py with the following contents:
from dagster import asset
from dagster_cube.cube_resource import CubeResource
@asset
def cube_query_workflow():
my_cube_resource = CubeResource(
instance_url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api/v1/",
api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEwMDAwMDAwMDAsImV4cCI6NTAwMDAwMDAwMH0.OHZOpOBVKr-sCwn8sbZ5UFsqI3uCs6e4omT7P6WVMFw"
)
response = my_cube_resource.make_request(
method="POST",
endpoint="load",
data={
'query': {
'measures': ['Orders.count'],
'dimensions': ['Orders.status']
}
}
)
return response
@asset
def cube_build_workflow():
my_cube_resource = CubeResource(
instance_url="https://awesome-ecom.gcp-us-central1.cubecloudapp.dev/cubejs-api/v1/",
api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjEwMDAwMDAwMDAsImV4cCI6NTAwMDAwMDAwMH0.OHZOpOBVKr-sCwn8sbZ5UFsqI3uCs6e4omT7P6WVMFw"
)
response = my_cube_resource.make_request(
method="POST",
endpoint="pre-aggregations/jobs",
data={
'action': 'post',
'selector': {
'timezones': ['UTC'],
'contexts': [{'securityContext': {}}]
}
}
)
return response
As you can see, the make_request method for the load endpoint accepts a Cube
query via the query option and the make_request method for the
pre-aggregations/jobs endpoint accepts a pre-aggregation selector via the
selector option.
Now, you can load these jobs to Dagster:
dagster dev -f cube.py
Navigate to Dagit UI at localhost:3000 and click <Btn>Materialize all</Btn> to run both jobs:
<Screenshot src="https://ucarecdn.com/948e700f-92c5-4103-ad27-4c3db1bc9e49/" />