java-sdk/adr/0004-dag-parsing.md
Proposed
Note: This ADR describes language-native DAG file parsing, which was removed from the scope of AIP-108. Per AIP-108, Java tasks are declared as
@task.stubin ordinary Python DAG files; the Java SDK and coordinator are responsible only for task execution, not for DAG parsing. This document is retained for future reference if language-native DAG parsing is revisited in a follow-up AIP (likely after AIP-85 stabilises).
Airflow's standard DAG file processor only understands Python files. To support DAGs defined in other languages (Java, Go, Rust, etc.), the pipeline would need an extension point where a language-specific processor can intercept the parsing request, delegate to an external runtime, and return a result in the same format the Airflow scheduler expects.
This ADR details the DAG parsing side of the coordinator architecture described in ADR-0001. It starts with the generic model — the abstract contracts and expected behavior that any language must implement — then walks through Java as a concrete example.
BaseCoordinatorA single abstract base class — BaseCoordinator — handles both DAG parsing and task execution. Concrete subclasses ship as standalone distributions (apache-airflow-coordinators-<lang>) under the shared airflow.sdk.coordinators namespace package; they are not Airflow providers and are not registered through provider.yaml. For DAG parsing, a subclass must implement two methods:
| Method | Signature | Responsibility |
|---|---|---|
can_handle_dag_file | (bundle_name, path) -> bool | Return True if this coordinator should handle the given file. Default returns False; subclasses add language-specific checks (e.g., "is this a JAR with a Main-Class?"). |
dag_parsing_cmd | (dag_file_path, bundle_name, bundle_path, comm_addr, logs_addr) -> list[str] | Return the full command to launch the language runtime. comm_addr and logs_addr are host:port strings the process must connect to. |
Coordinators are configured in airflow.cfg. See
ADR-0001 — Java Coordinator Configuration
for a configuration example, and ADR-0005 for the full
schema. A single instance entry covers both DAG parsing and task execution — there are no
separate registries for the two roles.
Per-host opt-in. A coordinator becomes active on a given DAG processor host only when
its module is installed there and its instance appears in [sdk] coordinators. A deployment
can run a Python-only DAG processor pool and a separate Java-capable pool by omitting the
coordinator config on the Python-only hosts. There is no requirement that every parser carry
a JDK.
_resolve_processor_target()When DagFileProcessorProcess.start() needs to parse a file:
_resolve_processor_target(path, bundle_name, bundle_path)
for entry in conf.get("sdk", "coordinators"):
coordinator_cls = import_string(entry["classpath"])
coordinator = coordinator_cls(name=entry["name"], **entry.get("kwargs", {}))
if coordinator.can_handle_dag_file(bundle_name, path):
return functools.partial(coordinator.run_dag_parsing, path=..., bundle_name=..., bundle_path=...)
return None # fall back to default Python parser
The first coordinator instance whose can_handle_dag_file() returns True wins. If none match, the default Python _parse_file_entrypoint runs. Instances are constructed lazily from [sdk] coordinators and cached for the lifetime of the host process.
A natural reviewer question is "why a custom-looking framed-msgpack protocol over 127.0.0.1:<random>, and not Unix sockets / gRPC / HTTP REST?" Two clarifications are important:
task-sdk/src/airflow/sdk/execution_time/supervisor.py and comms.py). The coordinator bridge wires the language-runtime sockets onto that same byte stream — it does not define a new wire format. Migrating it would be a separate, pan-SDK change.| Option | Why not (today) |
|---|---|
| Unix domain sockets instead of TCP loopback | Avoids the IPv6/dual-stack concern with 127.0.0.1, and matches conventions like Docker's /var/run/docker.sock. Worth revisiting once a formal IPC AIP lands; not adopted now because it would diverge from the existing Python supervisor transport, which is also TCP loopback. |
| gRPC / Protocol Buffers | Would require defining an intermediate IDL for DagFileParseRequest, StartupDetails, etc. The internal serialization that the language runtime returns (DagSerialization v3) is not expressible as a flat ProtoBuf without losing information — see "Cross-SDK serialization compatibility" below. gRPC would replace one custom-looking layer with two: ProtoBuf for transport plus a separate JSON-shaped DAG payload nested inside it. |
| HTTP REST | Adds an HTTP server in every language runtime and an HTTP client in the supervisor for a strictly local, single-peer connection. None of HTTP's value (intermediaries, caching, content negotiation) applies. The Java SDK's Supervisor.kt already does HTTP for the Execution API (Edge-worker path); the comm channel between supervisor and language runtime is intentionally lower-level. |
| Keep msgpack-over-TCP (chosen) | Reuses the existing supervisor transport unchanged; the bridge is a pure byte forwarder. New language SDKs only need a length-prefixed-msgpack codec, which exists in every target language. |
A formal AIP for the supervisor-to-runtime comm protocol is expected as a follow-up once two or more language SDKs (Java, Go) are in tree; that AIP is the natural place to revisit transport and framing.
The DagFileParsingResult payload that a language runtime returns is the Airflow internal serialized DAG format, not an SDK-defined schema. The authoritative reference is airflow-core/src/airflow/serialization/schema.json, which describes LazyDeserializedDAG (see airflow-core/src/airflow/dag_processing/processor.py and airflow-core/src/airflow/serialization/serialized_objects.py). The scheduler reads this format directly into its internal model — any divergence is a parsing failure.
Why a per-language reimplementation rather than codegen? The first attempt was to generate POJOs from schema.json (similar to how Pydantic models are generated from OpenAPI specs). That approach was abandoned because the generated types miss the wrapping/unwrapping rules that distinguish "decorated" fields (kept as {"__type", "__var"}) from "non-decorated" fields (unwrapped to the bare value), as well as the timetable/task encoding rules listed below. Wiring an extra translation layer on top of generated types added more code than implementing the serializer directly per language.
Compatibility strategy. Each language SDK ships its own serializer plus a cross-SDK validator:
test_dags.yaml defines logical fixtures.serialized_python.json via DagSerialization.serialize_dag().serialized_<lang>.json via its own serializer.compare.py does a field-by-field comparison and fails on divergence.This validator is planned to run as a CI gate (PR #65959). A complementary direction (suggested by reviewers, deferred): publish JSON schemas for the IPC envelope types themselves (DagFileParsingResult, StartupDetails, TaskInstance), which are currently undocumented because they were Python-to-Python only. That work is out of scope for the Java SDK PR but is a sensible next step once a second language SDK is in tree.
The matched coordinator's run_dag_parsing() (a concrete method on BaseCoordinator) delegates to _runtime_subprocess_entrypoint(), which handles all the TCP/process plumbing:
127.0.0.1 with random ports (comm + logs)dag_parsing_cmd() to get the commandstdin=DEVNULL (does NOT inherit fd 0)supervisor_comm via os.dup(0)_bridge() — a raw byte forwarder between fd 0 and the TCP comm socketAirflow Dag-Processor
│
▼
DagFileProcessorProcess.start(path, bundle_name, bundle_path)
│
├─ _resolve_processor_target()
│ └─ iterates instances from [sdk] coordinators (airflow.cfg)
│ └─ first can_handle_dag_file() == True wins
│
▼
WatchedSubprocess.start(target=coordinator.run_dag_parsing)
│
[fork — child process gets fd 0 as Unix domain socket to supervisor]
│
▼ (in child)
<Lang>Coordinator.run_dag_parsing(path, bundle_name, bundle_path)
│
▼
BaseCoordinator._runtime_subprocess_entrypoint(DagParsingInfo)
│
├─ 1. Create TCP comm_server + logs_server on 127.0.0.1:random
├─ 2. Create stderr socketpair
├─ 3. Call dag_parsing_cmd() → get launch command
├─ 4. Popen(cmd, stdin=DEVNULL, stderr=child_stderr)
├─ 5. Accept TCP connections from the language runtime
├─ 6. supervisor_comm = socket(fileno=os.dup(0))
└─ 7. _bridge() — raw byte forwarding until process exits
Once the bridge is running, the Airflow supervisor and the language runtime communicate directly through the bridge (raw bytes, no re-encoding):
Airflow Supervisor Bridge Language Runtime
│ │ │
├── DagFileParseRequest ──────────┼──────────────────────►│
│ [4-byte len][msgpack frame] │ raw byte forward │
│ │ │
│ │ ├── parse DAGs from
│ │ │ bundle/file
│ │ │
│◄── DagFileParsingResult ────────┼───────────────────────┤
│ [4-byte len][msgpack frame] │ raw byte forward │
│ │ │
│ │ └── exit(0)
│ │
│ └── drain remaining bytes (5s deadline)
│ close all sockets
The language runtime must produce a DagFileParsingResult that matches Python Airflow's DagSerialization format exactly. The Airflow scheduler deserializes this into its internal model — any divergence causes parsing failures.
Envelope:
{
"type": "DagFileParsingResult",
"fileloc": "<source file path>",
"serialized_dags": [
{
"data": {
"__version": 3,
"dag": { <serialized DAG> }
}
},
...
]
}
Serialized DAG structure (version 3):
| Field | Type | Required | Description |
|---|---|---|---|
dag_id | string | yes | Unique identifier |
fileloc | string | yes | Source file path (can be empty) |
relative_fileloc | string | yes | Relative source path (can be empty) |
timezone | string | yes | Always "UTC" |
timetable | {__type, __var} | yes | Schedule timetable (see below) |
tasks | list | yes | Serialized task list |
dag_dependencies | list | yes | Empty list for non-Python DAGs |
task_group | map | yes | Flat root task group |
edge_info | map | yes | Empty map |
params | list | yes | DAG-level parameters |
description | string | if set | |
start_date | float (epoch) | if set | Unwrapped from __type/__var |
end_date | float (epoch) | if set | Unwrapped from __type/__var |
tags | list | if non-empty | Unwrapped from __type/__var |
catchup | bool | if true | |
max_active_tasks | int | if non-default | |
max_active_runs | int | if non-default |
Timetable encoding:
| Schedule | __type | __var |
|---|---|---|
null | airflow.timetables.simple.NullTimetable | {} |
@once | airflow.timetables.simple.OnceTimetable | {} |
@continuous | airflow.timetables.simple.ContinuousTimetable | {} |
| cron expr | airflow.timetables.trigger.CronTriggerTimetable | {expression, timezone, interval, run_immediately} |
Task encoding:
{
"__type": "operator",
"__var": {
"task_id": "<id>",
"task_type": "<class simple name>",
"_task_module": "<fully qualified package>",
"downstream_task_ids": ["<sorted dependent ids>"] // only if non-empty
}
}
Value type encoding (for complex fields):
| Type | Encoding |
|---|---|
| datetime | {"__type": "datetime", "__var": <epoch_seconds_float>} |
| timedelta | {"__type": "timedelta", "__var": <total_seconds_float>} |
| dict | {"__type": "dict", "__var": {k: serialize(v), ...}} |
| set | {"__type": "set", "__var": [sorted_items]} |
| list | [serialize(item), ...] (no wrapper) |
| primitives | pass through unchanged |
Non-decorated vs decorated fields: Some fields (like start_date, end_date, tags) are "non-decorated" — they are serialized with __type/__var wrapping but then unwrapped to just the __var value before inclusion in the DAG dict. Other fields (like default_args, access_control) are "decorated" — they keep the __type/__var wrapper. This matches Python's serialize_to_json behavior.
For DAG parsing, a new language provider needs:
A BaseCoordinator subclass with:
can_handle_dag_file() — language-specific file detection (e.g., "is this a JAR?", "is this a .go file?")dag_parsing_cmd() — returns the command to launch the runtimeA runtime process that:
--comm=host:port and --logs=host:port CLI argumentsDagFileParseRequest msgpack frame from the comm channelDagFileParsingResult msgpack frameRegistration as an entry in [sdk] coordinators in airflow.cfg, pointing classpath at the importable subclass under airflow.sdk.coordinators.<lang>
JavaCoordinator:
The Java SDK implements all DAG-parsing contracts in a single BaseCoordinator subclass shipped as apache-airflow-coordinators-java:
# Distribution: apache-airflow-coordinators-java
# Module: airflow.sdk.coordinators.java.coordinator
class JavaCoordinator(BaseCoordinator):
def __init__(self, *, name, java_executable="java", jvm_args=None, jdk_home=None):
self.name = name
self.java_executable = java_executable
self.jvm_args = list(jvm_args or [])
self.jdk_home = jdk_home
def can_handle_dag_file(self, bundle_name, path) -> bool:
# Returns True when path is a JAR with a Main-Class manifest entry
with contextlib.suppress(FileNotFoundError):
return find_main_class(Path(path)) is not None
return False
def dag_parsing_cmd(self, *, dag_file_path, bundle_name, bundle_path, comm_addr, logs_addr):
main_class = find_main_class(Path(dag_file_path))
return [
self.java_executable,
*self.jvm_args,
"-classpath",
f"{bundle_path}/*",
main_class,
f"--comm={comm_addr}",
f"--logs={logs_addr}",
]
can_handle_dag_file() checks that the file is a JAR with a Main-Class in its manifest. This ensures the coordinator only claims files it can actually handle.
The classpath is <bundle_path>/* — a wildcard that includes all JARs in the directory (the application JAR plus its dependencies). The java_executable and jvm_args come from the per-instance kwargs declared in [sdk] coordinators, so multiple instances (e.g., jdk-11, jdk-17) can launch different JVMs with different flags from the same class.
No separate JavaDagFileProcessor class is needed — BaseCoordinator consolidates file detection, DAG parsing, and task execution into a single extension point.
Java SDK Bundle Process:
The Java bundle process (Server.kt) starts, connects to both TCP servers, and enters CoordinatorComm.startProcessing(). When it receives a DagFileParseRequest:
CoordinatorComm.handleIncoming(frame)
│
├── frame.body is DagFileParseRequest
│ file: String ← the path from the request
│
▼
DagParser(request.file).parse(bundle)
│
├── Returns DagParsingResult(fileloc=file, dags=bundle.dags)
│ The DAGs were already loaded into the Bundle at startup
│ via BundleBuilder.getDags()
│
▼
sendMessage(frame.id, result)
│
├── CoordinatorComm.encode(OutgoingFrame(id, result))
│ ├── detects DagParsingResult type
│ └── calls result.serialize() ← Serde.kt
│
├── DagParsingResult.serialize()
│ ├── Wraps each DAG: {"data": {"__version": 3, "dag": dag.serialize(id)}}
│ ├── Dag.serialize() produces the full v3 format:
│ │ timetable, tasks, task_group, params, optional fields...
│ ├── Task.serialize() wraps as {"__type": "operator", "__var": {...}}
│ └── serializeValue() handles datetime/timedelta/dict/set encoding
│
├── TaskSdkFrames.encodeRequest(id, serializedMap)
│ ├── Converts map to msgpack: [id, body]
│ └── Returns byte array
│
└── Writes [4-byte length prefix][msgpack payload] to comm channel
shutDownRequested = true ← one-shot, process will exit
Java SDK BundleBuilder Interface:
Bundle authors implement BundleBuilder to define their DAGs:
public class ExampleBundleBuilder implements BundleBuilder {
@Override
public List<Dag> getDags() {
var dag = new Dag("java_example", null, "@daily");
dag.addTask("extract", Extract.class, List.of());
dag.addTask("transform", Transform.class, List.of("extract"));
dag.addTask("load", Load.class, List.of("transform"));
return List.of(dag);
}
public static void main(String[] args) {
var bundle = new ExampleBundleBuilder().build();
Server.create(args).serve(bundle);
}
}
The Dag class provides a fluent API:
dagId, description, schedule (cron or preset), startDate, endDate, and all standard Airflow DAG parametersaddTask(id, taskClass, dependsOn) — registers a task and its upstream dependenciesdependants map (parent → set of children), serialized as downstream_task_idsJava SDK Serialization Compatibility:
The serialization in Serde.kt is validated against Python's output:
# 1. Java generates serialized output
./gradlew sdk:test
# → writes validation/serialization/serialized_java.json
# 2. Python generates the same DAGs
uv run validation/serialization/serialize_python.py \
validation/serialization/test_dags.yaml \
validation/serialization/serialized_python.json
# 3. Field-by-field comparison
uv run validation/serialization/compare.py \
validation/serialization/serialized_python.json \
validation/serialization/serialized_java.json
Both share test cases defined in test_dags.yaml, ensuring the Java SDK produces byte-identical output to Python's DagSerialization.serialize_dag() for the same inputs.
BaseCoordinator subclass distributed as apache-airflow-coordinators-<lang> plus an entry in [sdk] coordinators is needed.test_dags.yaml + compare.py).