pond-ts API Reference (core)
    Preparing search index...

    Class LivePartitionedSeries<S, K, ByCol>

    Live counterpart to PartitionedTimeSeries. Routes events from a source LiveSource<S> into per-partition LiveSeries<S> sub-buffers, each with its own retention, grace window, and stateful operator pipeline.

    Per-partition semantics (settled in the v0.11 design pass):

    • Retention applies to each partition independently. A chatty host can't squeeze a quiet one out of the buffer.
    • Grace windows apply per partition. A late event for host-A does not perturb host-B's emission. Caveat: per-partition grace is bounded by the source's grace window. If the source rejects an event (because it's older than the source's grace), it never reaches the partitioned view. Setting partitionBy('host', { graceWindow: '10m' }) on a source with graceWindow: '1m' silently uses the smaller window.
    • Aggregation timing is per-partition. host-A's rolling avg fires when host-A has enough data, regardless of host-B.
    • Auto-spawn on new partition values: the first time a value not seen before arrives, allocate a sub-buffer. Optional { groups } upfront declares the expected set (mirrors the batch typed-groups pattern); when set, unknown partition values throw on ingest.

    v0.11 PR 1 scope — foundation only. Compose operators per partition via apply((sub) => sub.fill(...).rolling(...)). Typed chainable sugar methods (fill(...).rolling(...).collect()) arrive in v0.11 PR 2.

    const live = new LiveSeries({ ... });

    // Per-host event lookup — direct subscription per partition.
    const byHost = live.partitionBy('host').toMap();
    byHost.get('api-1')?.on('event', (e) => { ... });

    // Apply a chain of live operators per partition; collect into a
    // unified LiveSeries.
    const cpuSmoothed = live.partitionBy('host').apply((sub) =>
    sub.fill({ cpu: 'hold' }).rolling('1m', { cpu: 'avg' }),
    );

    Type Parameters

    Index

    Constructors

    Properties

    by: ByCol
    groups?: readonly K[]
    name: string
    schema: S

    Methods

    • Apply factory per-partition and fan in the outputs into a single unified LiveSeries<R>. The factory is called once per partition (current and future); each call receives the partition's LiveSource<S> and should return a LiveSource<R> derived from it (typically by composing LiveSeries-style operators like sub.fill(...).rolling(...)).

      The unified series subscribes to every factory output and pushes events as they arrive. Auto-spawn propagates: a new partition value triggers a fresh factory invocation and the resulting LiveSource is subscribed to.

      Append-only semantics. Same as collect() — this is a fan-in sink. Per-partition output evictions (e.g. from a window operator inside the factory) are NOT propagated to the unified buffer. Use the options argument to set the unified buffer's own retention.

      History replay. When apply() is called on a partitioned view that already has events distributed across multiple partitions, existing factory-output events are gathered from every output, sorted globally by time, and pushed into the unified buffer in time order. This preserves strict ordering for the unified buffer.

      Factory contract. The factory must be pure and re-runnable: side-effect-free, no closure-captured state that mutates across calls, no external subscriptions on the input or output. The implementation invokes the factory once upfront on a stub LiveSeries<S> (to capture the output schema synchronously) and again once per partition (current and future). Factories that don't satisfy the contract may leak state across the stub call and the real per-partition calls.

      Ordering (v0.17.1+). Same shape as collect() — the unified LiveSeries<R> inherits ordering and graceWindow from this partitioned series by default; explicit options.ordering / options.graceWindow override. Retention stays caller-explicit per the append-only fan-in semantics.

      Type Parameters

      Parameters

      Returns LiveSeries<R>

    • Fan in events from every partition into a single unified LiveSeries<S>. Subscribes to per-partition output 'event' streams and pushes each event into the unified buffer.

      Append-only semantics. This is a fan-in sink, not a mirrored materialization. When per-partition retention or grace evicts events from a sub-buffer, those evictions are NOT propagated to the unified buffer. The unified buffer keeps every event it ever received until evicted by its own retention. To control its size, pass a retention option to collect. To inspect the current per-partition state, use toMap() and snapshot each partition independently.

      Ordering (v0.17.1+). The unified LiveSeries defaults to inheriting ordering and graceWindow from this partitioned series (which itself inherits from the source LiveSeries via partitionBy). Pre-fix it defaulted to 'strict' regardless of source — under 'reorder' sources, partition fan-in could deliver events out of order to a strict unified buffer and throw. Inheritance closes that gap. Explicit ordering and graceWindow on collect(...) override inheritance.

      Retention does NOT inherit — the append-only fan-in semantics above are deliberate. Pass retention explicitly to cap the unified buffer.

      Parameters

      Returns LiveSeries<S>

    • Dispose of the partitioned view: unsubscribe from the source, disconnect every per-partition pipeline subscriber (created by collect() and apply()), and drop spawn listeners. Safe to call multiple times.

      Note: this does not clear the per-partition LiveSeries sub-buffers themselves. Their event arrays linger until the LivePartitionedSeries instance becomes unreferenced and is garbage-collected. If you want to free the sub-buffer memory eagerly, drop your reference to the LivePartitionedSeries after dispose().

      Returns void

    • Per-partition rolling. See LiveSeries.rolling.

      Two emission modes, chosen by the trigger option:

      Default (no trigger / Trigger.event()): per-partition rolling — each partition has its own LiveRollingAggregation emitting per source event. Returns a chainable LivePartitionedView.

      In this mode the partition column drops by defaultrolling's output schema only retains columns named in mapping. Without including the partition column, the unified output of the chain loses the partition tag (e.g. host becomes undefined). To keep the partition column visible, include it with a passthrough reducer:

      partitioned.rolling('5m', { cpu: 'avg', host: 'last' })
      // ^^^^^^^^^^^^^^

      Trigger.clock(seq): synchronised partitioned rolling — all partitions share one bucket index and emit together at each boundary crossing. Returns a flat LiveSource<RowSchema> whose schema is [time, <partitionColumn>, ...mappingColumns].

      In this mode the partition column is auto-injected from the routing key — do NOT include it in mapping. A collision between the partition column name and any reducer-output column is rejected at construction with a clear error.

      partitioned.rolling(
      '5m',
      { cpu: 'avg' }, // host is auto-injected
      { trigger: Trigger.every('200ms') },
      );

      Type Parameters

      • const M extends
            | Readonly<
                Record<
                    string,
                    | AggregateReducer
                    | Readonly<
                        {
                            from: ValueColumnsForSchema<S>[number]["name"];
                            kind?: ScalarKind | undefined;
                            using: AggregateFunctionsForKind<
                                Extract<
                                    ValueColumnsForSchema<(...)>[number],
                                    ColumnDef<(...)[(...)], ScalarKind>,
                                >["kind"],
                            >;
                        },
                    >,
                >,
            >
            | {
                readonly [K in string
                | number
                | symbol]: K extends ValueColumnsForSchema<S>[number]["name"]
                    ?
                        | Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind;
                                using: AggregateFunctionsForKind<
                                    Extract<
                                        ValueColumnsForSchema<(...)>[number],
                                        ColumnDef<(...)[(...)], ScalarKind>,
                                    >["kind"],
                                >;
                            },
                        >
                        | AggregateFunctionsForKind<
                            ColumnKindByName<ValueColumnsForSchema<S>, K & string>,
                        >
                    : string extends K
                        ? | AggregateReducer
                        | Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind | undefined;
                                using: AggregateFunctionsForKind<
                                    Extract<(...)[(...)], ColumnDef<(...), (...)>>["kind"],
                                >;
                            },
                        >
                        : Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind;
                                using: AggregateFunctionsForKind<
                                    Extract<
                                        ValueColumnsForSchema<(...)>[number],
                                        ColumnDef<(...)[(...)], ScalarKind>,
                                    >["kind"],
                                >;
                            },
                        >
            }

      Parameters

      Returns LivePartitionedView<
          S,
          readonly [S[0], AggregateColumns<ValueColumnsForSchema<S>, M>],
          K,
          ByCol,
      >

    • Per-partition rolling. See LiveSeries.rolling.

      Two emission modes, chosen by the trigger option:

      Default (no trigger / Trigger.event()): per-partition rolling — each partition has its own LiveRollingAggregation emitting per source event. Returns a chainable LivePartitionedView.

      In this mode the partition column drops by defaultrolling's output schema only retains columns named in mapping. Without including the partition column, the unified output of the chain loses the partition tag (e.g. host becomes undefined). To keep the partition column visible, include it with a passthrough reducer:

      partitioned.rolling('5m', { cpu: 'avg', host: 'last' })
      // ^^^^^^^^^^^^^^

      Trigger.clock(seq): synchronised partitioned rolling — all partitions share one bucket index and emit together at each boundary crossing. Returns a flat LiveSource<RowSchema> whose schema is [time, <partitionColumn>, ...mappingColumns].

      In this mode the partition column is auto-injected from the routing key — do NOT include it in mapping. A collision between the partition column name and any reducer-output column is rejected at construction with a clear error.

      partitioned.rolling(
      '5m',
      { cpu: 'avg' }, // host is auto-injected
      { trigger: Trigger.every('200ms') },
      );

      Type Parameters

      • const M extends
            | Readonly<
                Record<
                    string,
                    | AggregateReducer
                    | Readonly<
                        {
                            from: ValueColumnsForSchema<S>[number]["name"];
                            kind?: ScalarKind | undefined;
                            using: AggregateFunctionsForKind<
                                Extract<
                                    ValueColumnsForSchema<(...)>[number],
                                    ColumnDef<(...)[(...)], ScalarKind>,
                                >["kind"],
                            >;
                        },
                    >,
                >,
            >
            | {
                readonly [K in string
                | number
                | symbol]: K extends ValueColumnsForSchema<S>[number]["name"]
                    ?
                        | Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind;
                                using: AggregateFunctionsForKind<
                                    Extract<
                                        ValueColumnsForSchema<(...)>[number],
                                        ColumnDef<(...)[(...)], ScalarKind>,
                                    >["kind"],
                                >;
                            },
                        >
                        | AggregateFunctionsForKind<
                            ColumnKindByName<ValueColumnsForSchema<S>, K & string>,
                        >
                    : string extends K
                        ? | AggregateReducer
                        | Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind | undefined;
                                using: AggregateFunctionsForKind<
                                    Extract<(...)[(...)], ColumnDef<(...), (...)>>["kind"],
                                >;
                            },
                        >
                        : Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind;
                                using: AggregateFunctionsForKind<
                                    Extract<
                                        ValueColumnsForSchema<(...)>[number],
                                        ColumnDef<(...)[(...)], ScalarKind>,
                                    >["kind"],
                                >;
                            },
                        >
            }

      Parameters

      Returns LivePartitionedSyncRolling<S, K, SeriesSchema>

    • Per-partition rolling. See LiveSeries.rolling.

      Two emission modes, chosen by the trigger option:

      Default (no trigger / Trigger.event()): per-partition rolling — each partition has its own LiveRollingAggregation emitting per source event. Returns a chainable LivePartitionedView.

      In this mode the partition column drops by defaultrolling's output schema only retains columns named in mapping. Without including the partition column, the unified output of the chain loses the partition tag (e.g. host becomes undefined). To keep the partition column visible, include it with a passthrough reducer:

      partitioned.rolling('5m', { cpu: 'avg', host: 'last' })
      // ^^^^^^^^^^^^^^

      Trigger.clock(seq): synchronised partitioned rolling — all partitions share one bucket index and emit together at each boundary crossing. Returns a flat LiveSource<RowSchema> whose schema is [time, <partitionColumn>, ...mappingColumns].

      In this mode the partition column is auto-injected from the routing key — do NOT include it in mapping. A collision between the partition column name and any reducer-output column is rejected at construction with a clear error.

      partitioned.rolling(
      '5m',
      { cpu: 'avg' }, // host is auto-injected
      { trigger: Trigger.every('200ms') },
      );

      Type Parameters

      • const M extends
            | Readonly<
                Record<
                    string,
                    | AggregateReducer
                    | Readonly<
                        {
                            from: ValueColumnsForSchema<S>[number]["name"];
                            kind?: ScalarKind | undefined;
                            using: AggregateFunctionsForKind<
                                Extract<
                                    ValueColumnsForSchema<(...)>[number],
                                    ColumnDef<(...)[(...)], ScalarKind>,
                                >["kind"],
                            >;
                        },
                    >,
                >,
            >
            | {
                readonly [K in string
                | number
                | symbol]: K extends ValueColumnsForSchema<S>[number]["name"]
                    ?
                        | Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind;
                                using: AggregateFunctionsForKind<
                                    Extract<
                                        ValueColumnsForSchema<(...)>[number],
                                        ColumnDef<(...)[(...)], ScalarKind>,
                                    >["kind"],
                                >;
                            },
                        >
                        | AggregateFunctionsForKind<
                            ColumnKindByName<ValueColumnsForSchema<S>, K & string>,
                        >
                    : string extends K
                        ? | AggregateReducer
                        | Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind | undefined;
                                using: AggregateFunctionsForKind<
                                    Extract<(...)[(...)], ColumnDef<(...), (...)>>["kind"],
                                >;
                            },
                        >
                        : Readonly<
                            {
                                from: ValueColumnsForSchema<S>[number]["name"];
                                kind?: ScalarKind;
                                using: AggregateFunctionsForKind<
                                    Extract<
                                        ValueColumnsForSchema<(...)>[number],
                                        ColumnDef<(...)[(...)], ScalarKind>,
                                    >["kind"],
                                >;
                            },
                        >
            }

      Parameters

      Returns
          | LivePartitionedSyncRolling<S, K, SeriesSchema>
          | LivePartitionedView<
              S,
              readonly [S[0], AggregateColumns<ValueColumnsForSchema<S>, M>],
              K,
              ByCol,
          >

    • Keyed-form fused multi-window partitioned rolling. Maintains N windows per partition in a single ingest pass over a single shared deque per partition; emits one merged event per partition per trigger boundary.

      Clock trigger required. The fused form on partitioned series is synced-cross-partition by design — single trigger, single boundary detection, single fan-out per boundary. Event/count triggers don't make sense for cross-partition synced emission and are not accepted.

      Output schema is [time, <byColumn>, ...mergedColumns] — partition column auto-injected once at the front, never per- window. Duplicate output column names across windows are rejected at construction.

      See PLAN.md "Fused multi-window rolling" for the full design.

      Type Parameters

      Parameters

      Returns LivePartitionedFusedRolling<
          S,
          K,
          readonly [
              ColumnDef<"time", "time">,
              ColumnDef<ByCol, FromColumnKind<S, ByCol> & string> & { required: false },
              FusedRollingColumns<S, FM>,
          ],
      >

    • Per-partition stream sampling. Each partition gets its own stride counter (closure-captured inside its LiveView). Safe by construction: chaining after partitionBy thins each partition's stream independently — no multi-entity bias.

      v0.17.0 ships stride only on the live side; see SampleStrategy for why reservoir is deferred. The buffer-as-window persona's typical shape:

      live.partitionBy('host').sample({ stride: 10 }).rolling('5m', m);
      

      Each host's stream is thinned 1-in-10 before flowing into a per-host 5m rolling window — decoupling baseline length from event rate.

      Parameters

      Returns LivePartitionedView<S, S, K, ByCol>

    • Pipeline stats snapshot — current partition count plus cumulative routing counter. Cheap O(1).

      • partitions: current number of partitions (declared groups plus auto-spawned ones). With { groups }, equal to groups.length once any of those values appear; without it, grows on each new partition value.
      • eventsRouted: total source events successfully routed to a partition. Events that throw (unknown partition value under typed-groups) are counted only if they reach LivePartitionedSeries.#routeEvent successfully — they don't.

      Note: per-partition counters (per-partition eventsRouted, per-partition retention state, etc.) are intentionally NOT exposed by this method. Use toMap() and call LiveSeries.stats on each partition's sub-buffer for per-partition observability — that scales O(partitions) only when you actually need it.

      Returns { eventsRouted: number; partitions: number }