An Opinionated Guide to Managing Observability Pipelines
A step by step guide to take control of your observability data
Observability data, when left on its own, behaves like entropy - it increases without bounds (until you run out of usable energy money).
This is especially apparent this last decade with the rise of kubernetes and micro services which emit orders of magnitude more telemetry than the humble monoliths that came before it. With the increase in data volume comes the need for new tools to manage it. Enter the observability pipeline.
Observability pipelines helps organizations collect, transform, and send telemetry from any source to any destination. They are becoming increasingly popular among the enterprise because they allow organizations to control their data and limit volume before sending it upstream to vendors.
Most pipeline solutions rely on YAML for configuration. Some have a GUI that helps with generating YAML. But regardless how you do it, managing complex pipeline topologies using YAML quickly becomes error prone and hard to scale.
This post goes over a step by step example of building a scalable pipeline configuration. We'll start with a basic example of sending and receiving data and build upon it layer by layer to get to a pipeline that can handle any number of components and branching topologies.
Vector
For the purposes of this post, we will be writing pipeline configuration for Vector, a high performance pipeline written in rust. We'll also limit the examples to working with log data. The general principals in this post will be applicable to other pipeline implementations and signals.
1 - Send and Receive
This is a basic pipeline configured to send and receive data. In this case, its collecting logs from the datadog agent and sending it to the datadog vendor destination.
sources:
source/a/dd_agent:
type: datadog_agent
address: 0.0.0.0:8020
multiple_outputs: true
sinks:
sink/l/dd:
site: ${DD_SITE}
type: datadog_logs
inputs:
- source/a/dd_agent.logs
compression: gzip
default_api_key: ${DD_API_KEY}
A few things to note about this configuration:
vector lets you dynamically inject environmental variables to the configuration. we use this to add add secrets and configurable values such as the datadog site and api key
we're using a deliberate naming scheme for pipeline components. The convention is
$stage/$signal/$detailed_name
.$stage
describes the pipeline stage. asource
receives data. asink
sends data.$signal
denotes the type of data.a
applies to all data.l
. is logs.m
is metrics.t
is traces$detailed_name
gives further detail about the$stage
or the function of the particular component
2 - Basic Pipeline
The previous example was somewhat contrived since you could just send logs via an agent without using a pipeline. This example adds a transformation step to shape the data before sending it upstream.
sources:
source/a/dd_agent:
type: datadog_agent
address: 0.0.0.0:8020
multiple_outputs: true
transforms:
in/l/remap:
type: remap
inputs:
- source/a/dd_agent.logs
source: |
.message = parse_json(.message) ?? .message
sinks:
sink/l/dd:
site: ${DD_SITE}
type: datadog_logs
inputs:
- in/l/remap
compression: gzip
default_api_key: ${DD_API_KEY}
This particular transformation will try to parse the message
field as json. If it fails, it will leave the message
field as is. Note that this transform applies to all incoming logs and has the in/
$stage
prefix.
3 - Routing and Aggregation
In real world deployments, you'll tend to have more than one transformation. You'll also not want to apply all transformations to all data all the time. This example adds a second transformation that is conditionally applied via a router component.
sources:
source/a/dd_agent:
type: datadog_agent
address: 0.0.0.0:8020
multiple_outputs: true
transforms:
in/l/remap:
type: remap
inputs:
- source/a/dd_agent.logs
source: |
.message = parse_json(.message) ?? .message
in/l/route:
type: route
route:
itemrefresh: (.service == "itemrefresh")
inputs:
- in/l/remap
route/l/itemrefresh:
type: reduce
inputs:
- in/l/route.itemrefresh
group_by:
- host
max_events: 300
sinks:
sink/l/dd:
site: ${DD_SITE}
type: datadog_logs
inputs:
- in/l/route._unmatched
- route/l/itemrefresh
compression: gzip
default_api_key: ${DD_API_KEY}
In this configuration, the router is forwarding logs where service = itemrefresh
to the reduce component for aggregation. Router components have a router/
$stage
prefix. Non-matching logs are sent to the _unmatched
path. All log paths are then fed into the datadog sink for sending to datadog.
Vector provides different merge_strategies for aggregating individual message fields. In this case, we rely on the default behavior, which keeps the first string field as well as the first and last timestamp field in the aggregated output.
4 - Multiple Routes
This expands on the previous example and adds a second route that forwards logs to a different reduce component.
sources:
source/a/dd_agent:
type: datadog_agent
address: 0.0.0.0:8020
multiple_outputs: true
transforms:
in/l/remap:
type: remap
inputs:
- source/a/dd_agent.logs
source: |
.message = parse_json(.message) ?? .message
in/l/route:
type: route
route:
itemrefresh: (.service == "itemrefresh")
checkout: (.service == "checkout")
inputs:
- in/l/remap
route/l/itemrefresh:
type: reduce
inputs:
- in/l/route.itemrefresh
group_by:
- host
max_events: 300
route/l/checkout:
type: reduce
inputs:
- in/l/route.checkout
group_by:
- customerId
- transactionId
max_events: 300
merge_strategies:
customerId: discard
transactionId: discard
sinks:
sink/l/dd:
site: ${DD_SITE}
type: datadog_logs
inputs:
- in/l/route._unmatched
- route/l/*
compression: gzip
default_api_key: ${DD_API_KEY}
Vector accepts adding wildcards to route names which makes it possible for us to combine data from all routers into a single path route/l/*
. Because each component has the signal type as part of its identifier, its easy to select components on a per signal basis.
5 - Complex Routes
In some cases, you might need to apply multiple transformations to some data. In this example, we pull up some fields nested under message to the top level. This is actually required as we use these fields in the group_by
clause of the reduce component and vector does not support grouping by nested fields.
sources:
source/a/dd_agent:
type: datadog_agent
address: 0.0.0.0:8020
multiple_outputs: true
transforms:
in/l/remap:
type: remap
inputs:
- source/a/dd_agent.logs
source: |
.message = parse_json(.message) ?? .message
in/l/route:
type: route
route:
itemrefresh: (.service == "itemrefresh")
checkout: (.service == "checkout")
inputs:
- in/l/remap
route/l/itemrefresh_exec:
type: reduce
inputs:
- in/l/route.itemrefresh
group_by:
- host
max_events: 300
route/l/checkout_pre:
type: remap
inputs:
- in/l/route.checkout
source: |-
.transactionId = .message.transactionId
.customerId = .message.customerId
del(.message.transactionId)
del(.message.customerId)
route/l/checkout/exec:
type: reduce
inputs:
- route/l/checkout_pre
group_by:
- customerId
- transactionId
max_events: 300
merge_strategies:
customerId: discard
transactionId: discard
sinks:
sink/l/dd:
site: ${DD_SITE}
type: datadog_logs
inputs:
- in/l/route._unmatched
- route/l/*exec
compression: gzip
default_api_key: ${DD_API_KEY}
Note that we change the naming convention for components after the router. Pre-processing transformations have the _pre
suffix appended and the main transformation has the _exec
suffix. This lets us continue to aggregate all the routes using a single path route/l/*exec
in the sink.
Review
At this point, we've built a pipeline configuration that is both flexible and easy to reason about.
Global transformations can be applied by adding them at the beginning of the pipeline
Scoped transformations can be applied by adding them after a router component
Additional transformations can be added using suffixes before and after other transformations
All inputs and outputs are well defined with well structured names that make it easy to wire up and diagnose
Paths from any stage can be aggregated by using
$stage/*
and can be further filtered by signal$stage/[l|m|t]/*
The well defined naming scheme also makes it easy to do diagnostics. Vector emits the component identifier as a label on all metrics which means that it is now easy to graph related metrics in the same dashboard. The following are examples of some helpful queries written in PromQL syntax:
get metric from all sources:
$metric_name{component_id=~"source/*"}
get metric from all components dealing with logs:
$metric_name{component_id=~".+/l/.+"}
get metric from all routing pre-processing components:
$metric_name{component_id=~".+/.+/.+_pre"}
Lastly, Vector makes it easy to capture the inputs and outputs of different components and inspect data flows via vector tap $component_id
. This command also takes wildcards which means that you can easily tap components by $stage
and $signal
.
An example of tapping all transformations that are scoped to a route:
vector tap --outputs-of "route/l*"
Additional Topics
For the sake of brevity, we left out everything from this post that doesn't directly relate to managing configuration.
Other important topics:
buffers, batches, and timeout configuration for optimizing data durability
power and perils of using custom transformations via scripting languages like VRL and OTTL
high available pipeline deployment topologies
day two pipeline operations - zero downtime updates and migrations
Look out for future posts that will dive further into these topics.
Final Thoughts
Pipelines are a recent but powerful addition to the modern observability stack. They help organizations limit vendor lock in and take full control over their observability data.
If you want to have the benefits of a pipeline without the toil of operations, consider Nimbus, a fully managed observability pipeline that analyzes your traffic and automatically optimizes your data. On average, Nimbus helps customers reduce log volume by 60% without dropping data.
At the end of the day, observability data should help you fight fires, not be the cause of internal fire fighting - if you find yourself on the wrong end of that equation, consider updating the plumbing and adopting a pipeline.
Hi Kevein,
Thumbs up for the detailed explanation about the log shipping alternatives.
Have you by chance tried adding custom tags to the logs to. be shipped to datadog_logs sink? If so how did you achieve that?