Back to Paradedb

JoinScan

pg_search/src/postgres/customscan/joinscan/README.md

0.23.310.6 KB
Original Source

JoinScan

JoinScan intercepts PostgreSQL join planning and replaces the standard executor with a DataFusion-based pipeline that operates entirely on Tantivy's columnar fast fields. The core strategy is late materialization: execute the join using only index data, apply sorting and limits, then access the PostgreSQL heap only for the final K result rows.

Physical Plan

For a typical SELECT ... FROM files JOIN documents ... ORDER BY title LIMIT K:

txt
ProjectionExec
  TantivyLookupExec                   ← materializes deferred strings for final K rows only
    SegmentedTopKExec                 ← global threshold pruning + final sort + LIMIT K
      HashJoinExec                    ← join on fast fields
        PgSearchScan (documents)      ← BM25 search
        PgSearchScan (files)          ← lazy scan, deferred columns, receives dynamic filters

SegmentedTopKExec publishes dynamic filter thresholds that are pushed down through the join to the probe-side scan, pruning rows at the scanner level. It also performs the final materialized sort and LIMIT, so TantivyLookupExec only decodes K rows (not K×segments).

How It Works

1. Activation

JoinScan fires when all conditions are met: LIMIT present, equi-join keys exist, all columns are fast fields, all tables have BM25 indexes, and at least one @@@ predicate. See create_custom_path() for the full checklist.

2. Planning

The planner hook builds a JoinCSClause — a serializable IR capturing the RelNode join tree, predicates, ORDER BY, and LIMIT. This is stored in CustomScan.custom_private and deserialized at execution time.

3. Physical Plan Construction

scan_state.rs builds a DataFusion logical plan from the JoinCSClause, then runs physical optimization:

  1. SortMergeJoinEnforcer — converts HashJoin to SortMergeJoin when inputs are pre-sorted
  2. FilterPushdown (Post) — pushes dynamic filters through the join
  3. LateMaterializationRule — injects TantivyLookupExec to defer string materialization
  4. SegmentedTopKRule — injects SegmentedTopKExec for Top K on deferred columns, removes the now-redundant SortExec(TopK), wraps blocking nodes with FilterPassthroughExec
  5. FilterPushdown (Post) — second pass — pushes SegmentedTopKExec's DynamicFilterPhysicalExpr down to the scan

4. Deferred Columns

String columns are emitted as a 3-way UnionArray (doc_address | term_ordinal | materialized) so intermediate nodes work with cheap integer ordinals instead of decoded strings. The decision to defer is made in configure_deferred_outputs().

5. Pruning Path

There are two primary pruning mechanisms for dynamic filters that are pushed down to the scan:

  1. Query-Time Pushdown (Inverted Index): Filters that are static and known at the start of the scan (such as InList predicates generated from a HashJoin build side) are intercepted during the first poll_next of the scan stream. They are converted into native Tantivy queries (e.g., TermSetQuery) and ANDed into the main search query via try_dynamic_filter_pushdown. This allows Tantivy to use its inverted index to filter documents while executing the search, providing the highest possible pruning performance. The DataFusion expressions are then rewritten to lit(true) so they are not evaluated again.

  2. Pre-Filter Pushdown (Fast Fields): For evolving thresholds, such as the global threshold from SegmentedTopKExec, the threshold is pushed down to the scan via filter pushdown. This works because SegmentedTopKExec and PgSearchScan share an Arc<DynamicFilterPhysicalExpr>. The scanner reads current() on every batch and applies the filter after the search but before Arrow column materialization. For strings, it translates literals to per-segment ordinal bounds via try_rewrite_binary and filters directly against the fetched term ordinals.

6. Execution Result

After all input is consumed, SegmentedTopKExec materializes sort column values, performs the final sort, and emits exactly K rows. TantivyLookupExec decodes deferred strings for those K rows only. JoinScanState extracts CTIDs and fetches heap tuples — the only point where the PostgreSQL heap is accessed.

Key Files

FilePurpose
mod.rsLifecycle, activation checks, parallel support
build.rsRelNode, JoinCSClause, JoinSource
scan_state.rsDataFusion plan building, optimizer registration, result streaming
planner.rsSortMergeJoinEnforcer, FilterPassthroughExec usage
planning.rsCost estimation, field validation, ORDER BY extraction
predicate.rsPostgres expression → JoinLevelExpr
translator.rsPostgres ↔ DataFusion expression mapping
explain.rsEXPLAIN output formatting

Execution-layer files under pg_search/src/scan/:

FilePurpose
segmented_topk_exec.rsSegmentedTopKExec — per-segment heaps, global heap, build_global_filter_expression
segmented_topk_rule.rsOptimizer rule, wrap_blocking_nodes
tantivy_lookup_exec.rsDictionary decode + filter passthrough
filter_passthrough_exec.rsTransparent wrapper enabling filter pushdown through blocking nodes
batch_scanner.rsScanner::next() — batch iteration, pre-filter, visibility
execution_plan.rsPgSearchScanPlan — dynamic filter integration
pre_filter.rstry_rewrite_binary, collect_filters
deferred_encode.rs3-way UnionArray construction and unpacking

GUCs

GUCDefaultEffect
paradedb.enable_join_custom_scanonMaster switch
paradedb.enable_segmented_topktrueSegmentedTopKExec injection
paradedb.enable_columnar_sorttrueEnables SortMergeJoin path