rfcs/2020-06-12-2768-batch-and-buffer-rework.md
Most sinks in Vector batch up events to send in a buffer in order to improve efficiency and increase transmission rates. However, the current implementations of batching suffer from two significant problems:
For sinks that use the BatchSink framework (PartitionBatchSink is
similar), events move along the following path:
with_flat_map on the batch sink, ref type Batch::Input).BatchSink::start_send).BatchSink::poll_complete).trait Service::call) which
will do the final encoding and serialization.There exist the following buffer types:
Buffer offers (gzip) compression, but requires the events be
serialized before insertion and that serialization must include a
terminator. Events are stored and output as one large Vec<u8>, which
allows for easy computation of both sizes.JsonArrayBuffer internally serializes events into "raw" JSON, which
may be then added to another structure after output. Events are stored
as serde_json::value::RawValue which is a newtype over a boxed
string, which allows for maintaining a running total of both sizes.MetricBuffer stores metrics in their structured form, without
encoding. Since the target serialization is unknown, this buffer
cannot know the total byte size.PartitionBuffer<T, K> is a meta-buffer, that partitions events into
multiple buffers of type T.Vec<T> can only count events, not bytes, as it is agnostic to the
size of T.We want to get to the point where as many batching sinks as possible can be configured with either a byte limit or an event limit, or both. Additionally, the batch limits must be inviolable, so that no requests are ever sent that exceed size limits. To accomplish this the following is necessary:
The split configuration types for bytes and events will be rolled into a single configuration combining both attributes. Batch sizes will be limited to which ever maximum is reached first.
The max_size element is retained for backwards compatibility, and each
sink will use it as a default for the maximum bytes or events depending
on the previous configuration mode. If both max_size and either of
max_bytes or max_events are set, Vector will produce a configuration
error indicating a conflict in the configuration.
struct BatchConfig {
max_bytes: Option<usize>,
max_events: Option<usize>,
max_size: Option<usize>,
timeout_secs: Option<u64>,
}
The current batch buffers always allow events to be inserted, and then check separately if the buffer is "big enough" to be sent. This will be changed into a new signature that indicates if the insertion would push the buffer over its size limits.
#[must_use]
enum PushResult<E> {
Ok,
Overflow(E),
}
trait Batch {
fn push(&mut self, event: Self::Input) -> PushResult<Self::Input>;
}
This will require all the buffers to know their maximum size. This will not be encoded into the trait directly, as each buffer will implement their own creation method to accommodate additional creation parameters, such as the compression mode or partition key.
Does it make sense to move conversion of input events into the
intermediate type Input into the buffer types instead of using a map
on the sink? This would mean something like a TryInto trait,
implemented differently for each sink, or parameterizing the buffer with
something the way RetryLogic works for the sink trait.
Similarly, does it make sense to move serialization of output events
into the trait? Is this possible for MetricBuffer? If this could be
done for all buffers, it would guarantee availability of the byte size
limit across the board.
The desired goal can be accomplished incrementally as follows:
Vec<T> buffer with a new VecBuffer<T>
type.BatchSink into trait Batch.Batch::push to return an indicator that the item would
overflow the buffer.impl Sink for BatchSink to use failure
from the new push method to cause a new request to be generated,
making start_send exit without consuming the new event.BatchBytesConfig and BatchEventsConfig into a
unified BatchConfig. Buffers that do not support byte size limits
(like MetricBuffer) would error on creation if they were configured
with one.