Ad Click Event Aggregation (Real-time) System Design Interview Question
Problem: Design a real-time ad click event aggregation service that powers advertiser reporting and near-real-time budgeting.
Overview
Ad click aggregation is the quiet backbone of the internet advertising business: every ad impression and click on every page has to land in a counter that an advertiser trusts enough to pay against. The volume is enormous — roughly a billion clicks a day at peak-hour rates near 50,000 per second — and the correctness bar is non-trivial, because advertisers read these numbers to decide when to pause campaigns, trigger budget caps, and reconcile invoices. The design that has emerged across Google, Meta, and their peers is a lambda-flavoured pipeline: a fast streaming path gives near-real-time counters for dashboards and budget enforcement, and a nightly batch path re-aggregates the raw event log from cold storage to produce authoritative numbers that absorb late events, bot filtering, and schema fixes. This page walks the streaming half in detail — Kafka ingest, Flink windowing, and idempotent rollup writes — because that is where the interesting distributed-systems work lives.
Summary
A streaming aggregation pipeline: ad clients fire impression/click beacons into an edge ingest tier, events are queued on Kafka with idempotency keys, Flink maintains per-(ad_id, minute) counters with event-time tumbling windows, and aggregates land in a columnar OLAP store (ClickHouse) for advertiser dashboards. A nightly batch corrector re-aggregates raw events from cold storage (S3) to fix late-arriving clicks and bot-filtered rows — the streaming result is 'fast but approximate', the batch result is 'authoritative'. The dominant design choice is a lambda-style stream + batch reconciliation over a pure streaming system: exactly-once aggregation at the click volume here (1B/day, peak ~50K clicks/sec) is expensive, and advertisers will accept a stream that's eventually corrected overnight. The main tradeoff is dual-path complexity — two codebases must produce the same schema.
Requirements
Functional
- Ingest click and impression beacons from millions of browsers and apps with an idempotency key
- Produce per-(ad_id, minute) aggregates available to advertisers within about a minute
- Support per-campaign and per-advertiser rollups over hour, day, and lifetime windows
- Enforce campaign budget caps in near real time by reading the streaming counters
- Tolerate out-of-order and late-arriving events up to a bounded watermark
- Expose a query API for advertiser dashboards with sub-second latency on recent data
Non-functional
- Sustain 50,000 clicks/sec steady state with headroom for 5x traffic spikes
- Streaming aggregates visible within 60 seconds of the originating click
- No data loss: every accepted event must appear in either the streaming or batch rollup
- Nightly batch correction must reconcile with streaming to within 1% before publishing
- Horizontal scale on both ingest and aggregation — no single broker or worker is a bottleneck
- Cost per billion events must fall year-on-year as the fleet grows
Capacity Assumptions
- 1B click events/day (100:1 impressions:clicks → 100B impressions/day — not all stored)
- Peak 3x sustained for major events (Black Friday, Super Bowl)
- Event size ≈ 400 bytes (ad_id, user_id, ts, geo, device, creative_id, publisher)
- Advertiser queries: 'impressions & clicks for campaign X last 7 days, grouped by creative'
- SLA: streaming aggregates visible within 1 min; batch reconciliation by T+24h
Back-of-Envelope Estimates
- Clicks: 1B / 86400 ≈ 11.6K/sec sustained, ~35K/sec peak
- Impressions (if stored): 100B / 86400 ≈ 1.16M/sec sustained — dominates capacity
- Daily raw volume: 1B * 400B = 400 GB/day clicks; plus ~40 TB/day impressions
- Aggregated rows: ~10M active ads * 1440 minutes = 14.4B cells/day into ClickHouse, but heavily compressible (~95% compression ratio on counters)
- Query QPS: advertiser dashboards ≈ 500 concurrent, 2 queries each per load → ~50 QPS on OLAP
High-level architecture
Clients fire click beacons at a thin edge ingest service that validates the payload, stamps a server-side timestamp and idempotency key, and appends the event to a Kafka topic partitioned by ad_id. Kafka is the source of truth: the same topic is consumed by three downstream systems. First, a Flink job reads the firehose and maintains per-(ad_id, minute) counters using event-time tumbling windows with a watermark a few seconds behind wall-clock; when a window closes, the aggregator emits a (ad_id, minute, count) record to a rollup topic. Second, a rollup writer consumes that topic and upserts into an OLAP store — ClickHouse or Pinot — using (ad_id, minute) as the primary key so replays and retries are idempotent. Third, a cold-storage sink archives the raw Kafka log to S3 or GCS partitioned by hour. A nightly Spark or Flink batch job re-reads the archive, applies bot filters and schema fixes, and overwrites the authoritative daily rollup; the streaming numbers are marked 'preliminary' until the batch settles. Advertiser dashboards query the OLAP store directly; budget enforcement services subscribe to the rollup topic and pause campaigns when thresholds are crossed. Deduplication happens at two layers — idempotency keys at ingest, and upserts at the sink — because exactly-once across this many hops is cheaper to approximate than to guarantee.
Architecture Components (10)
- Ad Client (SDK / Pixel) (client) — JavaScript pixel or mobile SDK embedded on publisher pages that emits impression and click beacons.
- Edge Ingest Service (api) — Thin global edge POP that accepts click/impression beacons, server-stamps timestamps, and publishes to Kafka.
- Click Kafka (stream) — Durable partitioned log of impression + click events feeding all downstream consumers.
- Stream Aggregator (Flink) (stream-processor) — Consumes click events and maintains per-(ad_id, minute) counters using event-time tumbling windows.
- OLAP Store (ClickHouse) (nosql) — Columnar OLAP database holding per-(ad_id, minute) aggregates for sub-second advertiser queries.
- Dead Letter Queue (queue) — Holds events too late for the streaming window or rejected by schema validation.
- Raw Event Lake (S3) (blob) — Immutable Parquet files of every raw event, partitioned by date and hour for the batch corrector.
- Batch Corrector (worker) — Nightly Spark job that re-aggregates the full day's raw events and overwrites streaming aggregates with 'batch' rows.
- Report API (api) — Thin read API in front of ClickHouse for advertiser dashboards and billing.
- Advertiser Dashboard (client) — Web UI where advertisers see campaign performance.
Operations Walked Through (3)
- ingest-click — A user clicks an ad; the SDK beacons the edge; the edge stamps and publishes to Kafka; raw event is mirrored to S3 by Kafka Connect.
- stream-aggregate — Flink consumes clicks, dedupes by event_id, and on each minute boundary flushes aggregates into ClickHouse exactly-once.
- batch-correct — At 02:00 UTC a Spark job re-aggregates the full prior day from S3 + DLQ with bot filtering and dedupe, and overwrites the 'stream' rows with 'batch' rows in ClickHouse.
Implementation
package com.example.ads.ingest;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class ClickConsumer implements Runnable {
private final KafkaConsumer<String, ClickEvent> consumer;
private final ClickHandler handler;
private volatile boolean running = true;
public ClickConsumer(Map<String, Object> cfg, ClickHandler handler) {
cfg.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
cfg.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
cfg.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
this.consumer = new KafkaConsumer<>(cfg);
this.handler = handler;
}
@Override
public void run() {
consumer.subscribe(List.of("clicks.v1"));
while (running) {
ConsumerRecords<String, ClickEvent> batch = consumer.poll(Duration.ofMillis(500));
if (batch.isEmpty()) continue;
for (ConsumerRecord<String, ClickEvent> r : batch) {
handler.accept(r.key(), r.value(), r.timestamp()); // dedupe by event id inside
}
// commit only after the handler has flushed — at-least-once with idempotent sink
consumer.commitSync();
}
consumer.close();
}
public void stop() { running = false; }
}
package com.example.ads.agg;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class ClickAggregator {
// Takes a keyed stream of ClickEvent keyed by ad_id, emits (ad_id, minuteStart, count).
public static DataStream<AdMinuteCount> aggregate(DataStream<ClickEvent> clicks) {
return clicks
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.seconds(10)) {
@Override public long extractTimestamp(ClickEvent e) { return e.serverTs; }
})
.keyBy(e -> e.adId)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new CountAgg(), new EmitWithKey());
}
public static class CountAgg implements AggregateFunction<ClickEvent, Long, Long> {
public Long createAccumulator() { return 0L; }
public Long add(ClickEvent e, Long acc) { return acc + 1L; }
public Long getResult(Long acc) { return acc; }
public Long merge(Long a, Long b) { return a + b; }
}
// EmitWithKey: ProcessWindowFunction that wraps (adId, windowStart, count) into AdMinuteCount.
}
package com.example.ads.rollup;
import java.sql.*;
public class RollupStore {
private final Connection conn;
public RollupStore(Connection conn) throws SQLException {
this.conn = conn;
// Primary key (ad_id, minute_start) guarantees idempotent replay.
try (Statement s = conn.createStatement()) {
s.execute("""
CREATE TABLE IF NOT EXISTS ad_minute_rollup (
ad_id BIGINT NOT NULL,
minute_start BIGINT NOT NULL,
click_count BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
PRIMARY KEY (ad_id, minute_start)
)
""");
}
}
// Upsert with a monotonic guard: a replayed or out-of-order write never overwrites
// a larger count produced by a later batch correction.
public void upsert(long adId, long minuteStart, long count, long updatedAt) throws SQLException {
String sql = """
INSERT INTO ad_minute_rollup(ad_id, minute_start, click_count, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT (ad_id, minute_start)
DO UPDATE SET
click_count = GREATEST(ad_minute_rollup.click_count, EXCLUDED.click_count),
updated_at = GREATEST(ad_minute_rollup.updated_at, EXCLUDED.updated_at)
""";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setLong(1, adId);
ps.setLong(2, minuteStart);
ps.setLong(3, count);
ps.setLong(4, updatedAt);
ps.executeUpdate();
}
}
}
Key design decisions & trade-offs
- Processing model — Chosen: Lambda (stream + nightly batch) rather than pure streaming. True exactly-once at this volume is expensive and fragile; a cheaper stream that is overwritten by a trustworthy batch gives you both freshness and correctness, at the cost of two codebases.
- Windowing clock — Chosen: Event time with a bounded watermark. Processing-time windows are simpler but double-count after recovery and under-count during lag spikes; event time with a 10-second watermark matches how advertisers actually think about 'clicks per minute'.
- Deduplication strategy — Chosen: Idempotency key at ingest plus idempotent upsert at the sink. Kafka exactly-once across job restarts is achievable but brittle. Two cheap guards — a hashed event id and a PK-based upsert — cover every realistic replay path.
- Sink database — Chosen: Columnar OLAP (ClickHouse/Pinot) over a row-oriented OLTP database. Advertiser queries are wide scans over time ranges with group-bys on ad_id and campaign_id; columnar storage with compression is 10-100x faster for those shapes and cheaper to store.
- Raw event retention — Chosen: Archive every event to cold storage for at least 30 days. The batch corrector, compliance audits, and ML training pipelines all require the raw log. Kafka retention is too short and too expensive to serve that role directly.
- Partitioning key — Chosen: Partition Kafka by ad_id. Hot ads concentrate traffic, but aggregation is per-ad so co-locating events per partition keeps state local to a single Flink task and avoids cross-partition shuffles.
Interview follow-ups
- How do you detect and quarantine bot traffic before it inflates the streaming counters?
- How do you handle a single 'celebrity' ad whose traffic exceeds one Kafka partition?
- How do you reconcile when the nightly batch disagrees with the streaming number by more than 1%?
- How do you serve the advertiser dashboard sub-second while the OLAP store is absorbing millions of writes per minute?
- How does budget enforcement avoid over-spending when it reads a counter that lags by 60 seconds?
- How do you extend this pipeline to support per-user frequency capping without exploding the state size?