docs/RFCS/20171025_interleaved_table_joins.md
We currently permit users to specify if a child table should be interleaved on-disk under its specified parent table. This feature was adopted from Google Spanner's interleaved tables and was meant to be an optimization the user could opt in for tables that are often queried in a parent-child relationship (i.e. one-to-one, one-to-many). Refer to the RFC on interleaved tables for more details.
Beyond co-locating the child's key-value (kv) pairs with the parent's kv pairs in the underlying Key-Value (KV) storage (which provides the significant optimization of 1 phase-commit (1PC) commits for free), we do not currently take advantage of this fact to optimize queries.
One type of query we can optimize for interleaved tables is joins between the
parent and children tables. In the context of DistSQL, instead of spawning two
TableReaders, one scan for the parent and one scan for the child table for a
total of two scans, and a Joiner processor, we can do one scan over the
relevant interleaved rows, then perform a join on the table rows.
This optimization has significant upside on performance, potentially lowering both the number of scans - interleaved parent and child tables are scanned simultaneously instead of in two separate scans - and the volume of inter-node gRPC traffic for the join - rows routed by the hash on their join columns cross the network to their respective join processor.
This RFC identifies two implementation phases for interleaved table joins:
We currently highlight some of the benefits for interleaved tables such as joins, yet we do not currently execute them differently for interleaved tables compared to non-interleaved tables. In fact, since we do two separate table scans for the parent and child table in an interleaved configuration, there is a performance impact to scan disjoint KV ranges as noted in the interleave table docs themselves.
To illustrate the current behavior, let us interleave table child into
table parent:
CREATE DATABASE foo;
CREATE TABLE IF NOT EXISTS parent (id INT PRIMARY KEY);
CREATE TABLE IF NOT EXISTS child (id INT, parent_id INT, PRIMARY KEY (parent_id, id)) INTERLEAVE IN PARENT parent (parent_id);
EXPLAIN SELECT * FROM child JOIN parent ON child.parent_id = parent.id;
which results in the following logical plan
+-------+------+----------------+--------------------+
| Level | Type | Field | Description |
+-------+------+----------------+--------------------+
| 0 | join | | |
| 0 | | type | inner |
| 0 | | equality | (parent_id) = (id) |
| 0 | | mergeJoinOrder | +"(parent_id=id)" |
| 1 | scan | | |
| 1 | | table | child@primary |
| 1 | | spans | ALL |
| 1 | scan | | |
| 1 | | table | parent@primary |
| 1 | | spans | ALL |
+-------+------+----------------+--------------------+
Users expecting a performance boost for parent-child joins (even simple ones like the above) are instead experiencing a performance hit.
It is worth noting how SQL rows are mapped to KV pairs, specifically how data for a given table is represented as key-value pairs on the primary index (refer to this blog post).
planNodes) in
CockroachDB.C is interleaved into table P,
then C is the child table of the parent table P. P and C have a
parent-child relationship where C is a direct "descendant" of P (and P
a direct "ancestor" of C).CREATE TABLE ... INTERLEAVE IN PARENT rule. This is concretely an
interleaved primary index (see next point).CREATE INDEX ... INTERLEAVE IN PARENT rule.(child.parent_id) or (parent.id). Note the prefix may or may not be a
proper prefix (it is possible for the child table to have no additional
primary key columns).From a CockroachDB user perspective, there is no apparent feature change. Join performance will be improved for joins between interleaved tables and their parent tables. Since we do not employ any significant query rewriting or optimization, the implementation of how we recognize interleaved table joins in the Planning phase will heavily dictate how we advertise this performance improvement.
Specifically, the first iteration will permit more efficient joins between tables with a parent-child relationship. Any table that is often joined with a parent table (e.g. via a foreign key) should be interleaved in the the parent table.
In general, tables that are consistently subject to hierarchical querying patterns (i.e. queried together via multi-way joins) can see improvements with interleaving. The current docs do a good job highlighting the benefits and tradeoffs of interleaved tables in the general case. See Drawbacks for caveats on advising users with respect to interleaved tables.
For CockroachDB contributors and developers, changes to the codebase are mapped to their respective phases. Firstly, it is important to understand how the interleaved indexes are mapped in storage. Refer to the RFC on interleaved tables.
As for how each phase of this feature affects the codebase:
InterleaveReaderJoiner processor that functionally combines two TableReaders
(corresponding to each parent and child table) and a MergeJoiner
(since interleave prefixes are defined on the parent table's primary key,
there is an ordering guarantee we can take advantage of):
InterleaveReaderJoiner will first scan the interleaved hierarchy
(scoped to the relevant key span if there is a filter on the primary
key). It can be configured with the TableDescriptors and
IndexDescriptors of the parent and child table such that it can perform
a single-pass scan of the two tables with RowFetcher. See [1]
Scanning the interleaved
hierarchy for discussion on why
this is the case.InterleaveReaderJoiner must read full
interleaves (and cannot partially read an interleaf), otherwise there may
potentially be missing joined rows (see the Reference-level explanation
section).MergeJoiner. We can abstract the joining logic from MergeJoiner and incorporate
it into InterleaveReaderJoiner.joinNodes will be annotated in the logical plan if it is the root of
a plan sub-tree, have two scanNode children/leaves, and:
scanNodes correspond to a scan of the parent and child
tablesjoinNode is an equality join on the parent's primary key (the interleave
prefix) (see [5] Theta joins for inequality joins)
In the general case, the table with the join columns could be a common
ancestor (see [3a] Common ancestor joins
Alternatives) and/or the join columns could
be a prefix of the interleave prefix (see [3b] Prefix
joins).InterleaveReaderJoiner each
node (along with routers and streams) that has data for the relevant span
when a joinNode with an interleaved join algorithm is encountered.Note every type of join (inner, left, right, full) can easily be supported since
InterleaveReaderJoiner employs the same joining logic as MergeJoiner.
For the query described above where we do a simple join between table parent
and table child where child is INTERLEAVED INTO PARENT parent
SELECT * FROM child JOIN parent ON child.parent_id = parent.id
we previously had the logical plan
+-------+------+----------------+--------------------+
| Level | Type | Field | Description |
+-------+------+----------------+--------------------+
| 0 | join | | |
| 0 | | type | inner |
| 0 | | equality | (parent_id) = (id) |
| 0 | | mergeJoinOrder | +"(parent_id=id)" |
| 1 | scan | | |
| 1 | | table | child@primary |
| 1 | | spans | ALL |
| 1 | scan | | |
| 1 | | table | parent@primary |
| 1 | | spans | ALL |
+-------+------+----------------+--------------------+
which is a logical plan with a joinNode root and two scanNode leaves. This
could very well be a planNode sub-tree in a more complex query. Regardless,
one can annotate the joinNode to identify the interleaved join which will
produce a query plan resembling
+-------+------+------------------+--------------------+
| Level | Type | Field | Description |
+-------+------+------------------+--------------------+
| 0 | join | | |
| 0 | | type | inner |
| 0 | | equality | (parent_id) = (id) |
| 0 | | mergeJoinOrder | +"(parent_id=id)" |
| 0 | | interleavePrefix | parent_id, id |
| 0 | | algorithmHint | interleave
| 1 | scan | | |
| 1 | | table | child@primary |
| 1 | | spans | ALL |
| 1 | scan | | |
| 1 | | table | parent@primary |
| 1 | | spans | ALL |
+-------+------+------------------+--------------------+
The local execution engine can recognize the additional interleavePrefix
field and algorithmHint = interleave once it implements interleaved join logic. The distributed
execution engine can produce a physical plan with an InterleaveReaderJoiner
processor for every such tree pattern.
This RFC will be scoped down to:
Furthermore, we will discuss the two implementation phases (Processors and Planning) separately since they can be orthogonally implemented.
For the first iteration, we only care about equality joins on the entire
interleave prefix. If you imagine the following interleaved hierarchy
in storage (where each row represents a table row), where the primary
key of of parent is pk, the primary key of childN is (pk, ckN),
and the interleaved index/interleave prefix is (pk):
<pk>/.../parent_r1
<pk>/<ck1>/.../child1_r1
...
<pk>/<ck1>/.../child1_r2
<pk>/<ck2>/.../child2_r1
<pk>/<ck3>/.../child3_r1
...
<pk>/.../parent_r2
<pk>/<ck1>/.../child1_r3
<pk>/<ck2>/.../child2_r2
<pk>/<ck2>/.../child2_r3
<pk>/<ck2>/.../child2_r4
...
Let's suppose we want to join parent and child2
SELECT * FROM parent JOIN child2 ON parent.pk = child2.pk
The expected output is
parent_r1 U child2_r1
parent_r2 U child2_r2
parent_r2 U child2_r3
parent_r2 U child2_r4
...
We can scan top to bottom of the interleaved hierarchy and separate out
parent_rN rows and child2_rN rows into two streams.
Looking through the implementation of RowFetcher, specifically the call chain
RowFetcher --> RowFetcher.NextKey --> RowFetcher.ReadIndexKey --> DecodeIndexKey (table.go)
it doesn't seem like RowFetcher is very permissive when it comes to allow
1-pass through on an interleaved hierarchy. Specifically, DecodeIndexKey in
table.go
will need to be refactored so that it returns true for KV pairs if
it matches either the target parent index or child2 index. This is
generalized to a set membership problem with a set of N (tableID, indexID) tuples if we
emit multiple interleaved tables in one scan or to support multi-table
joins).
The refactor necessary is as follows:
RowFetcher that produces rows from any of the N tables
(identified as unique TableDescriptor-IndexDescriptor pairs)NextRow will need to be able to return a RowResponse with the
EncDatumRow as well as the row's TableDescriptor and IndexDescriptor.[]byte. For example the equivalence signature
for the primary index on child2 interleaved into the primary index of
parent as well as any rows that belong to this primary index of child2 is
/parent/1/#/child2/1
RowFetcher and introducing additional
overhead, it is prudent to separately implement this RowFetcher. We
can eventually merge RowFetcher into RowFetcher after it is
determined the overhead is marginal for the 1 table case.The outstanding PR for RowFetcher
has the full implementation details.
The InterleaveReaderJoiner can branch on the
TableDescriptor-IndexDescriptor pair to distinguish between parent and
child2 rows. Each join batch is defined as rows that are joined for a given
interleave prefix. If the InterleaveReaderJoiner observes a parent row, it
can start a new join batch since each unique interleave prefix is a primary key
on parent which is also unique. It memoizes this parent row. If a child2
row is retrieved, it joins it with the most recent parent with
joinerBase.render.
This implies that InterleaveReaderJoiner will need to nest joinerBase
similar to
mergeJoiner.
The joining logic becomes more complicated on joins not on the full interleave prefix as detailed for prefix joins and subset joins.
There are three areas (that each vary in the amount of work required) that are relevant to implementing the planning of interleaved table joins:
joinNodesAnnotating joinNodes can be accomplished after a logical plan has been
created and during optimization (since one can consider identifying interleaved
table joins an optimization; by precedent, we did this in the optimizePlan
stage of the planner for merge
joins). We
introduce a general algorithmHint field on joinNodes to annotate.
Annotation can be accomplished when we expandPlan (within optimizePlan) and do type
switching on the nodes we encounter. Specifically, we'd have a helper function to
peek into the left and right sources to see if they satisfy the conditions
for an interleaved join. This is already being done for identifying merge
joins and merge join order: in fact this can be a cookie cutter derivative
of the planning changes in the DistSQL merge join
PR.
The conditions for an interleaved join in this first iteration are:
scanNodes correspond to a scan of the parent and child tablesjoinNode is an equality join on the parent's primary key (the
interleave prefix)We first check if the planNode attach to left and right are scanNodes.
We then corroborate their InterleaveDescriptors (nested in IndexDescriptor which
is nested in TableDescriptor). Note that InterleaveDescriptor does in
fact keep track of all ancestors, thus it's rather simple to do a interleaved hierarchy traversal
if we want to do common ancestor joins in the future.
If the first ancestor's (the parent) table ID of left or right
corresponds to the table ID of the other table, then we've satisfied condition #1.
Note that only a scan of the interleaved index of the child table can be
incorporated in an interleaved join. For example, if only the child table's primary index
is interleaved into the parent table but the scanNode is a scan over one of the child's
secondary indexes, then an interleaved join is not appropriate. If the secondary
index is interleaved then we can should also optimize index joins.
We then check if the joinPredicate on the join nodes have equality indices
on the (exact) columns in the interleave prefix.
If both conditions are met, we set algorithmHint to "parent-child interleaved join".
The annotated plan tree is then sent off to the execution engine.
Thankfully, there is not much to do here for the first iteration of this RFC!
The idea of spawning the InterleaveReaderJoiner in the local execution
engine is out of this RFC's scope.
The bulk of the logic that will setup the interleave
join will be in
createPlanForJoin.
If the interleave join algorithm has been selected by the logical planner, it
will skip creating the individual
plans
for the left and right scanNodes and invoking MergePlans. Instead, the
descriptors on the scanNodes will be used to construct the
InterleaveReaderJoiner processor.
We pass down TableDescriptors, IndexDescriptors and any other arguments
our RowFetcher requires to perform a single-pass read for the two tables.
The final physical plan for a three-node cluster looks something like
First the union of the spans from the two scanNodes (by invoking
MergeSpans)
are passed into
PartitionSpans.
This will return a slice of SpanPartitions of length n, which corresponds
to the number of data nodes. We will eventually spawn n
InterleaveReaderJoiners, one for each node (n = 3 in the diagram above).
Side note: any scan over an interleaved index will always default to
the span of the root table in the interleaved
hierarchy so
it's not strictly necessary to take the union. For future-proofing's sake,
we do this since it's a trivial performance impact to do a union over a set of spans during planning.
These spans will need to be "fixed" in order to prevent partial interleave
reads since range splits can happen in between interleaves.
If an interleaf is only partially read on a given node (for example, the parent
row is read but none of the children rows are read because they overflow into
the next span partition on a different node), then we will miss a few joined
rows. We need to first extract the parent's prefix key then call PrefixEnd().
To do this, we need to figure out the cumulative. That is for a given child key
/<tableid>/<indexid>/<parent-interleave-prefix>/<interleave sentinel>/<rest-of-child-primary-key>/...
where the number of segments (/.../) for <parent-interleave-prefix> (i.e.
number of columns in the prefix) is shared_prefix_len in the child table's
InterleaveDescriptor.
For example, if we wanted to fix this span (generated from PartitionSpans)
(# is the <interleave sentinel>)
StartKey: /parent/1/2/#/child/2
EndKey: /parent/1/42/#/child/4
to
StartKey: /parent/1/2/#/child/2
EndKey: /parent/1/43
such that we know for certain we read all child rows with interleave prefix
5, then we need to first extract /parent/1/42 from
/parent/1/42/#/child/4. We can call encoding.PeekLength() 3 times (once
for each segment) to find the total prefix length then take the prefix of
/parent/1/42/#/child/4 of that length. More generally, for a descendant
EndKey
/<1st-tableid>/<1st-indexid>/<1st-index-columns>/#/<2nd-tableid>/<2nd-indexid>/2nd-index-columns/#/...
the number of times we call encoding.PeekLength() is
3 * count(interleave ancestors) + sum(shared_prefix_len) - 1
where the - 1 is to not include the last <interleave sentinel>.
If the above "fixing" happens too often, it begs the question of avoiding splits inside interleaves as much as possible. This was mentioned briefly in the initial RFC of interleaved tables. See Avoiding splits inside interleaves for more detail.
Everything discussed in this PR is more efficient than the way we do joins currently. This is because we do one scan over the interleaved hierarchy instead of two separate scans for each the two table in the join.
In terms of future caveats, interleaved table joins are ill-advised if the majority of queries are joins on a small subset of the interleaved hierarchy. In the scope of this RFC (where we focus on joins between two tables in the interleaved hierarchy), one should only have a parent and child table in a given interleaved hierarchy.
Once multi-table interleaved joins are implemented (see [4] Multi-table joins), any scan queries (whether on individual tables or joins) should be on the majority of the tables in the interleaved hierarchy (to optimize the amount of relevant data read when doing the interleaved hierarchy scan).
All this being said, we should be very cautious prescribing usage patterns for interleaved tables. We currently do not support interleave schema changes (i.e. un-interleaving a table from its parent). It would be inconsistent of us if we prescribe that interleaved tables should only be used in parent-child relationships presently, but amend our prescription to include deeply-nested, hierarchical relationships once we implement multi-table joins and common ancestor joins.
After discussions with @jordanlewis who has been working with indexes of interleaved
parent and child tables, a couple of key points were brought up that will dictate
how we do the reading component of InterleaveReaderJoiner (and RowFetcher):
parent with primary key
between 2 and 42 might look up the span parent@primary /2-/43. This
scan at the KVFetcher level iterates through all child rows and
decodes their index
keys
with interleave prefix also between 2 and 42. It then decides whether
or not the child row is a parent row via a comparison on its
TableID and
IndexID.INSERT/UPDATEs. They are also located on separate ranges (and possibly
separate machines) which will require sending rows over the wire to the
Joiners. This is potentially worse than doing an entire hierarchy
scan and filtering for relevant rows.For planning interleaved table joins, one can either introduce the branching
point (that is, the point at which an InterleavedReaderJoiner is introduced)
in the physical plan (processor-level in the distributed execution engine) or
in the logical plan (which affects both execution engines).
Currently the local execution engine only scans and propagate rows in the
computed spans on the data nodes to the gateway node. Note however that these
scans occur at the leaves of plan tree (scanNodes are always leaves). More
precisely, the InterleaveReaderJoiner logic described for the distributed
execution engine can theoretically be accomplished in the local execution
engine: the local implementation spawns and runs an InterleaveReaderJoiner
processor and returns the joined results. If we never do quite fuse the local
and distributed backends into one entity, annotating a joinNode to perform an
interleaved join if possible will give us the flexibility to optimize both
pathways.
This decision to annotate logical plans to allow for more efficient joins has
precedent when we added ordering information for merge joins to
joinNodes for the
distributed execution engine.
As noted in the Guide-level explanation section,
one can identify the plan tree pattern that has a joinNode as the root and
two scanNode leaves that correspond to the parent and child tables. For the
simple case, one can verify that the join column(s) are on the primary key of
the parent table (and a prefix of the interleaved index of the child table, but
this is mandated by INTERLEAVE anyway), but this can of course be
generalized.
When we have a parent and direct child relationship and join, we really have a
join on a common ancestor's primary key (the parent) between the parent and the
child. If we imagine the interleave relationships for a given database as a
forest where every top-level parent is the root of a tree (and these trees can
be arbitrarily deep), then a common ancestor for two nodes A and B is
defined as as node whose subtrees contain A and B.
Imagine that we wanted to perform a join between two children tables on their
parent's primary key. For example, suppose we have table parent, child1, child2:
CREATE TABLE parent (id INT PRIMARY KEY);
CREATE TABLE child1 (id1 INT, pid INT, PRIMARY KEY (pid, id1)) INTERLEAVE IN PARENT parent (pid);
CREATE TABLE child2 (id2 INT, pid INT, PRIMARY KEY (pid, id2)) INTERLEAVE IN PARENT parent (pid);
SELECT * FROM child1 JOIN child2 ON child1.pid = child2.pid;
The forest representation (with one tree) looks like
parent
/ \
/ \
/ \
child1 child2
It is complete and sound to scan the interleaved parts of child1 and child2
and perform interleaved joins per each unique parent primary key. Reasoning:
all child1 rows with interleave prefix X are interleaved under the parent
row with primary key X by INTERLEAVE constraint. Similarly all child2
rows with X prefix are interleaved under the same parent row. Therefore,
all child1 and child2 rows with equivalent pids are nested under the same
interleave block and thus will be joined.
This concept can be arbitrarily extended to any common ancestor primary key
join so long as the join is exactly on the common ancestor's primary key. Since
the nesting depths of interleaved tables are typically no more than 1-4 (if
there is very deep nesting, then we'll have other issues to worry about), we
can simply traverse up the connected tree until we reach the root and check if
a common ancestor was encountered (traverse up instead of down from either
node since InterleaveDescriptor only keeps track of ancestors, not
descendants).
The use cases for a join on an ancestor more than one level higher in the hierarchical tree is limited since it is intuitively rare for a grandchildren to have a foreign key reference to its grandparent. Similarly, joining siblings on their parents' key is essentially a cross product join per each unique primary parent key, which is hard to imagine as a common query. A multi-table join between parent-child-grandchild is conceivable (see Multi-table joins).
Prefix joins involves joining on a proper prefix of the primary key of the parent (which is a prefix of the interleave prefix).
For example suppose we table parent that
has primary key (PK) (pk1, pk2, pk3) and table child that has primary key
(PK) (pk1, pk2, pk3, c_pk1).
The simple case we've been dealing with is a join on (exclusively) columns
pk1, pk2, and pk3.
It was proposed that we could possibly do a join on a prefix of the
interleave prefix. Suppose we wanted to join on pk1 and
pk2. For clarity's sake, let us imagine we have the parent key schema
/<pk1>/<pk2>/<pk3>/...
and the interleaved child key schema
/<pk1>/<pk2>/<pk3>/<interleaved sentinel>/<c_pk1>/...
where the interleaved sentinel is # (in practice, it is the first byte in
KeyEncDescending such that it is always sorted after the parent KV pairs).
Example KV pairs (each representing an entire table row) for the parent and
child tables will be stored as (where child rows are indented)
...
/1/1/2/...
/1/1/2/#/2/...
/1/1/2/#/4/...
/1/1/3/...
/1/1/3/#/1/...
/1/1/3/#/3/...
/1/1/3/#/7/...
/1/2/3/...
/1/2/3/#/5/...
/1/2/9/...
/1/2/9/#/5/...
/1/2/9/#/7/...
...
Joining on pk1 and pk2 would result in the following parent-child KV pair joins
---------------------------------
| parent (key) | child (key) |
---------------------------------
| /1/1/2/... | /1/1/2/#/2/... |
| /1/1/2/... | /1/1/2/#/4/... |
| /1/1/2/... | /1/1/3/#/1/... |
| /1/1/2/... | /1/1/3/#/3/... |
| /1/1/2/... | /1/1/3/#/7/... |
| /1/1/3/... | /1/1/2/#/2/... |
| /1/1/3/... | /1/1/2/#/4/... |
| /1/1/3/... | /1/1/3/#/1/... |
| /1/1/3/... | /1/1/3/#/3/... |
| /1/1/3/... | /1/1/3/#/7/... |
| ... | .... |
--------------------------------
and similarly for the joins where (pk1, pk2) = (1, 2).
Since the primary index for the parent table is ordered, two 2-pass approaches using a merge-join pattern e-merges here:
TODO(richardwu): From my understanding, if one does not know the exact
/pk1/pk2 start and end keys (i.e. one can't tell RowFetcher/kvBatchFetcher to scan
the first, second, etc. /pk1/pk2/ range), then one would have to scan the
entire hierarchy. The bases for the following approaches revolve around the
fact that we can't do separate scans for each unique (pk1, pk2) prefix.
This approach uses
O(# of parent+child rows for given (pk1, pk2) prefix)
memory and 1 span scan:
(pk1, pk2) prefix:
NextRow() and store parent/child row in their respective buffers until
we see a new (pk1, pk2) prefixIf the memory requirements of approach 1 becomes too unwieldy, one can consider storing either the child rows or parent rows in one buffer (deciding which will require table statistics or a general heuristic). This requires
min{
O(# of child rows for given (pk1, pk2) prefix),
O(# of parent rows for given (pk1, pk2) prefix),
}
memory. WLOG, let's store the parent rows in our buffer. This approach relies on the assumption that RocksDB can efficiently cache the requested span and subsequent overlapping scans of the initial span. @petermattis notes that RocksDB should be able to cache these large blocks of SSTables:
(pk1, pk2) prefix:
(pk1, pk2) prefix is encountered, mark the current
(pk1, pk2) prefix for the next scan(pk1, pk2) span again
(pk1, pk2) prefix (go to #2)To illustrate, let's assume that there are three unique (pk1, pk2) prefixes. This
forms three blocks of our overall span: block A, B, and C.
For k unique prefixes, this requires O(2k) scans. Assuming every scan
is retrieved from the RocksDB cache, we only do one read from disk.
Subset joins are joins on a strict subset of the interleave prefix
columns that is not a prefix itself. For example, if (pk1, pk2, pk3) is the
interleave prefix, then
{(pk2), (pk3), (pk1, pk3), (pk2, pk3)}
is its corresponding set of subsets.
There is no optimized way specific to interleaved tables to performing subset
joins (that I can think of). If you refer to the Canonical prefix join
example, a join on (pk1, pk3) would involve
some disjoint cross joins which is hard to optimize in a general way. We could
of course use a hybrid merge-hash join for a join on (pk1, pk3): we'd perform
a hash-join (hashed on pk3) on all rows for each pk1 (see this
issue for more details).
For a given interleaved hierarchy, we might fold in multiple joins from more than two tables in the hierarchy. Suppose we have the hierarchy
customers
orders
items
We may want retrieve all items that customers have purchased joined with
their customer information (stored in customers). The corresponding
query might look like
SELECT * FROM customers
JOIN
SELECT * FROM orders
JOIN items ON orders.id = items.order_id
ON customers.id = orders.cus_id
Ideally, this three-table join be combined into one join with one scan of the
interleaved hierarchy. InterleaveReaderJoiner would read rows from N = 3 tables-indexes
and perform a merge join on all three tables (this is essentially a 3-way cross product,
which can be implemented with 3 loops nested in a hierarchy).
Theta joins, or joins on columns with predicates that are not all equivalence predicates, are a little more complicated with interleaved table joins. I haven't fully grok-ed the implications of theta joins for interleaved tables, so I'll leave it open for now.
After some initial discussion in the RFC PR, we came to the realization that
the goal of this RFC is to avoid scanning the entire interleaved hierarchy twice
with two independent TableReader processors. An alternative solution proposed
was to have some InterleaveReader that emits both parent and child rows
in the simple case. The joining logic is identical to that of MergeJoiner and
the rows from InterleaveReader can be piped to MergeJoiner.
In fact, the prefix and subset joins
are general cases for MergeJoiner that is current being tracked by this
issue.
This alternative architecture would look something like
Instead of specifying a hash router for each TableReader instance to their
respective MergeJoiners (hashed by the equality columns), the two output streams
of the InterleaveReader will be mapped directly to the inputs of the
MergeJoiner processor. Two OutputRouterSpecs for PASSTHROUGH routers are
specified for each InterleaveReader. They each correspond to the output streams
for parent and child rows. This is accomplished by setting the appropriate
fields (i.e. SourceProcessor, DestProcessor, DestInput) fields of
distsqlplan.Stream and adding it to the plan. Since distsqlplan.Stream
currently does not support multiple source outputs for a given SourceProcessor,
we will also need to introduce SourceOutput int (similar to how DestInput int
permits multiple input streams).
InterleaveReader also requires reading from RowFetcher and piping the
rows to each of its streams. Since InterleaveReader "owns" its N output
streams (rather than letting some Router message-broker direct the output), a
similar buffer logic will be required to prevent
deadlocks. The deadlock happens
when MergeJoiner (streamMerger to be precise) tries to batch all rows from both sides with the
current join column
values.
It will try to retrieve parent rows until it sees that the values for the join
columns increase (in order to form a batch of parent rows with the same join
column values). However, this would not be possible if the next row after the
parent row is a child row since the InterleavedReader reads top to bottom. Thus
we would need to buffer enough child rows until streamMerger can get the next
parent row which has a different primary key.
parent_r1 <- first row in parent batch, NextRow until parent row with a different primary key
child_r1 <- need to buffer since NextRow is called
child_r2 <- buffered
child_r3 <- buffered
parent_r2 <- parent row with a different primary key, finish parent batch
and move on to child batch (child_r1 - r3 is unblocked and sent)
In the case of a join on the entire interleave prefix, buffering is okay since there is at most 1 parent row for 0+ child rows. We would thus buffer at most 1 child row in order to retrieve the next parent row (that has a larger primary key, finishing off the batch). In the case where we do a prefix join, parent rows are separated by blocks of 0+ child rows, thus we will need to buffer the child rows in order to retrieve all the parent rows for a given batch.
This buffering logic needs to be abstracted from
routerBase
and imported into InterleaveReader.
The actual joining logic in MergeJoiner should be agnostic to whether the
rows came from two TableReaders or one InterleaveReader.
One of the drawbacks of delegating the joining logic to a generic MergeJoiner
is, in the simple case, having to buffer batches of child rows even though we
know there is only one parent row to join and we can stream these joined rows.
For example, the second batch for parent and child2 looks like
<parent batch> <child batch>
parent_r2 child2_r2
child2_r3
child2_r4
Instead of buffering the entire batch, we know for certainty that we need only buffer the one parent row and stream-join the child rows.
One way we can fix this is to refactor streamMerger to recognize
that if either the left or right batch has one row, we can stream join
the other row.
For the scope of the first iteration, we will defer this optimization.
If we wanted to perform multi-table joins efficiently,
we could consider an InterleaveJoiner (or MultiMergeJoiner) that can merge
N arbitrary batches.
There were a number of cons to this design compared to a single InterleaveReaderJoiner
processor, namely:
InterleaveReader OR refactor streamMerger to
read rows from left and right concurrently to avoid deadlocking. Note
refactoring streamMerger doesn't solve the "prefix joins" or "subset" case:
we still need to buffer/store rows on the InterleaveReader side.streamMerger unless we hint to MergeJoiner
during planning that the parent side contains only 1 row per batch (for joins
on the full interleave prefix). If it does contain 1 row on one side we can
skip batching and simply stream joined rows.Streams to handle multiple inputs (from InterleaveReader).Although we do not publicly advertise interleaving secondary indexes under primary indexes, this is possible with the syntax
CREATE INDEX secondary_idx ON foo (id, data) INTERLEAVE IN PARENT foo (id)
We can thus identify interleaved joins on indexJoinNodes (where left and right
scanNodes in joinNodes are now index and table scanNodes in indexJoinNodes)
and perform efficient index joins. Index joins would employ the same processor (InterleaveReaderJoiner)
and the planning implementation will be symmetric to that of joinNodes.
We would like to refactor MVCCFindSplitKey
such that it avoids splitting inside interleaves (i.e. a split between two
children rows that are nested under the same parent row).
We have similar logic for avoiding splits in between
rows
where they may be multiple family kv pairs. In short,
EnsureSafeSplitKey
simply truncates the split key obtained from the call to MVCCFindSplitKey in
libroach/db.cc to the row prefix. For example, if the given key is chosen as
the split key
/Table/50/1/<column id>/1
^ family id
it simply reassigns the split key as
/Table/50/1/<column id>
which would split the range such that the first key in row <column id> would
be the first key in the right range (ranges are specified with an inclusive
start key and an exclusive end key).
To apply this for interleaves too, we can branch on whether the row belongs to
an interleaved table or not. If it's not an interleave table the GetRowPrefixLength
procedure will take place. If the row belongs to an interleave table, we can invoke
GetTablePrefixLength, where instead of taking the prefix up to and including
the column ids, it would instead take the prefix up to the last table's index
ID. The resulting split key from the above example is
/Table/50/1
We do this by decoding the key from left to right until the index of the last index ID
(that is the last index ID after encountering an <interleave sentinel>).
To check whether the row belongs to an interleave table, we can either check
for <interleave sentinel>s or plumb a flag from above.
There is an outstanding
issue with ranges
failing to split since EnsureSafeSplitKey would naively default to the
beginning of the row (which may be the first key in the range), where there may
be a better split point at the end of the row. The fix @a-robinson will
implement should work fine with interleave boundaries too.