Back to Airflow

Java SDK

airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst

3.3.0b220.9 KB
Original Source

.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

.. _java-sdk:

Java SDK

|experimental|

The Java SDK lets you implement Airflow task logic in Java, Kotlin, or any other JVM language. The Dag and its scheduling remain in Python; individual tasks delegate to a JVM subprocess that is spawned by :class:~airflow.sdk.coordinators.java.JavaCoordinator for each task instance.

.. contents:: Contents :local: :depth: 2

Prerequisites

  • JRE 17 or later must be available on the Airflow worker nodes.
  • The compiled task JAR(s) and JVM dependencies must be accessible from the worker.
  • The apache-airflow-task-sdk package (installed with Airflow) provides the coordinator; no additional Python packages are needed.

Quick start

The following example shows the minimal moving parts: a Python Dag with two stub tasks, and a Java implementation of those tasks.

Python Dag (the scheduling side)


.. code-block:: python

    from airflow.sdk import dag, task


    @dag
    def sales_pipeline():
        @task.stub(queue="java")
        def extract(): ...

        @task.stub(queue="java")
        def transform(extracted): ...

        @task()
        def load(transformed):
            print(f"Loaded: {transformed}")

        load(transform(extract()))


    sales_pipeline()

Java implementation
~~~~~~~~~~~~~~~~~~~

.. code-block:: java

    import org.apache.airflow.sdk.*;

    @Builder.Dag(id = "sales_pipeline")
    public class SalesPipeline {

      @Builder.Task(id = "extract")
      public long extract(Client client) {
        var conn = client.getConnection("sales_db");
        // ... fetch data using conn.host, conn.login, conn.password ...
        return recordCount;
      }

      @Builder.Task(id = "transform")
      public long transform(
        Client client,
        @Builder.XCom(task = "extract") long recordCount
      ) {
        var threshold = (String) client.getVariable("transform_threshold");
        // ... process data ...
        return transformedCount;
      }
    }

.. note::

  See how both ``transform`` in Python and Java need to have an argument to accept upstream XCom. The
  Python one is needed to declare dependency, and the Java one is needed to actually retrieve the value.

Java entry point
~~~~~~~~~~~~~~~~

.. code-block:: java

    public class Main implements BundleBuilder {
      @Override
      public Iterable<Dag> getDags() {
        return List.of(SalesPipelineBuilder.build());  // SalesPipelineBuilder generated at compile time
      }

      public static void main(String[] args) {
        Server.create(args).serve(new Main().build());
      }
    }

Coordinator configuration
~~~~~~~~~~~~~~~~~~~~~~~~~

.. code-block:: ini

    [sdk]
    coordinators = {
      "java-jdk17": {
        "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
        "kwargs": {"jars_root": ["/opt/airflow/jars"]}
      }
    }
    queue_to_coordinator = {"java": "java-jdk17"}

See :ref:`java-sdk/coordinator-config` for the full list of accepted ``kwargs``.

Writing tasks
-------------

The Java SDK offers two APIs for implementing tasks. Both produce the same runtime behavior; the choice is a
matter of style.

.. _java-sdk/annotation-api:

Annotation-based API
~~~~~~~~~~~~~~~~~~~~

Annotate a plain Java class and let the SDK generate the boilerplate at compile time.

.. list-table::
   :header-rows: 1
   :widths: 30 70

   * - Annotation
     - Purpose
   * - ``@Builder.Dag(id = "...")``
     - Marks the class as a task container.  The ``id`` must match the ``dag_id`` in the Python Dag.
   * - ``@Builder.Task(id = "...")``
     - Marks a method as a task implementation.  The ``id`` must match the ``@task.stub`` function
       name in the Python Dag.  If ``id`` is omitted the method name is used.
   * - ``@Builder.XCom(task = "...")``
     - Injects the ``return_value`` XCom from the named upstream task as a method parameter.
       The parameter type must be compatible with the stored value (see :ref:`java-sdk/types`).

The annotation processor generates a ``<ClassName>Builder`` class that wires up the task
registry and handles XCom injection automatically.

.. code-block:: java

    @Builder.Dag(id = "my_dag")
    public class MyDag {

      @Builder.Task(id = "fetch")
      public String fetch(Client client) throws Exception {
        var conn = client.getConnection("my_api");
        // implement task logic
        return result;
      }

      @Builder.Task(id = "process")
      public long process(
        Client client,
        @Builder.XCom(task = "fetch") String fetched
      ) {
        var threshold = (String) client.getVariable("process_threshold");
        // implement task logic
        return count;
      }
    }

A task method may declare ``throws Exception``; any uncaught exception causes the task instance to be marked
as failed in Airflow (triggering retries if configured on the stub).

.. _java-sdk/interface-api:

Interface-based API
~~~~~~~~~~~~~~~~~~~

Implement the ``Task`` interface directly for full control over how tasks are registered and how XComs are
read.

.. code-block:: java

    import org.apache.airflow.sdk.*;

    public class FetchTask implements Task {
      @Override
      public void execute(Context context, Client client) throws Exception {
        var conn = client.getConnection("my_api");
        // implement task logic
        client.setXCom(result);
      }
    }

Register tasks manually in a ``BundleBuilder``:

.. code-block:: java

    public class MyBundle implements BundleBuilder {
      @Override
      public Iterable<Dag> getDags() {
        var dag = new Dag("my_dag");
        dag.addTask("fetch", FetchTask.class);
        dag.addTask("process", ProcessTask.class);
        return List.of(dag);
      }
    }

See the Java SDK's published JavaDoc for more details.

.. TODO: (AIP-108) Put a link here once we publish the JavaDoc.

.. _java-sdk/logging:

Logging
-------

Task code can emit log records through any common Java logging framework. The SDK ships optional
integration libraries that forward those records to Airflow's task log store, where they appear
alongside the standard task output in the Airflow UI.

Declare a logger as a static field on the task class, using the class's own type as the name. This
is the conventional pattern regardless of which logging framework you choose:

.. code-block:: java

    private static final System.Logger log =
        System.getLogger(SalesPipeline.class.getName());

    @Builder.Task(id = "extract")
    public long extract(Client client) {
        log.log(System.Logger.Level.INFO, "Starting extraction");
        return recordCount;
    }

The Gradle snippets below show the dependency declarations; all Airflow artifact versions are managed
by ``airflow-sdk-bom``. Maven users apply the same artifact IDs following the pattern in
:ref:`java-sdk/build/maven`.

.. _java-sdk/logging/jpl:

``System.Logger`` (Java Platform Logging)

Java 9's new logging facade java.lang.System.Logger (JEP 264), commonly abbreviated JPL, can be used by libraries without pulling in any third-party API. The airflow-sdk-jpl artifact registers an AirflowSystemLoggerFinder via ServiceLoader, which routes all System.Logger calls directly to Airflow's task log store.

.. code-block:: groovy

implementation("org.apache.airflow:airflow-sdk-jpl:${version}")

No configuration file or startup call is required. The ServiceLoader mechanism discovers the provider automatically as long as the JAR is on the classpath.

.. note::

Do not add a second ``System.LoggerFinder`` implementation alongside
``airflow-sdk-jpl``. The JVM selects one finder via ``ServiceLoader``; having
multiple providers on the classpath leads to unpredictable behaviour.

.. _java-sdk/logging/slf4j:

SLF4J 2.x


The SLF4J binding is discovered automatically via ``ServiceLoader``; no configuration file or
startup call is required.

.. code-block:: groovy

    implementation("org.apache.airflow:airflow-sdk-slf4j:${version}")

The above automatically pulls in the SLF4J API, so you don't need to add ``slf4j-api`` yourself.

.. note::

    Do not add a second SLF4J binding (such as ``logback-classic`` or ``slf4j-simple``) alongside
    ``airflow-sdk-slf4j``. SLF4J 2.x warns about multiple bindings and selects one unpredictably.

.. _java-sdk/logging/log4j2:

Log4j 2
~~~~~~~

``airflow-sdk-log4j2`` declares ``log4j-api`` as a transitive dependency, so you do not need to add the latter
separately. You must also place ``log4j-core`` on the runtime classpath to host the plugin loader that
discovers the custom ``AirflowAppender`` supplied by ``airflow-sdk-log4j2`` at startup:

.. code-block:: groovy

    implementation("org.apache.airflow:airflow-sdk-log4j2:${version}")
    runtimeOnly("org.apache.logging.log4j:log4j-core:${log4jVersion}")

Declare ``AirflowAppender`` in your ``log4j2.xml``:

.. code-block:: xml

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration>
      <Appenders>
        <AirflowAppender name="Airflow"/>
      </Appenders>
      <Loggers>
        <Root level="info">
          <AppenderRef ref="Airflow"/>
        </Root>
      </Loggers>
    </Configuration>

.. _java-sdk/logging/jul:

``java.util.logging``

Add the artifact:

.. code-block:: groovy

implementation("org.apache.airflow:airflow-sdk-jul:${version}")

and call AirflowJulHandler.install() on startup to attach the handler to the JUL root logger before any task runs:

.. code-block:: java

public static void main(String[] args) {
    AirflowJulHandler.install();
    Server.create(args).serve(new MyBundle());
}

Alternatively, declare the handler in a logging.properties file and point JUL at it with the java.util.logging.config.file system property (set via jvm_args in the coordinator configuration):

.. code-block:: properties

handlers = org.apache.airflow.sdk.jul.AirflowJulHandler

.. code-block:: ini

[sdk]
coordinators = {
  "java-jdk17": {
    "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
    "kwargs": {
      "jars_root": ["/opt/airflow/jars"],
      "jvm_args": ["-Djava.util.logging.config.file=/opt/airflow/logging.properties"]
    }
  }
}

.. _java-sdk/logging/other:

Other frameworks


Several commonly used logging APIs are covered without a dedicated Airflow artifact:

* **Logback** is itself an SLF4J binding. Replace ``logback-classic`` with ``airflow-sdk-slf4j``
  and no changes are needed in your task code.
* **Apache Commons Logging (JCL)** can be bridged to SLF4J via ``org.slf4j:jcl-over-slf4j`` or
  to Log4j 2 via ``org.apache.logging.log4j:log4j-jcl``.

.. _java-sdk/types:

XCom type mapping
-----------------

XCom values are stored as JSON in Airflow's metadata database.  The table below shows how JSON types are
represented as Java objects when read back via ``getXCom``.

.. list-table::
   :header-rows: 1
   :widths: 30 35 35

   * - Python type
     - JSON
     - Java type (from ``getXCom``)
   * - ``int``
     - number (integer)
     - ``Long`` (for values that fit; ``BigInteger`` otherwise)
   * - ``float``
     - number (decimal)
     - ``Double``
   * - ``str``
     - string
     - ``String``
   * - ``bool``
     - boolean
     - ``Boolean``
   * - ``None``
     - null
     - ``null``
   * - ``list``
     - array
     - ``List<Object>``
   * - ``dict``
     - object
     - ``Map<String, Object>``

.. _java-sdk/build:

Building and packaging
-----------------------

The Java SDK is distributed as a JAR. The sections below show how to build a bundle with Gradle or Maven.

.. _java-sdk/build/gradle:

Gradle
~~~~~~

Apply the Airflow SDK Gradle plugin in your ``build.gradle``:

.. code-block:: groovy

    plugins {
        id("org.apache.airflow.sdk") version "${version}"
    }

    dependencies {
        annotationProcessor("org.apache.airflow:airflow-sdk-processor:${version}")
        implementation("org.apache.airflow:airflow-sdk:${version}")
    }

    airflowBundle {
        mainClass = "com.example.Main"  // Point to your main class instead.
    }

Then run:

.. code-block:: bash

    ./gradlew bundle

The ``build/bundle/`` directory contains all required JAR(s). Copy or mount it into the directory pointed to
by ``jars_root`` in the coordinator configuration. :class:`~airflow.sdk.coordinators.java.JavaCoordinator`
scans ``jars_root`` recursively and builds the classpath automatically.

.. note::

  You only need the ``annotationProcessor`` entry if you use the annotation-based API. It is not needed for
  the interface-based API.

.. note::

  The plugin generates a fat JAR with the `Shadow <https://gradleup.com/shadow/>`__ plugin by default. This is
  generally a good idea since you only deploy one JAR file to avoid dependency issues between projects. If this
  does not suit you, set ``fatJar = false`` in ``airflowBundle`` to produce thin JARs instead. The rest of the
  process stays the same, but you will need to put all dependency JARs somewhere Airflow can find with
  ``jars_root``.

.. _java-sdk/build/maven:

Maven
~~~~~

Import the ``airflow-sdk-bom`` Bill of Materials so that artifact versions and the
``${airflow.supervisor.schema.version}`` property are managed in one place:

.. code-block:: xml

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.airflow</groupId>
                <artifactId>airflow-sdk-bom</artifactId>
                <version>${version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

Add the SDK as a dependency (version is managed by the BOM):

.. code-block:: xml

    <dependencies>
        <dependency>
            <groupId>org.apache.airflow</groupId>
            <artifactId>airflow-sdk</artifactId>
        </dependency>
    </dependencies>

Wire the annotation processor through ``maven-compiler-plugin`` so it stays off the runtime classpath:

.. code-block:: xml

    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <annotationProcessorPaths>
                <path>
                    <groupId>org.apache.airflow</groupId>
                    <artifactId>airflow-sdk-processor</artifactId>
                    <version>${version}</version>
                </path>
            </annotationProcessorPaths>
        </configuration>
    </plugin>

**Option 1 (recommended): fat JAR**

Use ``maven-shade-plugin`` to bundle your code and all dependencies into a single JAR. This is the
simplest deployment: one file, no dependency management at runtime.

.. code-block:: xml

    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.6.0</version>
        <executions>
            <execution>
                <phase>package</phase>
                <goals><goal>shade</goal></goals>
                <configuration>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <!-- Replace with your BundleBuilder implementation. -->
                            <mainClass>com.example.Main</mainClass>
                            <manifestEntries>
                                <!-- Resolved from the BOM; do not hard-code this value. -->
                                <Airflow-Supervisor-Schema-Version>${airflow.supervisor.schema.version}</Airflow-Supervisor-Schema-Version>
                            </manifestEntries>
                        </transformer>
                    </transformers>
                </configuration>
            </execution>
        </executions>
    </plugin>

Then run:

.. code-block:: bash

    mvn package

The fat JAR is written to ``target/<artifactId>-<version>.jar``. Copy it to the directory configured as
``jars_root`` in your coordinator.

**Option 2: thin JAR with separate dependencies**

If a fat JAR does not suit your project, use ``maven-jar-plugin`` to set ``Main-Class`` on the regular
JAR and ``maven-dependency-plugin`` to collect all runtime dependencies alongside it. Note that
``Airflow-Supervisor-Schema-Version`` does not need to be set here since Airflow reads it directly from the
``airflow-sdk`` JAR on the classpath.

.. code-block:: xml

    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <configuration>
            <archive>
                <manifestEntries>
                    <Main-Class>com.example.Main</Main-Class>
                </manifestEntries>
            </archive>
        </configuration>
    </plugin>

    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-dependency-plugin</artifactId>
        <executions>
            <execution>
                <id>copy-dependencies</id>
                <phase>package</phase>
                <goals><goal>copy-dependencies</goal></goals>
                <configuration>
                    <outputDirectory>${project.build.directory}/bundle</outputDirectory>
                    <includeScope>runtime</includeScope>
                </configuration>
            </execution>
            <execution>
                <id>copy-artifact</id>
                <phase>package</phase>
                <goals><goal>copy</goal></goals>
                <configuration>
                    <artifactItems>
                        <artifactItem>
                            <groupId>${project.groupId}</groupId>
                            <artifactId>${project.artifactId}</artifactId>
                            <version>${project.version}</version>
                            <outputDirectory>${project.build.directory}/bundle</outputDirectory>
                        </artifactItem>
                    </artifactItems>
                </configuration>
            </execution>
        </executions>
    </plugin>

Then run:

.. code-block:: bash

    mvn package

``target/bundle/`` will contain the thin JAR and all runtime dependency JARs. Point ``jars_root`` at
this directory.

.. note::

  You only need the ``annotationProcessorPaths`` entry if you use the annotation-based API.

.. note::

  Unlike the Gradle plugin, Maven has no equivalent of the ``verifyBundleMainClass`` validation step.
  A wrong ``<mainClass>`` value will not be caught until runtime.

.. _java-sdk/coordinator-config:

:class:`~airflow.sdk.coordinators.java.JavaCoordinator` configuration
-------------------------------------------------------------------------------

All ``kwargs`` in the ``coordinators`` config entry are passed to the
:class:`~airflow.sdk.coordinators.java.JavaCoordinator` constructor:

.. list-table::
   :header-rows: 1
   :widths: 30 15 55

   * - Parameter
     - Default
     - Description
   * - ``jars_root``
     - *(required)*
     - One or more directories scanned recursively for ``.jar`` files. Accepts a string,
       a path, or a list of strings/paths.
   * - ``java_executable``
     - ``"java"``
     - Path to the ``java`` binary.  Defaults to ``java`` on ``$PATH``.
   * - ``jvm_args``
     - ``[]``
     - Extra JVM arguments such as ``["-Xmx1g", "-Dsome.property=value"]``.
   * - ``main_class``
     - *(auto-detect)*
     - Explicit entry-point class. If omitted,
       :class:`~airflow.sdk.coordinators.java.JavaCoordinator` scans ``jars_root`` for a
       JAR whose manifest sets ``Main-Class``. If multiple executable JARs are found the
       result is non-deterministic; set ``main_class`` explicitly in that case.
   * - ``task_startup_timeout``
     - ``10.0``
     - Seconds to wait for the JVM subprocess to connect after launch.  Increase this if your
       JVM startup is slow (e.g. on constrained hardware or with a large classpath).

.. _java-sdk/limitations:

Limitations
-----------

* **One JVM subprocess per task instance.**  Each task instance spawns a fresh JVM. Tasks that need to share
  in-process state between instances should use XCom or an external store instead.
* **Limited support for assets, deferral, and other Airflow features.** They may be implemented in the future
  based on user feedback and demand.