Skip to main content

Example: Streaming Dashboard

This example shows how to build a streaming dashboard with two common widgets:

  1. Bar chart — 5-minute average latency, with bars appearing as buckets close
  2. Live value — the last minute's 95th-percentile latency, updated on every event

Both consume the same LiveSeries and run concurrently. No timers, no polling — data is the clock.

Setup

import { LiveSeries, Sequence } from 'pond-ts';

const schema = [
{ name: 'time', kind: 'time' },
{ name: 'latency', kind: 'number' },
{ name: 'host', kind: 'string' },
] as const;

const live = new LiveSeries({
name: 'requests',
schema,
retention: { maxAge: '30m' },
});

Bar chart: 5-minute average

.aggregate() returns a LiveAggregation that accumulates events into buckets. A bucket closes when an event arrives past its boundary.

const barChart = live.aggregate(Sequence.every('5m'), { latency: 'avg' });

// Fires each time a 5m bucket closes — append to your chart
barChart.on('close', (event) => {
const start = event.begin();
const avg = event.get('latency');
appendBar({ start, avg });
});

// snapshot() includes the open (in-progress) bucket for real-time preview
function getBars() {
return barChart.snapshot();
}

closed() returns only finalized buckets (suitable for export). snapshot() includes the current open bucket as a provisional bar.

Live value: 1-minute p95

.rolling() returns a LiveRollingAggregation that maintains a sliding window. The p95 reducer keeps events sorted for O(1) percentile snapshots.

const p95 = live.rolling('1m', { latency: 'p95' });

// Fires on every source event with the current rolling value
p95.on('update', (val) => {
renderLiveValue(val.latency);
});

// Or poll the current value at any time
function getCurrentP95() {
return p95.value().latency;
}

As time advances, events older than 1 minute are evicted from the window automatically. If a latency spike scrolls out of the window, the p95 drops back to normal.

Filtering before aggregation

Use .filter() to narrow the stream before it reaches the aggregator. The filter produces a LiveView that supports .aggregate() and .rolling():

const api1 = live.filter((e) => e.get('host') === 'api-1');

const api1Bars = api1.aggregate(Sequence.every('5m'), { latency: 'avg' });
const api1P95 = api1.rolling('1m', { latency: 'p95' });

Multiple consumers can fan out from the same LiveSeries without duplicating the underlying data.

Adding trend detection

Chain .diff() on the bar chart to compute the change between consecutive 5-minute buckets:

const trend = barChart.diff('latency');

trend.on('event', (event) => {
const delta = event.get('latency');
if (delta === undefined) return; // first bucket, no previous
if (delta > 10) alert('Latency rising');
});

Putting it together

import { LiveSeries, Sequence } from 'pond-ts';

const schema = [
{ name: 'time', kind: 'time' },
{ name: 'latency', kind: 'number' },
{ name: 'host', kind: 'string' },
] as const;

const live = new LiveSeries({
name: 'requests',
schema,
retention: { maxAge: '30m' },
});

// Bar chart: 5m average latency
const barChart = live.aggregate(Sequence.every('5m'), { latency: 'avg' });

barChart.on('close', (event) => {
appendBar({ start: event.begin(), avg: event.get('latency') });
});

// Live value: 1m rolling p95
const p95 = live.rolling('1m', { latency: 'p95' });

p95.on('update', (val) => {
renderLiveValue(val.latency);
});

// Trend: diff between consecutive 5m buckets
const trend = barChart.diff('latency');

trend.on('event', (event) => {
const delta = event.get('latency');
if (delta !== undefined && delta > 10) {
showTrendAlert('rising');
}
});

// Feed from WebSocket, server-sent events, or any push source
websocket.on('message', (msg) => {
const { timestamp, latency, host } = JSON.parse(msg);
live.push([timestamp, latency, host]);
});

Every push to live flows through the entire pipeline synchronously:

  1. The event enters LiveSeries
  2. barChart accumulates it into the current 5m bucket
  3. p95 adds it to the 1-minute sliding window and recomputes
  4. trend checks if a new bucket just closed and computes the diff

No background timers, no polling intervals, no batched flushes. The data drives everything.

Cleanup

When the dashboard unmounts or the connection closes:

barChart.dispose();
p95.dispose();
live.clear();

dispose() unsubscribes the transform from its source. clear() drops all events from the buffer.