rfcs/2021-07-19-8216-multiple-pipelines.md
Large Vector users often require complex Vector topologies to facilitate the collection and processing of data from many different upstream sources. Currently, this results in large Vector configuration files that are hard to manage, especially across different teams. This RFC lays out the concept of pipelines, a structured way to organize configuration that makes Vector a better candidate for use cases involving widespread collaboration on configuration.
This change will introduce the concept of pipelines to users. A pipeline is defined as:
id of the pipelinePipelines will be loaded from a pipelines sub-directory relative to the Vector configuration directory (e.g., /etc/vector/pipelines). Therefore, if a user changes the location of the Vector configuration directory they will also change the pipelines directory path. They are coupled.
The pipelines directory will contain all pipelines represented as individual files. For simplicity, and to ensure users do not overcomplicate pipeline management, sub-directories/nesting are not allowed. This is inspired by Terraform's single-level directory nesting, which has been a net positive for simple management of large Terraform projects.
Each pipeline file is a processing subset of a larger Vector configuration file. Therefore, it follows the same syntax as Vector's configuration (toml, yaml, and json). Each pipeline will have an id derived the name of the file without the extension. For example, the pipeline defined in load-balancer.yml will have load-balancer as its id.
Pipelines have access to any components defined in the root configuration directory. For example, if the transform foo is defined in /etc/vector/bar.toml, it will be accessible by the pipeline /etc/vector/pipelines/pipeline.toml, but if a transform bar is defined in /etc/vector/pipelines/another-pipeline.toml, it will not be accessible by other pipelines.
If no pipeline is defined, Vector behaves as if the feature didn't exist. This way, a configuration from a version without the pipeline feature will keep working. If a pipeline file is left empty, Vector behaves as if it doesn't exist.
If any of the following constraints are violated, Vector will error on boot:
load-balancer.yml and load-balancer.json).If the violation occurs during a reload, an error will be triggered and handled in the same fashion as other reload errors.
As mentioned in the previous section, a pipeline is just a set of transforms.
To be able to forward the events going through the pipeline to a sink, we'll add a new option outputs on the pipeline's transforms that will simply specify where the transforms events are redirected to.
outputs options are used solely to build the topology and represent an interface between the transform and the external sinks.
A pipeline will have the following internal representation before building the topology.
struct PipelineTransform {
inner: TransformOuter,
outputs: Vec<String>,
}
struct Pipeline {
id: String,
transforms: Map<String, PipelineTransform>,
}
Which corresponds to the following configuration file:
# /etc/vector/pipelines/pipeline.toml
[transforms.foo]
type = "remap"
inputs = ["from-root"]
outputs = ["dc1", "dc2"]
# ...
[transforms.bar]
type = "remap"
inputs = ["foo"]
outputs = ["dc-us", "dc-eu"]
# ...
The outputs option is made to forward the events from inside the pipeline to an external component.
If we look deeper at the configuration building process, the configuration compiler will require the pipelines to build the configuration.
To do so, we'll need to implement a Pipeline from the previous section. We'll then update the compile function to build a Config containing the required pipelines components. The compiler will load the pipelines' transforms and add the outputs to the corresponding sinks.
The components coming from the pipeline will be cloned inside the final Config, in the IndexMap containing the transforms and the outputs from the pipeline components will be added to the referring components input field.
For example, the following configuration and pipeline, and its equivalent once built.
# /etc/vector/vector.toml
[sources.in]
# ...
[sinks.out]
# ...
# /etc/vector/pipelines/foo.toml
[transforms.bar]
inputs = ["in"]
outputs = ["out"]
# ...
# equivalent once compiled
[sources.in]
# ...
[transforms.foo#baz]
inputs = ["in"]
[sinks.out]
# the # notation is just a representation of the pipeline namespace
inputs = ["foo#baz"]
# ...
In order to avoid internal conflicts with the pipeline components ids, the components ids internal representation will be changed to the following struct
struct ComponentId {
name: String,
scope: ComponentScope,
}
enum ComponentScope {
Global,
Pipeline(String),
}
That way, if a transform foo is defined in the pipeline bar and in the pipeline baz, they will not conflict.
Users should be able to observe and monitor individual pipelines.
This means relevant metrics coming from the internal_metrics source must contain a pipeline_id tag referring to the pipeline's id.
This approach would extend the RFC 2064 by just adding pipeline_id to the context.
In Vector, once the topology is built from the configuration, every component is encapsulated in a Task that intercepts an incoming event and processes it accordingly. This task also keeps track of its internal metrics and finally emits internal_metrics events.
To add the pipeline information to the task, we need to change the name parameter to id: ComponentId in the Task::new method.
pub struct Task {
#[pin]
inner: BoxFuture<'static, Result<TaskOutput, ()>>,
id: ComponentId,
typetag: String,
}
impl Task {
pub fn new<S, Fut>(id: ComponentId, typetag: S, inner: Fut) -> Self
where
S: Into<String>,
Fut: Future<Output = Result<TaskOutput, ()>> + Send + 'static,
{
Self {
inner: inner.boxed(),
id,
typetag: typetag.into(),
}
}
}
That way, when vector spawns a new transform task, it will be able to add the optional pipeline information to the span.
let span = error_span!(
"transform",
component_kind = "transform",
component_name = %task.name(),
component_type = %task.typetag(),
pipeline_id = %task.pipeline_id(),
);
Doing so, each time the task will emit an internal event, it will be populated by the optional pipeline_id.
Why is this change worth it?
sources, sinks and transforms, and expose them to be used by the devs.internal_metrics source.What is the impact of not doing this?
How does this position us for success in the future?
This would imply some duplication if a transform is used in multiple configuration files.
Anybody that has write access to Vector's configuration folder could add a sink or source.
Adding a different folder would allow to separate concerns between a root config and a pipeline.
internal_metrics.Writing a different tool would increase the difficulty of using this feature.
Doesn't add access control regarding who can edit the root config.
This doesn't allow to add internal metrics to specific transforms and monitor them, other than by adding a dummy filter that we could monitor.
Doesn't add access control regarding who can edit the root config.
Adds a lot of complexity and would add some constraints regarding resources that can only be used once. Doesn't block to create other sources/sinks.