docs/RFCS/20171120_scrub_index_and_physical_implementation.md
This RFC presents a method of running a check on a secondary index in an efficient manner, as well as other physical checks which are closely in implementation.
This check can be run through the SQL statement SCRUB, using the
options to specifically check indexes or check the physical data of the
database.
See the SCRUB command RFC for motivation.
In addition to the motivations for the check, this work may also benefit schema changes. The last unresolved question of the online schema change RFC mentions a method of distributing backfill work. While this is currently done through distSQL, there are problems where it's currently unable to backfill an index at a specific timestamp. This has become prevalent as it may be the source of issues such as #18533.
The methods for collecting index data introduced in this RFC are similar to that which is needed to create an alternative backfilling mechanism.
After implementing the methods proposed by this RFC, the SCRUB command
will have the means to check for physical data errors and for index
errors. In particular, the errors we are checking for are:
missing_secondary_indexdangling_secondary_indexinvalid_encodinginvalid_composite_encodingnoncanonical_encodingThe first two are index errors. Missing secondary indexes occur when an index entry is not correctly made or updated with the primary k/v. Conversely, a dangling index entries occur if the primary key referenced in the index k/v is not valid. This happens if the primary k/v being incorrectly deleted while the index k/v remains, in addition to the reasons for missing index entries.
To detect missing index entries, we need to do a full scan of the primary k/v. Then for each primary k/v, we expect to find a corresponding index entry. If no index entry is found, we have a missing index entry. This does not capture any dangling index entries though, as we are only looking up index entries which have a valid primary key reference. In order to find dangling indexes, we need to do a similar process where we do a full scan of the secondary index k/v.
As a result, a full outer join is a natural solution for this problem. Taking this approach, we limit the scans done to 2, as opposed to potentially scanning both the primary k/v and the secondary index k/v and doing expensive point-lookups to the other set, twice.
Lastly, we want to propagate the errors to be the SQL results of
SCRUB, including the information of:
The following changes are introduced for in order to do the entirety of the index check:
Add logic plan creation (scrubNode[0] initialization) to construct a
logical plan for checking an index, then plan and run it through the
distSQL execution engine. This plan will accomplish the full scan and
join in one go.
The solution instead constructs multiple logical plans upon execution of our original plan, and proceeds to run them.
The latter three errors related to encoding happen due to errors in the row data or our functions manipulating the row data. These can be tested just by doing a full scan of the data to check (primary k/v and index entry k/v). Similarly to the above, we will also want the error results to be propagated up as the SQL results.
The following changes are introduced for running physical checks:
Transform the scrubNode[0] a logical plan that fully encapsulates
the logic to scan relevant tables. This plan will run through distSQL,
similar to any regular plan.
Add a IsCheck enum to scanNode and plumb it through the
TableReader for RowFetcher initialization during distSQL planning.
This enum would only be meaningfully set during planning of SCRUB
when doing a physical check.
Modify the RowFetcher to conditionally run check code in existing
row fetching code paths, using the IsCheck indicator. For example,
we want to return an error instead of panic when NULL is encountered
on a non-NULL column.
Add a new iterator function to RowFetcher, NextRowWithErrors, for
just scanning for errors. In here we can execute additional checking
code on a separate code path than the critical fetching one. This lets
us consume check errors instead of row data.
Add logic to TableReader to consume NextRowWithErrors and handle the
difference between wanted rows and returned rows (check error row
type). This may potentially require a new TableChecker. With this
change, we are able to retrieve the error results desired.
[0]: scrubNode is the root planning node when using the SCRUB
command.
The details of implementation are broken down into a few parts. NB: the optimizations section is just ideation. The goal of it is to discuss the feasibility and time-effort of the possibilities.
Index checks and how they will be implemented.
Physical checks and how they will be implemented.
Generating and executing the logical plan in the distSQL execution engine
Optimizations
4.a. Multiple simultaneous index checks.
4.b. Executing index and physical checks simultaneously.
Checking the index can be expressed through a rather simple logical SQL plan. The final result will return all erroneous index entries -- both dangling references and missing entries. In short, this can be broken down into a few stages of processing:
NULL.For example, if we have the following schema:
CREATE TABLE t1 (
pkey int primary key,
name int,
INDEX name_idx (id ASC),
)
Then checking the indexes can be done through the logical plan represented by:
SELECT p.pkey, p.name, s.pkey, s.name
FROM
(SELECT pkey, name FROM t1@{FORCE_INDEX=primary,NO_INDEX_JOIN} ORDER BY pkey, name) AS p
FULL OUTER JOIN
(SELECT pkey, name FROM t1@{FORCE_INDEX=name_idx,NO_INDEX_JOIN} ORDER BY pkey, name) AS s
ON
p.pkey = s.pkey OR (p.pkey IS NULL AND s.pkey IS NULL)) AND
(p.name = s.name OR (p.name IS NULL AND s.name IS NULL))
WHERE
(p.pkey IS NULL AND p.name IS NULL) OR
(s.pkey IS NULL AND s.name IS NULL)
In short, this query is:
Because this is an anti-join, the results are as follows:
Now, a problem comes up because of the join predicate. The query above
will be run as a hash joiner, with only one bucket. This will be very
slow. As a workaround, the p.col = s.col OR (p.col IS NULL AND s.col IS NULL)
predicates are replaced with just p.col = s.col so that the query can
be run using a merge joiner, hence the explicit orders and step 2 in the
query. The distSQL plan generated from this query has the
MergeJoinerSpec.NullEquality set to true, to make NULL = NULL evaluate
to true, making the two predicates equivalent while allowing for a far
more efficient execution/
[1]: Relevant index data includes the columns, extra columns, and store columns of the index.
The physical table is the underlying k/v behind tables. This is where we find primary and secondary indexes (as distinct keys), groupings of certain data (column families), how index data is organized (index storing conditions), and the encoding of the data.
Most of the checks that involve the physical table are encoding and
related data validity. This data is already partially checked during
scanning in RowFetcher. Because of this, adding checks is a matter of
a few additive changes, and handling the error to instead return row
results.
The checks we can implement in RowFetcher include:
NOT NULL constraint violation (not physical table, but checked at
this layer)The underlying scanning is in the RowFetcher. Inside the current
RowFetcher the method processKv is responsible for encoding
specific work as it iterates over the k/v, which is where the checks
will be added, including:
We can also throttle the speed of the check by modifying the limitHint
provided to KVFetcher, and sleeping between batches.
Thus, to run the check we just need to do a scan over the whole table,
and the returned values will be the CheckError as found in the
SCRUB interface RFC. Ideally, this is executed through the distSQL, as
there will be very few results relative to the data scanned.
Invalid data encoding (and others) already return an error while
scanning. We want to capture errors in RowFetcher that come from
any external function calls and wrap them in a scrub.Error struct with
an error code annotated.
Those error structs will be captured further upstream inside the TableReader and translated into the row results. In the existing scanning code path, this same error struct will simply be unwrapped.
For example, consider this excerpt from RowFetcher.processValueSingle:
value, err := UnmarshalColumnValue(rf.alloc, typ, kv.Value)
if err != nil {
return "", "", err
}
// Becomes
value, err := UnmarshalColumnValue(rf.alloc, typ, kv.Value)
if err != nil {
return "", "", scrub.NewError(
scrub.InvalidEncodingError,
err,
)
}
Another chunk of the work will then be plumbing through the check error results out as the table schema we are scanning will differ from the result rows representing errors. To get around this we must manually construct the distSQL physical plan. This will also take some extra code to wrap the call from TableReader to RowFetcher where it will transform any caught errors.
During local engine execution, we will generate the necessary check plans, then execute the plans in sequence on the distSQL execution engine. These plans will scan the data in a distributed fashion and the only data leaving the nodes are any error results. This strategy is similar to schema change index backfilling.
The advantage of this over the alternative method [3], creating the full logical plan during query planning, is that we can easily run the checks sequentially. We can block waiting for the distSQL results of each check before running the next check. This also pulls the work of aggregating results from the distSQL execution engine to simpler Go code that accumulates the results.
Note that this section is largely just the ideas behind optimizations and are not required to be resolved. The goal of this section is to gauge feasibility, and whether or not it's worth it to go down those routes.
A challenge here is producing both check errors and the necessary data to run index checks simultaneously. It is not currently known if multiple different schemas can be streamed out of one table reader.
In the RowFetcher, this may look like a NextRowOrCheckError iterator
that may return either row data or check error data, and we emit those
to the corresponding outputs.
A workaround may be to have an intermediary processor for each mirrored stream which filters out only the columns desired for each consumer -- either a specific index check (a joiner), or the final result errors (in final aggregation).
Alternatively, a crazy idea may modify the RowFetcher or TableReader to produce two separate schemas, branching off the idea of RowFetchers made for the adjustments for interleaved tables.
While checking multiple indexes sequentially is discussed above, but a
problem is that a naive solution involves doing N scans of the primary
k/vs for checking N indexes. The following approaches are both
non-trivial changes, so this idea is largely suggested as an
optimization for future work.
The extra scans can be avoided if we scan all the necessary data for all indexes being check in one table reader, then use a mirror router to mirror the data to multiple join consumers. Unfortunately, this would require making one of the changes:
Manual surgery of a distSQL physical plan to independently plan each index check, then combine them with:
Slicing out the primary k/v scan in each, replacing it with a single mirrored scan that fetches all the data.
Remove each plan's final aggregator, and have them all stream to a single new final aggregator.
Or, adding the mechanisms to be able to do a UNION of all the
results and then elide redundant table readers based on processors'
column requirements. This is the same final aggregation UNION
mentioned in section 3.a. but we also need to solve the problem of
scanning the same k/v data multiple times.
JOIN READER aggregator for index checkingInstead of using a JOINER aggregator, we could use a JOIN READER.
How this would work is that instead of the 3 stages introduced in 1.a,
the process for checking would involve 2 steps:
JOIN READER aggregator that does point-lookups against the
secondary index.This has two major problems:
This is by far the easiest method to do as it only involves:
RowFetcher.Step 1 is ideally done in the RowFetcher as this is also where we can
avoid passing decoded data further upstream, when we only need check
errors.
Step 2 can be optimized by batching the k/v lookups, which would be a
trivial mechanism to add inside RowFetcher. In this case, having an
alternative NextRow called NextRowWithErrors makes more sense to not
add this code to the critical code path.
A proof of concept has been done where this does not batch lookups or propagate errors to the client. Errors encountered are just logged.
This is very similar to the alternative proposal [1]. This approach has the same problems.
As an alternative to section 3, during query planning we can transform the scrub planNode into a plan that would execute all of the check logic. The goal of this would be to use existing planNodes to express the check logic, as some of the checks such as index checks can be expressed completely through SQL as seen in section 1. This was not picked as the strategy to implement because there were more challenges with this approach.
For this option, during the planning phase, we will expand a Scrub
planning node to include plans that encompass all of the checking work.
Because of this, the plan will not need to run additional plan and
execution phases.
This may be easier to implement as we don't need to be concerned with
creating a new distSQLPlanner, it will be limited in a few ways. If any
part of the SCRUB involves plan nodes that cannot be executed in the
distSQL execution engine, we have no choice but to implement those
features or come up with a workaround.
One example of that problem is the need for a form of UNION or special
case workaround to aggregate the error results for multiple index
checks. To do this, we will need to add some form of UNION or special
case workaround to do the aggregation and sequential execution step for
this.
A special case where we have new plan nodes specifically to union the errors might look like:
type ScrubCheckNode {
// Sub-plan which checks a particular aspect.
Checks []ScrubCheck
}
type ScrubCheck {
Check CheckType
// Plan for running the check.
plan *plan
}
Then during distSQL physical plan creation, we will plan each
ScrubCheck.plan and route their streams into a final aggregator. This
may be a problem though, as this will run all checks in parallel and not
in sequence, which will adversely affect foreground traffic.
Lastly, we may need to have an alternative to a TableReader, call it
TableChecker, where we call the iterator NextRowWithErrors instead of
NextRow to produce all physical errors.