Skip to main content

Aggregation

The deep dive on aggregate and its siblings. Already familiar with the conceptual split between aggregate and align? Skip the Sampling overview and start here.

This page covers three shapes of "collapse many events into fewer":

Plus the live-side counterpart: LiveAggregation (incremental bucketed aggregation over a LiveSeries).

For partition-by-column (groupBy) and long-to-wide reshape (pivotByGroup), see Reshaping — those are shape-changing ops, not value collapses.

Coming from pondjs?
  • pondjs Aggregatoraggregate() for batch, LiveAggregation for streaming.
  • Pipeline + Processor → method chain on TimeSeries; a custom reducer function fills the Processor role.
  • Index-keyed buckets → Interval-keyed events. The full rename table is on Concepts.

aggregate

aggregate over a 5-minute grid with a sum reducer: irregularly-
spaced request events land in three sequence buckets; each bucket
emits one output event whose value is the reducer applied to every
source event inside

import { Sequence, TimeSeries } from 'pond-ts';

const fiveMin = cpu.aggregate(Sequence.every('5m'), {
cpu: 'avg',
requests: 'sum',
host: 'last',
});
// fiveMin.schema[0].kind === 'interval'
// fiveMin.at(0)?.get('cpu') -> number | undefined

Mapping shapes

Two forms; mix them in one call:

cpu.aggregate(Sequence.every('5s'), {
// shorthand: "avg of cpu, output column same name"
cpu: 'avg',

// explicit { from, using, kind? } — rename the output column
cpu_p95: { from: 'cpu', using: 'p95' },

// multiple reducers over the same source column
requests_total: { from: 'requests', using: 'sum' },
requests_peak: { from: 'requests', using: 'max' },
});

Use the explicit form when you want to rename, run multiple reducers over one source column, or use a custom reducer whose output kind differs from the source.

Custom reducers

Reducers are (values: ReadonlyArray<ColumnValue | undefined>) => ColumnValue | undefined. Filter the undefineds yourself if needed; the input is the raw bucket.

Two shapes:

// Same column in, same column out — shorthand key.
cpu.aggregate(Sequence.every('5s'), {
cpu: (values) => {
const nums = values.filter((v): v is number => typeof v === 'number');
if (nums.length === 0) return undefined;
const sorted = [...nums].sort((a, b) => a - b);
return sorted[Math.floor(sorted.length * 0.999)];
},
});

// Renamed output — wrap the function in the explicit spec.
cpu.aggregate(Sequence.every('5s'), {
cpu_p99_9: {
from: 'cpu',
using: (values) => {
const nums = values.filter((v): v is number => typeof v === 'number');
if (nums.length === 0) return undefined;
const sorted = [...nums].sort((a, b) => a - b);
return sorted[Math.floor(sorted.length * 0.999)];
},
kind: 'number',
},
});

Bucket membership

Buckets are half-open: [begin, end). An event at exactly the boundary lands in the later bucket. For TimeRange- or Interval-keyed source events, the event contributes to every bucket it overlaps under half-open overlap rules.

range

Default range is the source's own time extent. Pass options.range to pin the output range explicitly — useful when you want a uniform grid over a specific window even if the source doesn't fill it:

cpu.aggregate(
Sequence.every('5s'),
{ cpu: 'avg' },
{ range: new TimeRange({ start: 0, end: 60_000 }) },
);

Empty buckets in the padded range emit 0 for count and sum, undefined for everything else. See Reducer reference → Empty-bucket behavior.

reduce

Whole-series → scalar or record. Same reducer vocabulary as aggregate, same empty-bucket behavior.

// Single-column form — wide return type.
const avgRaw = series.reduce('cpu', 'avg'); // ColumnValue | undefined
Type narrowing

The single-column form returns the wide ColumnValue | undefined — the type system can't see through a string reducer name. Use the mapping form whenever you want narrowed types at the call site.

Mapping form returns a record with per-field narrowing:

const summary = series.reduce({
cpu: 'avg', // number | undefined
requests: 'sum', // number | undefined
host: 'unique', // ReadonlyArray<string> | undefined
});

Named outputs work too:

const quantiles = series.reduce({
p50: { from: 'latency', using: 'p50' },
p95: { from: 'latency', using: 'p95' },
p99: { from: 'latency', using: 'p99' },
});

See Reducer reference → Type narrowing on reduce for the full narrowing story.

arrayAggregate

Reduce one array cell per event — feeds each event's array to a reducer as if it were a bucket. Per-event in / per-event out.

// "Number of tags per event" — replace the array column with a count.
events.arrayAggregate('tags', 'count');

// Same, but keep the original array column too — append a new
// `tagCount` number column alongside.
events.arrayAggregate('tags', 'count', { as: 'tagCount' });

// Distinct values per event, deduped (keeps the array column).
events.arrayAggregate('tags', 'unique');

Full treatment, including custom reducers and the flatten-one-level semantics on array-kind sources, lives on Array columns.

LiveAggregation

The live counterpart to aggregate. Buckets close incrementally as the watermark advances:

import { LiveAggregation, LiveSeries, Sequence } from 'pond-ts';

const live = new LiveSeries({
name: 'cpu',
schema,
retention: { maxAge: '1h' },
});

const agg = new LiveAggregation(live, Sequence.every('1m'), {
cpu: 'avg',
host: 'last',
});

// Closed buckets stream out.
agg.on('close', (event) => storeMetric(event));

// Provisional in-progress bucket on every contributing push.
agg.on('bucket', (event) => updateLiveTile(event));

// Snapshot at any time — includes the open bucket as the last row.
const snap = agg.snapshot();

LiveAggregation inherits the source's graceWindow for late-event tolerance at bucket closure. See Live transforms → Late-event scope for what that does and doesn't propagate further down the pipeline.

Method comparison

MethodReturns
aggregate(seq, mapping)TimeSeries<S> — one row per bucket
reduce(col, reducer)Wide ColumnValue | undefined
reduce(mapping)Record<key, narrowed>
arrayAggregate(col, reducer)TimeSeries<S> — one cell reduced per event
LiveAggregation(src, seq, m)Stream of closed buckets + provisional bucket events

For groupBy and pivotByGroup see Reshaping — those are partition / long-to-wide ops, not collapses.