Apply factory per-partition and fan in the outputs into a
single unified LiveSeries<R>. The factory is called once per
partition (current and future); each call receives the
partition's LiveSource<S> and should return a LiveSource<R>
derived from it (typically by composing LiveSeries-style
operators like sub.fill(...).rolling(...)).
The unified series subscribes to every factory output and
pushes events as they arrive. Auto-spawn propagates: a new
partition value triggers a fresh factory invocation and the
resulting LiveSource is subscribed to.
Append-only semantics. Same as collect() — this is a
fan-in sink. Per-partition output evictions (e.g. from a
window operator inside the factory) are NOT propagated to
the unified buffer. Use the options argument to set the
unified buffer's own retention.
History replay. When apply() is called on a partitioned
view that already has events distributed across multiple
partitions, existing factory-output events are gathered from
every output, sorted globally by time, and pushed into the
unified buffer in time order. This preserves strict ordering
for the unified buffer.
Factory contract. The factory must be pure and
re-runnable: side-effect-free, no closure-captured state
that mutates across calls, no external subscriptions on the
input or output. The implementation invokes the factory once
upfront on a stub LiveSeries<S> (to capture the output
schema synchronously) and again once per partition (current
and future). Factories that don't satisfy the contract may
leak state across the stub call and the real per-partition
calls.
Ordering (v0.17.1+). Same shape as collect() — the unified
LiveSeries<R> inherits ordering and graceWindow from this
partitioned series by default; explicit options.ordering /
options.graceWindow override. Retention stays caller-explicit
per the append-only fan-in semantics.
Optionaloptions: Partial<LiveSeriesOptions<R>>Fan in events from every partition into a single unified
LiveSeries<S>. Subscribes to per-partition output 'event'
streams and pushes each event into the unified buffer.
Append-only semantics. This is a fan-in sink, not a
mirrored materialization. When per-partition retention or
grace evicts events from a sub-buffer, those evictions are
NOT propagated to the unified buffer. The unified buffer
keeps every event it ever received until evicted by its own
retention. To control its size, pass a retention option to
collect. To inspect the current per-partition state, use
toMap() and snapshot each partition independently.
Ordering (v0.17.1+). The unified LiveSeries defaults to
inheriting ordering and graceWindow from this partitioned
series (which itself inherits from the source LiveSeries via
partitionBy). Pre-fix it defaulted to 'strict' regardless of
source — under 'reorder' sources, partition fan-in could
deliver events out of order to a strict unified buffer and
throw. Inheritance closes that gap. Explicit ordering and
graceWindow on collect(...) override inheritance.
Retention does NOT inherit — the append-only fan-in
semantics above are deliberate. Pass retention explicitly to
cap the unified buffer.
Optionaloptions: Partial<LiveSeriesOptions<S>>Per-partition cumulative. See LiveSeries.cumulative.
Per-partition diff. See LiveSeries.diff.
Dispose of the partitioned view: unsubscribe from the source,
disconnect every per-partition pipeline subscriber (created
by collect() and apply()), and drop spawn listeners. Safe
to call multiple times.
Note: this does not clear the per-partition LiveSeries
sub-buffers themselves. Their event arrays linger until the
LivePartitionedSeries instance becomes unreferenced and is
garbage-collected. If you want to free the sub-buffer memory
eagerly, drop your reference to the LivePartitionedSeries
after dispose().
Per-partition fill. See LiveSeries.fill.
Optionaloptions: { limit?: number }Per-partition pctChange. See LiveSeries.pctChange.
Per-partition rate. See LiveSeries.rate.
Per-partition rolling. See LiveSeries.rolling.
Two emission modes, chosen by the trigger option:
Default (no trigger / Trigger.event()): per-partition
rolling — each partition has its own LiveRollingAggregation
emitting per source event. Returns a chainable
LivePartitionedView.
In this mode the partition column drops by default —
rolling's output schema only retains columns named in
mapping. Without including the partition column, the unified
output of the chain loses the partition tag (e.g. host becomes
undefined). To keep the partition column visible, include it
with a passthrough reducer:
partitioned.rolling('5m', { cpu: 'avg', host: 'last' })
// ^^^^^^^^^^^^^^
Trigger.clock(seq): synchronised partitioned rolling — all
partitions share one bucket index and emit together at each
boundary crossing. Returns a flat LiveSource<RowSchema> whose
schema is [time, <partitionColumn>, ...mappingColumns].
In this mode the partition column is auto-injected from the
routing key — do NOT include it in mapping. A collision
between the partition column name and any reducer-output column
is rejected at construction with a clear error.
partitioned.rolling(
'5m',
{ cpu: 'avg' }, // host is auto-injected
{ trigger: Trigger.every('200ms') },
);
Optionaloptions: LiveRollingOptions & { trigger?: { kind: "event" | "count" } }Per-partition rolling. See LiveSeries.rolling.
Two emission modes, chosen by the trigger option:
Default (no trigger / Trigger.event()): per-partition
rolling — each partition has its own LiveRollingAggregation
emitting per source event. Returns a chainable
LivePartitionedView.
In this mode the partition column drops by default —
rolling's output schema only retains columns named in
mapping. Without including the partition column, the unified
output of the chain loses the partition tag (e.g. host becomes
undefined). To keep the partition column visible, include it
with a passthrough reducer:
partitioned.rolling('5m', { cpu: 'avg', host: 'last' })
// ^^^^^^^^^^^^^^
Trigger.clock(seq): synchronised partitioned rolling — all
partitions share one bucket index and emit together at each
boundary crossing. Returns a flat LiveSource<RowSchema> whose
schema is [time, <partitionColumn>, ...mappingColumns].
In this mode the partition column is auto-injected from the
routing key — do NOT include it in mapping. A collision
between the partition column name and any reducer-output column
is rejected at construction with a clear error.
partitioned.rolling(
'5m',
{ cpu: 'avg' }, // host is auto-injected
{ trigger: Trigger.every('200ms') },
);
Per-partition rolling. See LiveSeries.rolling.
Two emission modes, chosen by the trigger option:
Default (no trigger / Trigger.event()): per-partition
rolling — each partition has its own LiveRollingAggregation
emitting per source event. Returns a chainable
LivePartitionedView.
In this mode the partition column drops by default —
rolling's output schema only retains columns named in
mapping. Without including the partition column, the unified
output of the chain loses the partition tag (e.g. host becomes
undefined). To keep the partition column visible, include it
with a passthrough reducer:
partitioned.rolling('5m', { cpu: 'avg', host: 'last' })
// ^^^^^^^^^^^^^^
Trigger.clock(seq): synchronised partitioned rolling — all
partitions share one bucket index and emit together at each
boundary crossing. Returns a flat LiveSource<RowSchema> whose
schema is [time, <partitionColumn>, ...mappingColumns].
In this mode the partition column is auto-injected from the
routing key — do NOT include it in mapping. A collision
between the partition column name and any reducer-output column
is rejected at construction with a clear error.
partitioned.rolling(
'5m',
{ cpu: 'avg' }, // host is auto-injected
{ trigger: Trigger.every('200ms') },
);
Keyed-form fused multi-window partitioned rolling. Maintains N windows per partition in a single ingest pass over a single shared deque per partition; emits one merged event per partition per trigger boundary.
Clock trigger required. The fused form on partitioned series is synced-cross-partition by design — single trigger, single boundary detection, single fan-out per boundary. Event/count triggers don't make sense for cross-partition synced emission and are not accepted.
Output schema is [time, <byColumn>, ...mergedColumns] —
partition column auto-injected once at the front, never per-
window. Duplicate output column names across windows are
rejected at construction.
See PLAN.md "Fused multi-window rolling" for the full design.
Per-partition stream sampling. Each partition gets its own
stride counter (closure-captured inside its LiveView). Safe
by construction: chaining after partitionBy thins each
partition's stream independently — no multi-entity bias.
v0.17.0 ships stride only on the live side; see SampleStrategy for why reservoir is deferred. The buffer-as-window persona's typical shape:
live.partitionBy('host').sample({ stride: 10 }).rolling('5m', m);
Each host's stream is thinned 1-in-10 before flowing into a per-host 5m rolling window — decoupling baseline length from event rate.
Pipeline stats snapshot — current partition count plus cumulative routing counter. Cheap O(1).
partitions: current number of partitions (declared groups
plus auto-spawned ones). With { groups }, equal to
groups.length once any of those values appear; without it,
grows on each new partition value.eventsRouted: total source events successfully routed to
a partition. Events that throw (unknown partition value
under typed-groups) are counted only if they reach
LivePartitionedSeries.#routeEvent successfully —
they don't.Note: per-partition counters (per-partition eventsRouted,
per-partition retention state, etc.) are intentionally NOT
exposed by this method. Use toMap() and call
LiveSeries.stats on each partition's sub-buffer for
per-partition observability — that scales O(partitions) only
when you actually need it.
Materialize the partitioned view as a Map<key, LiveSource<S>>,
one entry per spawned partition. Map iteration order matches
spawn order (declared order if groups was set, insertion
order otherwise).
Live counterpart to PartitionedTimeSeries. Routes events from a source
LiveSource<S>into per-partitionLiveSeries<S>sub-buffers, each with its own retention, grace window, and stateful operator pipeline.Per-partition semantics (settled in the v0.11 design pass):
host-Adoes not perturbhost-B's emission. Caveat: per-partition grace is bounded by the source's grace window. If the source rejects an event (because it's older than the source's grace), it never reaches the partitioned view. SettingpartitionBy('host', { graceWindow: '10m' })on a source withgraceWindow: '1m'silently uses the smaller window.host-A's rolling avg fires whenhost-Ahas enough data, regardless ofhost-B.{ groups }upfront declares the expected set (mirrors the batch typed-groups pattern); when set, unknown partition values throw on ingest.v0.11 PR 1 scope — foundation only. Compose operators per partition via
apply((sub) => sub.fill(...).rolling(...)). Typed chainable sugar methods (fill(...).rolling(...).collect()) arrive in v0.11 PR 2.Example