docs/design/2024-05-11-hash-join-v2.md
Hash join is a widely used join algorithm. TiDB supports hash join since its 1.0 version, however, the current implementation of hash join has some shortcomings:
useOuterToBuild, outerIsRight, tryToMatchInners, tryToMatchOuters are introduced to handle the extra complex. This makes the current code too complex to understand and more error-prone when we try to fix bug.Taking into account the above factors, we decided to do a complete refactoring of hash join.
The basic idea of hash join is to divide the join into build stage and probe stage. On the build stage, a hash table is built based on the join key, on the probe stage, a lookup is made in the hash table using the join key, and the join result is generated based on the lookup result and the join type. The problems faced in the build stage mainly include the design of the hash table, the data organization on the build side and the concurrent build of the hash table. The problems faced in the probe stage include memory control during the probe process(especially when there are large number of duplicated join keys), the processing of hash table lookup results for various join types and the generation of the final join result.
The most intuitive problem in build side is the data organization of the build side data. The input data of build side is using column storage, and during build stage, we convert the column storage to row storage.
|---------------------|-----------------|----------------------|-------------------------------|
| | | |
V V V V
next_row_ptr null_map serialized_key/key_length row_data
Since the row_data is variable length, it is by design that the column data in row_data should only be used by sequential access. In order to adopt this sequential access restrict, the columns order in row_data must be well-designed instead of just using its original order.
The columns in the build side can be divided into 3 categories:
The order of the columns in all columns data is as follows
| has non-equal condition | no non-equal condition | |
|---|---|---|
| join key is inlined | join key columns + non-equal condition columns + rest columns | join key columns + rest column |
| join key is not inlined | non-equal condition columns + rest columns | all columns in their original order |
RowTable is mainly used to store data that has been converted to row storage. RowTable needs to store two parts of information: meta and data.
type rowTable struct {
meta *tableMeta
segments []*rowTableSegment
}
tableMeta is used to record some meta-related information used in build, including at least
type TableMeta struct {
// if the row has fixed length
isFixedLength bool
// the row length if the row is fixed length
rowLength int
// if the join keys has fixed length
isJoinKeysFixedLength bool
// the join keys length if it is fixed length
joinKeysLength int
// is the join key inlined in the row data
isJoinKeysInlined bool
// the length of null map, the null map include null bit for each column in the row and the used flag for right semi/outer join
nullMapLength int
// the column order in row layout, as described above, the save column order maybe different from the column order in build schema
// for example, the build schema maybe [col1, col2, col3], and the column order in row maybe [col2, col1, col3], then this array
// is [1, 0, 2]
rowColumnsOrder []int
// the column size of each column, -1 mean variable column, the order is the same as rowColumnsOrder
columnsSize []int
// the serialize mode for each key
serializeModes []codec.SerializeMode
// the first n columns in row is used for other condition, if a join has other condition, we only need to extract
// first n columns from the RowTable to evaluate other condition
columnCountNeededForOtherCondition int
// total column numbers for build side chunk, this is used to construct the chunk if there is join other condition
totalColumnNumber int
// column index offset in null map, will be 1 when if there is usedFlag and 0 if there is no usedFlag
colOffsetInNullMap int
// keyMode is the key mode, it can be OneInt/FixedSerializedKey/VariableSerializedKey
keyMode keyMode
// offset to rowData, -1 for variable length, non-inlined key
rowDataOffset int
// the mask of usedFlag in nullmap
usedFlagMask uint32
}
rowTableSegment is used to save row storage data, including at least
type rowTableSegment struct {
rawData []byte // the chunk of memory to save the row data
hashValues []uint64 // the hash value of each rows
rowLocations []unsafe.Pointer // the start address of each row
validJoinKeyPos []int // the pos of rows that need to be inserted into hash table, used in hash table build
}
Hash table uses a chain hash table. The required space will be resized in advance before the hash table is built, and will not be resized during the entire build process. Therefore, there is no need to save hash value-related information in the hash table. The row address can be stored in each slot of the hash table. The structure of the entire hash table is as follows:
The building of the hash table is divided into two stages:
After calculating the hash value of the join key, the hash table lookup can be done by directly accessing the corresponding slot of the target hash table
Since chain hash table is used, after hash table lookup, we still need to compare the join key. The join key on the Build side is already in the row. The join key on the probe side is serialized to a byte buffer, TiDB will use memory compare for key comparison. For some simple cases, it can be improved to use some specific comparison: for example, if the join key is one int, TiDB can use int compare instead of memory compare
After join key compare, we need to generate a joined block by combining the probe columns and the build columns. For joins don't have other non-equal conditions, the joined block is the final join result. For joins that have some non-equal conditions, the joined block is an intermediate result, and we evaluate the non-equal condition based on the joined block, and then generate the final join result. By generating an intermediate joined block, we can evaluate the non-equal condition in vector mode which is faster and easier to support complex non-equal condition.
When generating intermediate joined block, we try to only construct the minimum columns which are enough for non-equal condition:
type JoinProbe interface {
// SetChunkForProbe will do some pre-work when start probing a chunk
SetChunkForProbe(chunk *chunk.Chunk) error
// Probe is to probe current chunk, the result chunk is set in result.chk, and Probe need to make sure result.chk.NumRows() <= result.chk.RequiredRows()
Probe(joinResult *hashjoinWorkerResult, sqlKiller sqlkiller.SQLKiller) (ok bool, result *hashjoinWorkerResult)
// IsCurrentChunkProbeDone returns true if current probe chunk is all probed
IsCurrentChunkProbeDone() bool
// ScanRowTable is called after all the probe chunks are probed. It is used in some special joins, like left outer join with left side to build, after all
// the probe side chunks are handled, it needs to scan the row table to return the un-matched rows
ScanRowTable(joinResult *hashjoinWorkerResult, sqlKiller sqlkiller.SQLKiller) (result *hashjoinWorkerResult)
// IsScanRowTableDone returns true after scan row table is done
IsScanRowTableDone() bool
// NeedScanRowTable returns true if current join need to scan row table after all the probe side chunks are handled
NeedScanRowTable() bool
// InitForScanRowTable do some pre-work before ScanRowTable, it must be called before ScanRowTable
InitForScanRowTable()
}