airflow-core/docs/tutorial/objectstorage.rst
.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
.. versionadded:: 2.8
Welcome to the final tutorial in our Airflow series! By now, you've built Dags with Python and the TaskFlow API, passed data with XComs, and chained tasks together into clear, reusable workflows.
In this tutorial we'll take it a step further by introducing the Object Storage API. This API makes it easier to read from and write to cloud storage -- like Amazon S3, Google Cloud Storage (GCS), or Azure Blob Storage -- without having to worry about provider-specific SDKs or low-level credentials management.
We'll walk you through a real-world use case:
Along the way, we'll highlight the new ObjectStoragePath abstraction, explain how Airflow handles cloud credentials via
connections, and show how this enables portable, cloud-agnostic pipelines.
Many data workflows depend on files -- whether it's raw CSVs, intermediate Parquet files, or model artifacts.
Traditionally, you'd need to write S3-specific or GCS-specific code for this. Now, with ObjectStoragePath, you can
write generic code that works across providers, as long as you've configured the right Airflow connection.
Let's get started!
Before diving in, make sure you have the following:
pip install duckdbpip install apache-airflow-providers-amazon[s3fs]
(You can substitute your preferred provider by changing the storage URL protocol and installing the relevant provider.)pip install pandasAt the heart of this tutorial is ObjectStoragePath, a new abstraction for handling paths on cloud object stores.
Think of it like pathlib.Path, but for buckets instead of filesystems.
.. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py :language: python :start-after: [START create_object_storage_path] :end-before: [END create_object_storage_path]
|
The URL syntax is simple: protocol://bucket/path/to/file
protocol (like s3, gs or azure) determines the backendconn_id, telling Airflow how to authenticateconn_id is omitted, Airflow will fall back to the default connection for that backendYou can also provide the conn_id as keyword argument for clarity:
.. code-block:: python
ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
This is especially handy when reusing a path defined elsewhere (like in an Asset), or when the connection isn't baked into the URL. The keyword argument always takes precedence.
.. tip:: You can safely create an ObjectStoragePath in your global Dag scope. Connections are resolved only when the
path is used, not when it's created.
Let's fetch some data and save it to the cloud.
.. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py :language: python :start-after: [START get_air_quality_data] :end-before: [END get_air_quality_data]
|
Here's what's happening:
ObjectStoragePath, we write the data directly to cloud storage as ParquetThis is a classic TaskFlow pattern. The object key changes each day, allowing us to run this daily and build a dataset over time. We return the final object path to be used in the next task.
Why this is cool: No boto3, no GCS client setup, no credentials juggling. Just simple file semantics that work across storage backends.
Now let's analyze that data using SQL with DuckDB.
.. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py :language: python :start-after: [START analyze] :end-before: [END analyze]
|
A few key things to note:
fsspec, which makes it easy to register the object storage backendpath.fs to grab the right filesystem object and register it with DuckDBNotice that the function doesn't recreate the path manually -- it gets the full path from the previous task using Xcom. This makes the task portable and decoupled from earlier logic.
Here's the full Dag that ties everything together:
.. exampleinclude:: /../src/airflow/example_dags/tutorial_objectstorage.py :language: python :start-after: [START tutorial] :end-before: [END tutorial]
|
You can trigger this Dag and view it in the Graph View in the Airflow UI. Each task logs its inputs and outputs clearly, and you can inspect returned paths in the Xcom tab.
Here are some ways to take this further:
S3KeySensor) to wait for files uploaded by external systemsSee Also
Managing Connections guide <../authoring-and-scheduling/connections>Event-Driven Scheduling framework <../authoring-and-scheduling/event-scheduling>TaskFlow API guide <../core-concepts/taskflow>