Error-Rate Dashboard
An end-to-end walkthrough that uses the full array-column stack: filter
on tags, aggregate with unique and top(n), derive a scalar sidecar
with arrayAggregate, fan out per host with arrayExplode, and repeat
the whole thing against a LiveSeries for a streaming view.
The complete runnable version lives at
packages/core/test/error-rate-dashboard.test.ts
— every assertion here is backed by that test, so this page can't drift.
Scenario
A request stream from a fleet of API servers. Each event carries the
request's host, path, a list of tags attached at request time ('slow',
'5xx', 'retry', …), and its latency.
import { Sequence, TimeRange, TimeSeries, top } from 'pond-ts';
const schema = [
{ name: 'time', kind: 'time' },
{ name: 'host', kind: 'string' },
{ name: 'path', kind: 'string' },
{ name: 'tags', kind: 'array' }, // list of scalars per request
{ name: 'latency_ms', kind: 'number' },
] as const;
Fifteen hand-picked events span three minutes:
- Minute 0 — one isolated
/checkout5xx onapi-1. - Minute 1 — broad outage: 5 failing requests across
api-1/api-2/api-3/api-4, hitting/checkoutand/cartwith5xx/timeout/retrytags. - Minute 2 — recovery. No errors.
Step 1 — Filter to error events
arrayContainsAny keeps events whose tag column overlaps with any of the
given values. Pure predicate, same output schema as the source:
const ERROR_TAGS = ['5xx', 'timeout', 'retry'];
const errors = series.arrayContainsAny('tags', ERROR_TAGS);
// errors.length === 6
If you wanted "only events tagged both 5xx AND slow," swap in
arrayContainsAll; for a single value, arrayContains.
Step 2 — Per-minute rollup
The heart of the dashboard: one aggregate() call produces tail latency,
distinct hosts, top offending paths, and most common failure modes for
every minute.
const DASHBOARD_RANGE = new TimeRange({ start: 0, end: 180_000 });
const perMinute = errors.aggregate(
Sequence.every('1m'),
{
latency_ms: 'p95', // number: tail latency
host: 'unique', // array: which hosts saw errors this minute
path: top(3), // array: top 3 offending paths by frequency
tags: top(5), // array: most common failure modes
},
{ range: DASHBOARD_RANGE },
);
The output schema is:
// perMinute.schema
[
{ name: 'interval', kind: 'interval' },
{ name: 'latency_ms', kind: 'number' },
{ name: 'host', kind: 'array' },
{ name: 'path', kind: 'array' },
{ name: 'tags', kind: 'array' },
];
And the three buckets look like:
| Minute | p95 latency | hosts | paths | top tags |
|---|---|---|---|---|
| 0 | 80 | ['api-1'] | ['/checkout'] | ['5xx'] |
| 1 | 2580 | ['api-1','api-2','api-3','api-4'] | ['/checkout','/cart'] | ['5xx','retry','timeout'] |
| 2 | undefined | [] | [] | [] |
Two things worth noticing:
top(n)flattens array-kind source columns. Thetagscolumn is already an array per event;top(5)counts elements across every array in the bucket, so "most common failure modes this minute" reads the natural way.uniquebehaves identically — set-union across all arrays.- The
{ range }option materializes empty buckets. Without it,aggregate()only emits buckets whose begin-point falls inside the filtered series' range, and minute 2 would silently drop out. With it, you get a uniform three-row grid for the chart, and reducers on empty buckets bottom out naturally (undefined,[]).
Step 3 — Sidecar: distinct-host count without losing the list
arrayAggregate is the bridge between array columns and scalar
columns: feed each row's array to a reducer. With { as: 'name' } it
appends a new column and preserves the source array, so you can keep
the host list for tooltips and chart detail while computing a
hostCount for alert logic.
const withCount = perMinute.arrayAggregate('host', 'count', {
as: 'hostCount',
});
// withCount.schema: ...host (array), hostCount (number)
// minute 0: host=['api-1'] hostCount=1
// minute 1: host=['api-1','api-2','api-3','api-4'] hostCount=4
// minute 2: host=[] hostCount=0
The same call with a different reducer gives you sum, avg, min, max,
median, stdev, unique, any p${number}, any top${number}, or a custom
(values) => result function. Everything the reducer registry knows
how to do on a bucket, arrayAggregate does on a single cell.
Step 4 — Alert view
Now hostCount is a plain number column, so any of the existing
scalar operators apply. A one-line alert filter:
const broadOutages = withCount.filter((e) => {
const n = e.get('hostCount'); // number | undefined from schema
return n !== undefined && n >= 3;
});
// exactly 1 event: minute 1, hostCount = 4
Step 5 — Per-host fan-out for a small-multiples chart
To render "one mini-chart per host," the bucket-per-host shape is what
you want. arrayExplode emits one output event per array element:
const perMinutePerHost = perMinute.arrayExplode('host');
// minute 0 -> 1 row (api-1)
// minute 1 -> 4 rows (api-1..api-4, same interval key)
// minute 2 -> 0 rows (empty array drops)
// Downstream: .groupBy('host') for per-host series.
If you'd rather keep the full host list attached to each fanned-out row
— for example to draw the "4 hosts affected" badge on every per-host
bar — pass { as }:
const fanned = perMinute.arrayExplode('host', { as: 'focusHost' });
// row carries both: host = ['api-1','api-2','api-3','api-4']
// focusHost = 'api-2' (one per row)
Live variant
The same composition works on a LiveSeries. Filter with a plain
.filter() (live variants of the array-* predicates are still on the
plan), then .aggregate() — the return value is a LiveAggregation
that fires 'close' every time a bucket is finalized.
import { LiveSeries } from 'pond-ts';
const live = new LiveSeries({ name: 'requests', schema });
const errorStream = live
.filter((event) => {
const tags = event.get('tags') as ReadonlyArray<string> | undefined;
if (!Array.isArray(tags)) return false;
for (const needle of ERROR_TAGS) {
if (tags.includes(needle)) return true;
}
return false;
})
.aggregate(Sequence.every('1m'), {
host: 'unique',
path: top(3),
tags: top(5),
});
errorStream.on('close', (bucket) => {
// append a bar, fire an alert, push a notification, …
console.log({
start: bucket.begin(),
hosts: bucket.get('host'),
tags: bucket.get('tags'),
});
});
Four semantics to remember in the live layer:
- Buckets only materialize if an event lands in them. An all-clean
minute 2 will not emit a closed bucket — use the batch path
(
toTimeSeries()+ explicit{ range }) when you need a uniform grid. 'close'fires when the watermark crosses a bucket's end. The next event past120_000mscloses minute 1; until then the minute-1 bucket stays open and visible viasnapshot()as a provisional bar.snapshot()vsclosed().closed()returns only finalized buckets (safe for alert pipelines).snapshot()includes the currently-open bucket so the current minute can render incrementally in the UI.- The open bucket isn't reachable via
.at()or.length.LiveAggregationimplementsLiveSource, but those accessors only see closed buckets — right after a few pushes and before the first bucket closes, they'll report an empty series. To read the current partial value, reach foragg.snapshot().last()or the'bucket'event (see the Live Transforms guide → Reading the current value). If you want every bucket populated immediately, snapshot the source and run batch aggregate:live.toTimeSeries().aggregate(seq, mapping, { range }).
Putting it all together
import { LiveSeries, Sequence, TimeRange, top } from 'pond-ts';
const live = new LiveSeries({ name: 'requests', schema });
// Batch view, driven by toTimeSeries() on a render tick ------------------
function renderDashboard() {
const series = live.toTimeSeries();
const errors = series.arrayContainsAny('tags', ERROR_TAGS);
const perMinute = errors
.aggregate(
Sequence.every('1m'),
{
latency_ms: 'p95',
host: 'unique',
path: top(3),
tags: top(5),
},
{ range: dashboardRange() },
)
.arrayAggregate('host', 'count', { as: 'hostCount' });
renderBarChart(perMinute);
renderAlerts(
perMinute.filter((e) => {
const n = e.get('hostCount');
return n !== undefined && n >= 3;
}),
);
renderSmallMultiples(perMinute.arrayExplode('host').groupBy('host'));
}
// Live alert pipeline, driven by events ---------------------------------
live
.filter(isErrorEvent)
.aggregate(Sequence.every('1m'), { host: 'unique', tags: top(5) })
.on('close', (bucket) => {
const hosts = (bucket.get('host') ?? []) as ReadonlyArray<string>;
if (hosts.length >= 3) fireAlert(bucket);
});
The same primitives carry from the batch snapshot to the live pipeline to the alert handler. No custom reducer plumbing, no ad-hoc tag flattening — the array-column operators plus the existing reducer registry cover it.
The "right now" stat card
For the top-of-dashboard summary — "what's happening this moment" — a
bucketed aggregate is overkill. Pair tail with reduce:
const summary = live
.toTimeSeries()
.arrayContainsAny('tags', ERROR_TAGS)
.tail('30s')
.reduce({
latency_ms: 'p95',
host: 'unique',
tags: top(5),
});
// summary:
// { latency_ms: number | undefined,
// host: ArrayValue | undefined,
// tags: ArrayValue | undefined }
In React this is a single hook call via
useCurrent:
const { latency_ms, host, tags } = useCurrent(
live.filter(isErrorEvent),
{ latency_ms: 'p95', host: 'unique', tags: top(5) },
{ tail: '30s', throttle: 500 },
);
useCurrent returns a stable-shape record even when the source is
empty, so the stat card can destructure directly without null-checks.
See the
Live Transforms → Reading the current value
guide for when to reach for useCurrent vs LiveAggregation vs batch
aggregate({ range }).
What gets tested
The E2E test at
packages/core/test/error-rate-dashboard.test.ts
exercises every step above against the same fixture used on this page:
arrayContainsAnyfilter count- output schema kinds through
aggregate(..., { tags: top(5), host: 'unique' }) - p95 values across buckets (including
undefinedon the empty minute) arrayAggregate('host', 'count', { as })schema + valuesfilteronhostCount >= 3for the broad-outage alertarrayExplodecardinality + per-host fan-outarrayExplode({ as })that preserves the source array alongside the scalar sibling- the full filter → aggregate pipeline on a
LiveSeriesproducing the same closed-bucket values as the batch path