rfcs/2021-09-21-9292-throttle-transform.md
throttle transformThis RFC proposes the addition of a new transform that provides a user the ability to control the throughput of specific event streams.
The throttle transform can be used to rate limit specific subsets of your event
stream to limit load on downstream services or to enforce quotas on users.
You can enforce rate limits on number of events, as well as excluding events based
on a VRL condition to avoid dropping critical logs. Rate limits
can be applied globally across all logs or by specifying a key to create buckets
of events to rate limit more granularly.
The initial implementation sheds load by dropping any events above the configured threshold.
The throttle transform will leverage the existing
Governor crate.
Config:
pub struct ThrottleConfig {
threshold: u32, // Throttle only on number of events
window: f64,
key: Option<String>, // Template string
exclude: Option<AnyCondition>,
}
TaskTransform:
impl TaskTransform for Throttle {
fn transform(
self: Box<Self>,
mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
) -> Pin<Box<dyn Stream<Item = Event> + Send>>
where
Self: 'static,
{
let lim = RateLimiter::keyed(quota);
let mut flush_keys = tokio::time::interval(Duration::from_secs(self.window * 2);
let mut flush_stream = tokio::time::interval(Duration::from_millis(1000));
Box::pin(
stream! {
loop {
let mut output = Vec::new();
let done = tokio::select! {
_ = flush_stream.tick() => {
false
}
_ = flush_keys.tick() => {
lim.retain_recent();
false
}
maybe_event = input_rx.next() => {
match maybe_event {
None => true,
Some(event) => {
if let Some(condition) = self.exclude_as_ref() {
if condition.check(&event) {
output.push(event);
false
}
}
let value = self
.key_field
.as_ref()
.and_then(|key_field| event.get(key_field))
.map(|v| v.to_string_lossy());
match lim.check_key_n(value, size) {
Ok(()) => {
output.push(event);
false
}
_ => {
emit!(EventRateLimited);
false
}
}
}
}
}
};
yield stream::iter(output.into_iter());
if done { break }
}
}
.flatten(),
)
}
}
Bytes in this transform it will be the unserialized
form of the event, which will differ from what downstream sinks will actually receivesample transform to allow for a window configurationsink feature, rather than a separate transformsink concern and implemented as a
composable part of our sink pattern. This could give more "accurate" serialized sizes and
possibly be easier to manage for administrators (depending on needs). If rate limiting is
also a sink concern should it only be implemented there or also available as a transform?