Pipeline stats snapshot — cumulative counters since construction plus current reducer-state size. Cheap O(1).
eventsObserved: total source events ingested into reducer
state. Includes events replayed at construction from a
non-empty source — phrased uniformly with sibling classes'
stats() docs. Never decreases.evictions: total events removed from reducer state via
the source's 'evict' channel. Events that predated this
LiveReduce (already past the FIFO evict cursor) don't count.
Never decreases.emissions: total output events fired. Never decreases.
For Trigger.event, a single pushMany(K) fires ONE
deferred emission (see class JSDoc) — emissions may be
strictly less than eventsObserved.bufferSize: current count of events in reducer state
(= eventsObserved - evictions). Tracks the source's
current retained buffer that this reduce sees.bufferSize is only meaningful when the source emits
'evict'. This class subscribes to 'evict' only when
the source is marked with EMITS_EVICT (see class JSDoc).
If a future LiveSource impl evicts internally without
emitting 'evict', evictions stays at 0 and bufferSize
grows monotonically — silently wrong. Today every pond
source that evicts also marks EMITS_EVICT, so this is
theoretical, but the contract is load-bearing.
Read the current reducer snapshot — every output column's
current value, computed over the source's current buffer. Cheap
O(reducers) — each reducer's snapshot() is O(1) for built-ins.
"Reduce over the whole current buffer."
LiveReduceis the streaming counterpart to batchseries.reduce(mapping)— same mapping shape, but reactive: every push to the source updates the reducer state viaadd, every retention eviction updates viaremove. The snapshot at any moment is the reduction over "what's currently retained."Why not sugar over
LiveFusedRolling? The fused-rolling primitive requires a time-based window and maintains its own deque.LiveReduce's "window" is whatever's in the source's buffer right now — driven by retention, not a duration. That works for any retention shape (maxAge,maxEvents, both, neither) without forcing a sentinel resolution.Output stream-shape: a
LiveSource<Out>. One emitted event per trigger fire, keyed at the latest source event's key. The output schema is[time, ...mappingColumns]— same shape asLiveRollingAggregationfor consistency. Composes with the rest of the live operator surface.Trigger semantics match
LiveRollingAggregation's. The trigger fires on each source'event'ingest (defaultTrigger.event()), per-N-events (Trigger.count(n)), or per data-clock boundary crossing (Trigger.every(...)). Source eviction is independent — it drives reducer-state removes but does not itself fire the trigger.Post-retention emission via deferred microtask. A push that triggers retention fires
'event'per row, then runsapplyRetention(), then fires'batch'and'evict'. To emit a snapshot reflecting the post-retention buffer state,LiveReducedefers the trigger fire to aqueueMicrotaskscheduled in the'event'handler. By the time the microtask runs, all synchronous'event'/'evict'callbacks for the push have completed and reducer state is consistent with the source's current buffer.One implication:
Trigger.event()semantics differ fromLiveRollingAggregation's. ApushMany(rows)of K rows fires ONE deferred emission, not K. (Each individualpush()is one row → one emission, identical.) For most users this is the more useful semantic — the snapshot represents "state-after-this-push," not "state-after-each-row-mid-push." Users wanting per-row emissions should reach forLiveRollingAggregationover a buffer-sized window instead.Construction-time replay. Sources with existing buffer content at construction replay through
#ingest. WithTrigger.event()this fires ONE deferred emission after replay completes (not N events under the new microtask defer). Test pin:it('replay emits one deferred event ...').Caveat —
ordering: 'reorder'source mode.LiveReduceprocesses events in arrival order, not sorted-by-timestamp order. For a source withordering: 'reorder', late events are inserted into the source buffer at their sorted position but reachLiveReduceas new arrivals. Order-sensitive reducers (first,last,samples,top${N}, custom functions) compute over arrival order, not buffer order. If you need order-sensitive reductions on a reorder-mode source, snapshot to aTimeSeriesfirst vialive.toTimeSeries().reduce(...).Caveat —
reorder+ retention specifically. The windowed reducersmin/max/first/last/samplesmaintain forward-sliding-window state (a monotone deque, or head-removal ordered entries) that assumes eviction removes the OLDEST-arrived event first. That holds forstrict/drop(append-only) and the chunked backing, but NOT forreorder+ retention, where the source evicts the sorted-prefix — which may be a later arrival. On that combination those five reducers can report stale orundefinedsnapshots. The value-based reducers (avg,count,sum,stdev,median,percentile,unique) remove by value and stay correct. This is a long-standing limitation (it predates the chunked backing); for reliable windowed extrema on a reorder source, snapshot to aTimeSeriesandreducethere. Tracked in PLAN.md "Deferred".Source contract —
EMITS_EVICTis load-bearing. This class's reducer state stays in sync with the source's current buffer because it removes events as the source evicts them. The'evict'subscription is gated on theEMITS_EVICTsymbol marker. Sources that evict internally but do NOT emit'evict'would causeLiveReduce's state to grow without bound (no removes ever fire). Today every pond LiveSource that evicts also marks itself withEMITS_EVICT(LiveSeries,LiveViewwith eviction); future LiveSource implementations must preserve this contract.Public API: constructed via
live.reduce(mapping, opts?)onLiveSeries/LiveView. User code doesn't import this class directly.