LiveSeries
LiveSeries is a mutable, append-optimized buffer that shares the same
schema type system as TimeSeries.
Use it when data arrives incrementally — from WebSockets, message queues,
polling loops, or any streaming source — and you need to maintain a
rolling window of recent events.
LiveSeries is a bounded in-order buffer with an opt-in grace
window for moderate reordering at ingest. It's not a full streaming
engine like Apache Beam or Apache Flink: no distributed execution,
engine-managed watermarks, processing-time timers, checkpointed
state, late panes, retractions, or exactly-once/replay guarantees.
pond-ts instead uses a simpler "data is the clock, ordered ingest
with bounded reorder tolerance" model. For the specifics of what
that does and doesn't cover once data leaves the buffer, see
Live Transforms → Late-event scope.
If you're coming from pandas: there's no direct equivalent — pandas
works on already-materialized DataFrame / Series objects rather
than a live source with subscribers. The closest analogue is a
bounded deque you periodically .toTimeSeries() on and run batch
methods against: aggregate() for .resample().agg(), align() for
.asfreq() / fill / interpolation, rolling() for .rolling().agg(),
and groupBy() for .groupby().
If you're coming from PondJS: there is no one-class equivalent to
LiveSeries. PondJS split live work across Stream for unbounded
input, Pipeline for streaming or batch processing, Aggregator for
windowed/grouped aggregation inside a pipeline, and
Collection / TimeSeries for event containers. LiveSeries is the
pond-ts in-process push target and retained live source.
| PondJS | pond-ts |
|---|---|
Stream + Pipeline().from(stream) | LiveSeries.push(...) and live subscriptions |
Pipeline().windowBy(...).aggregate(...) | live.aggregate(...) → LiveAggregation |
| pipeline processors and output callbacks | method chain on LiveSeries / LiveView |
Collection / TimeSeries event containers | TimeSeries; live.toTimeSeries() snapshots |
| source- or pipeline-managed history | retention: { maxEvents / maxAge } on source |
Creating a LiveSeries
import { LiveSeries } from 'pond-ts';
const schema = [
{ name: 'time', kind: 'time' },
{ name: 'cpu', kind: 'number' },
{ name: 'host', kind: 'string' },
] as const;
const live = new LiveSeries({
name: 'cpu-metrics',
schema,
});
The schema is identical to what you'd pass to TimeSeries. All the same column
kinds and required: false options work here.
Pushing events
live.push([Date.now(), 0.42, 'api-1']);
live.push([Date.now(), 0.51, 'api-1'], [Date.now(), 0.38, 'api-2']);
push() accepts one or more rows. Each row is validated against the schema on
arrival — invalid rows throw a ValidationError. Events must arrive in
timestamp order by default (see ordering modes to change
this).
Reading events
live.length; // number of events in the buffer
live.first(); // oldest event, or undefined
live.last(); // newest event, or undefined
live.at(0); // event by index
live.at(-1); // negative indices count from the end
Events are the same immutable Event objects used by TimeSeries.
Snapshots
Convert the current buffer contents into an immutable TimeSeries for batch
analytics:
const ts = live.toTimeSeries();
// All TimeSeries operations work on the snapshot
const smoothed = ts.rolling('5m', { cpu: 'avg' });
const groups = ts.groupBy('host');
The snapshot is independent — future pushes to the LiveSeries don't affect
it. You can pass a custom name: live.toTimeSeries('snapshot-12:05').
Retention policies
Control how much data the buffer holds. Retention is checked after every
push() — no background timers, no implicit scheduling.
const live = new LiveSeries({
name: 'bounded',
schema,
retention: {
maxEvents: 1000, // keep at most 1000 events
maxAge: '30m', // drop events older than 30 minutes
},
});
Both policies can be combined. When both apply, the one that evicts the most events wins. Eviction always removes the oldest events first.
maxAge is relative to the latest event's timestamp, not wall-clock time.
This follows the "data is the clock" principle — if no events arrive, nothing
is evicted.
Duration strings support ms, s, m, h, d suffixes, or plain
millisecond numbers.
Ordering modes
By default, LiveSeries expects events in timestamp order and throws on
violations. Two alternative modes handle out-of-order data:
strict (default)
const live = new LiveSeries({ name: 'strict', schema });
live.push([1000, 0.5, 'a']);
live.push([500, 0.3, 'a']); // throws ValidationError
drop
Silently discard late events:
const live = new LiveSeries({
name: 'drop-late',
schema,
ordering: 'drop',
});
live.push([1000, 0.5, 'a']);
live.push([500, 0.3, 'a']); // silently dropped
live.length; // 1
reorder
Insert late events in sorted position:
const live = new LiveSeries({
name: 'reorder',
schema,
ordering: 'reorder',
});
live.push([2000, 0.6, 'a']);
live.push([1000, 0.5, 'a']); // inserted before the 2000 event
live.at(0)?.begin(); // 1000
Add a graceWindow to bound how late an event can be:
const live = new LiveSeries({
name: 'bounded-reorder',
schema,
ordering: 'reorder',
graceWindow: '5s',
});
live.push([10000, 0.5, 'a']);
live.push([8000, 0.4, 'a']); // ok, within 5s
live.push([1000, 0.1, 'a']); // throws, outside grace window
Without graceWindow, reorder accepts any timestamp.
Subscriptions
Subscribe to lifecycle events. All callbacks fire synchronously, inline with
push().
// Per-event: fires once for each event added
const unsub = live.on('event', (event) => {
console.log('new:', event.get('cpu'));
});
// Batch: fires once per push() call with all added events
live.on('batch', (events) => {
console.log(`received ${events.length} events`);
});
// Evict: fires when retention removes events
live.on('evict', (events) => {
console.log(`evicted ${events.length} old events`);
});
Each on() call returns an unsubscribe function. Call it to stop receiving
notifications:
const unsub = live.on('event', handler);
// later...
unsub();
Callback ordering
Within a single push() call:
- event — fires per event, inline with insertion
- Retention runs (may evict old events)
- batch — fires once with all events that were added
- evict — fires if retention removed events
clear() fires only the evict subscription.
Clearing
live.clear();
Removes all events and fires the evict subscription with the removed events.
After clearing, the buffer is empty and ready for new pushes.
Typical patterns
WebSocket ingestion with rolling window
const live = new LiveSeries({
name: 'ws-metrics',
schema,
retention: { maxAge: '1h' },
});
ws.on('message', (msg) => {
const { timestamp, cpu, host } = JSON.parse(msg);
live.push([timestamp, cpu, host]);
});
// Periodic snapshot for rendering
setInterval(() => {
const ts = live.toTimeSeries();
renderChart(ts.rolling('5m', { cpu: 'avg' }));
}, 1000);
Monitoring with eviction alerts
const live = new LiveSeries({
name: 'monitor',
schema,
retention: { maxEvents: 500 },
});
live.on('evict', (evicted) => {
metrics.increment('events.evicted', evicted.length);
});
live.on('batch', (events) => {
for (const e of events) {
const cpu = e.get('cpu'); // number | undefined, narrowed from schema
if (cpu !== undefined && cpu > 0.9) {
alert(`High CPU on ${e.get('host')}`);
}
}
});
Tolerating out-of-order delivery
const live = new LiveSeries({
name: 'multi-source',
schema,
ordering: 'reorder',
graceWindow: '10s',
});
// Events from multiple sources may arrive slightly out of order
sourceA.on('data', (row) => live.push(row));
sourceB.on('data', (row) => live.push(row));