Skip to main content

Aggregation

aggregate(seq, mapping) collapses events onto a Sequence grid — one Interval-keyed output row per bucket, computed by reducing every source event whose key falls inside the bucket.

This is the right operator when the question is "what's the rollup of every event in this bucket?" — per-minute averages, hourly counts, daily peaks. For "what was the value at this grid point?", reach for Alignment instead.

This page covers four shapes of "collapse many events into fewer":

Plus the live-side counterpart: LiveAggregation (incremental bucketed aggregation over a LiveSeries).

For partition-by-column (groupBy) and long-to-wide reshape (pivotByGroup), see Reshaping — those are shape-changing ops, not value collapses.

Coming from pondjs?
  • pondjs Aggregatoraggregate() for batch, LiveAggregation for streaming.
  • Pipeline + Processor → method chain on TimeSeries; a custom reducer function fills the Processor role.
  • Index-keyed buckets → Interval-keyed events. The full rename table is on Concepts.

aggregate

aggregate over a 5-minute grid with a sum reducer: irregularly-
spaced request events land in three sequence buckets; each bucket
emits one output event whose value is the reducer applied to every
source event inside

import { Sequence, TimeSeries } from 'pond-ts';

const fiveMin = cpu.aggregate(Sequence.every('5m'), {
cpu: 'avg',
requests: 'sum',
host: 'last',
});
// fiveMin.schema[0].kind === 'interval'
// fiveMin.at(0)?.get('cpu') -> number | undefined

The output is interval-keyed. To feed it into a transform that requires a time-keyed series (e.g. pivotByGroup, which throws at runtime on interval keys), rekey first with .asTime({ at: 'begin' }).

Mapping shapes

Two forms; mix them in one call:

cpu.aggregate(Sequence.every('5s'), {
// shorthand: "avg of cpu, output column same name"
cpu: 'avg',

// explicit { from, using, kind? } — rename the output column
cpu_p95: { from: 'cpu', using: 'p95' },

// multiple reducers over the same source column
requests_total: { from: 'requests', using: 'sum' },
requests_peak: { from: 'requests', using: 'max' },
});

Use the explicit form when you want to rename, run multiple reducers over one source column, or use a custom reducer whose output kind differs from the source.

Mixing the two forms keeps every output column in the result type, each narrowed per its reducer — cpu and requests_total come back as number | undefined, a { from: 'host', using: 'first' } spec as string | undefined, and so on. (Before v0.23.0 a mixed mapping silently dropped the { from, using }-keyed columns from the result type — they were still emitted at runtime, but event.get('cpu_p95') wouldn't type-check. The single unified mapping shape fixes this.)

Shorthand entries stay validated at compile time: a reducer that doesn't fit the source column's kind (host: 'avg' on a string column) or a bare reducer on a column that doesn't exist (ghost: 'avg' — a typo) is a type error. Spec keys are free output names — only their from must name a real column.

Custom reducers

Reducers are (values: ReadonlyArray<ColumnValue | undefined>) => ColumnValue | undefined. Filter the undefineds yourself if needed; the input is the raw bucket.

Two shapes:

// Same column in, same column out — shorthand key.
cpu.aggregate(Sequence.every('5s'), {
cpu: (values) => {
const nums = values.filter((v): v is number => typeof v === 'number');
if (nums.length === 0) return undefined;
const sorted = [...nums].sort((a, b) => a - b);
return sorted[Math.floor(sorted.length * 0.999)];
},
});

// Renamed output — wrap the function in the explicit spec.
cpu.aggregate(Sequence.every('5s'), {
cpu_p99_9: {
from: 'cpu',
using: (values) => {
const nums = values.filter((v): v is number => typeof v === 'number');
if (nums.length === 0) return undefined;
const sorted = [...nums].sort((a, b) => a - b);
return sorted[Math.floor(sorted.length * 0.999)];
},
kind: 'number',
},
});

Bucket membership

Buckets are half-open: [begin, end). An event at exactly the boundary lands in the later bucket. For TimeRange- or Interval-keyed source events, the event contributes to every bucket it overlaps under half-open overlap rules.

range

Default range is the source's own time extent. Pass options.range to pin the output range explicitly — useful when you want a uniform grid over a specific window even if the source doesn't fill it:

cpu.aggregate(
Sequence.every('5s'),
{ cpu: 'avg' },
{ range: new TimeRange({ start: 0, end: 60_000 }) },
);

Empty buckets in the padded range emit 0 for count and sum, undefined for everything else. See Reducer reference → Empty-bucket behavior.

reduce

Whole-series → scalar or record. Same reducer vocabulary as aggregate, same empty-bucket behavior.

// Single-column form — wide return type.
const avgRaw = series.reduce('cpu', 'avg'); // ColumnValue | undefined
Type narrowing

The single-column form returns the wide ColumnValue | undefined — the type system can't see through a string reducer name. Use the mapping form whenever you want narrowed types at the call site.

Mapping form returns a record with per-field narrowing:

const summary = series.reduce({
cpu: 'avg', // number | undefined
requests: 'sum', // number | undefined
host: 'unique', // ReadonlyArray<string> | undefined
});

Named outputs work too:

const quantiles = series.reduce({
p50: { from: 'latency', using: 'p50' },
p95: { from: 'latency', using: 'p95' },
p99: { from: 'latency', using: 'p99' },
});

See Reducer reference → Type narrowing on reduce for the full narrowing story.

byColumn

aggregate buckets the temporal key; byColumn buckets a numeric value column. Same reducer mapping, but each output row is one value-bin — per-kilometre splits over cumulative distance, a power histogram, time-in-zone over heart-rate edges.

// Per-kilometre elevation gain and average speed over a ride.
const splits = ride.byColumn(
'cumDist',
{ width: 1000 },
{
gain: { from: 'ele', using: 'sum' },
speed: { from: 'speed', using: 'avg' },
},
);
// → [{ start: 0, end: 1000, gain: 12.4, speed: 7.1 },
// { start: 1000, end: 2000, gain: 8.0, speed: 6.4 }, ...]

The result is a plain array of bin records, each { start, end, ...aggregates }not a TimeSeries. Value-bins (distance, power) aren't time-indexed, so there's no key to build one from; this mirrors reduce's plain-record return. Reach for byColumn whenever the x-axis of your rollup is a value rather than time.

Even bins — { width, origin? }

Fixed-width bins (shifted by origin, default 0), emitted contiguously from the lowest to the highest occupied bin — interior empty bins included, so a histogram or profile has no gaps:

// Power distribution: seconds spent in each 25 W band.
ride.byColumn('watts', { width: 25 }, {
secs: { from: 'watts', using: 'count' },
});
// → [{ start: 0, end: 25, secs: 3 }, { start: 25, end: 50, secs: 0 }, ...]

A monotonic source (cumulative distance, work) yields contiguous ranges — splits, an elevation-vs-distance profile. A non-monotonic source (instantaneous power) yields a histogram. Same call, both shapes.

Explicit bins — { edges }

Ascending edges [e₀, …, eₙ] give n bins, bin i = [eᵢ, eᵢ₊₁) — one record per bin, in order. Use them for named zones:

// Time in each power zone (FTP-relative edges).
ride.byColumn('watts', { edges: [0, 150, 250, 350, 500] }, {
secs: { from: 'watts', using: 'count' },
});
// → one record per [0,150), [150,250), [250,350), [350,500)

A value below the first edge or at/above the last is out of range, and that row is dropped.

Edge cases

  • A row whose bin value is missing or non-finite contributes to no bin. Within a bin, the reducer non-finite policy still applies to the source columns.
  • Empty bins emit each reducer's empty value (count0, avg / min / … → undefined) — exactly like an empty aggregate bucket.
  • start and end are reserved output names (they carry the bin range), and a non-finite or wrong-kind reducer result throws — the same write-time guard aggregate enforces.

arrayAggregate

Reduce one array cell per event — feeds each event's array to a reducer as if it were a bucket. Per-event in / per-event out.

// "Number of tags per event" — replace the array column with a count.
events.arrayAggregate('tags', 'count');

// Same, but keep the original array column too — append a new
// `tagCount` number column alongside.
events.arrayAggregate('tags', 'count', { as: 'tagCount' });

// Distinct values per event, deduped (keeps the array column).
events.arrayAggregate('tags', 'unique');

Full treatment, including custom reducers and the flatten-one-level semantics on array-kind sources, lives on Array columns.

LiveAggregation

The live counterpart to aggregate. Buckets close incrementally as the watermark advances:

import { LiveSeries, Sequence } from 'pond-ts';

const live = new LiveSeries({
name: 'cpu',
schema,
retention: { maxAge: '1h' },
});

const agg = live.aggregate(Sequence.every('1m'), {
cpu: 'avg',
host: 'last',
});

// Closed buckets stream out.
agg.on('close', (event) => storeMetric(event));

// Provisional in-progress bucket on every contributing push.
agg.on('bucket', (event) => updateLiveTile(event));

// Snapshot at any time — includes the open bucket as the last row.
const snap = agg.snapshot();

LiveAggregation inherits the source's graceWindow for late-event tolerance at bucket closure. See Live transforms → Late-event scope for what that does and doesn't propagate further down the pipeline.

Method comparison

MethodReturns
aggregate(seq, mapping)TimeSeries<S> — one row per bucket
reduce(col, reducer)Wide ColumnValue | undefined
reduce(mapping)Record<key, narrowed>
byColumn(col, binning, m)Array<{ start, end, ...aggregates }> — one per value-bin
arrayAggregate(col, reducer)TimeSeries<S> — one cell reduced per event
LiveAggregation(src, seq, m)Stream of closed buckets + provisional bucket events

For groupBy and pivotByGroup see Reshaping — those are partition / long-to-wide ops, not collapses.

Compared to align

Same parameter shape (sequence + range), same output key shape (Interval-keyed grid), different reduction.

QuestionOperator
"What's the rollup of every event in this bucket?"aggregate(seq, mapping)
"What was the value at this grid point?"align(seq, { method, sample })
// aggregate — one row per minute, value reduced from sources inside.
cpu.aggregate(Sequence.every('1m'), { cpu: 'avg' });

// align — one row per minute, value sampled from the source.
cpu.align(Sequence.every('1m'), { method: 'hold' });

For "average CPU per minute," aggregate is right. For a chart at one-minute resolution where every grid point should have a value, align is right. The two are complementary.

See Alignment for the sampling counterpart.