Skip to main content

LiveSeries

LiveSeries is a mutable, append-optimized buffer that shares the same schema type system as TimeSeries. Use it when data arrives incrementally — from WebSockets, message queues, polling loops, or any streaming source — and you need to maintain a rolling window of recent events.

LiveSeries is a bounded in-order buffer with an opt-in grace window for moderate reordering at ingest. It's not a full streaming engine like Apache Beam or Apache Flink: no distributed execution, engine-managed watermarks, processing-time timers, checkpointed state, late panes, retractions, or exactly-once/replay guarantees. pond-ts instead uses a simpler "data is the clock, ordered ingest with bounded reorder tolerance" model. For the specifics of what that does and doesn't cover once data leaves the buffer, see Live Transforms → Late-event scope.

If you're coming from pandas: there's no direct equivalent — pandas works on already-materialized DataFrame / Series objects rather than a live source with subscribers. The closest analogue is a bounded deque you periodically .toTimeSeries() on and run batch methods against: aggregate() for .resample().agg(), align() for .asfreq() / fill / interpolation, rolling() for .rolling().agg(), and groupBy() for .groupby().

If you're coming from PondJS: there is no one-class equivalent to LiveSeries. PondJS split live work across Stream for unbounded input, Pipeline for streaming or batch processing, Aggregator for windowed/grouped aggregation inside a pipeline, and Collection / TimeSeries for event containers. LiveSeries is the pond-ts in-process push target and retained live source.

PondJSpond-ts
Stream + Pipeline().from(stream)LiveSeries.push(...) and live subscriptions
Pipeline().windowBy(...).aggregate(...)live.aggregate(...)LiveAggregation
pipeline processors and output callbacksmethod chain on LiveSeries / LiveView
Collection / TimeSeries event containersTimeSeries; live.toTimeSeries() snapshots
source- or pipeline-managed historyretention: { maxEvents / maxAge } on source

Creating a LiveSeries

import { LiveSeries } from 'pond-ts';

const schema = [
{ name: 'time', kind: 'time' },
{ name: 'cpu', kind: 'number' },
{ name: 'host', kind: 'string' },
] as const;

const live = new LiveSeries({
name: 'cpu-metrics',
schema,
});

The schema is identical to what you'd pass to TimeSeries. All the same column kinds and required: false options work here.

Pushing events

live.push([Date.now(), 0.42, 'api-1']);
live.push([Date.now(), 0.51, 'api-1'], [Date.now(), 0.38, 'api-2']);

push() accepts one or more rows. Each row is validated against the schema on arrival — invalid rows throw a ValidationError. Events must arrive in timestamp order by default (see ordering modes to change this).

Reading events

live.length; // number of events in the buffer
live.first(); // oldest event, or undefined
live.last(); // newest event, or undefined
live.at(0); // event by index
live.at(-1); // negative indices count from the end

Events are the same immutable Event objects used by TimeSeries.

Snapshots

Convert the current buffer contents into an immutable TimeSeries for batch analytics:

const ts = live.toTimeSeries();

// All TimeSeries operations work on the snapshot
const smoothed = ts.rolling('5m', { cpu: 'avg' });
const groups = ts.groupBy('host');

The snapshot is independent — future pushes to the LiveSeries don't affect it. You can pass a custom name: live.toTimeSeries('snapshot-12:05').

Retention policies

Control how much data the buffer holds. Retention is checked after every push() — no background timers, no implicit scheduling.

const live = new LiveSeries({
name: 'bounded',
schema,
retention: {
maxEvents: 1000, // keep at most 1000 events
maxAge: '30m', // drop events older than 30 minutes
},
});

Both policies can be combined. When both apply, the one that evicts the most events wins. Eviction always removes the oldest events first.

maxAge is relative to the latest event's timestamp, not wall-clock time. This follows the "data is the clock" principle — if no events arrive, nothing is evicted.

Duration strings support ms, s, m, h, d suffixes, or plain millisecond numbers.

Ordering modes

By default, LiveSeries expects events in timestamp order and throws on violations. Two alternative modes handle out-of-order data:

strict (default)

const live = new LiveSeries({ name: 'strict', schema });
live.push([1000, 0.5, 'a']);
live.push([500, 0.3, 'a']); // throws ValidationError

drop

Silently discard late events:

const live = new LiveSeries({
name: 'drop-late',
schema,
ordering: 'drop',
});
live.push([1000, 0.5, 'a']);
live.push([500, 0.3, 'a']); // silently dropped
live.length; // 1

reorder

Insert late events in sorted position:

const live = new LiveSeries({
name: 'reorder',
schema,
ordering: 'reorder',
});
live.push([2000, 0.6, 'a']);
live.push([1000, 0.5, 'a']); // inserted before the 2000 event
live.at(0)?.begin(); // 1000

Add a graceWindow to bound how late an event can be:

const live = new LiveSeries({
name: 'bounded-reorder',
schema,
ordering: 'reorder',
graceWindow: '5s',
});
live.push([10000, 0.5, 'a']);
live.push([8000, 0.4, 'a']); // ok, within 5s
live.push([1000, 0.1, 'a']); // throws, outside grace window

Without graceWindow, reorder accepts any timestamp.

Subscriptions

Subscribe to lifecycle events. All callbacks fire synchronously, inline with push().

// Per-event: fires once for each event added
const unsub = live.on('event', (event) => {
console.log('new:', event.get('cpu'));
});

// Batch: fires once per push() call with all added events
live.on('batch', (events) => {
console.log(`received ${events.length} events`);
});

// Evict: fires when retention removes events
live.on('evict', (events) => {
console.log(`evicted ${events.length} old events`);
});

Each on() call returns an unsubscribe function. Call it to stop receiving notifications:

const unsub = live.on('event', handler);
// later...
unsub();

Callback ordering

Within a single push() call:

  1. event — fires per event, inline with insertion
  2. Retention runs (may evict old events)
  3. batch — fires once with all events that were added
  4. evict — fires if retention removed events

clear() fires only the evict subscription.

Clearing

live.clear();

Removes all events and fires the evict subscription with the removed events. After clearing, the buffer is empty and ready for new pushes.

Typical patterns

WebSocket ingestion with rolling window

const live = new LiveSeries({
name: 'ws-metrics',
schema,
retention: { maxAge: '1h' },
});

ws.on('message', (msg) => {
const { timestamp, cpu, host } = JSON.parse(msg);
live.push([timestamp, cpu, host]);
});

// Periodic snapshot for rendering
setInterval(() => {
const ts = live.toTimeSeries();
renderChart(ts.rolling('5m', { cpu: 'avg' }));
}, 1000);

Monitoring with eviction alerts

const live = new LiveSeries({
name: 'monitor',
schema,
retention: { maxEvents: 500 },
});

live.on('evict', (evicted) => {
metrics.increment('events.evicted', evicted.length);
});

live.on('batch', (events) => {
for (const e of events) {
const cpu = e.get('cpu'); // number | undefined, narrowed from schema
if (cpu !== undefined && cpu > 0.9) {
alert(`High CPU on ${e.get('host')}`);
}
}
});

Tolerating out-of-order delivery

const live = new LiveSeries({
name: 'multi-source',
schema,
ordering: 'reorder',
graceWindow: '10s',
});

// Events from multiple sources may arrive slightly out of order
sourceA.on('data', (row) => live.push(row));
sourceB.on('data', (row) => live.push(row));