← System Design Simulator

Ad Click Event Aggregation (Real-time) System Design Interview Question

By Rahul Kumar · Senior Software Engineer · Updated · 10 components · 3 operations ·Source: Alex Xu, System Design Interview Vol 2, Chapter 6; Flink docs; ClickHouse docs

Problem: Design a real-time ad click event aggregation service that powers advertiser reporting and near-real-time budgeting.

Overview

Ad click aggregation is the quiet backbone of the internet advertising business: every ad impression and click on every page has to land in a counter that an advertiser trusts enough to pay against. The volume is enormous — roughly a billion clicks a day at peak-hour rates near 50,000 per second — and the correctness bar is non-trivial, because advertisers read these numbers to decide when to pause campaigns, trigger budget caps, and reconcile invoices. The design that has emerged across Google, Meta, and their peers is a lambda-flavoured pipeline: a fast streaming path gives near-real-time counters for dashboards and budget enforcement, and a nightly batch path re-aggregates the raw event log from cold storage to produce authoritative numbers that absorb late events, bot filtering, and schema fixes. This page walks the streaming half in detail — Kafka ingest, Flink windowing, and idempotent rollup writes — because that is where the interesting distributed-systems work lives.

Ad Click Event Aggregation (Real-time) — Interactive Simulator

Runs fully client-side in your browser; no sign-up. Or open full screen →

Launch the interactive walkthrough for Ad Click Event Aggregation (Real-time) — animated architecture diagram, step-by-step flow with real payloads, component swap, and a discrete-event stress simulator.

Summary

A streaming aggregation pipeline: ad clients fire impression/click beacons into an edge ingest tier, events are queued on Kafka with idempotency keys, Flink maintains per-(ad_id, minute) counters with event-time tumbling windows, and aggregates land in a columnar OLAP store (ClickHouse) for advertiser dashboards. A nightly batch corrector re-aggregates raw events from cold storage (S3) to fix late-arriving clicks and bot-filtered rows — the streaming result is 'fast but approximate', the batch result is 'authoritative'. The dominant design choice is a lambda-style stream + batch reconciliation over a pure streaming system: exactly-once aggregation at the click volume here (1B/day, peak ~50K clicks/sec) is expensive, and advertisers will accept a stream that's eventually corrected overnight. The main tradeoff is dual-path complexity — two codebases must produce the same schema.

Requirements

Functional

Non-functional

Capacity Assumptions

Back-of-Envelope Estimates

High-level architecture

Clients fire click beacons at a thin edge ingest service that validates the payload, stamps a server-side timestamp and idempotency key, and appends the event to a Kafka topic partitioned by ad_id. Kafka is the source of truth: the same topic is consumed by three downstream systems. First, a Flink job reads the firehose and maintains per-(ad_id, minute) counters using event-time tumbling windows with a watermark a few seconds behind wall-clock; when a window closes, the aggregator emits a (ad_id, minute, count) record to a rollup topic. Second, a rollup writer consumes that topic and upserts into an OLAP store — ClickHouse or Pinot — using (ad_id, minute) as the primary key so replays and retries are idempotent. Third, a cold-storage sink archives the raw Kafka log to S3 or GCS partitioned by hour. A nightly Spark or Flink batch job re-reads the archive, applies bot filters and schema fixes, and overwrites the authoritative daily rollup; the streaming numbers are marked 'preliminary' until the batch settles. Advertiser dashboards query the OLAP store directly; budget enforcement services subscribe to the rollup topic and pause campaigns when thresholds are crossed. Deduplication happens at two layers — idempotency keys at ingest, and upserts at the sink — because exactly-once across this many hops is cheaper to approximate than to guarantee.

Architecture Components (10)

Operations Walked Through (3)

Implementation

Kafka consumer for the click firehose
package com.example.ads.ingest;

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;

public class ClickConsumer implements Runnable {
    private final KafkaConsumer<String, ClickEvent> consumer;
    private final ClickHandler handler;
    private volatile boolean running = true;

    public ClickConsumer(Map<String, Object> cfg, ClickHandler handler) {
        cfg.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        cfg.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        cfg.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
        this.consumer = new KafkaConsumer<>(cfg);
        this.handler = handler;
    }

    @Override
    public void run() {
        consumer.subscribe(List.of("clicks.v1"));
        while (running) {
            ConsumerRecords<String, ClickEvent> batch = consumer.poll(Duration.ofMillis(500));
            if (batch.isEmpty()) continue;
            for (ConsumerRecord<String, ClickEvent> r : batch) {
                handler.accept(r.key(), r.value(), r.timestamp()); // dedupe by event id inside
            }
            // commit only after the handler has flushed — at-least-once with idempotent sink
            consumer.commitSync();
        }
        consumer.close();
    }

    public void stop() { running = false; }
}
Flink-style 1-minute tumbling window aggregation
package com.example.ads.agg;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class ClickAggregator {

    // Takes a keyed stream of ClickEvent keyed by ad_id, emits (ad_id, minuteStart, count).
    public static DataStream<AdMinuteCount> aggregate(DataStream<ClickEvent> clicks) {
        return clicks
            .assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.seconds(10)) {
                    @Override public long extractTimestamp(ClickEvent e) { return e.serverTs; }
                })
            .keyBy(e -> e.adId)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .aggregate(new CountAgg(), new EmitWithKey());
    }

    public static class CountAgg implements AggregateFunction<ClickEvent, Long, Long> {
        public Long createAccumulator()          { return 0L; }
        public Long add(ClickEvent e, Long acc)  { return acc + 1L; }
        public Long getResult(Long acc)          { return acc; }
        public Long merge(Long a, Long b)        { return a + b; }
    }

    // EmitWithKey: ProcessWindowFunction that wraps (adId, windowStart, count) into AdMinuteCount.
}
Idempotent upsert into the rollup store
package com.example.ads.rollup;

import java.sql.*;

public class RollupStore {
    private final Connection conn;

    public RollupStore(Connection conn) throws SQLException {
        this.conn = conn;
        // Primary key (ad_id, minute_start) guarantees idempotent replay.
        try (Statement s = conn.createStatement()) {
            s.execute("""
                CREATE TABLE IF NOT EXISTS ad_minute_rollup (
                  ad_id        BIGINT NOT NULL,
                  minute_start BIGINT NOT NULL,
                  click_count  BIGINT NOT NULL,
                  updated_at   BIGINT NOT NULL,
                  PRIMARY KEY (ad_id, minute_start)
                )
                """);
        }
    }

    // Upsert with a monotonic guard: a replayed or out-of-order write never overwrites
    // a larger count produced by a later batch correction.
    public void upsert(long adId, long minuteStart, long count, long updatedAt) throws SQLException {
        String sql = """
            INSERT INTO ad_minute_rollup(ad_id, minute_start, click_count, updated_at)
            VALUES (?, ?, ?, ?)
            ON CONFLICT (ad_id, minute_start)
            DO UPDATE SET
              click_count = GREATEST(ad_minute_rollup.click_count, EXCLUDED.click_count),
              updated_at  = GREATEST(ad_minute_rollup.updated_at,  EXCLUDED.updated_at)
            """;
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setLong(1, adId);
            ps.setLong(2, minuteStart);
            ps.setLong(3, count);
            ps.setLong(4, updatedAt);
            ps.executeUpdate();
        }
    }
}

Key design decisions & trade-offs

Interview follow-ups

Related