Skip to main content

Ingesting messy data

I was recently handed a CSV of per-host metrics — CPU, memory, request count, latency — sampled every minute across four hosts for 12 hours. Like a lot of real-world time-series data, it arrived broken: timestamps in four different formats, four different spellings of "missing," exact-duplicate rows from retried scrapes, multiple hosts interleaved on the same wall clock, and gaps where a collector missed a window. This guide walks through what I did to clean it into a typed TimeSeries ready for downstream processing.

The patterns generalize. Substitute "host" for "device," "tenant," "sensor," or whatever entity dimension your data carries — the same pipeline shape applies.

If you know Pandas

TimeSeries<S> is roughly a DataFrame with a DatetimeIndex and a typed schema. Two differences worth knowing upfront: pond events are immutable, and operators that read neighbors (fill, rolling, etc.) need an explicit per-entity scope to avoid cross-entity contamination. We'll hit both.

1. Declare the schema

My CSV had 11 columns. Three of them turned out to be ephemeral pipeline metadata — two Prometheus-style scrape labels and a __scrape_meta tag — that I didn't want propagating downstream. Real-world ingest often arrives with this kind of pipeline-residue mixed in alongside the signal; the first decision is what to keep.

Once you've decided which columns are signal and which are noise, declare a SeriesSchema as a literal tuple:

import { TimeSeries } from 'pond-ts';
import type { SeriesSchema } from 'pond-ts';

const schema = [
{ name: 'time', kind: 'time' },
{ name: 'cpu', kind: 'number', required: false },
{ name: 'memory', kind: 'number', required: false },
{ name: 'requests', kind: 'number', required: false },
{ name: 'latency_ms', kind: 'number', required: false },
{ name: 'host', kind: 'string' },
{ name: 'deployment_id', kind: 'string' },
{ name: 'region', kind: 'string' },
] as const satisfies SeriesSchema;

type Schema = typeof schema;

Three things to notice:

  • First column is the temporal key. kind: 'time' makes this a point-in-time-keyed series. ('timeRange' and 'interval' are also options for span-based events.)
  • required: false admits null/undefined cells. Numeric metrics that can plausibly be missing get this; categorical dimensions that should always be populated don't.
  • as const satisfies SeriesSchema locks in every column name and kind as a literal type. Downstream event.get('cpu') narrows to number | undefined without a cast.

Anything you don't include in the schema is ignored on ingest. In my case that meant the two label columns and the __scrape_meta tag disappeared at this step — no extra filter pass needed.

2. Normalize at the boundary

Pond's wire format is JSON. Anything that isn't already JSON-clean needs to be coerced before fromJSON. Three things to handle.

Timestamps

My CSV had timestamps in four different formats — 2025-01-15T00:00:00.000Z, 2025-01-15T01:05:00+01:00, 2025-01-15T00:01:00 (no offset), and 1736899260000 (epoch ms) — sometimes within the same column. In real- world data, mixed formats are common, especially if the file came out of a multi-collector pipeline. A robust parser should handle at least:

import { Time } from 'pond-ts';

function parseTimestamp(raw: string): number {
const s = raw.trim();
// Epoch milliseconds.
if (/^-?\d+$/.test(s)) return Number(s);
// ISO with explicit offset: 2025-01-15T01:05:00+01:00 or ...Z
if (/Z$|[+-]\d{2}:?\d{2}$/.test(s)) return Date.parse(s);
// Naked ISO: declare a zone explicitly.
return Time.parse(s, { timeZone: 'UTC' }).timestampMs();
}

Time.parse(s, { timeZone }) is pond's strict ISO parser. It throws if you pass it an ambiguous wall-clock string with no zone — that's a feature. Silent zone-drift is one of the easier ways to ship subtly wrong analysis. Pick a zone explicitly per source.

If you know Pandas

There's no equivalent of pd.to_datetime's "best-effort, infer everything" mode. The strictness is intentional — wall-clock strings are ambiguous and pond makes you say so.

Missing-value markers

My CSV used four different spellings for missing values across columns and rows: '', 'null', '-', and 'NA'. In general, real-world data spells missing in many ways — 'NaN' is also common, and any given file may use several at once. Collapse all of them to JSON null upstream:

const MISSING = new Set(['', 'null', '-', 'NA', 'na', 'NaN']);

function maybeNumber(raw: string): number | null {
const s = raw.trim();
if (MISSING.has(s)) return null;
const n = Number(s);
return Number.isFinite(n) ? n : null;
}

fromJSON converts JSON null to undefined for any column declared required: false. Required columns reject null at ingest time with a ValidationError.

Whitespace and quoting

Use a real CSV parser that handles quoted commas — my data had a deployment_id column where some values like "v2.7.89,beta" carried embedded commas, which a naive split-on-comma pass would mangle. Papaparse is the obvious choice.

Trim string fields that may carry stray whitespace from upstream. A few of my rows came through with host as ' api-1 ' from somewhere in the pipeline. Pond won't trim for you, and ' api-1 ' and 'api-1' will partition into different buckets at step 5.

3. Build the typed series

With per-row objects in hand, hand them to fromJSON:

const ingested = TimeSeries.fromJSON<Schema>({
name: 'host-metrics',
schema,
rows, // ObjectRow[] from your normalizer
});

rows can be either array-form ([time, ...values]) or object-form (keyed by column name). Object-form is more readable when you're constructing rows by hand; array-form is more compact for serialized payloads.

fromJSON validates that every row matches the schema — required cells aren't null, kinds line up, the time column parses. Invalid rows throw a ValidationError you can catch and report at ingest time, before any downstream code sees the data.

If you know Pandas

Roughly equivalent to pd.DataFrame.from_records(rows).astype(dtypes).set_index('time'), with the schema-typed validator built in.

4. Dedupe duplicates and same-key revisions

My CSV had ~60 byte-identical doubled rows that looked like flaky-retry re-emits, plus a handful of (time, host) pairs that appeared twice with diverging payload values — likely a collector that re-scraped and got a different reading. Real ingest pipelines re-emit rows for both reasons, and dedupe collapses both:

const deduped = series.dedupe({ keep: 'last' });

The default key is the full event key (the timestamp). For time-keyed series, two events at the same begin() are duplicates regardless of payload differences.

keep policies:

PolicyBehavior
'first'Keep the first occurrence at each key
'last'Keep the last occurrence (default — matches "newer wins" replay semantics)
'error'Throw on any duplicate. Useful for ingest paths that want hard failure on shape violations
'drop'Discard every duplicated event entirely. Conservative — when "1.5 events at this timestamp" can't be defended
{ min: col } / { max: col }Keep the event with the smallest / largest value at the named numeric column
(events) => eventCustom resolver. Receives all events sharing the key, returns one

The custom resolver is the escape hatch when you need to combine duplicates rather than pick one:

const merged = series.dedupe({
keep: (events) => {
// Take the latest cpu/memory but the maximum requests count.
const last = events[events.length - 1];
const maxRequests = Math.max(...events.map((e) => e.get('requests') ?? 0));
return last.set('requests', maxRequests);
},
});

For multi-entity data, dedupe inside a partitionBy chain so the entity columns become part of the duplicate key (next section).

If you know Pandas

df.drop_duplicates(keep='last') plus the { min/max: col } and resolver forms that Pandas would need a manual groupby().apply() for.

5. Scope per-entity work with partitionBy

This is the most important section of this guide.

My CSV had four hosts (api-1, api-2, api-3, web-1) interleaved on the same per-minute grid: 4 rows per timestamp, one per host. This is the canonical shape for multi-entity time series, and it's where the most subtle ingest bugs live.

When your series carries multiple entities (hosts, tenants, sensors) interleaved on the same time grid, every operator that reads neighboring events — fill, rolling, smooth, baseline, aggregate, dedupe, materialize — has a silent contamination hazard. A naive global .fill('linear') for api-1 would interpolate using api-2's value as a "neighbor" because api-2's event happens to be the next event in the series. That's almost never what you want.

partitionBy scopes those ops to within each entity's events:

const cleaned = series
.partitionBy('host') // enter partition view
.dedupe({ keep: 'last' }) // per-host: same time AND same host is the duplicate key
.fill({ cpu: 'linear' }) // per-host: neighbors come from the same host only
.rolling('5m', { cpu: 'avg' }) // per-host rolling average
.collect(); // exit, back to TimeSeries<S>

Three design choices to know:

The view is persistent across chains. Each sugar method on PartitionedTimeSeries returns another PartitionedTimeSeries carrying the same partition columns, so multi-step pipelines compose without re-declaring the partition.

Every neighbor-reading op has a partitioned variant. fill, dedupe, align, materialize, rolling, smooth, baseline, outliers, diff, rate, pctChange, cumulative, shift, aggregate. If you're calling one of these on multi-entity data, partition first.

Three terminals exit the view:

// Materialize back to a single TimeSeries<S>:
const collected = series.partitionBy('host').fill({ cpu: 'linear' }).collect();

// Get a Map<host, TimeSeries<S>> for per-entity downstream code:
const byHost = series.partitionBy('host').toMap();

// Run an arbitrary transform per partition (terminal — exits the view):
const custom = series
.partitionBy('host')
.apply((g) => g.fill({ cpu: 'linear' }).rolling('5m', { cpu: 'avg' }));

Composite partitions

Pass an array for entity boundaries that span multiple columns:

const filled = series
.partitionBy(['tenant', 'host'])
.fill({ cpu: 'linear' })
.collect();

Declared partition sets

If the entity set is known and stable, pass groups to make it explicit:

const HOSTS = ['api-1', 'api-2', 'api-3', 'web-1'] as const;

const filled = series
.partitionBy('host', { groups: HOSTS })
.fill({ cpu: 'linear' })
.collect();

Declared groups give you three things: deterministic iteration order in toMap, empty-but-stable partitions (a host with no events still appears as an empty TimeSeries), and runtime rejection of unknown entity values at construction time.

If you know Pandas

partitionBy(col).fill(...).collect() is the typed analog of df.groupby('host').apply(lambda g: g.interpolate()). The persistent- view shape is what Pandas's .groupby doesn't have natively — every chained op stays scoped without an inner apply.

6. Fill gaps without making things up

fill replaces undefined cells using a per-column policy:

const filled = series
.partitionBy('host')
.fill(
{
cpu: 'linear', // time-interpolate between bracketing values
memory: 'linear',
requests: 'linear',
latency_ms: 'linear',
deployment_id: 'hold', // forward-fill (slowly-changing dimension)
region: 'hold',
},
{ maxGap: '2m' }, // don't fill across gaps wider than 2 minutes
)
.collect();

Strategies:

StrategyBehavior
'hold'Forward-fill from the last known value
'bfill'Backward-fill from the next known value
'linear'Time-interpolate between bracketing values (numeric only)
'zero'Constant 0 (numeric only)
Literal value (e.g. host: 'unknown')Constant fill — anything not a strategy name is treated as a literal

Cap the fill scope

Two options bound how aggressively fill reaches across gaps:

  • { limit: N } — at most N consecutive null cells per column.
  • { maxGap: '2m' } — bracketing real values must be no more than 2 minutes apart.

Prefer maxGap. It's cadence-independent: an isolated single-cell drop-out gets filled, a multi-minute outage stays null. limit is a count, which only makes sense if your sampling cadence is regular and known. My data was at 1-minute cadence, so I went with maxGap: '2m' — that fills an isolated missed scrape (anchors 2 minutes apart) but leaves anything wider as null, which felt like the right place to draw the "this signals a real outage" line for this dataset. Tune the threshold to your data's cadence and your tolerance for filled-in values.

Leading and trailing null runs are never linearly filled (no neighbor on one side). If your downstream tolerates it, follow up with a bfill or hold pass to catch them:

series
.partitionBy('host')
.fill({ cpu: 'linear' }, { maxGap: '2m' })
.fill({ cpu: 'hold' }) // catch trailing gaps
.collect();
If you know Pandas

fill is the typed analog of df.interpolate(limit_area='inside', limit=...), plus the time-distance cap (maxGap) that Pandas needs explicit logic for.

7. Regularize to a fixed grid (optional)

fill operates on existing events — it doesn't synthesize missing rows. In my CSV, when a collector missed a scrape window, the row simply wasn't in the file; fill had nothing to fill because the event itself was absent. I didn't need a regular grid for my downstream, so I left the missing rows missing. If you do need rows at every grid point — e.g., for a wide-table export or a chart that wants explicit nulls per sample — use materialize:

import { Sequence } from 'pond-ts';

const regular = series
.partitionBy('host')
.materialize(Sequence.every('1m')) // emit one row per minute, undefined for empty buckets
.fill({ cpu: 'linear' }, { maxGap: '3m' })
.collect();

materialize produces a time-keyed series with one event per sequence sample. Empty buckets emit rows with undefined cells; the partitioned variant auto-populates the partition columns (host, here) on those rows so downstream code doesn't have to fill them separately.

The materialize → fill order matters: fill only operates on rows that already exist, so synthesize the rows first, then fill.

If you know Pandas

df.resample('1min').asfreq() creates the empty rows; materialize is the equivalent that doesn't implicitly fill.

8. Export

Two primary export shapes:

// JSON wire format — round-trips through fromJSON.
const wire = cleaned.toJSON({ rowFormat: 'object' });

// Wide-row points for chart libraries (Recharts, Observable Plot, visx).
const points = cleaned.toPoints();
// [{ ts: number, cpu: number | undefined, memory: ..., host: ... }, ...]

toJSON preserves the pond schema and round-trips losslessly. toPoints is the flat-row shape every chart library accepts directly: ts plus one key per value column.

For per-entity rendering, partition then export:

const linesByHost = cleaned.partitionBy('host').toMap((g) => g.toPoints());
// Map<host, ChartRow[]>

End-to-end

Putting it together:

import { readFileSync } from 'node:fs';
import Papa from 'papaparse';
import { TimeSeries, Time } from 'pond-ts';
import type {
JsonObjectRowForSchema,
SeriesSchema,
TimeSeriesJsonInput,
} from 'pond-ts';

const schema = [
{ name: 'time', kind: 'time' },
{ name: 'cpu', kind: 'number', required: false },
{ name: 'memory', kind: 'number', required: false },
{ name: 'requests', kind: 'number', required: false },
{ name: 'latency_ms', kind: 'number', required: false },
{ name: 'host', kind: 'string' },
{ name: 'deployment_id', kind: 'string' },
{ name: 'region', kind: 'string' },
] as const satisfies SeriesSchema;

type Schema = typeof schema;
type Row = JsonObjectRowForSchema<Schema>;

const MISSING = new Set(['', 'null', '-', 'NA', 'na', 'NaN']);

function parseTimestamp(raw: string): number {
const s = raw.trim();
if (/^-?\d+$/.test(s)) return Number(s);
if (/Z$|[+-]\d{2}:?\d{2}$/.test(s)) return Date.parse(s);
return Time.parse(s, { timeZone: 'UTC' }).timestampMs();
}

function maybeNumber(raw: string): number | null {
const s = raw.trim();
if (MISSING.has(s)) return null;
const n = Number(s);
return Number.isFinite(n) ? n : null;
}

const csv = readFileSync('messy.csv', 'utf8');
const parsed = Papa.parse<Record<string, string>>(csv, {
header: true,
skipEmptyLines: true,
});

const rows: Row[] = [];
for (const raw of parsed.data) {
if (!raw.time || !raw.host || !raw.deployment_id || !raw.region) continue;
rows.push({
time: parseTimestamp(raw.time),
cpu: maybeNumber(raw.cpu ?? ''),
memory: maybeNumber(raw.memory ?? ''),
requests: maybeNumber(raw.requests ?? ''),
latency_ms: maybeNumber(raw.latency_ms ?? ''),
host: raw.host.trim(),
deployment_id: raw.deployment_id.trim(),
region: raw.region.trim(),
});
}

rows.sort((a, b) => Number(a.time) - Number(b.time));

const cleaned = TimeSeries.fromJSON<Schema>({
name: 'host-metrics',
schema,
rows,
})
.partitionBy('host')
.dedupe({ keep: 'last' })
.fill(
{
cpu: 'linear',
memory: 'linear',
requests: 'linear',
latency_ms: 'linear',
deployment_id: 'hold',
region: 'hold',
},
{ maxGap: '2m' },
)
.collect();

const out = cleaned.toJSON({
rowFormat: 'object',
}) as TimeSeriesJsonInput<Schema>;
console.log(JSON.stringify(out));

Three layers of responsibility:

  1. Boundary (CSV-side): parse, normalize types, collapse missing markers, trim whitespace, drop unparseable rows.
  2. Ingest: fromJSON with a typed schema. Get a typed series with validation.
  3. Cleaning (pond-side): one chain. partitionBy for entity scope, dedupe for duplicates, fill with maxGap for cell-level gaps, collect to exit. Optional materialize if you need missing-row insertion.

Once the data is inside pond, every cleaning decision is a chained method on a typed series. The bugs you'd otherwise hand-roll — silent cross-entity interpolation, ambiguous duplicate resolution, wrongly-timezoned timestamps, gap-fills that paper over outages — are either prevented by the type system or made into explicit choices in the chain.

See also

  • Cleaning data — the operator-by-operator reference for the cleaning primitives this guide chains together (fill, dedupe, materialize, partitionBy).
  • Ingest — the JSON ingest reference: fromJSON shapes, parse options, and the nullundefined boundary.
  • API reference (core) — generated full-width reference for every method and type used here.