x-pack/plugin/esql/AGGREGATE_PUSHDOWN_PLAN.md
Replace AggregateExec → ExternalSourceExec with LocalSourceExec on each data node.
The coordinator's FINAL AggregateExec is never touched — it merges intermediate values
from all data nodes regardless of whether each data node pushed down or scanned.
Each data node decides independently based on statistics availability:
On each data node, the local physical optimizer replaces the aggregate subtree with
a LocalSourceExec whose LocalSupplier reads file footers at execution time.
Files WITH statistics get their values from the footer. Files WITHOUT statistics
fall back to the normal scan path (the original AggregateExec → ExternalSourceExec
is kept for those splits).
FileSplit gets a @Nullable SourceStatistics statistics field.
In Phase 1, this is always null — the supplier reads footers on the data node.
In Phase 2 (union-by-name), the coordinator populates this from schema merging.
The optimizer rule checks split statistics first and skips footer reads when present.
Add @Nullable SourceStatistics statistics to FileSplit.
Phase 1: always null. Phase 2: populated by coordinator.
Update serialization to handle nullable statistics (write a boolean flag + data).
Files:
FileSplit.java — add field, constructor, accessor, serializationFileSplitProvider.java — pass null for statistics in Phase 1Replace current approach (eliminate AggregateExec, set hint on ExternalSourceExec) with the LocalSourceExec approach.
The rule:
AggregateExec(any mode) → ExternalSourceExecOutput attributes:
Files:
PushAggregatesToExternalSource.java — rewrite rule logicA LocalSupplier that reads file footers at execution time.
For each assigned split:
Merge strategy across splits:
If ALL splits yield statistics, produce a single intermediate format page:
All-or-nothing: if ANY split lacks statistics, the supplier signals failure and the optimizer keeps the original plan (normal scan for all splits). Parquet and ORC always write statistics so this fallback is rare.
The supplier needs:
Files:
FileStatsSupplier.java (new) — implements LocalSupplierRemove all hint-through-execution-chain infrastructure since the optimizer handles everything via LocalSourceExec.
Remove:
ExternalSourceExec.PushedAggregate record and related fieldsFormatReader.withPushedAggregate() default methodSourceOperatorContext.pushedAggregate field and builder methodFileSourceFactory aggregate hint plumbingAsyncExternalSourceOperatorFactory aggregate merge logicAggregatePageMerger classParquetFormatReader.createAggregateResultIterator() and createStatBlock()OrcFormatReader.createAggregateResultIterator() and createStatBlock()LocalExecutionPlanner projectedColumns aggregate checkKeep:
AggregateSpec record in SPI (used by optimizer rule and supplier)AggregatePushdownSupport interface (format capability declaration)FormatReader.aggregatePushdownSupport() (optimizer probes capability)FormatReaderRegistry.findByName() (nullable lookup for optimizer)ParquetAggregatePushdownSupport and OrcAggregatePushdownSupportMove statistics extraction logic from format readers into reusable utilities that FileStatsSupplier can call.
For Parquet: read footer → iterate row groups → extract min/max/count For ORC: open reader → get file-level statistics → extract min/max/count
These are the same operations as the current createAggregateResultIterator methods but returning SourceStatistics instead of Page.
Files:
ParquetFileStatistics.java (new) — extracts SourceStatistics from Parquet footerOrcFileStatistics.java (new) — extracts SourceStatistics from ORC file statsBuild blocks in the correct format based on AggregatorMode.
SINGLE mode output (final values): Block[0] = count (LONG) Block[1] = min_salary (LONG) Block[2] = max_age (INT)
INITIAL mode output (intermediate channels): Block[0] = count (LONG) Block[1] = seen (BOOLEAN = true) Block[2] = min_salary (LONG) Block[3] = seen (BOOLEAN = true) Block[4] = max_age (INT) Block[5] = seen (BOOLEAN = true)
The intermediate attributes come from AggregateFunction.intermediateAttributes() for each aggregate in the plan.
When union-by-name / schema merging lands, the coordinator reads ALL file metadata to merge schemas. At that point, collecting per-file statistics is free — the footer is already in memory.
FileSplitProvider: When creating splits, populate statistics field
from the metadata that was already read during schema merging.
ExternalSourceResolver: During multi-file resolution, collect SourceStatistics for each file alongside the schema.
Optimizer rule (no changes needed): The existing rule checks
split.statistics() != null first. Phase 2 pre-populates this,
so the rule takes the fast path (no file I/O on data node).
Coordinator-level pushdown (optional optimization): For SINGLE mode queries where the coordinator has all statistics, the coordinator's local optimizer can replace the entire plan with LocalSourceExec without sending anything to data nodes. This is an optimization over Phase 1 where data nodes do the work.
Phase 1 (data node footer reads):
Phase 2 (pre-populated from schema merge):
When implementing union-by-name schema merging:
The FormatReader.metadata(StorageObject) method already returns
SourceMetadata which contains Optional<SourceStatistics>.
Store this per-file alongside the schema.
When creating FileSplits in FileSplitProvider, pass the
SourceStatistics from the metadata into the split constructor.
The nullable field is already there from Phase 1.
The optimizer rule PushAggregatesToExternalSource already checks
split.statistics() != null as its first code path. When statistics
are pre-populated, this path fires and the FileStatsSupplier is
never created — no file I/O on the data node.
For coordinator-level pushdown in SINGLE mode: add a check in the rule for when ALL splits have statistics AND mode is SINGLE. In that case, build the final-values page directly in the rule (no LocalSupplier needed) and return LocalSourceExec immediately. This avoids sending splits to data nodes entirely.