Back to Vector

RFC 8621 - 2021-10-29 - Framing and Codecs - Sinks

rfcs/2021-10-29-8621-framing-and-codecs-sinks.md

0.55.010.8 KB
Original Source

RFC 8621 - 2021-10-29 - Framing and Codecs - Sinks

This RFC discusses changes to apply framing and encoding across sinks in a consistent way. On a high-level, this feature aims to be the symmetric counter part to the concept outlined Framing and Codecs - Sources.

Context

In the context of sinks, we refer to encoding as serializing an event to bytes, and to framing as the process of wrapping one or more serialized events into a bounded message that can be used as the payload for a request sent by a sink.

Currently, most sinks include the common EncodingConfig<T> in their config. It takes a generic argument where enums specify which encodings they support. However, the actual encoding logic is reimplemented by each sink individually, rather than falling back to a shared inventory of codec implementations. This leads to unnecessary drift in their feature set and behavior.

Furthermore, the shared EncodingConfig as it is implemented today is concerned with three tasks: Reshaping an event (including/excluding fields), serializing an event to a byte message, and framing/batching events. In accordance with the functionality that the decoding side provides, we want to keep these concepts separate for a simpler mental model. In fact, we want to separate reshaping from codecs entirely and move it to the responsibilities of the schema work as outlined in the next section.

Cross cutting concerns

The codec work is very interweaved with the ongoing schema work. It introduces the concept of "mappers" which understand how to transform an event at runtime based on the schema from its source and destination within Vector's topology.

We want to hand over the reshaping responsibilities to the schema, and provide a transition layer to support existing reshaping capabilities of sinks until schema mappers are implemented.

Another related change is StandardEncodings which was introduced in #9215. It provides a consistent implementation of common codecs, and therefore aligns with our goals. However, it does not separate the reshaping responsibilities.

Scope

In scope

  • Common support structures for encoding/framing
  • Set of commonly used encoding (text, JSON, ...) and framing (newline-delimited, octet-counting, ...) methods
  • Strategy for integrating encoding/framing with sinks

Out of scope

  • Integration with schema system (automatic transformation of event fields depending)

Pain

Internally, we don't share implementations for encoding strategies that are very commonly used (e.g. JSON, newline-delimited). This leads to duplicated effort and missing guidance for handling encoding in sinks since no canonical implementation exists.

Externally, users are restricted by the available custom set of encodings implemented by the sink. There does not exist any mental model for consistent encoding/framing options for event data that applies to all sinks.

Proposal

User Experience

Users should be able to set encoding and framing options on sinks, analogously to the decoding and framing options on sources. These options uniformly control how the event payload is encoded. This distinction is important, as encoding for the sink specific protocol and the event payload are separate concerns. The payload should still be encoded according to the sink's protocol, and the sink should provide additional options if there are multiple protocols to choose from, e.g. under a protocol key. If there need to be any encoding options for the payload at all can be decided on a per-sink basis.

The fields containing event transformations in sinks on the current encoding options (only_fields, except_fields, timestamp_format) should also work with the new encoding options without breaking backwards compatibility. In the future, they may be moved to a dedicated key, e.g. transform or replaced by a mechanism provided by the upcoming schema support.

Implementation

The following common config structures will be provided, analogously to the decoding/framing configs:

rust
pub trait SerializerConfig: Debug + DynClone + Send + Sync {
    /// Builds a serializer from this configuration.
    ///
    /// Fails if the configuration is invalid.
    fn build(&self) -> crate::Result<BoxedSerializer>;
}
rust
pub trait FramingConfig: Debug + DynClone + Send + Sync {
    /// Builds a framer from this configuration.
    ///
    /// Fails if the configuration is invalid.
    fn build(&self) -> crate::Result<BoxedFramer>;
}

These can be built and combined to form an Encoder:

rust
/// An encoder that can encode structured events to byte messages.
pub struct Encoder {
    serializer: BoxedSerializer,
    framer: BoxedFramer,
}

Encoder implements tokio_util::codec::Encoder<Event>. Internally, events first go through the Serializer which implements tokio_util::codec::Encoder<Event> and are then handed over to the Framer which implements tokio_util::codec::Encoder<()>, such that serialized events can be framed in-place without additional allocations.

Sinks which don't need the framing capabilities of the Encoder, e.g. Kafka where the protocol is already message based, may only add a SerializerConfig to their config. The Serializer that can be built from the config implements tokio_util::codec::Encoder<Event> and therefore conforms to the same trait as Encoder.

Plan Of Attack

Incremental steps to execute this change. These will be converted to issues after the RFC is approved:

  • Implementation of support structures (Encoder, SerializerConfig, FramingConfig)
  • Implementation of selected encoders/framers (e.g. JSON and newline_delimited)
  • Implementation of compatibility layer for legacy reshaping via EncodingConfiguration::apply_rules
  • Example integration with first sink, e.g. socket or http which benefit most from generic encoding/framing options
  • Subsequent PRs for each integration to a sink

Surveyed Sinks

Overview for the current state of sinks regarding encoding:

sinkencoding config.apply_rulesnotes
aws_cloudwatch_logsEncodingConfig<Encoding { Text, Json }>✔︎Enveloped in rusoto_logs::InputLogEvent. Text reads message_key()
aws_kinesis_firehoseEncodingConfig<Encoding { Text, Json }>✔︎Enveloped in rusoto_firehose::Record that serializes to base64. Text reads message_key()
aws_kinesis_streamsEncodingConfig<Encoding { Text, Json }>✔︎Enveloped in rusoto_kinesis::PutRecordsRequestEntry. Text reads message_key()
aws_s3EncodingConfig<StandardEncodings { Text, Json, Ndjson }>✔︎Uses util::{RequestBuilder, Encoder, Compressor}. Text reads message_key()
aws_sqsEncodingConfig<Encoding { Text, Json }>✔︎Enveloped in EncodedEvent<SendMessageEntry>. Text reads message_key()
azure_blobEncodingConfig<Encoding { Ndjson, Text }>✔︎Enveloped in EncodedEvent<PartitionInnerBuffer>. Text reads message_key()
azure_monitor_logsEncodingConfigWithDefault<Encoding { Default }>✔︎Serializes to JSON. Enveloped in HTTP request
blackhole---
clickhouseEncodingConfigWithDefault<Encoding { Default }>✔︎Serializes to JSON. Enveloped in HTTP request
consoleEncodingConfig<Encoding { Text, Json }>✔︎Text reads message_key()
datadog_logsEncodingConfigFixed<DatadogLogsJsonEncoding>✔︎Doesn't provide options to encode the event payload separately from the protocol
datadog_events--
datadog_archives-Uses custom DatadogArchivesEncoding, which has a field inner: StandardEncodings which is not user-configurable
elasticsearchEncodingConfigFixed<ElasticsearchEncoder>✔︎Reshapes event internally and implements custom ElasticsearchEncoder to serialize for the Elasticsearch protocol
fileEncodingConfig<Encoding { Text, Ndjson }>✔︎Text reads message_key()
gcpEncodingConfig<StandardEncodings>✔︎Enveloped in HTTP request via sinks::util::request_builder::RequestBuild. Sets HTTP request header depending on encoding config
honeycomb-Embeds event as JSON under a data key. Enveloped in HTTP request
httpEncodingConfig<Encoding { Text, Ndjson, Json }>✔︎Enveloped in HTTP request. Request-level compression. Sets HTTP request header depending on encoding config
humioEncodingConfig<Encoding { Json, Text }>✔︎Wrapper, see splunk_hec for more information
influxdbEncodingConfigWithDefault<Encoding { Default }>✔︎Encoding (for the protocol envelope) is done by routing event fields either into a "tags" or "fields" map that are passed into an internal function influx_line_protocol
kafkaEncodingConfig<StandardEncodings { Text, Json, Ndjson }>✔︎Uses Encoder<Event> for StandardEncodings in encode_input, enveloped in KafkaRequest
logdnaEncodingConfigWithDefault<Encoding { Default }>✔︎Builds a message by manually picking fields from the event. Enveloped in HTTP request
lokiEncodingConfig<Encoding { Json, Text, Logfmt }>✔︎Uses reshaping. Text reads message_key(), Logfmt build a key-value string. Sink has config to preprocess event by adding/removing label fields and timestamp. Enveloped in HTTP request
natsEncodingConfig<Encoding { Text, Json }>✔︎Text reads message_key()
new_relic_logsEncodingConfigWithDefault<Encoding { Json }>✔︎Defers to HTTP sink, uses encoding config to reshape only and convert to JSON
papertrailEncodingConfig<Encoding { Text, Json }>✔︎Text reads message_key(). Serializes event using syslog and sends buffer via TCP
pulsarEncodingConfig<Encoding { Text, Json, Avro }>✔︎Text reads message_key(), Avro expects another dedicated key for the serialization schema. Serialized buffer is sent to Pulsar producer
redisEncodingConfig<Encoding { Text, Json }>✔︎Text reads message_key(). Encoded message is serialized to buffer
sematextEncodingConfigFixed<ElasticsearchEncoder>✔︎Wrapper, see elasticsearch for more information
socketEncodingConfig<Encoding { Text, Json }>✔︎Text reads message_key()
splunk_hecEncodingConfig<HecLogsEncoder { Json, Text }>✔︎Encoding is used to create a message according to the Splunk HEC protocol. There is no separate control over encoding the payload itself
vector---