Back to Airflow

ADR-0004: DAG Parsing — Language-Specific DAG File Processing

java-sdk/adr/0004-dag-parsing.md

3.3.0b121.7 KB
Original Source
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->

ADR-0004: DAG Parsing — Language-Specific DAG File Processing

Status

Proposed

Note: This ADR describes language-native DAG file parsing, which was removed from the scope of AIP-108. Per AIP-108, Java tasks are declared as @task.stub in ordinary Python DAG files; the Java SDK and coordinator are responsible only for task execution, not for DAG parsing. This document is retained for future reference if language-native DAG parsing is revisited in a follow-up AIP (likely after AIP-85 stabilises).

Context

Airflow's standard DAG file processor only understands Python files. To support DAGs defined in other languages (Java, Go, Rust, etc.), the pipeline would need an extension point where a language-specific processor can intercept the parsing request, delegate to an external runtime, and return a result in the same format the Airflow scheduler expects.

This ADR details the DAG parsing side of the coordinator architecture described in ADR-0001. It starts with the generic model — the abstract contracts and expected behavior that any language must implement — then walks through Java as a concrete example.

Decision

Extension Point: BaseCoordinator

A single abstract base class — BaseCoordinator — handles both DAG parsing and task execution. Concrete subclasses ship as standalone distributions (apache-airflow-coordinators-<lang>) under the shared airflow.sdk.coordinators namespace package; they are not Airflow providers and are not registered through provider.yaml. For DAG parsing, a subclass must implement two methods:

MethodSignatureResponsibility
can_handle_dag_file(bundle_name, path) -> boolReturn True if this coordinator should handle the given file. Default returns False; subclasses add language-specific checks (e.g., "is this a JAR with a Main-Class?").
dag_parsing_cmd(dag_file_path, bundle_name, bundle_path, comm_addr, logs_addr) -> list[str]Return the full command to launch the language runtime. comm_addr and logs_addr are host:port strings the process must connect to.

Registration

Coordinators are configured in airflow.cfg. See ADR-0001 — Java Coordinator Configuration for a configuration example, and ADR-0005 for the full schema. A single instance entry covers both DAG parsing and task execution — there are no separate registries for the two roles.

Per-host opt-in. A coordinator becomes active on a given DAG processor host only when its module is installed there and its instance appears in [sdk] coordinators. A deployment can run a Python-only DAG processor pool and a separate Java-capable pool by omitting the coordinator config on the Python-only hosts. There is no requirement that every parser carry a JDK.

Discovery: _resolve_processor_target()

When DagFileProcessorProcess.start() needs to parse a file:

_resolve_processor_target(path, bundle_name, bundle_path)
  for entry in conf.get("sdk", "coordinators"):
    coordinator_cls = import_string(entry["classpath"])
    coordinator = coordinator_cls(name=entry["name"], **entry.get("kwargs", {}))
    if coordinator.can_handle_dag_file(bundle_name, path):
      return functools.partial(coordinator.run_dag_parsing, path=..., bundle_name=..., bundle_path=...)
  return None  # fall back to default Python parser

The first coordinator instance whose can_handle_dag_file() returns True wins. If none match, the default Python _parse_file_entrypoint runs. Instances are constructed lazily from [sdk] coordinators and cached for the lifetime of the host process.

Transport: Why msgpack over TCP Loopback

A natural reviewer question is "why a custom-looking framed-msgpack protocol over 127.0.0.1:<random>, and not Unix sockets / gRPC / HTTP REST?" Two clarifications are important:

  1. The protocol is not new for the Java SDK. Length-prefixed msgpack frames are the existing transport between the Airflow supervisor and the Python task runner (see task-sdk/src/airflow/sdk/execution_time/supervisor.py and comms.py). The coordinator bridge wires the language-runtime sockets onto that same byte stream — it does not define a new wire format. Migrating it would be a separate, pan-SDK change.
  2. Forward-compat for IPC messages is treated as a contract, not as a transport choice. The decoder rules that all SDKs must follow are stated in ADR-0002 — IPC Forward-Compatibility Contract.

Alternatives considered

OptionWhy not (today)
Unix domain sockets instead of TCP loopbackAvoids the IPv6/dual-stack concern with 127.0.0.1, and matches conventions like Docker's /var/run/docker.sock. Worth revisiting once a formal IPC AIP lands; not adopted now because it would diverge from the existing Python supervisor transport, which is also TCP loopback.
gRPC / Protocol BuffersWould require defining an intermediate IDL for DagFileParseRequest, StartupDetails, etc. The internal serialization that the language runtime returns (DagSerialization v3) is not expressible as a flat ProtoBuf without losing information — see "Cross-SDK serialization compatibility" below. gRPC would replace one custom-looking layer with two: ProtoBuf for transport plus a separate JSON-shaped DAG payload nested inside it.
HTTP RESTAdds an HTTP server in every language runtime and an HTTP client in the supervisor for a strictly local, single-peer connection. None of HTTP's value (intermediaries, caching, content negotiation) applies. The Java SDK's Supervisor.kt already does HTTP for the Execution API (Edge-worker path); the comm channel between supervisor and language runtime is intentionally lower-level.
Keep msgpack-over-TCP (chosen)Reuses the existing supervisor transport unchanged; the bridge is a pure byte forwarder. New language SDKs only need a length-prefixed-msgpack codec, which exists in every target language.

A formal AIP for the supervisor-to-runtime comm protocol is expected as a follow-up once two or more language SDKs (Java, Go) are in tree; that AIP is the natural place to revisit transport and framing.

Cross-SDK Serialization Compatibility

The DagFileParsingResult payload that a language runtime returns is the Airflow internal serialized DAG format, not an SDK-defined schema. The authoritative reference is airflow-core/src/airflow/serialization/schema.json, which describes LazyDeserializedDAG (see airflow-core/src/airflow/dag_processing/processor.py and airflow-core/src/airflow/serialization/serialized_objects.py). The scheduler reads this format directly into its internal model — any divergence is a parsing failure.

Why a per-language reimplementation rather than codegen? The first attempt was to generate POJOs from schema.json (similar to how Pydantic models are generated from OpenAPI specs). That approach was abandoned because the generated types miss the wrapping/unwrapping rules that distinguish "decorated" fields (kept as {"__type", "__var"}) from "non-decorated" fields (unwrapped to the bare value), as well as the timetable/task encoding rules listed below. Wiring an extra translation layer on top of generated types added more code than implementing the serializer directly per language.

Compatibility strategy. Each language SDK ships its own serializer plus a cross-SDK validator:

  • A shared test_dags.yaml defines logical fixtures.
  • Python emits serialized_python.json via DagSerialization.serialize_dag().
  • Each language SDK emits serialized_<lang>.json via its own serializer.
  • compare.py does a field-by-field comparison and fails on divergence.

This validator is planned to run as a CI gate (PR #65959). A complementary direction (suggested by reviewers, deferred): publish JSON schemas for the IPC envelope types themselves (DagFileParsingResult, StartupDetails, TaskInstance), which are currently undocumented because they were Python-to-Python only. That work is out of scope for the Java SDK PR but is a sensible next step once a second language SDK is in tree.

What the Base Class Handles Automatically

The matched coordinator's run_dag_parsing() (a concrete method on BaseCoordinator) delegates to _runtime_subprocess_entrypoint(), which handles all the TCP/process plumbing:

  1. Creates two TCP servers on 127.0.0.1 with random ports (comm + logs)
  2. Creates a stderr socketpair
  3. Calls dag_parsing_cmd() to get the command
  4. Spawns the subprocess with stdin=DEVNULL (does NOT inherit fd 0)
  5. Accepts TCP connections from the subprocess
  6. Wraps fd 0 as supervisor_comm via os.dup(0)
  7. Runs _bridge() — a raw byte forwarder between fd 0 and the TCP comm socket

Expected E2E Flow

Airflow Dag-Processor
  │
  ▼
DagFileProcessorProcess.start(path, bundle_name, bundle_path)
  │
  ├─ _resolve_processor_target()
  │   └─ iterates instances from [sdk] coordinators (airflow.cfg)
  │   └─ first can_handle_dag_file() == True wins
  │
  ▼
WatchedSubprocess.start(target=coordinator.run_dag_parsing)
  │
  [fork — child process gets fd 0 as Unix domain socket to supervisor]
  │
  ▼ (in child)
<Lang>Coordinator.run_dag_parsing(path, bundle_name, bundle_path)
  │
  ▼
BaseCoordinator._runtime_subprocess_entrypoint(DagParsingInfo)
  │
  ├─ 1. Create TCP comm_server + logs_server on 127.0.0.1:random
  ├─ 2. Create stderr socketpair
  ├─ 3. Call dag_parsing_cmd() → get launch command
  ├─ 4. Popen(cmd, stdin=DEVNULL, stderr=child_stderr)
  ├─ 5. Accept TCP connections from the language runtime
  ├─ 6. supervisor_comm = socket(fileno=os.dup(0))
  └─ 7. _bridge() — raw byte forwarding until process exits

Expected Message Sequence

Once the bridge is running, the Airflow supervisor and the language runtime communicate directly through the bridge (raw bytes, no re-encoding):

Airflow Supervisor                    Bridge              Language Runtime
      │                                 │                       │
      ├── DagFileParseRequest ──────────┼──────────────────────►│
      │   [4-byte len][msgpack frame]   │  raw byte forward     │
      │                                 │                       │
      │                                 │                       ├── parse DAGs from
      │                                 │                       │   bundle/file
      │                                 │                       │
      │◄── DagFileParsingResult ────────┼───────────────────────┤
      │   [4-byte len][msgpack frame]   │  raw byte forward     │
      │                                 │                       │
      │                                 │                       └── exit(0)
      │                                 │
      │                                 └── drain remaining bytes (5s deadline)
      │                                     close all sockets

DagFileParsingResult Format

The language runtime must produce a DagFileParsingResult that matches Python Airflow's DagSerialization format exactly. The Airflow scheduler deserializes this into its internal model — any divergence causes parsing failures.

Envelope:

{
  "type": "DagFileParsingResult",
  "fileloc": "<source file path>",
  "serialized_dags": [
    {
      "data": {
        "__version": 3,
        "dag": { <serialized DAG> }
      }
    },
    ...
  ]
}

Serialized DAG structure (version 3):

FieldTypeRequiredDescription
dag_idstringyesUnique identifier
filelocstringyesSource file path (can be empty)
relative_filelocstringyesRelative source path (can be empty)
timezonestringyesAlways "UTC"
timetable{__type, __var}yesSchedule timetable (see below)
taskslistyesSerialized task list
dag_dependencieslistyesEmpty list for non-Python DAGs
task_groupmapyesFlat root task group
edge_infomapyesEmpty map
paramslistyesDAG-level parameters
descriptionstringif set
start_datefloat (epoch)if setUnwrapped from __type/__var
end_datefloat (epoch)if setUnwrapped from __type/__var
tagslistif non-emptyUnwrapped from __type/__var
catchupboolif true
max_active_tasksintif non-default
max_active_runsintif non-default

Timetable encoding:

Schedule__type__var
nullairflow.timetables.simple.NullTimetable{}
@onceairflow.timetables.simple.OnceTimetable{}
@continuousairflow.timetables.simple.ContinuousTimetable{}
cron exprairflow.timetables.trigger.CronTriggerTimetable{expression, timezone, interval, run_immediately}

Task encoding:

{
  "__type": "operator",
  "__var": {
    "task_id": "<id>",
    "task_type": "<class simple name>",
    "_task_module": "<fully qualified package>",
    "downstream_task_ids": ["<sorted dependent ids>"]  // only if non-empty
  }
}

Value type encoding (for complex fields):

TypeEncoding
datetime{"__type": "datetime", "__var": <epoch_seconds_float>}
timedelta{"__type": "timedelta", "__var": <total_seconds_float>}
dict{"__type": "dict", "__var": {k: serialize(v), ...}}
set{"__type": "set", "__var": [sorted_items]}
list[serialize(item), ...] (no wrapper)
primitivespass through unchanged

Non-decorated vs decorated fields: Some fields (like start_date, end_date, tags) are "non-decorated" — they are serialized with __type/__var wrapping but then unwrapped to just the __var value before inclusion in the DAG dict. Other fields (like default_args, access_control) are "decorated" — they keep the __type/__var wrapper. This matches Python's serialize_to_json behavior.

What a Language Provider Must Implement

For DAG parsing, a new language provider needs:

  1. A BaseCoordinator subclass with:

    • can_handle_dag_file() — language-specific file detection (e.g., "is this a JAR?", "is this a .go file?")
    • dag_parsing_cmd() — returns the command to launch the runtime
  2. A runtime process that:

    • Accepts --comm=host:port and --logs=host:port CLI arguments
    • Connects to both TCP addresses
    • Reads a DagFileParseRequest msgpack frame from the comm channel
    • Parses the DAGs from the bundle
    • Serializes the result to DagSerialization v3 format
    • Sends back a DagFileParsingResult msgpack frame
    • Exits
  3. Registration as an entry in [sdk] coordinators in airflow.cfg, pointing classpath at the importable subclass under airflow.sdk.coordinators.<lang>

Java as a Concrete Example

JavaCoordinator:

The Java SDK implements all DAG-parsing contracts in a single BaseCoordinator subclass shipped as apache-airflow-coordinators-java:

python
# Distribution: apache-airflow-coordinators-java
# Module: airflow.sdk.coordinators.java.coordinator
class JavaCoordinator(BaseCoordinator):
    def __init__(self, *, name, java_executable="java", jvm_args=None, jdk_home=None):
        self.name = name
        self.java_executable = java_executable
        self.jvm_args = list(jvm_args or [])
        self.jdk_home = jdk_home

    def can_handle_dag_file(self, bundle_name, path) -> bool:
        # Returns True when path is a JAR with a Main-Class manifest entry
        with contextlib.suppress(FileNotFoundError):
            return find_main_class(Path(path)) is not None
        return False

    def dag_parsing_cmd(self, *, dag_file_path, bundle_name, bundle_path, comm_addr, logs_addr):
        main_class = find_main_class(Path(dag_file_path))
        return [
            self.java_executable,
            *self.jvm_args,
            "-classpath",
            f"{bundle_path}/*",
            main_class,
            f"--comm={comm_addr}",
            f"--logs={logs_addr}",
        ]

can_handle_dag_file() checks that the file is a JAR with a Main-Class in its manifest. This ensures the coordinator only claims files it can actually handle.

The classpath is <bundle_path>/* — a wildcard that includes all JARs in the directory (the application JAR plus its dependencies). The java_executable and jvm_args come from the per-instance kwargs declared in [sdk] coordinators, so multiple instances (e.g., jdk-11, jdk-17) can launch different JVMs with different flags from the same class.

No separate JavaDagFileProcessor class is needed — BaseCoordinator consolidates file detection, DAG parsing, and task execution into a single extension point.

Java SDK Bundle Process:

The Java bundle process (Server.kt) starts, connects to both TCP servers, and enters CoordinatorComm.startProcessing(). When it receives a DagFileParseRequest:

CoordinatorComm.handleIncoming(frame)
  │
  ├── frame.body is DagFileParseRequest
  │     file: String  ← the path from the request
  │
  ▼
DagParser(request.file).parse(bundle)
  │
  ├── Returns DagParsingResult(fileloc=file, dags=bundle.dags)
  │   The DAGs were already loaded into the Bundle at startup
  │   via BundleBuilder.getDags()
  │
  ▼
sendMessage(frame.id, result)
  │
  ├── CoordinatorComm.encode(OutgoingFrame(id, result))
  │     ├── detects DagParsingResult type
  │     └── calls result.serialize()  ← Serde.kt
  │
  ├── DagParsingResult.serialize()
  │     ├── Wraps each DAG: {"data": {"__version": 3, "dag": dag.serialize(id)}}
  │     ├── Dag.serialize() produces the full v3 format:
  │     │     timetable, tasks, task_group, params, optional fields...
  │     ├── Task.serialize() wraps as {"__type": "operator", "__var": {...}}
  │     └── serializeValue() handles datetime/timedelta/dict/set encoding
  │
  ├── TaskSdkFrames.encodeRequest(id, serializedMap)
  │     ├── Converts map to msgpack: [id, body]
  │     └── Returns byte array
  │
  └── Writes [4-byte length prefix][msgpack payload] to comm channel

shutDownRequested = true  ← one-shot, process will exit

Java SDK BundleBuilder Interface:

Bundle authors implement BundleBuilder to define their DAGs:

java
public class ExampleBundleBuilder implements BundleBuilder {
    @Override
    public List<Dag> getDags() {
        var dag = new Dag("java_example", null, "@daily");
        dag.addTask("extract", Extract.class, List.of());
        dag.addTask("transform", Transform.class, List.of("extract"));
        dag.addTask("load", Load.class, List.of("transform"));
        return List.of(dag);
    }

    public static void main(String[] args) {
        var bundle = new ExampleBundleBuilder().build();
        Server.create(args).serve(bundle);
    }
}

The Dag class provides a fluent API:

  • dagId, description, schedule (cron or preset), startDate, endDate, and all standard Airflow DAG parameters
  • addTask(id, taskClass, dependsOn) — registers a task and its upstream dependencies
  • Dependencies are stored as a dependants map (parent → set of children), serialized as downstream_task_ids

Java SDK Serialization Compatibility:

The serialization in Serde.kt is validated against Python's output:

bash
# 1. Java generates serialized output
./gradlew sdk:test
# → writes validation/serialization/serialized_java.json

# 2. Python generates the same DAGs
uv run validation/serialization/serialize_python.py \
    validation/serialization/test_dags.yaml \
    validation/serialization/serialized_python.json

# 3. Field-by-field comparison
uv run validation/serialization/compare.py \
    validation/serialization/serialized_python.json \
    validation/serialization/serialized_java.json

Both share test cases defined in test_dags.yaml, ensuring the Java SDK produces byte-identical output to Python's DagSerialization.serialize_dag() for the same inputs.

Consequences

  • The DAG file processor can be extended to any language without modifying Airflow Core — only a BaseCoordinator subclass distributed as apache-airflow-coordinators-<lang> plus an entry in [sdk] coordinators is needed.
  • The language runtime must produce exact DagSerialization v3 JSON, requiring cross-language validation infrastructure (e.g., test_dags.yaml + compare.py).
  • The base class absorbs all TCP/process plumbing, so language providers only implement two methods for DAG parsing.
  • The subprocess bridge adds latency and a process boundary; DAG parsing for non-Python files is inherently slower than in-process Python parsing.