Skip to main content

Error-Rate Dashboard

An end-to-end walkthrough that uses the full array-column stack: filter on tags, aggregate with unique and top(n), derive a scalar sidecar with arrayAggregate, fan out per host with arrayExplode, and repeat the whole thing against a LiveSeries for a streaming view.

The complete runnable version lives at packages/core/test/error-rate-dashboard.test.ts — every assertion here is backed by that test, so this page can't drift.

Scenario

A request stream from a fleet of API servers. Each event carries the request's host, path, a list of tags attached at request time ('slow', '5xx', 'retry', …), and its latency.

import { Sequence, TimeRange, TimeSeries, top } from 'pond-ts';

const schema = [
{ name: 'time', kind: 'time' },
{ name: 'host', kind: 'string' },
{ name: 'path', kind: 'string' },
{ name: 'tags', kind: 'array' }, // list of scalars per request
{ name: 'latency_ms', kind: 'number' },
] as const;

Fifteen hand-picked events span three minutes:

  • Minute 0 — one isolated /checkout 5xx on api-1.
  • Minute 1 — broad outage: 5 failing requests across api-1/api-2/api-3/api-4, hitting /checkout and /cart with 5xx / timeout / retry tags.
  • Minute 2 — recovery. No errors.

Step 1 — Filter to error events

arrayContainsAny keeps events whose tag column overlaps with any of the given values. Pure predicate, same output schema as the source:

const ERROR_TAGS = ['5xx', 'timeout', 'retry'];
const errors = series.arrayContainsAny('tags', ERROR_TAGS);
// errors.length === 6

If you wanted "only events tagged both 5xx AND slow," swap in arrayContainsAll; for a single value, arrayContains.

Step 2 — Per-minute rollup

The heart of the dashboard: one aggregate() call produces tail latency, distinct hosts, top offending paths, and most common failure modes for every minute.

const DASHBOARD_RANGE = new TimeRange({ start: 0, end: 180_000 });

const perMinute = errors.aggregate(
Sequence.every('1m'),
{
latency_ms: 'p95', // number: tail latency
host: 'unique', // array: which hosts saw errors this minute
path: top(3), // array: top 3 offending paths by frequency
tags: top(5), // array: most common failure modes
},
{ range: DASHBOARD_RANGE },
);

The output schema is:

// perMinute.schema
[
{ name: 'interval', kind: 'interval' },
{ name: 'latency_ms', kind: 'number' },
{ name: 'host', kind: 'array' },
{ name: 'path', kind: 'array' },
{ name: 'tags', kind: 'array' },
];

And the three buckets look like:

Minutep95 latencyhostspathstop tags
080['api-1']['/checkout']['5xx']
12580['api-1','api-2','api-3','api-4']['/checkout','/cart']['5xx','retry','timeout']
2undefined[][][]

Two things worth noticing:

  • top(n) flattens array-kind source columns. The tags column is already an array per event; top(5) counts elements across every array in the bucket, so "most common failure modes this minute" reads the natural way. unique behaves identically — set-union across all arrays.
  • The { range } option materializes empty buckets. Without it, aggregate() only emits buckets whose begin-point falls inside the filtered series' range, and minute 2 would silently drop out. With it, you get a uniform three-row grid for the chart, and reducers on empty buckets bottom out naturally (undefined, []).

Step 3 — Sidecar: distinct-host count without losing the list

arrayAggregate is the bridge between array columns and scalar columns: feed each row's array to a reducer. With { as: 'name' } it appends a new column and preserves the source array, so you can keep the host list for tooltips and chart detail while computing a hostCount for alert logic.

const withCount = perMinute.arrayAggregate('host', 'count', {
as: 'hostCount',
});
// withCount.schema: ...host (array), hostCount (number)
// minute 0: host=['api-1'] hostCount=1
// minute 1: host=['api-1','api-2','api-3','api-4'] hostCount=4
// minute 2: host=[] hostCount=0

The same call with a different reducer gives you sum, avg, min, max, median, stdev, unique, any p${number}, any top${number}, or a custom (values) => result function. Everything the reducer registry knows how to do on a bucket, arrayAggregate does on a single cell.

Step 4 — Alert view

Now hostCount is a plain number column, so any of the existing scalar operators apply. A one-line alert filter:

const broadOutages = withCount.filter((e) => {
const n = e.get('hostCount'); // number | undefined from schema
return n !== undefined && n >= 3;
});
// exactly 1 event: minute 1, hostCount = 4

Step 5 — Per-host fan-out for a small-multiples chart

To render "one mini-chart per host," the bucket-per-host shape is what you want. arrayExplode emits one output event per array element:

const perMinutePerHost = perMinute.arrayExplode('host');
// minute 0 -> 1 row (api-1)
// minute 1 -> 4 rows (api-1..api-4, same interval key)
// minute 2 -> 0 rows (empty array drops)
// Downstream: .groupBy('host') for per-host series.

If you'd rather keep the full host list attached to each fanned-out row — for example to draw the "4 hosts affected" badge on every per-host bar — pass { as }:

const fanned = perMinute.arrayExplode('host', { as: 'focusHost' });
// row carries both: host = ['api-1','api-2','api-3','api-4']
// focusHost = 'api-2' (one per row)

Live variant

The same composition works on a LiveSeries. Filter with a plain .filter() (live variants of the array-* predicates are still on the plan), then .aggregate() — the return value is a LiveAggregation that fires 'close' every time a bucket is finalized.

import { LiveSeries } from 'pond-ts';

const live = new LiveSeries({ name: 'requests', schema });

const errorStream = live
.filter((event) => {
const tags = event.get('tags') as ReadonlyArray<string> | undefined;
if (!Array.isArray(tags)) return false;
for (const needle of ERROR_TAGS) {
if (tags.includes(needle)) return true;
}
return false;
})
.aggregate(Sequence.every('1m'), {
host: 'unique',
path: top(3),
tags: top(5),
});

errorStream.on('close', (bucket) => {
// append a bar, fire an alert, push a notification, …
console.log({
start: bucket.begin(),
hosts: bucket.get('host'),
tags: bucket.get('tags'),
});
});

Four semantics to remember in the live layer:

  1. Buckets only materialize if an event lands in them. An all-clean minute 2 will not emit a closed bucket — use the batch path (toTimeSeries() + explicit { range }) when you need a uniform grid.
  2. 'close' fires when the watermark crosses a bucket's end. The next event past 120_000ms closes minute 1; until then the minute-1 bucket stays open and visible via snapshot() as a provisional bar.
  3. snapshot() vs closed(). closed() returns only finalized buckets (safe for alert pipelines). snapshot() includes the currently-open bucket so the current minute can render incrementally in the UI.
  4. The open bucket isn't reachable via .at() or .length. LiveAggregation implements LiveSource, but those accessors only see closed buckets — right after a few pushes and before the first bucket closes, they'll report an empty series. To read the current partial value, reach for agg.snapshot().last() or the 'bucket' event (see the Live Transforms guide → Reading the current value). If you want every bucket populated immediately, snapshot the source and run batch aggregate: live.toTimeSeries().aggregate(seq, mapping, { range }).

Putting it all together

import { LiveSeries, Sequence, TimeRange, top } from 'pond-ts';

const live = new LiveSeries({ name: 'requests', schema });

// Batch view, driven by toTimeSeries() on a render tick ------------------
function renderDashboard() {
const series = live.toTimeSeries();
const errors = series.arrayContainsAny('tags', ERROR_TAGS);

const perMinute = errors
.aggregate(
Sequence.every('1m'),
{
latency_ms: 'p95',
host: 'unique',
path: top(3),
tags: top(5),
},
{ range: dashboardRange() },
)
.arrayAggregate('host', 'count', { as: 'hostCount' });

renderBarChart(perMinute);
renderAlerts(
perMinute.filter((e) => {
const n = e.get('hostCount');
return n !== undefined && n >= 3;
}),
);
renderSmallMultiples(perMinute.arrayExplode('host').groupBy('host'));
}

// Live alert pipeline, driven by events ---------------------------------
live
.filter(isErrorEvent)
.aggregate(Sequence.every('1m'), { host: 'unique', tags: top(5) })
.on('close', (bucket) => {
const hosts = (bucket.get('host') ?? []) as ReadonlyArray<string>;
if (hosts.length >= 3) fireAlert(bucket);
});

The same primitives carry from the batch snapshot to the live pipeline to the alert handler. No custom reducer plumbing, no ad-hoc tag flattening — the array-column operators plus the existing reducer registry cover it.

The "right now" stat card

For the top-of-dashboard summary — "what's happening this moment" — a bucketed aggregate is overkill. Pair tail with reduce:

const summary = live
.toTimeSeries()
.arrayContainsAny('tags', ERROR_TAGS)
.tail('30s')
.reduce({
latency_ms: 'p95',
host: 'unique',
tags: top(5),
});
// summary:
// { latency_ms: number | undefined,
// host: ArrayValue | undefined,
// tags: ArrayValue | undefined }

In React this is a single hook call via useCurrent:

const { latency_ms, host, tags } = useCurrent(
live.filter(isErrorEvent),
{ latency_ms: 'p95', host: 'unique', tags: top(5) },
{ tail: '30s', throttle: 500 },
);

useCurrent returns a stable-shape record even when the source is empty, so the stat card can destructure directly without null-checks. See the Live Transforms → Reading the current value guide for when to reach for useCurrent vs LiveAggregation vs batch aggregate({ range }).

What gets tested

The E2E test at packages/core/test/error-rate-dashboard.test.ts exercises every step above against the same fixture used on this page:

  • arrayContainsAny filter count
  • output schema kinds through aggregate(..., { tags: top(5), host: 'unique' })
  • p95 values across buckets (including undefined on the empty minute)
  • arrayAggregate('host', 'count', { as }) schema + values
  • filter on hostCount >= 3 for the broad-outage alert
  • arrayExplode cardinality + per-host fan-out
  • arrayExplode({ as }) that preserves the source array alongside the scalar sibling
  • the full filter → aggregate pipeline on a LiveSeries producing the same closed-bucket values as the batch path