Stream Windows
Tumbling / sliding / session + event-time watermarks. Ch 11.
This interactive explanation is built for system design interview prep: step through Stream Windows, watch the internal state change, and connect the concept to real distributed-system trade-offs.
Overview
A stream processor is a never-ending query, and the only way to produce finite answers from an infinite input is to carve time into windows. Tumbling windows chop the timeline into non-overlapping fixed buckets — 09:00-09:01, 09:01-09:02 — and emit one aggregate per bucket. Sliding windows overlap: every new event sits in several windows at once, producing smoother rolling averages at higher compute cost. Session windows bound events by gaps of inactivity rather than wall-clock — a user's click session might run for 3 minutes or 30 minutes. The hard problem is not defining the window; it is deciding when to close one. Events arrive out of order due to network jitter, GC pauses, and upstream retries, so a window for [09:00, 09:01) cannot be emitted at 09:01 without risking data loss. Watermarks solve this: a monotone estimate of how far behind "now" the stream has fallen, letting the engine emit windows whose watermark has passed.
How it works
The window operator indexes incoming events by their event-time timestamp, not the wall-clock arrival time. For a tumbling window of size W, event at time t belongs to the single window floor(t / W) * W. For a sliding window of size W and slide S, the event belongs to every window whose start lies in [t - W, t]; with W=60s and S=10s that is 6 windows per event. Each window holds a partial aggregate (sum, count, sketch, top-k) in state, updated on every arriving event. The key question is when to close and emit: too early and late events are dropped; too late and downstream latency grows. Watermarks track the minimum expected event-time across all sources, and the engine emits a window when the watermark passes its end. A window with end time 09:01:00 is emitted when the watermark reaches 09:01:00 + allowed_lateness. Events arriving after that can be dropped, sent to a side output, or trigger a late update if the engine supports retractions. Good implementations combine watermarks with triggers — fire at watermark, fire every N events, fire on explicit flush — so downstream consumers can choose latency vs completeness. State size is bounded by the number of active (unclosed) windows per key times the average aggregate size, which is what keeps this design tractable at scale.
Implementation
import java.util.*;
/** Tumbling window over longs. Emits (windowStart, sum) when watermark passes. */
public final class TumblingWindow {
private final long windowSizeMs;
private final long allowedLatenessMs;
private final NavigableMap<Long, Long> windows = new TreeMap<>(); // start -> sum
private long watermark = Long.MIN_VALUE;
public TumblingWindow(long windowSizeMs, long allowedLatenessMs) {
this.windowSizeMs = windowSizeMs;
this.allowedLatenessMs = allowedLatenessMs;
}
/** A single window start for any event-time. */
private long windowStart(long ts) {
return Math.floorDiv(ts, windowSizeMs) * windowSizeMs;
}
/** Add an event, drop if too late. */
public void onEvent(long eventTimeMs, long value) {
long start = windowStart(eventTimeMs);
if (start + windowSizeMs + allowedLatenessMs <= watermark) return; // too late
windows.merge(start, value, Long::sum);
}
/** Advance watermark and flush finalized windows. */
public List<Map.Entry<Long, Long>> advanceWatermark(long newWatermark) {
if (newWatermark <= watermark) return List.of();
watermark = newWatermark;
List<Map.Entry<Long, Long>> out = new ArrayList<>();
Iterator<Map.Entry<Long, Long>> it = windows.entrySet().iterator();
while (it.hasNext()) {
var e = it.next();
long end = e.getKey() + windowSizeMs;
if (end + allowedLatenessMs <= watermark) {
out.add(Map.entry(e.getKey(), e.getValue()));
it.remove();
}
}
return out;
}
/** Return all windows currently covering a given timestamp. */
public List<Long> getWindowsForTime(long ts) {
return List.of(windowStart(ts));
}
}
import java.util.*;
/** Sliding window of (size, slide). Each event falls into size/slide windows. */
public final class SlidingWindow {
private final long sizeMs;
private final long slideMs;
private final long allowedLatenessMs;
private final NavigableMap<Long, Long> windows = new TreeMap<>(); // start -> sum
private long watermark = Long.MIN_VALUE;
public SlidingWindow(long sizeMs, long slideMs, long allowedLatenessMs) {
if (sizeMs % slideMs != 0) throw new IllegalArgumentException("size % slide must be 0");
this.sizeMs = sizeMs;
this.slideMs = slideMs;
this.allowedLatenessMs = allowedLatenessMs;
}
/** Every window whose [start, start+size) contains ts. */
public List<Long> getWindowsForTime(long ts) {
long firstStart = Math.floorDiv(ts - sizeMs + slideMs, slideMs) * slideMs;
List<Long> out = new ArrayList<>();
for (long s = firstStart; s <= ts; s += slideMs) {
if (s + sizeMs > ts) out.add(s);
}
return out;
}
public void onEvent(long eventTimeMs, long value) {
for (long start : getWindowsForTime(eventTimeMs)) {
if (start + sizeMs + allowedLatenessMs <= watermark) continue;
windows.merge(start, value, Long::sum);
}
}
/** Emit any windows whose end + lateness has been passed by the watermark. */
public List<Map.Entry<Long, Long>> advanceWatermark(long newWatermark) {
if (newWatermark <= watermark) return List.of();
watermark = newWatermark;
List<Map.Entry<Long, Long>> emitted = new ArrayList<>();
Iterator<Map.Entry<Long, Long>> it = windows.entrySet().iterator();
while (it.hasNext()) {
var e = it.next();
long end = e.getKey() + sizeMs;
if (end + allowedLatenessMs <= watermark) {
emitted.add(Map.entry(e.getKey(), e.getValue()));
it.remove();
}
}
return emitted;
}
}
Complexity
- Tumbling onEvent:
O(log W) where W is active window count (TreeMap insert) - Sliding onEvent:
O((size/slide) * log W) — one update per overlapping window - Watermark advance / emit:
O(E) where E is expired-window count - State size:
O(active_windows * aggregate_size) per key - Late-arrival drop:
O(1) check against watermark
Key design decisions & trade-offs
- Event time vs processing time — Chosen: Event time for correctness, processing time for simplicity. Event time reflects when something actually happened in the source system; processing time reflects when your engine saw it. Event time gives stable, replayable, reproducible results; processing time is trivially simple but breaks on any upstream delay or reprocessing.
- Watermark conservatism — Chosen: Bounded-out-of-orderness watermark (now - allowedLateness). A tighter watermark emits windows sooner, improving latency but dropping more late events. A looser watermark holds state longer, using more memory. Observing actual lag percentiles and setting allowedLateness at p99 is the usual compromise.
- Handling late events — Chosen: Side-output for auditing plus optional retraction updates. Dropping late events silently causes undercounts that are hard to debug. Side-outputting them preserves auditability. Full retraction (emit a new window result with corrected value) is only worth the complexity when downstream consumers can handle updates.
- Sliding window density — Chosen: Slide as a divisor of size to keep emission counts uniform. A sliding window with size=60s slide=10s produces 6 overlapping windows per event, which is manageable. Non-divisor slides produce irregular window alignments and harder-to-reason-about emission schedules.
Common pitfalls
- Using wall-clock time on the event itself instead of a timestamp from the source; NTP jumps or timezone bugs shift whole windows
- Setting allowedLateness to a value smaller than observed p99 lag, silently dropping the tail of every window
- Holding windows open forever because the watermark never advances — common when one partition has no traffic and blocks the global minimum
- Implementing sliding windows as N independent tumbling windows and paying N times the state cost when incremental aggregates would share state
- Using session windows with a long gap on a bursty stream and discovering one session never closes because events never stop arriving
Interview follow-ups
- Implement session windows with dynamic gap-based expiry and merging of adjacent sessions
- Add a side-output for late events so they can be audited without blocking the main pipeline
- Handle multi-source watermarks where the global minimum is taken across partitions
- Combine windowed aggregates with stateful joins for enrichment patterns
Recommended reading
- Alex Petrov, Database Internals — storage engines and distributed systems internals.
- Martin Kleppmann, Designing Data-Intensive Applications (DDIA) — data models, replication, partitioning, consistency.
- The System Design Primer — high-level design building blocks.
- Foundational networking + web-security references (TCP/IP, TLS 1.3, OWASP Top 10).