pond-ts API Reference (core)
    Preparing search index...

    Class LiveReduce<S, Out>

    "Reduce over the whole current buffer." LiveReduce is the streaming counterpart to batch series.reduce(mapping) — same mapping shape, but reactive: every push to the source updates the reducer state via add, every retention eviction updates via remove. The snapshot at any moment is the reduction over "what's currently retained."

    Why not sugar over LiveFusedRolling? The fused-rolling primitive requires a time-based window and maintains its own deque. LiveReduce's "window" is whatever's in the source's buffer right now — driven by retention, not a duration. That works for any retention shape (maxAge, maxEvents, both, neither) without forcing a sentinel resolution.

    Output stream-shape: a LiveSource<Out>. One emitted event per trigger fire, keyed at the latest source event's key. The output schema is [time, ...mappingColumns] — same shape as LiveRollingAggregation for consistency. Composes with the rest of the live operator surface.

    Trigger semantics match LiveRollingAggregation's. The trigger fires on each source 'event' ingest (default Trigger.event()), per-N-events (Trigger.count(n)), or per data-clock boundary crossing (Trigger.every(...)). Source eviction is independent — it drives reducer-state removes but does not itself fire the trigger.

    Post-retention emission via deferred microtask. A push that triggers retention fires 'event' per row, then runs applyRetention(), then fires 'batch' and 'evict'. To emit a snapshot reflecting the post-retention buffer state, LiveReduce defers the trigger fire to a queueMicrotask scheduled in the 'event' handler. By the time the microtask runs, all synchronous 'event' / 'evict' callbacks for the push have completed and reducer state is consistent with the source's current buffer.

    One implication: Trigger.event() semantics differ from LiveRollingAggregation's. A pushMany(rows) of K rows fires ONE deferred emission, not K. (Each individual push() is one row → one emission, identical.) For most users this is the more useful semantic — the snapshot represents "state-after-this-push," not "state-after-each-row-mid-push." Users wanting per-row emissions should reach for LiveRollingAggregation over a buffer-sized window instead.

    Construction-time replay. Sources with existing buffer content at construction replay through #ingest. With Trigger.event() this fires ONE deferred emission after replay completes (not N events under the new microtask defer). Test pin: it('replay emits one deferred event ...').

    Caveat — ordering: 'reorder' source mode. LiveReduce processes events in arrival order, not sorted-by-timestamp order. For a source with ordering: 'reorder', late events are inserted into the source buffer at their sorted position but reach LiveReduce as new arrivals. Order-sensitive reducers (first, last, samples, top${N}, custom functions) compute over arrival order, not buffer order. If you need order-sensitive reductions on a reorder-mode source, snapshot to a TimeSeries first via live.toTimeSeries().reduce(...).

    Caveat — reorder + retention specifically. The windowed reducers min / max / first / last / samples maintain forward-sliding-window state (a monotone deque, or head-removal ordered entries) that assumes eviction removes the OLDEST-arrived event first. That holds for strict / drop (append-only) and the chunked backing, but NOT for reorder + retention, where the source evicts the sorted-prefix — which may be a later arrival. On that combination those five reducers can report stale or undefined snapshots. The value-based reducers (avg, count, sum, stdev, median, percentile, unique) remove by value and stay correct. This is a long-standing limitation (it predates the chunked backing); for reliable windowed extrema on a reorder source, snapshot to a TimeSeries and reduce there. Tracked in PLAN.md "Deferred".

    Source contract — EMITS_EVICT is load-bearing. This class's reducer state stays in sync with the source's current buffer because it removes events as the source evicts them. The 'evict' subscription is gated on the EMITS_EVICT symbol marker. Sources that evict internally but do NOT emit 'evict' would cause LiveReduce's state to grow without bound (no removes ever fire). Today every pond LiveSource that evicts also marks itself with EMITS_EVICT (LiveSeries, LiveView with eviction); future LiveSource implementations must preserve this contract.

    Public API: constructed via live.reduce(mapping, opts?) on LiveSeries / LiveView. User code doesn't import this class directly.

    Type Parameters

    Implements

    Index

    Constructors

    Properties

    Accessors

    Methods

    Constructors

    • Type Parameters

      Parameters

      • source: LiveSource<S>
      • mapping:
            | Readonly<
                Record<
                    string,
                    | AggregateReducer
                    | Readonly<
                        {
                            from: ValueColumnsForSchema<S>[number]["name"];
                            kind?: ScalarKind | undefined;
                            using: AggregateFunctionsForKind<
                                Extract<
                                    ValueColumnsForSchema<(...)>[number],
                                    ColumnDef<(...)[(...)], ScalarKind>,
                                >["kind"],
                            >;
                        },
                    >,
                >,
            >
            | Readonly<
                Record<
                    string,
                    | AggregateReducer
                    | Readonly<
                        {
                            from: ValueColumnsForSchema<S>[number]["name"];
                            kind?: ScalarKind | undefined;
                            using: AggregateFunctionsForKind<
                                Extract<
                                    ValueColumnsForSchema<(...)>[number],
                                    ColumnDef<(...)[(...)], ScalarKind>,
                                >["kind"],
                            >;
                        },
                    >,
                >,
            >
      • options: LiveRollingOptions = {}

      Returns LiveReduce<S, Out>

    Properties

    name: string
    schema: Out

    Accessors

    Methods

    • Pipeline stats snapshot — cumulative counters since construction plus current reducer-state size. Cheap O(1).

      • eventsObserved: total source events ingested into reducer state. Includes events replayed at construction from a non-empty source — phrased uniformly with sibling classes' stats() docs. Never decreases.
      • evictions: total events removed from reducer state via the source's 'evict' channel. Events that predated this LiveReduce (already past the FIFO evict cursor) don't count. Never decreases.
      • emissions: total output events fired. Never decreases. For Trigger.event, a single pushMany(K) fires ONE deferred emission (see class JSDoc) — emissions may be strictly less than eventsObserved.
      • bufferSize: current count of events in reducer state (= eventsObserved - evictions). Tracks the source's current retained buffer that this reduce sees.

      bufferSize is only meaningful when the source emits 'evict'. This class subscribes to 'evict' only when the source is marked with EMITS_EVICT (see class JSDoc). If a future LiveSource impl evicts internally without emitting 'evict', evictions stays at 0 and bufferSize grows monotonically — silently wrong. Today every pond source that evicts also marks EMITS_EVICT, so this is theoretical, but the contract is load-bearing.

      Returns {
          bufferSize: number;
          emissions: number;
          eventsObserved: number;
          evictions: number;
      }

    • Read the current reducer snapshot — every output column's current value, computed over the source's current buffer. Cheap O(reducers) — each reducer's snapshot() is O(1) for built-ins.

      Returns Record<string, ColumnValue | undefined>