Aggregation
The deep dive on aggregate and its siblings. Already familiar with
the conceptual split between aggregate and align? Skip the
Sampling overview and start here.
This page covers three shapes of "collapse many events into fewer":
aggregate(seq, mapping)— one row per bucket on aSequencegrid.reduce(mapping)— collapse a whole series into a single record. "Bucket size: all of it."arrayAggregate(col, reducer)— reduce one array cell per event.
Plus the live-side counterpart: LiveAggregation (incremental
bucketed aggregation over a LiveSeries).
For partition-by-column (groupBy) and long-to-wide reshape
(pivotByGroup), see Reshaping — those are
shape-changing ops, not value collapses.
- pondjs
Aggregator→aggregate()for batch,LiveAggregationfor streaming. Pipeline+Processor→ method chain onTimeSeries; a custom reducer function fills theProcessorrole.Index-keyed buckets →Interval-keyed events. The full rename table is on Concepts.
aggregate

import { Sequence, TimeSeries } from 'pond-ts';
const fiveMin = cpu.aggregate(Sequence.every('5m'), {
cpu: 'avg',
requests: 'sum',
host: 'last',
});
// fiveMin.schema[0].kind === 'interval'
// fiveMin.at(0)?.get('cpu') -> number | undefined
Mapping shapes
Two forms; mix them in one call:
cpu.aggregate(Sequence.every('5s'), {
// shorthand: "avg of cpu, output column same name"
cpu: 'avg',
// explicit { from, using, kind? } — rename the output column
cpu_p95: { from: 'cpu', using: 'p95' },
// multiple reducers over the same source column
requests_total: { from: 'requests', using: 'sum' },
requests_peak: { from: 'requests', using: 'max' },
});
Use the explicit form when you want to rename, run multiple reducers over one source column, or use a custom reducer whose output kind differs from the source.
Custom reducers
Reducers are
(values: ReadonlyArray<ColumnValue | undefined>) => ColumnValue | undefined.
Filter the undefineds yourself if needed; the input is the raw bucket.
Two shapes:
// Same column in, same column out — shorthand key.
cpu.aggregate(Sequence.every('5s'), {
cpu: (values) => {
const nums = values.filter((v): v is number => typeof v === 'number');
if (nums.length === 0) return undefined;
const sorted = [...nums].sort((a, b) => a - b);
return sorted[Math.floor(sorted.length * 0.999)];
},
});
// Renamed output — wrap the function in the explicit spec.
cpu.aggregate(Sequence.every('5s'), {
cpu_p99_9: {
from: 'cpu',
using: (values) => {
const nums = values.filter((v): v is number => typeof v === 'number');
if (nums.length === 0) return undefined;
const sorted = [...nums].sort((a, b) => a - b);
return sorted[Math.floor(sorted.length * 0.999)];
},
kind: 'number',
},
});
Bucket membership
Buckets are half-open: [begin, end). An event at exactly the
boundary lands in the later bucket. For TimeRange- or
Interval-keyed source events, the event contributes to every bucket
it overlaps under half-open overlap rules.
range
Default range is the source's own time extent. Pass options.range
to pin the output range explicitly — useful when you want a uniform
grid over a specific window even if the source doesn't fill it:
cpu.aggregate(
Sequence.every('5s'),
{ cpu: 'avg' },
{ range: new TimeRange({ start: 0, end: 60_000 }) },
);
Empty buckets in the padded range emit 0 for count and sum,
undefined for everything else. See
Reducer reference → Empty-bucket behavior.
reduce
Whole-series → scalar or record. Same reducer vocabulary as
aggregate, same empty-bucket behavior.
// Single-column form — wide return type.
const avgRaw = series.reduce('cpu', 'avg'); // ColumnValue | undefined
The single-column form returns the wide ColumnValue | undefined —
the type system can't see through a string reducer name. Use the
mapping form whenever you want narrowed types at the call site.
Mapping form returns a record with per-field narrowing:
const summary = series.reduce({
cpu: 'avg', // number | undefined
requests: 'sum', // number | undefined
host: 'unique', // ReadonlyArray<string> | undefined
});
Named outputs work too:
const quantiles = series.reduce({
p50: { from: 'latency', using: 'p50' },
p95: { from: 'latency', using: 'p95' },
p99: { from: 'latency', using: 'p99' },
});
See Reducer reference → Type narrowing on reduce for the full narrowing story.
arrayAggregate
Reduce one array cell per event — feeds each event's array to a reducer as if it were a bucket. Per-event in / per-event out.
// "Number of tags per event" — replace the array column with a count.
events.arrayAggregate('tags', 'count');
// Same, but keep the original array column too — append a new
// `tagCount` number column alongside.
events.arrayAggregate('tags', 'count', { as: 'tagCount' });
// Distinct values per event, deduped (keeps the array column).
events.arrayAggregate('tags', 'unique');
Full treatment, including custom reducers and the flatten-one-level semantics on array-kind sources, lives on Array columns.
LiveAggregation
The live counterpart to aggregate. Buckets close incrementally as
the watermark advances:
import { LiveAggregation, LiveSeries, Sequence } from 'pond-ts';
const live = new LiveSeries({
name: 'cpu',
schema,
retention: { maxAge: '1h' },
});
const agg = new LiveAggregation(live, Sequence.every('1m'), {
cpu: 'avg',
host: 'last',
});
// Closed buckets stream out.
agg.on('close', (event) => storeMetric(event));
// Provisional in-progress bucket on every contributing push.
agg.on('bucket', (event) => updateLiveTile(event));
// Snapshot at any time — includes the open bucket as the last row.
const snap = agg.snapshot();
LiveAggregation inherits the source's graceWindow for late-event
tolerance at bucket closure. See Live transforms →
Late-event scope for what
that does and doesn't propagate further down the pipeline.
Method comparison
| Method | Returns |
|---|---|
aggregate(seq, mapping) | TimeSeries<S> — one row per bucket |
reduce(col, reducer) | Wide ColumnValue | undefined |
reduce(mapping) | Record<key, narrowed> |
arrayAggregate(col, reducer) | TimeSeries<S> — one cell reduced per event |
LiveAggregation(src, seq, m) | Stream of closed buckets + provisional bucket events |
For groupBy and pivotByGroup see Reshaping — those
are partition / long-to-wide ops, not collapses.