← System Design Simulator

Stream Windows

By Rahul Kumar · Senior Software Engineer · Updated · Category: Kleppmann · Designing Data-Intensive Applications

Tumbling / sliding / session + event-time watermarks. Ch 11.

This interactive explanation is built for system design interview prep: step through Stream Windows, watch the internal state change, and connect the concept to real distributed-system trade-offs.

Overview

A stream processor is a never-ending query, and the only way to produce finite answers from an infinite input is to carve time into windows. Tumbling windows chop the timeline into non-overlapping fixed buckets — 09:00-09:01, 09:01-09:02 — and emit one aggregate per bucket. Sliding windows overlap: every new event sits in several windows at once, producing smoother rolling averages at higher compute cost. Session windows bound events by gaps of inactivity rather than wall-clock — a user's click session might run for 3 minutes or 30 minutes. The hard problem is not defining the window; it is deciding when to close one. Events arrive out of order due to network jitter, GC pauses, and upstream retries, so a window for [09:00, 09:01) cannot be emitted at 09:01 without risking data loss. Watermarks solve this: a monotone estimate of how far behind "now" the stream has fallen, letting the engine emit windows whose watermark has passed.

Stream Windows — Interactive Simulator

Runs fully client-side in your browser; no sign-up. Or open full screen →

Launch the interactive Stream Windows widget — step through the algorithm or protocol and observe the internal state updating in real time.

How it works

The window operator indexes incoming events by their event-time timestamp, not the wall-clock arrival time. For a tumbling window of size W, event at time t belongs to the single window floor(t / W) * W. For a sliding window of size W and slide S, the event belongs to every window whose start lies in [t - W, t]; with W=60s and S=10s that is 6 windows per event. Each window holds a partial aggregate (sum, count, sketch, top-k) in state, updated on every arriving event. The key question is when to close and emit: too early and late events are dropped; too late and downstream latency grows. Watermarks track the minimum expected event-time across all sources, and the engine emits a window when the watermark passes its end. A window with end time 09:01:00 is emitted when the watermark reaches 09:01:00 + allowed_lateness. Events arriving after that can be dropped, sent to a side output, or trigger a late update if the engine supports retractions. Good implementations combine watermarks with triggers — fire at watermark, fire every N events, fire on explicit flush — so downstream consumers can choose latency vs completeness. State size is bounded by the number of active (unclosed) windows per key times the average aggregate size, which is what keeps this design tractable at scale.

Implementation

TumblingWindow: non-overlapping fixed buckets with watermark emission
import java.util.*;

/** Tumbling window over longs. Emits (windowStart, sum) when watermark passes. */
public final class TumblingWindow {
    private final long windowSizeMs;
    private final long allowedLatenessMs;
    private final NavigableMap<Long, Long> windows = new TreeMap<>();  // start -> sum
    private long watermark = Long.MIN_VALUE;

    public TumblingWindow(long windowSizeMs, long allowedLatenessMs) {
        this.windowSizeMs = windowSizeMs;
        this.allowedLatenessMs = allowedLatenessMs;
    }

    /** A single window start for any event-time. */
    private long windowStart(long ts) {
        return Math.floorDiv(ts, windowSizeMs) * windowSizeMs;
    }

    /** Add an event, drop if too late. */
    public void onEvent(long eventTimeMs, long value) {
        long start = windowStart(eventTimeMs);
        if (start + windowSizeMs + allowedLatenessMs <= watermark) return; // too late
        windows.merge(start, value, Long::sum);
    }

    /** Advance watermark and flush finalized windows. */
    public List<Map.Entry<Long, Long>> advanceWatermark(long newWatermark) {
        if (newWatermark <= watermark) return List.of();
        watermark = newWatermark;
        List<Map.Entry<Long, Long>> out = new ArrayList<>();
        Iterator<Map.Entry<Long, Long>> it = windows.entrySet().iterator();
        while (it.hasNext()) {
            var e = it.next();
            long end = e.getKey() + windowSizeMs;
            if (end + allowedLatenessMs <= watermark) {
                out.add(Map.entry(e.getKey(), e.getValue()));
                it.remove();
            }
        }
        return out;
    }

    /** Return all windows currently covering a given timestamp. */
    public List<Long> getWindowsForTime(long ts) {
        return List.of(windowStart(ts));
    }
}
SlidingWindow: overlapping windows with watermark emission
import java.util.*;

/** Sliding window of (size, slide). Each event falls into size/slide windows. */
public final class SlidingWindow {
    private final long sizeMs;
    private final long slideMs;
    private final long allowedLatenessMs;
    private final NavigableMap<Long, Long> windows = new TreeMap<>();  // start -> sum
    private long watermark = Long.MIN_VALUE;

    public SlidingWindow(long sizeMs, long slideMs, long allowedLatenessMs) {
        if (sizeMs % slideMs != 0) throw new IllegalArgumentException("size % slide must be 0");
        this.sizeMs = sizeMs;
        this.slideMs = slideMs;
        this.allowedLatenessMs = allowedLatenessMs;
    }

    /** Every window whose [start, start+size) contains ts. */
    public List<Long> getWindowsForTime(long ts) {
        long firstStart = Math.floorDiv(ts - sizeMs + slideMs, slideMs) * slideMs;
        List<Long> out = new ArrayList<>();
        for (long s = firstStart; s <= ts; s += slideMs) {
            if (s + sizeMs > ts) out.add(s);
        }
        return out;
    }

    public void onEvent(long eventTimeMs, long value) {
        for (long start : getWindowsForTime(eventTimeMs)) {
            if (start + sizeMs + allowedLatenessMs <= watermark) continue;
            windows.merge(start, value, Long::sum);
        }
    }

    /** Emit any windows whose end + lateness has been passed by the watermark. */
    public List<Map.Entry<Long, Long>> advanceWatermark(long newWatermark) {
        if (newWatermark <= watermark) return List.of();
        watermark = newWatermark;
        List<Map.Entry<Long, Long>> emitted = new ArrayList<>();
        Iterator<Map.Entry<Long, Long>> it = windows.entrySet().iterator();
        while (it.hasNext()) {
            var e = it.next();
            long end = e.getKey() + sizeMs;
            if (end + allowedLatenessMs <= watermark) {
                emitted.add(Map.entry(e.getKey(), e.getValue()));
                it.remove();
            }
        }
        return emitted;
    }
}

Complexity

Key design decisions & trade-offs

Common pitfalls

Interview follow-ups

Recommended reading

Related