docs/design/sql/14-keyed-watermark-support.md
| Related Jira | HZ-986 |
| Document Status / Completeness | DRAFT |
| Requirement owner | Sandeep Akhouri |
| Developer(s) | Sasha Syrotenko |
| Quality Engineer | Bartlomiej Poplawski |
| Technical Reviewers | Viliam Durina |
For stream-to-stream join we need to be able to support multiple WMs (WM). For the explanation why it's needed, see 15-stream-to-stream-join.md.
This document is about changes in Jet engine needed to support multiple WMs.
Non-goal is to discuss stream-to-stream join.
In Jet we used the term watermark for two things:
Which meaning is used is often clear from the context. We'll use the term watermark instance if we want to stress that a stream item carrying the value is meant. If we say there are two watermarks in the stream, it means there are two independent sets of watermarks, differentiated by a key.
A WM instance with value M means that any event with value less than M can
be considered late and ignored. It's very important in stream processing as it
allows the processors to reason about possible future events and to remove data
from the state.
We say that a stream is at WM value N when the value of the last received WM instance is N. Since each new WM instance is required to have a higher value, the WM value for the stream is ever-increasing.
So-called WM coalescing happens when there are two streams that are merged into one - the lowest current WM value of all the merged streams is used for the merged stream.
We show only WM instances on the streams, as the events (the payload) are
irrelevant to coalescing. in: wm(N, M) denotes an input WM instance from input
N with a value of M. out: wm(M) denotes an output WM; that is a WM
instance that is forwarded to the output stream. There's only one output stream
from stream merging, therefore we don't specify the output number in the example
in: wm(0, 10)
in: wm(1, 12)
out: wm(10)
in: wm(0, 11)
out: wm(11)
in: wm(1, 13)
## no new output now as the minimum is still 11
in: wm(0, 14)
out: wm(13)
An input can contain a special IDLE_MESSAGE that excludes the input stream on
which it appeared from coalescing. When an IDLE_MESAGE is received from an
input, that input is excluded from coalescing and the WM is output based on the
minimum of the remaining non-idle inputs. Any new WM or event from that input
clears the idle status and includes that input into coalescing again.
The IDLE_MESSAGE is implemented as a WM instance with a Long.MAX_VALUE
value, but it's not really a WM instance, we just piggyback on the WM
broadcasting mechanism.
Any processor can generate watermarks based on any logic. The processor can:
These are the typical rules:
the source emits WMs based on lag after the newest item it observed
if the source emits from multiple remote partitions, it coalesces the WM from each partition
non-source processors forward input watermarks
processors that delay input items, also delay watermarks by the same amount (e.g. async map processor or window aggregation processor)
But WMs must follow certain rules:
Monotonicity (strict rule): a WM can never go back. For efficiency, we require a strict monotonicity, because it's a waste of resources to emit a WM with the same value twice. Violation will cause a job failure.
No overtaking (conventional rule): an input item that is not late on input, should not be late on the processor output
No unnecessary latency (conventional rule): The processor should emit the WM as soon as it knows that no item that would be late will follow.
Only the 1st rule is a requirement. The other are conventions and the processors
can break them, but there's currently no such processor in Jet.
AbstractProcessor implements the case for simple processors that forward WMs
immediately.
Watermark classWe'll add a new byte key field to the Watermark class. Two WM instances with a
different key are handled independently.
Currently, the Processor has this method:
boolean tryProcessWatermark(@Nonnull Watermark watermark);
No change in the API is needed as the argument includes the key. However, the
API is not backwards-compatible - the processor now might need to look at the
watermark's key. If it doesn't, it can observe a newer WM instance with
key=1, and later an older WM instance with key=2, which amounts to
non-monotonic sequence, if the key is ignored. No change is needed in
AbstractProcessor - this class directly emits each received WM and won't
suffer from this change.
However, the API isn't sufficient for a join processor, which receives different
WMs from the left and right input. For example, if a stream of orders and
deliveries are joined, each input will have a different WM key so that the
processor is able to track the progress for each stream independently. However,
the above method receives the watermark only after the same WM key is received
from both inputs, so in this case it will never receive any watermark instance.
To address this, we'll add a new method:
boolean tryProcessWatermark(int ordinal, @Nonnull Watermark watermark);
This method will receive watermarks coalesced from multiple upstream processor
instances contributing to the input edge ordinal, but multiple input edges are
not coalesced and are received immediately.
The processor can choose, which method to override, depending on its needs.
Such WMs are coalesced completely independently. A wm with key=K is coalesced
with wm with the same key from other inputs independently, as if it was two
separate streams.
There's no union processor in Jet, we use map(identity()) with multiple inputs
to do it. This setup assumes that all inputs have the same WM, and the same WM
key is related to the same field in the items. If some WM key is only on one
input, but not on others, such WM won't be forwarded at all, because coalescing
will wait forever for a WM with the same key from the other side.
So for correct functionality the user has to properly choose WM key and use the same key if it's the same field and a different key, if it's not.
The processor currently uses a queue to which it adds both events and watermarks, as they are received. This preserves the original order of events w.r.t. watermarks, so no substantial change is needed. There's an optimization that if the last element in the queue is a watermark, it is replaced and not added. Here we need to check that it's a watermark with the same key.
This processor emits responses to async operations in the order the responses arrive. To handle watermarks, it tracks counts of events received between each watermark, and for each event with a pending response it stores after which WM it was received. When a response arrives, it decrements the count. When the count reaches zero, it checks the watermark counts from oldest to newest, and emits the watermark up to the first non-zero count.
This processor must be modified to take the WM key into account. Instead of
storing the counts for just the WM value, we need to store it for a tuple
{wmKey, wmValue}.
This processor forwards the input watermarks, but before doing so, it emits windows for which all the events were received.
The processor receives a list of timestamp functions, one for each input ordinal. It extracts the timestamp from the events using these functions, and uses them to assign windows.
For example, consider a stream with two watermarked timestamps: time1 and
time2. Then this query:
select
window_start window_start_inner,
window_end window_end_inner,
time2,
count(*)
from tumble(my_stream, descriptor(time1), 5 seconds)
group by 1, 2, 3
would be a valid one. It groups by windows calculated using time1, but also
groups by time2, which should be watermarked on output.
As we can see, the processor can group only by a single watermarked value. Another watermarked value can be a part of the grouping key. If it's not a part of the grouping key, then the value isn't on output and the watermark can be dropped.
To implement it, after emitting a window the processor will have to iterate all in-progress windows and their keys and find the minimum and emit the minimum of those and of the input WM for that key as the output WM. After items are removed from the set, to calculate the minimum the remaining items have to be iterated, so this will be a lot of extra work. This is the reason why we will not implement this. It's very rare to group by a watermarked column. The sliding window processor will drop all input watermarks, the output will have watermarks only on the window bounds.
Idle messages were originally introduced to handle the case where some partition doesn't have any data, but others do. For example, if there are 5 partitions, but only 4 distinct keys. In this case, thanks to watermark coalescing, no watermark will be ever forwarded and a windowing aggregation could never produce any results. The above case is very often a user error, but it happens in real life, especially in toy projects, and can confuse users.
The timeout is configurable in Core API, but in Pipeline API it's hard-coded to 60 seconds, which leads to issue reports such as "Jet pipeline starts producing results only after 60 seconds after start".
Idle messages are handled in two places:
Source processors generate them if they see no events for the configured timeout.
Watermark coalescing forwards it, if all coalesced inputs are idle.
So if we have a DAG like this:
scan1 -> map1 --\
+--> aggr
scan2 -> map2 --/
and if scan1 sees no data, after the idle timeout scan1 sends the idle
message, map1 forwards it (because all its inputs are idle), and the coalescer
on input to aggr will ignore the input from map1 from watermark coalescing
and will forward WMs from the other input, and aggr can start producing
results.
Thanks to the change in the Watermark class, idle messages now also carry a
key. Currently, we mark a stream as idle after receiving an idle message from it
and mark it active after receiving any event or watermark. We track idle
status for each stream. This means that idle messages have a key, but the
inverse messages (the events) don't.
Currently, for each stream we track the idle status: a stream is either active, or idle. With keyed idle messages, the status could be "queue is active for key=0 and idle for key=1". Or in other words "queue is partially idle". This doesn't make sense: an event has all timestamps, so if there is an event, the stream is not idle or partially idle, but active.
Therefore, we should ignore the key in the idle messages. An idle message with any key will mark the stream as idle, and the stream will be excluded from coalescing for all keys.
In the implementation it might be useful to only allow idle message with key=0 - the above design will be more apparent in the code.
Let's inspect the behavior in a couple of common examples to confirm its correctness.
-aggr
--merge
---join
----scan1
----scan2
---scan3
If both scan1 and scan2 are idle, join will be marked as idle, even though
scan1 and scan2 have a different watermark key. The merge operator will
exclude the join from WM coalescing, and will directly forward the watermarks
from scan3. This will enable the aggr operator to use those watermarks and
produce output.
In this example, the idle messages help to make progress if some input of the
merge operator has no data.
Note that the merge operator is a prototype of merging homogenous streams. Such
merging happens also when merging multiple streams contributing to a single DAG
edge, for example when the scan1 vertex is backed by multiple processors. It
shows that the algorithm helps also in that case.
-joinOuter
--joinInner
---scan1
---scan2
--scan3
joinInner will be idle if both its input scans are idle, even though each of
the scans uses a different WM key. joinInner will send the idle message with
key=0 (as the only possible key for an idle message).
joinOuter has two inputs: joinInner and scan3. joinInner will be marked
as idle and no event or watermark will come from it. joinOuter will be unable
to make any progress as it will be blocked by joinInner not producing any data
(it's a stream-to-stream join, the stream-to-batch or batch-to-batch joins don't
use watermarks). Moreover, joinOuter will buffer data from scan3 forever,
until joinInner is active again and sends some watermark.
This scenario seems problematic, but it's not caused by the idle mechanism. It's caused by the fact that one side of the join doesn't advance. It would be the same without any idle stream handling. It only shows that excluding idle streams from coalescing doesn't help here.