Ingesting messy data
I was recently handed a CSV of per-host metrics — CPU, memory, request
count, latency — sampled every minute across four hosts for 12 hours.
Like a lot of real-world time-series data, it arrived broken: timestamps
in four different formats, four different spellings of "missing,"
exact-duplicate rows from retried scrapes, multiple hosts interleaved on
the same wall clock, and gaps where a collector missed a window. This
guide walks through what I did to clean it into a typed TimeSeries
ready for downstream processing.
The patterns generalize. Substitute "host" for "device," "tenant," "sensor," or whatever entity dimension your data carries — the same pipeline shape applies.
TimeSeries<S> is roughly a DataFrame with a DatetimeIndex and a
typed schema. Two differences worth knowing upfront: pond events are
immutable, and operators that read neighbors (fill, rolling, etc.)
need an explicit per-entity scope to avoid cross-entity contamination.
We'll hit both.
1. Declare the schema
My CSV had 11 columns. Three of them turned out to be ephemeral pipeline
metadata — two Prometheus-style scrape labels and a __scrape_meta tag
— that I didn't want propagating downstream. Real-world ingest often
arrives with this kind of pipeline-residue mixed in alongside the
signal; the first decision is what to keep.
Once you've decided which columns are signal and which are noise,
declare a SeriesSchema as a literal tuple:
import { TimeSeries } from 'pond-ts';
import type { SeriesSchema } from 'pond-ts';
const schema = [
{ name: 'time', kind: 'time' },
{ name: 'cpu', kind: 'number', required: false },
{ name: 'memory', kind: 'number', required: false },
{ name: 'requests', kind: 'number', required: false },
{ name: 'latency_ms', kind: 'number', required: false },
{ name: 'host', kind: 'string' },
{ name: 'deployment_id', kind: 'string' },
{ name: 'region', kind: 'string' },
] as const satisfies SeriesSchema;
type Schema = typeof schema;
Three things to notice:
- First column is the temporal key.
kind: 'time'makes this a point-in-time-keyed series. ('timeRange'and'interval'are also options for span-based events.) required: falseadmits null/undefined cells. Numeric metrics that can plausibly be missing get this; categorical dimensions that should always be populated don't.as const satisfies SeriesSchemalocks in every column name and kind as a literal type. Downstreamevent.get('cpu')narrows tonumber | undefinedwithout a cast.
Anything you don't include in the schema is ignored on ingest. In my
case that meant the two label columns and the __scrape_meta tag
disappeared at this step — no extra filter pass needed.
2. Normalize at the boundary
Pond's wire format is JSON. Anything that isn't already JSON-clean needs
to be coerced before fromJSON. Three things to handle.
Timestamps
My CSV had timestamps in four different formats — 2025-01-15T00:00:00.000Z,
2025-01-15T01:05:00+01:00, 2025-01-15T00:01:00 (no offset), and
1736899260000 (epoch ms) — sometimes within the same column. In real-
world data, mixed formats are common, especially if the file came out of
a multi-collector pipeline. A robust parser should handle at least:
import { Time } from 'pond-ts';
function parseTimestamp(raw: string): number {
const s = raw.trim();
// Epoch milliseconds.
if (/^-?\d+$/.test(s)) return Number(s);
// ISO with explicit offset: 2025-01-15T01:05:00+01:00 or ...Z
if (/Z$|[+-]\d{2}:?\d{2}$/.test(s)) return Date.parse(s);
// Naked ISO: declare a zone explicitly.
return Time.parse(s, { timeZone: 'UTC' }).timestampMs();
}
Time.parse(s, { timeZone }) is pond's strict ISO parser. It throws if
you pass it an ambiguous wall-clock string with no zone — that's a
feature. Silent zone-drift is one of the easier ways to ship subtly
wrong analysis. Pick a zone explicitly per source.
There's no equivalent of pd.to_datetime's "best-effort, infer
everything" mode. The strictness is intentional — wall-clock strings
are ambiguous and pond makes you say so.
Missing-value markers
My CSV used four different spellings for missing values across columns
and rows: '', 'null', '-', and 'NA'. In general, real-world data
spells missing in many ways — 'NaN' is also common, and any given
file may use several at once. Collapse all of them to JSON null
upstream:
const MISSING = new Set(['', 'null', '-', 'NA', 'na', 'NaN']);
function maybeNumber(raw: string): number | null {
const s = raw.trim();
if (MISSING.has(s)) return null;
const n = Number(s);
return Number.isFinite(n) ? n : null;
}
fromJSON converts JSON null to undefined for any column declared
required: false. Required columns reject null at ingest time with a
ValidationError.
Whitespace and quoting
Use a real CSV parser that handles quoted commas — my data had a
deployment_id column where some values like "v2.7.89,beta" carried
embedded commas, which a naive split-on-comma pass would mangle.
Papaparse is the obvious choice.
Trim string fields that may carry stray whitespace from upstream. A few
of my rows came through with host as ' api-1 ' from somewhere in
the pipeline. Pond won't trim for you, and ' api-1 ' and 'api-1'
will partition into different buckets at step 5.
3. Build the typed series
With per-row objects in hand, hand them to fromJSON:
const ingested = TimeSeries.fromJSON<Schema>({
name: 'host-metrics',
schema,
rows, // ObjectRow[] from your normalizer
});
rows can be either array-form ([time, ...values]) or object-form
(keyed by column name). Object-form is more readable when you're
constructing rows by hand; array-form is more compact for serialized
payloads.
fromJSON validates that every row matches the schema — required cells
aren't null, kinds line up, the time column parses. Invalid rows throw
a ValidationError you can catch and report at ingest time, before any
downstream code sees the data.
Roughly equivalent to
pd.DataFrame.from_records(rows).astype(dtypes).set_index('time'),
with the schema-typed validator built in.
4. Dedupe duplicates and same-key revisions
My CSV had ~60 byte-identical doubled rows that looked like flaky-retry
re-emits, plus a handful of (time, host) pairs that appeared twice
with diverging payload values — likely a collector that re-scraped and
got a different reading. Real ingest pipelines re-emit rows for both
reasons, and dedupe collapses both:
const deduped = series.dedupe({ keep: 'last' });
The default key is the full event key (the timestamp). For time-keyed
series, two events at the same begin() are duplicates regardless of
payload differences.
keep policies:
| Policy | Behavior |
|---|---|
'first' | Keep the first occurrence at each key |
'last' | Keep the last occurrence (default — matches "newer wins" replay semantics) |
'error' | Throw on any duplicate. Useful for ingest paths that want hard failure on shape violations |
'drop' | Discard every duplicated event entirely. Conservative — when "1.5 events at this timestamp" can't be defended |
{ min: col } / { max: col } | Keep the event with the smallest / largest value at the named numeric column |
(events) => event | Custom resolver. Receives all events sharing the key, returns one |
The custom resolver is the escape hatch when you need to combine duplicates rather than pick one:
const merged = series.dedupe({
keep: (events) => {
// Take the latest cpu/memory but the maximum requests count.
const last = events[events.length - 1];
const maxRequests = Math.max(...events.map((e) => e.get('requests') ?? 0));
return last.set('requests', maxRequests);
},
});
For multi-entity data, dedupe inside a partitionBy chain so the entity
columns become part of the duplicate key (next section).
df.drop_duplicates(keep='last') plus the { min/max: col } and
resolver forms that Pandas would need a manual groupby().apply() for.
5. Scope per-entity work with partitionBy
This is the most important section of this guide.
My CSV had four hosts (api-1, api-2, api-3, web-1) interleaved
on the same per-minute grid: 4 rows per timestamp, one per host. This
is the canonical shape for multi-entity time series, and it's where the
most subtle ingest bugs live.
When your series carries multiple entities (hosts, tenants, sensors)
interleaved on the same time grid, every operator that reads
neighboring events — fill, rolling, smooth, baseline,
aggregate, dedupe, materialize — has a silent contamination
hazard. A naive global .fill('linear') for api-1 would interpolate
using api-2's value as a "neighbor" because api-2's event happens
to be the next event in the series. That's almost never what you want.
partitionBy scopes those ops to within each entity's events:
const cleaned = series
.partitionBy('host') // enter partition view
.dedupe({ keep: 'last' }) // per-host: same time AND same host is the duplicate key
.fill({ cpu: 'linear' }) // per-host: neighbors come from the same host only
.rolling('5m', { cpu: 'avg' }) // per-host rolling average
.collect(); // exit, back to TimeSeries<S>
Three design choices to know:
The view is persistent across chains. Each sugar method on
PartitionedTimeSeries returns another PartitionedTimeSeries carrying
the same partition columns, so multi-step pipelines compose without
re-declaring the partition.
Every neighbor-reading op has a partitioned variant. fill,
dedupe, align, materialize, rolling, smooth, baseline,
outliers, diff, rate, pctChange, cumulative, shift,
aggregate. If you're calling one of these on multi-entity data,
partition first.
Three terminals exit the view:
// Materialize back to a single TimeSeries<S>:
const collected = series.partitionBy('host').fill({ cpu: 'linear' }).collect();
// Get a Map<host, TimeSeries<S>> for per-entity downstream code:
const byHost = series.partitionBy('host').toMap();
// Run an arbitrary transform per partition (terminal — exits the view):
const custom = series
.partitionBy('host')
.apply((g) => g.fill({ cpu: 'linear' }).rolling('5m', { cpu: 'avg' }));
Composite partitions
Pass an array for entity boundaries that span multiple columns:
const filled = series
.partitionBy(['tenant', 'host'])
.fill({ cpu: 'linear' })
.collect();
Declared partition sets
If the entity set is known and stable, pass groups to make it explicit:
const HOSTS = ['api-1', 'api-2', 'api-3', 'web-1'] as const;
const filled = series
.partitionBy('host', { groups: HOSTS })
.fill({ cpu: 'linear' })
.collect();
Declared groups give you three things: deterministic iteration order in
toMap, empty-but-stable partitions (a host with no events still
appears as an empty TimeSeries), and runtime rejection of unknown
entity values at construction time.
partitionBy(col).fill(...).collect() is the typed analog of
df.groupby('host').apply(lambda g: g.interpolate()). The persistent-
view shape is what Pandas's .groupby doesn't have natively — every
chained op stays scoped without an inner apply.
6. Fill gaps without making things up
fill replaces undefined cells using a per-column policy:
const filled = series
.partitionBy('host')
.fill(
{
cpu: 'linear', // time-interpolate between bracketing values
memory: 'linear',
requests: 'linear',
latency_ms: 'linear',
deployment_id: 'hold', // forward-fill (slowly-changing dimension)
region: 'hold',
},
{ maxGap: '2m' }, // don't fill across gaps wider than 2 minutes
)
.collect();
Strategies:
| Strategy | Behavior |
|---|---|
'hold' | Forward-fill from the last known value |
'bfill' | Backward-fill from the next known value |
'linear' | Time-interpolate between bracketing values (numeric only) |
'zero' | Constant 0 (numeric only) |
Literal value (e.g. host: 'unknown') | Constant fill — anything not a strategy name is treated as a literal |
Cap the fill scope
Two options bound how aggressively fill reaches across gaps:
{ limit: N }— at most N consecutive null cells per column.{ maxGap: '2m' }— bracketing real values must be no more than 2 minutes apart.
Prefer maxGap. It's cadence-independent: an isolated single-cell
drop-out gets filled, a multi-minute outage stays null. limit is a
count, which only makes sense if your sampling cadence is regular and
known. My data was at 1-minute cadence, so I went with maxGap: '2m' —
that fills an isolated missed scrape (anchors 2 minutes apart) but
leaves anything wider as null, which felt like the right place to draw
the "this signals a real outage" line for this dataset. Tune the
threshold to your data's cadence and your tolerance for filled-in
values.
Leading and trailing null runs are never linearly filled (no neighbor
on one side). If your downstream tolerates it, follow up with a bfill
or hold pass to catch them:
series
.partitionBy('host')
.fill({ cpu: 'linear' }, { maxGap: '2m' })
.fill({ cpu: 'hold' }) // catch trailing gaps
.collect();
fill is the typed analog of
df.interpolate(limit_area='inside', limit=...), plus the time-distance
cap (maxGap) that Pandas needs explicit logic for.
7. Regularize to a fixed grid (optional)
fill operates on existing events — it doesn't synthesize missing rows.
In my CSV, when a collector missed a scrape window, the row simply
wasn't in the file; fill had nothing to fill because the event itself
was absent. I didn't need a regular grid for my downstream, so I left
the missing rows missing. If you do need rows at every grid point —
e.g., for a wide-table export or a chart that wants explicit nulls per
sample — use materialize:
import { Sequence } from 'pond-ts';
const regular = series
.partitionBy('host')
.materialize(Sequence.every('1m')) // emit one row per minute, undefined for empty buckets
.fill({ cpu: 'linear' }, { maxGap: '3m' })
.collect();
materialize produces a time-keyed series with one event per sequence
sample. Empty buckets emit rows with undefined cells; the partitioned
variant auto-populates the partition columns (host, here) on those
rows so downstream code doesn't have to fill them separately.
The materialize → fill order matters: fill only operates on rows
that already exist, so synthesize the rows first, then fill.
df.resample('1min').asfreq() creates the empty rows; materialize is
the equivalent that doesn't implicitly fill.
8. Export
Two primary export shapes:
// JSON wire format — round-trips through fromJSON.
const wire = cleaned.toJSON({ rowFormat: 'object' });
// Wide-row points for chart libraries (Recharts, Observable Plot, visx).
const points = cleaned.toPoints();
// [{ ts: number, cpu: number | undefined, memory: ..., host: ... }, ...]
toJSON preserves the pond schema and round-trips losslessly.
toPoints is the flat-row shape every chart library accepts directly:
ts plus one key per value column.
For per-entity rendering, partition then export:
const linesByHost = cleaned.partitionBy('host').toMap((g) => g.toPoints());
// Map<host, ChartRow[]>
End-to-end
Putting it together:
import { readFileSync } from 'node:fs';
import Papa from 'papaparse';
import { TimeSeries, Time } from 'pond-ts';
import type {
JsonObjectRowForSchema,
SeriesSchema,
TimeSeriesJsonInput,
} from 'pond-ts';
const schema = [
{ name: 'time', kind: 'time' },
{ name: 'cpu', kind: 'number', required: false },
{ name: 'memory', kind: 'number', required: false },
{ name: 'requests', kind: 'number', required: false },
{ name: 'latency_ms', kind: 'number', required: false },
{ name: 'host', kind: 'string' },
{ name: 'deployment_id', kind: 'string' },
{ name: 'region', kind: 'string' },
] as const satisfies SeriesSchema;
type Schema = typeof schema;
type Row = JsonObjectRowForSchema<Schema>;
const MISSING = new Set(['', 'null', '-', 'NA', 'na', 'NaN']);
function parseTimestamp(raw: string): number {
const s = raw.trim();
if (/^-?\d+$/.test(s)) return Number(s);
if (/Z$|[+-]\d{2}:?\d{2}$/.test(s)) return Date.parse(s);
return Time.parse(s, { timeZone: 'UTC' }).timestampMs();
}
function maybeNumber(raw: string): number | null {
const s = raw.trim();
if (MISSING.has(s)) return null;
const n = Number(s);
return Number.isFinite(n) ? n : null;
}
const csv = readFileSync('messy.csv', 'utf8');
const parsed = Papa.parse<Record<string, string>>(csv, {
header: true,
skipEmptyLines: true,
});
const rows: Row[] = [];
for (const raw of parsed.data) {
if (!raw.time || !raw.host || !raw.deployment_id || !raw.region) continue;
rows.push({
time: parseTimestamp(raw.time),
cpu: maybeNumber(raw.cpu ?? ''),
memory: maybeNumber(raw.memory ?? ''),
requests: maybeNumber(raw.requests ?? ''),
latency_ms: maybeNumber(raw.latency_ms ?? ''),
host: raw.host.trim(),
deployment_id: raw.deployment_id.trim(),
region: raw.region.trim(),
});
}
rows.sort((a, b) => Number(a.time) - Number(b.time));
const cleaned = TimeSeries.fromJSON<Schema>({
name: 'host-metrics',
schema,
rows,
})
.partitionBy('host')
.dedupe({ keep: 'last' })
.fill(
{
cpu: 'linear',
memory: 'linear',
requests: 'linear',
latency_ms: 'linear',
deployment_id: 'hold',
region: 'hold',
},
{ maxGap: '2m' },
)
.collect();
const out = cleaned.toJSON({
rowFormat: 'object',
}) as TimeSeriesJsonInput<Schema>;
console.log(JSON.stringify(out));
Three layers of responsibility:
- Boundary (CSV-side): parse, normalize types, collapse missing markers, trim whitespace, drop unparseable rows.
- Ingest:
fromJSONwith a typed schema. Get a typed series with validation. - Cleaning (pond-side): one chain.
partitionByfor entity scope,dedupefor duplicates,fillwithmaxGapfor cell-level gaps,collectto exit. Optionalmaterializeif you need missing-row insertion.
Once the data is inside pond, every cleaning decision is a chained method on a typed series. The bugs you'd otherwise hand-roll — silent cross-entity interpolation, ambiguous duplicate resolution, wrongly-timezoned timestamps, gap-fills that paper over outages — are either prevented by the type system or made into explicit choices in the chain.
See also
- Cleaning data — the operator-by-operator reference for the cleaning primitives this guide chains
together (
fill,dedupe,materialize,partitionBy). - Ingest — the JSON ingest reference:
fromJSONshapes, parse options, and thenull↔undefinedboundary. - API reference (core) — generated full-width reference for every method and type used here.