Example: Streaming Dashboard
This example shows how to build a streaming dashboard with two common widgets:
- Bar chart — 5-minute average latency, with bars appearing as buckets close
- Live value — the last minute's 95th-percentile latency, updated on every event
Both consume the same LiveSeries and run concurrently. No timers, no polling —
data is the clock.
Setup
import { LiveSeries, Sequence } from 'pond-ts';
const schema = [
{ name: 'time', kind: 'time' },
{ name: 'latency', kind: 'number' },
{ name: 'host', kind: 'string' },
] as const;
const live = new LiveSeries({
name: 'requests',
schema,
retention: { maxAge: '30m' },
});
Bar chart: 5-minute average
.aggregate() returns a LiveAggregation that accumulates events into buckets.
A bucket closes when an event arrives past its boundary.
const barChart = live.aggregate(Sequence.every('5m'), { latency: 'avg' });
// Fires each time a 5m bucket closes — append to your chart
barChart.on('close', (event) => {
const start = event.begin();
const avg = event.get('latency');
appendBar({ start, avg });
});
// snapshot() includes the open (in-progress) bucket for real-time preview
function getBars() {
return barChart.snapshot();
}
closed() returns only finalized buckets (suitable for export).
snapshot() includes the current open bucket as a provisional bar.
Live value: 1-minute p95
.rolling() returns a LiveRollingAggregation that maintains a sliding window.
The p95 reducer keeps events sorted for O(1) percentile snapshots.
const p95 = live.rolling('1m', { latency: 'p95' });
// Fires on every source event with the current rolling value
p95.on('update', (val) => {
renderLiveValue(val.latency);
});
// Or poll the current value at any time
function getCurrentP95() {
return p95.value().latency;
}
As time advances, events older than 1 minute are evicted from the window automatically. If a latency spike scrolls out of the window, the p95 drops back to normal.
Filtering before aggregation
Use .filter() to narrow the stream before it reaches the aggregator. The
filter produces a LiveView that supports .aggregate() and .rolling():
const api1 = live.filter((e) => e.get('host') === 'api-1');
const api1Bars = api1.aggregate(Sequence.every('5m'), { latency: 'avg' });
const api1P95 = api1.rolling('1m', { latency: 'p95' });
Multiple consumers can fan out from the same LiveSeries without duplicating
the underlying data.
Adding trend detection
Chain .diff() on the bar chart to compute the change between consecutive
5-minute buckets:
const trend = barChart.diff('latency');
trend.on('event', (event) => {
const delta = event.get('latency');
if (delta === undefined) return; // first bucket, no previous
if (delta > 10) alert('Latency rising');
});
Putting it together
import { LiveSeries, Sequence } from 'pond-ts';
const schema = [
{ name: 'time', kind: 'time' },
{ name: 'latency', kind: 'number' },
{ name: 'host', kind: 'string' },
] as const;
const live = new LiveSeries({
name: 'requests',
schema,
retention: { maxAge: '30m' },
});
// Bar chart: 5m average latency
const barChart = live.aggregate(Sequence.every('5m'), { latency: 'avg' });
barChart.on('close', (event) => {
appendBar({ start: event.begin(), avg: event.get('latency') });
});
// Live value: 1m rolling p95
const p95 = live.rolling('1m', { latency: 'p95' });
p95.on('update', (val) => {
renderLiveValue(val.latency);
});
// Trend: diff between consecutive 5m buckets
const trend = barChart.diff('latency');
trend.on('event', (event) => {
const delta = event.get('latency');
if (delta !== undefined && delta > 10) {
showTrendAlert('rising');
}
});
// Feed from WebSocket, server-sent events, or any push source
websocket.on('message', (msg) => {
const { timestamp, latency, host } = JSON.parse(msg);
live.push([timestamp, latency, host]);
});
Every push to live flows through the entire pipeline synchronously:
- The event enters
LiveSeries barChartaccumulates it into the current 5m bucketp95adds it to the 1-minute sliding window and recomputestrendchecks if a new bucket just closed and computes the diff
No background timers, no polling intervals, no batched flushes. The data drives everything.
Cleanup
When the dashboard unmounts or the connection closes:
barChart.dispose();
p95.dispose();
live.clear();
dispose() unsubscribes the transform from its source. clear() drops all
events from the buffer.