In a distributed system, state is akin to original sin - we can't help having it somewhere and it causes all sorts of problems because it's there. This is especially true for distributed state - that is context that needs to be shared across multiple nodes in a system. In this post, we dive into how we manage state at Nimbus by leveraging Vector, a high-performance tool for building observability pipelines, to optimize log aggregations while ensuring availability and scalability.
Context
Nimbus help customers optimize their log volume by aggregating similar logs into an aggregate log. During this process, Nimbus reduces, on average, log volume by 80% and log size by 30%. When aggregating logs, a subset of aggregations are stateful - that it to say it matters what node an aggregation is routed to.
Take the simplified example of logs with a common trace id:
{trace_id:1, msg: start checkout, timestamp: 1}
{trace_id:1, msg: apply discount, timestamp: 2}
{trace_id:1, msg: verify payment, timestamp: 3}
{trace_id:1, msg: complete transaction, timestamp: 4}
The same logs after stateful aggregation (aka all logs being processed by the same node):
{trace_id:1, msg: [start checkout, apply discount, verify payment, complete transaction], timestamp: 1, timestamp_end: 4, size: 4}
Note that during the aggregation, we drop the individual timestamps and keep just the start and end timestamp of the group. If aggregation were not stateful, it can result in the logs being split across multiple aggregations.
An example of the same logs without stateful aggregation:
{trace_id:1, msg: [apply discount, verify payment ], timestamp: 1, timestamp_end: 3, size: 2}
{trace_id:1, msg: [start checkout, complete transaction], timestamp: 2, timestamp_end: 4, size: 2}
Not only is the latter result less efficient in terms of size and volume reduction, it also causes operational issues. Because logs are now interleaved between groups, it is hard to see the proper sequence of events and make it difficult to do log based monitors and dashboards.
One way of getting around this problem is to run a single node to do all aggregations. This can get you quite far but come with scalability, availability and durability issues due to having a single point of failure. Another solution is to accept sub-optimal log aggregations and letting any node manage any aggregation. But if neither of these options are acceptable, then the system itself needs to coordinate traffic so that logs that belong to a common aggregation are processed by the same node. This system is what we'll be describing in this post.
Nimbus Architecture
A high level overview of the Nimbus ingestion pipeline consists of the following components
--------- Vector Fleet ----------
Load Balancer -> Router Fleet -> | Buffer Fleet -> Aggregator Fleet |
Traffic is fronted by an application load balancer. This is forwarded to our routing fleet which is responsible of directing customer traffic to the correct vector fleet - each customer has a separate fleet of buffers and aggregators.
Buffers are responsible for ingesting data and writing (buffering) it to disk before forwarding them to the aggregator. Aggregators are responsible for aggregations. All services are deployed across multiple availability zones and are orchestrated using kubernetes via EKS
The buffer fleet is a fleet of vector instances that have both the aggregation rules as well as the aggregator endpoints of a given customer.
Aggregations
Aggregations are defined in YAML and are built on top of vector's reduce transform.
An example of an aggregation rule:
id: cd0763c8-c4b5-4a61-aa14-51bcdee62db2
process_when: <list of predicates that if true, qualify a log for aggregation>
group_by: <optional list of keys that aggregation will be grouped by>
merge_strategies: <optional key specific strategies for aggregating keys>
...
To implement stateful aggregation, we need to ensure logs belonging to an aggregation groups always get aggregated by the same aggregator. To identify logs that belong to the same aggregation group, we create a hash of their id
and group_by
values to uniquely identify an aggregation. We then rely on consistent hashing to route traffic to the correct aggregator. The set of aggregators is initially hardcoded doing service bootstrapping but can change due scaling events. This means we also need a mechanism of service discovery so that buffers can route traffic correctly after scaling.
Neither service discovery or consistent hashing are built in features for vector, so to make this possible, we (ab)use the Vector Remap Language (VRL) to carry out the logic.
Consistent Hashing
Consistent hashing is a specialized hashing technique that minimizes work when resizing hash slots. It works by mapping hash slot as points in a circle. When a new value is added, its hash is mapped to the circle and it is grouped to the nearest preceding slot.
The following is what consistent hashing might look like for an aggregation with a single group_by
key: trace_id
.
transforms:
route/l/checkout_pre:
type: remap
source: |
rule = "cd0763c8-c4b5-4a61-aa14-51bcdee62db2"
point = abs(mod(seahash(join!([rule, to_string!(.trace_id)])), 360))
if (point >= 0 && point < 120) {
.__nimroute = "agg0"
} else if (point >= 120 && point < 240) {
.__nimroute = "agg1"
} else {
.__nimroute = "agg2"
}
out/l/router:
type: route
route:
agg1: .__nimroute == "agg0"
agg2: .__nimroute == "agg1"
agg3: .__nimroute == "agg2"
inputs:
- route/l/checkout_pre
First, the join
function concatenates the rule name and the trace id value into a single string which is then hashed into a number using seahash. We then modulo the result by 360 and take the absolute value to get to a point in the circle. The series of if/else
statements afterwards "map" that point to the appropriate "slot", in our case, this is assigning a custom property to the log called __nimroute
. Finally, the __nimroute
parameter is then used by the route
component to create different routes which will be used to route traffic to a specific aggregator.
Service Discovery
The Nimbus configuration service is responsible for generating vector configuration with the consistent hashing logic as well as aggregator endpoints. Because aggregators run as k8s statefulsets, they have stable network ids. This means that endpoint information can be constructed based on the aggregator naming scheme $CUSTOMER_ID-agg-$ORDINAL.vector.svc.cluster.local
and the number of replicas in the stateful set.
Javascript code for getting service endpoints:
const services = [];
for (let i=0; i <REPLICAS; i++) {
services.push(`${CUSTOMER_ID}-agg-${i}.vector.svc.cluster.local`)
}
The following are sinks generated for our current example with three replicas.
sinks:
sink/a/agg0:
type: vector
address: $CUSTOMER_ID-agg-0.vector.svc.cluster.local
inputs:
- out/l/router.agg0
sink/a/agg2:
type: vector
address: $CUSTOMER_ID-agg-1.vector.svc.cluster.local
inputs:
- out/l/router.agg1
sink/a/agg2:
type: vector
address: $CUSTOMER_ID-agg-2.vector.svc.cluster.local
inputs:
- out/l/router.agg2
Etc
Note that in this system, aggregators still represent a single point of failure for the set of logs that they are responsible for. In order to mitigate this, we rely on aggregators buffering data to disk (synced acrossed multiple availability zones using EFS) and having buffers retry sending data until aggregators come back online. This works for temporary restarts but can be problematic for longer outages due to fixed buffer sizes on the buffers. We have a secondary mechanism of also buffering data into s3 and a replay mechanism to re-send the data for the latter case. More details on this in a future post.
Final Thoughts
In distributed systems, managing state is often a necessary evil to deliver useful results. By combining dynamic config generation and the versatility of VRL, it is possible to implement well-known techniques such as consistent hashing and service discovery within Vector without the need for external services. Even if state can't be entirely avoided, we can design systems that minimize complexity when handling it.