docs/design/sql/04-parallel-execution.md
The Hazelcast Mustang engine executes queries in parallel. This document describes the design of the parallel execution environment of the engine.
The document doesn't discuss operator-level parallelism, which is a different topic.
The rest of this document is organized as follows. In section 1 we discuss the existing threading infrastructure of Hazelcast IMDG and Hazelcast Jet. In section 2 we analyze why the existing infrastructure is inappropriate for query execution and then describe the design of the Hazelcast Mustang execution environment. In section 3 we discuss possible alternative approaches that were rejected.
Hazelcast IMDG uses staged event-driven architecture (SEDA) for message processing. During execution, a message passes through several thread pools (stages), each optimized for a specific type of workload. We now describe stages that exist in Hazelcast.
Hazelcast uses a dedicated thread pool for a message send and receive, which will be referred to as IO Pool in this paper. Each thread from the IO pool maintains a subset of connections to remote members. Consider that we have a sender member (S) and a receiver member (R). The typical execution flow is organized as follows:
Snippet 1: Message execution flow
Stage(S) IO(S) IO(R) Stage(R)
|----enqueue/notify->--| | |
| |----send->--| |
| | |----enqueue/notify->--|
We now discuss the organization of different execution stages.
A message may have a logical partition, which is a positive integer number. Messages with defined partition are routed to
a special thread pool, which we refer to as partition pool. The pool has several threads. Every thread has a dedicated task
queue. Partition of the message is used to determine the exact thread which will process the message:
threadIndex = partition % threadCount.
The partition pool has the following advantages:
However, there is no load balancing in the partition pool: a single long-running task may delay other tasks from the same partition indefinitely. An imbalance between partitions may cause low resource utilization.
The partition pool is thus suitable for small tasks that operate on independent physical resources, and that are
distributed equally between logical partitions. An example is IMap operations, which operate on separate physical
partitions, such as GET and PUT.
Since the partition is a logical notion, it is possible to multiplex tasks from different components to a single partition pool. For example, CP Subsystem schedules tasks, all with the same partition, to the partition pool to ensure total processing order.
If a message doesn't have a logical partition, it is submitted to the generic pool. This is a conventional thread pool with a shared blocking queue. It has inherent balancing capabilities. At the same time, this pool may demonstrate less than optimal throughput when a lot of small tasks are submitted due to contention on the queue.
Hazelcast Jet uses its own cooperative thread pool to execute Jet jobs. A job is split into "tasklets", each vertex in the DAG is backed by one or more tasklets on each cluster member. Every tasklet is assigned to one cooperative thread, the thread then executes the tasklets in a loop. There is no balancing: once a tasklet is submitted to a specific thread, it is always executed in that thread. Non-cooperative tasklets (those handling a blocking API such as JDBC) run on dedicated threads.
IO pool doesn't notify the Jet pool about new data batch ("push"). Instead, the message is just enqueued, and the Jet thread checks the queue periodically ("poll").
We now define the requirements to Hazelcast Mustang threading model, analyze them concerning existing infrastructure, and define the design.
The requirements are thread safety, load balancing, and ordered processing.
First, the infrastructure must guarantee that operator execution is thread-safe. That is, the stateful operator should not be executed by multiple threads simultaneously. This simplifies operator implementations and makes them more performant. Hazelcast Jet follows this principle, as only one thread may execute a particular tasklet. However, Hazelcast Jet pool doesn't satisfy the load balancing requirement discussed below.
Second, the execution environment must support load balancing. Query execution may take a long time to complete. If several query fragments have been assigned to a single execution thread, it should be possible to reassign them to idle threads dynamically. Neither partition pool nor Hazelcast Jet pool designs are applicable to Hazelcast Mustang because they lack balancing capabilities.
Third, it should be possible to execute some messages in order. For example, for the ordered stream, the N-th batch should be processed before the (N+1)-th batch, as described in [1] (p. 1.3)
We define the taxonomy of tasks related to query execution.
First, the engine must execute query fragments, i.e., advance Volcano-style operators, as explained in [2] (p. 3). Fragment execution is initiated in response to query start or data messages. Fragment execution may take significant time to complete.
Second, the engine must execute system operations, such as query cancel and query check. These operations take short time to complete and must be executed as soon as possible. Moreover, query start operation may trigger the parallel execution of several fragments.
Given the different nature of fragment and system operations, we split execution into two independent pools:
The fragment pool is responsible for the execution of individual query fragments.
Fragment execution may take arbitrary time depending on the query structure. It is, therefore, important to
guarantee high throughput for short fragments, while still providing load balancing for long fragments. The ideal candidate
is a thread pool with dedicated per-thread queues and work-stealing. For this reason, we choose JDK's ForkJoinPool as a
backbone of the fragment pool. We create a separate instance of the ForkJoinPool rather than using ForkJoinPool.commonPool()
to avoid interference with user workloads.
We may decide to introduce multiple fragment pools for better resource management in the future. This will help limit the maximum number of CPU cores dedicated to a particular workload. An example of this approach is Resource Pools in MemSQL [3].
The system pool is a pool dedicated for cancel and check operations execution. With a separate pool, if the fragment pool
is fully occupied with long-running queries, the cancel and check operations would be executed in a timely manner still.
Several other approaches were considered but then rejected:
IMap.GET/PUTIScheduledExecutorService because it is prone to starvation.