Back to Hazelcast

013 - Resolving Sparse Events Issue with System-Time Watermarks

docs/design/jet/013-system-time-watermarks.md

5.6.02.1 KB
Original Source

Since: 4.3

The Sparse Events Issue

When using event time, time progresses only through the ingestion of new events. If the events are sparse, time will effectively stop until a newer event arrives. This causes high latency for time-sensitive operations such as window aggregation. In addition, Jet tracks event time for every source partition separately, and if just one partition has sparse events, time progress in the whole job is hindered.

Ingestion Time

When you use withIngestionTime() on the pipeline, Jet won't extract the timestamp from the event, but will set it according to the local system time on the server. This way of assigning timestamps is inferior because the event isn't associated with the time at which it occurred, but at which Jet received it, causing inaccuracy in the calculations. A specifically bad problem is that, if the job restarts due to a crash, Jet must receive again some of the events it already received and processed before the crash. These events will now get different timestamps, affecting a different piece of the aggregation state.

On the other hand, this timestamping approach provides the best latency: the job will never wait for delayed items, there's no need to allow any event lag as with event-time processing.

The Improvement

We first implemented WatermarkPolicy.limitingRealTimeLag(). This policy emits a watermark that lags behind the system clock by a fixed amount. This policy doesn't depend on the events processed, it depends solely on the system clock. Currently, this policy is available only in Core API.

Second, we made StreamSourceStage.withIngestionTimestamps() use this policy with zero lag. We can do this because the events are assigned with system time and we're sure that there will never be an event with an older timestamp.

Further work is to allow the use of real-time watermarks in Pipeline API even for event-time processing: this will solve the sparse events issue and will give fixed latency relative to the real time, but in case the job is down for more than the real time lag, some events will be dropped as late.