docs/design/sql/20-sql-partition-pruning.md
| Related Jira | HZ-1605 |
| Related Github issues | GH issue list |
| Document Status / Completeness | IN PROGRESS |
| Requirement owner | Sandeep Akhouri |
| Developer(s) | Krzysztof Jamróz, Ivan Yaschishin, Sasha Syrotenko |
| Quality Engineer | Isaac Sumner |
| Support Engineer | Support Engineer |
| Technical Reviewers | Krzysztof Jamróz, Sasha Syrotenko |
| Simulator or Soak Test PR(s) | Link to Simulator or Soak test PR |
In practice many queries for large data sets do not need to scan all data.
Very often data is partitioned and only limited subset of partitions needs to accessed during the query.
Also, some other operations like joins or aggregations can benefit from the knowledge about data partitioning.
Using such knowledge makes it possible to run a query faster and using less resources (network, IO).
This will make SQL queries eligible for partition pruning comparable in performance to
Predicate API queries using PartitionPredicate.
Knowing which partitions are needed allows also to eliminate some members completely from the query execution. Such members do not have any data related to the query.
AttributePartitioningStrategy which use attributes defined in it__key which are not eligible to be converted to IMapSelectPlan
and have to create a Jet job__key from constituent attributes, eg. if __key has 2 attributes, we could convert
__key.a1=X and __key.a2=Y to __key=(X,Y).| Term | Definition |
|---|---|
| item, entry, row | Single element of data which is treated as whole |
| prunable | Eligible for partition pruning optimization |
Definition: Partitioning column
Partitioning column is a column/attribute that impacts partition to which given item is assigned.
Definition: Partitioning key
Partitioning key is a minimum (in terms of inclusion) set of partitioning columns that are sufficient to determine to which partition given item belongs.
Definition: Set of partitioning keys
Set of partitioning keys is defined as a set containing:
AttributePartitioningStrategy (if any)__keyClarification:
For IMap, partitioning columns may be attributes of IMap key, attributes of IMap value cannot be used.
If IMap uses AttributePartitioningStrategy, partitioning columns are functionally dependent on entire __key.
However, for the sake of simplicity in the SQL optimizer we currently do not track functional dependencies
and assume that both __key and AttributePartitioningStrategy define a partitioning key.
Partition id calculation will take that into account and use a correct strategy for given IMap.
Definition: Partition-complete expression
Expression E is partition-complete when there exists partitioning key
PK = {partColumn1, ..., partColumnM}
for which the expression can be transformed to form:
E = partKeyExp1 OR partKeyExp2 OR ... OR partKeyExpN
where partKeyExpN is in form:
partKeyExpN = partColumn1Expr AND partColumn2Expr AND ... AND partColumnMExpr AND residualFilter
(there must be a sub-expression for each of partitioning columns making up PK)
and where partColumnMExpr is in form:
partColumnM <operator> <arguments>
where operator is one of the following operators:
=SEARCH with SargBETWEEN for integer types (will not be support in the first version, it makes sense only for reasonably small range)and operator arguments are:
arguments and any deterministic functions
(in particular references to columns are not allowed)and residualFilter is any expression, in particular it can be TRUE. residualFilter may reference any columns,
also the partitioning columns.
Clarifications:
order_date between day1 and day2) currently is not supported,
only hash/equality based partitioning is supported.OR branch in E might use different partitioning key. Currently, such case is not supported.partCol1 = partCol2 AND partCol2 = constantX is not partition-complete, but can be transformed to such form
by propagating the constant: partCol1 = constantX AND partCol2 = constantX. Such transformations are out-of-scope
for this TDD, but may be performed now or in future by the SQL optimizer independently.For IMap we support equality-based partitions which is inline with IMap being a hash table. Range-partitioning (eg. for date ranges) will be not supported.
Calcite SQL optimizer need to know how IMap is partitioned.
List of columns comprising AttributePartitioningStrategy of the IMap will be available in PartitionedMapTable
(note that __key is a special case that should be handled with either an extension to AttributePartitioningStrategy
or in a separate specialized Partitioning Strategy that will explicitly ignore case when whole key object implements
PartitionAware interface).
For some/all RelNodes we will be using information about how its input(s) are partitioned:
RexDynamicParam),
use special operators (SEARCH and Sarg).RelNodes can be categorized as single and multi-table (input) ones.
For the purposes of this optimization, single input nodes include:
Multi-table:
Wrappers and Support rels:
General rules:
Rels Prunability summarized:
| Rel | Prunability |
|---|---|
| FullScanPhysicalRel | Prunable based on its filter (see Filter Analysis section) |
| IndexScanPhysicalRel | Not currently supported, planned in the future with similiar semantics to FullScanPhysicalRel |
| JoinPhysicalRel | Prunable if both inputs are Prunable |
| UnionPhysicalRel | Prunable if all inputs are Prunable |
| CalcPhysicalRel | Prunable if the Scan input is Prunable |
| AggregatePhysicalRel | Prunable if the Scan input is Prunable |
Prunability of the single input rels is based on their Filter (described below in the Filter Analysis section in detail).
Explain plan should contain 2 sets of predicates in scan operations:
select count(*), sum(amount), priority from orders WHERE customerId='C2' group by priority
Current plan:
CalcPhysicalRel(expr#0..2=[{inputs}], EXPR$0=[$t1], EXPR$1=[$t2], priority=[$t0])
AggregateCombineByKeyPhysicalRel(group=[{0}], EXPR$0=[COUNT()], EXPR$1=[SUM($1)])
AggregateAccumulateByKeyPhysicalRel(group=[{0}])
FullScanPhysicalRel(table=[[hazelcast, public, orders[projects=[$7, $4], filter==($1, _UTF-16LE'C2')]]], discriminator=[0])
Desired plan:
CalcPhysicalRel(expr#0..2=[{inputs}], EXPR$0=[$t1], EXPR$1=[$t2], priority=[$t0])
AggregateCombineByKeyPhysicalRel(group=[{0}], EXPR$0=[COUNT()], EXPR$1=[SUM($1)])
AggregateAccumulateByKeyPhysicalRel(group=[{0}])
FullScanPhysicalRel(table=[[hazelcast, public, orders[projects=[$7, $4], filter==($1, _UTF-16LE'C2')]]], partitioningKey=[$1], partitioningKeyValues=[_UTF-16LE'C2'], discriminator=[0])
Filter Analysis and Transformation refers to the process of transforming and analyzing input Filter for determining whether filter inherently limits the query to a finite number of partitions, after which point parts of the filter can be extracted and transformed into form that allows other SQL execution logic to product concrete partition IDs.
This chapter refers to the future functionality, the MVP for Member Pruning should only include support for
basic filters with AttributePartitioningStrategy and no support for Aggregations, Joins, Unions and limited support
for conjunctive (series of expressions in a single AND) filters. This chapter describes proposed design of the
full implementation of the Filter Analysis, however.
Main objective is to transform input filter into partition-complete filter pairs. First step is to normalize into a series of disjunctions/conjunctions around key components (either __key or components extracted from attribute strategy config). (a BETWEEN 1 AND 2 AND b BETWEEN 3 AND 4 should become a cartesian product of inputs e.g. (A=1,B=1), (A=1,B=2), (A=2,B=1), (A=2,B=2) - note that A and B are positional in the produced tuples, based off what’s specified in the strategy. Alternative approach might be choosing Number ranges as the basis and therefore using BETWEEN as the basis operator instead of EQUALS.
In addition to this base functionality, we could consider function unwrapping in the future, however functions like floor, to_lower, ceiling have open-ended input-sets that are hard to determine or impractical to iterate over e.g. floor(__key) = 1.0 has virtually infinite number of possible concrete __key values. Additional step might be to reduce overlapping/negating expressions e.g. a IN (1,2,3,4,5) AND a > 2 should automatically eliminate 1 as possible variant of a.
Data Types play a big role in partition-boundness:
Our goal is to have a precise partition set to scan for all prunable FullScan-s in resulting plan.
To make it possible, and also isolate the implementation for each specific connector, we would like to move
the computational process to SqlConnector. We extended fullScanReader method in SqlConnector interface
to accept extracted all partition pruning candidates as a parameter and calculate it in connector-specific way:
@Override
@Nonnull
public Vertex fullScanReader(
@Nonnull DagBuildContext context,
@Nullable HazelcastRexNode predicate,
@Nonnull List<HazelcastRexNode> projection,
@Nullable List<Map<String, Expression<?>>> partitionPruningCandidates, // <-- new parameter
@Nullable FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> eventTimePolicyProvider)
The new parameter partitionPruningCandidates is a list of maps, where for each column present in the filter
the column name maps to the extracted comparison expression. The connector-specific implementation should
check if partitioning strategy is applicable generally, and in case of success transform the input to
the list of inner list of expressions: each inner list of expressions contains comparison expressions;
this list may be single-element, if the partitioning strategy key is simple, and multi-element,
if the partitioning strategy key is composite. If we have more than one prunable filter predicate,
outer list will be multi-element. This list would be passed to corresponding scan processor meta supplier,
which supports partition pruning.
Currently, it is implemented only for IMap connector, where all expressions are supported.
For better imagination we prepared an example below.
Let's assume we have an IMap map with composite key {comp1, comp2, comp3} and applied attribute partitioning strategy with
comp1 and comp2. Let's have the following synthetic query, where filter matches the partitioning strategy:
SELECT * FROM map WHERE __key.comp1 = 1 AND __key.comp2 = 2
IMap-specific fullScanReader receives the following list of maps as a parameter:
[{"__key.comp1" = Expression(`__key.comp1 = 1`)}, {"__key.comp2" = Expression(`__key.comp2 = 2`)}]
After the described computation above, fullScanReader implementation should pass the following list of expressions
to corresponding scan processor meta supplier, which supports partition pruning:
[[Expression(`__key.comp1 = 2`], [Expression(`__key.comp1 = 2`]]
Unit tests will be implemented ensuring that partition pruning generates expected information about members and partitions needed for query execution.
Performance will compared for the same cluster topology (in particular with more that 1 member, ideally 3-5), same IMap with the same data and data layout (ie. the same partitioning strategy). The same queries will be issued with and without partition pruning optimization. Throughput and latency will be compared.
Soak tests for SQL queries should include some test cases with queries eligible for partition pruning to test the stability of them (eg. in presence of concurrent partition migrations).