Distributed Message Queue (Kafka-style) System Design Interview Question
Problem: Design a distributed, durable, high-throughput message queue like Apache Kafka.
Overview
A distributed message queue like Apache Kafka is the backbone of modern event-driven platforms: it decouples producers from consumers, smooths traffic spikes, and preserves an immutable log of everything that happened in the business. The hard part isn't accepting messages, it's accepting a million per second without losing any when a broker dies, while still giving each consumer group its own replayable, ordered view of the stream. Get it wrong and you either drop payments on a leader failover, or you serialize the entire firehose through one node and cap out at a few tens of thousands of messages per second. The Kafka-style answer, partitioned append-only logs replicated across brokers with a leader-per-partition model, is now the default for analytics pipelines, CDC, microservice choreography, and real-time fraud detection. Understanding why partitions, ISR replication, offsets, and consumer groups exist in this exact shape is the single most leveraged piece of system-design knowledge for any backend engineer touching data in motion.
Summary
A partitioned, replicated, append-only log that decouples producers from consumers and guarantees ordering within a partition. The dominant design choice is a partition-leader / in-sync-replica (ISR) replication model: one broker owns each partition for writes, writes are replicated to followers, and a separate controller handles metadata + failover. The main tradeoff is strong per-partition ordering at the cost of cross-partition ordering — and the fact that leader failover causes a brief availability dip (~seconds) for that partition. Sized for ~1M msgs/sec ingest, ~3x fanout to consumers, and 7-day retention.
Requirements
Functional
- Producers publish records to named topics; each record lands on exactly one partition
- Consumers subscribe in groups; each partition is consumed by exactly one member per group
- Strict FIFO ordering within a partition; no global cross-partition ordering guarantee
- At-least-once delivery by default, with opt-in idempotent and transactional producers
- Configurable retention by time (e.g. 7 days) or size; consumers can replay from any offset
- Online topic creation, partition count increase, and broker membership changes without downtime
Non-functional
- Sustain 1M msgs/sec ingest and 3M peak with sub-10ms p99 produce latency
- No data loss on single-broker failure (replication factor = 3, min.insync.replicas = 2)
- Horizontal scale by adding brokers; no single write bottleneck
- Durable on disk with fsync tunable; survives full cluster restart
- Leader failover completes in seconds, not minutes
Capacity Assumptions
- 1M messages/sec steady, 3M peak
- Average message size = 1 KB
- Retention 7 days; replication factor 3
- 100 topics, 1000 partitions total, ~50 broker nodes
- Consumer groups: 3x fanout (one message read by 3 independent consumer groups on average)
Back-of-Envelope Estimates
- Ingest throughput: 1M * 1 KB = 1 GB/s, peak 3 GB/s
- Raw storage/day: 1 GB/s * 86400 ≈ 86 TB; x3 replication → 260 TB/day on disk → 1.8 PB for 7-day retention
- Read throughput (3x fanout): 3 GB/s steady, 9 GB/s peak
- Per broker: 20 GB/s aggregate NIC, ~100 MB/s sustained ingest (260 TB/week / 50 nodes)
- Controller metadata: ~1M topic/partition/replica entries → fits comfortably in a single Raft quorum
High-level architecture
The request path starts in the producer client, a smart library embedded in the application. It fetches topic metadata once from any bootstrap broker, caches a partition-to-leader map, and from then on talks directly to the leader broker for each partition. Records are buffered in a per-partition accumulator, compressed (lz4 or zstd), and flushed as a batch when either linger.ms or batch.size is hit, turning a million tiny writes into ten thousand fat ones. The leader broker appends the batch to the partition's segment file (a memory-mapped append-only log) and asynchronously ships it to follower replicas. With acks=all and min.insync.replicas=2, the leader only acknowledges the producer after at least one follower has persisted the batch, which is what gives you the no-data-loss guarantee under single-node failure. A separate controller (originally ZooKeeper, now KRaft) watches broker liveness and, when a leader dies, promotes an in-sync replica to be the new leader and propagates the new metadata. Consumers pull from leaders in long-polled fetch requests, track their position via committed offsets stored in a special __consumer_offsets topic, and rebalance partitions across group members when membership changes. The whole design leans into sequential disk I/O and the page cache: a partition log is an append-only file, and the OS serves most consumer reads from RAM without ever touching disk.
Architecture Components (9)
- Producer Client (client) — Application-side library that batches records, selects a partition, and produces to the partition leader.
- Bootstrap Load Balancer (lb) — L4 LB fronting the broker pool — used only for the initial bootstrap/metadata fetch. Subsequent data traffic goes directly to partition leaders.
- Broker (Partition Leader) (queue) — Stateful Kafka broker process; leads a set of partitions, serves PRODUCE and FETCH, and replicates to followers.
- Partition Log (on-disk) (blob) — Append-only segmented log files on local SSD. The physical representation of a partition.
- Replica Broker (Follower) (queue) — Follower brokers that pull records from the leader and maintain an in-sync replica (ISR) of each partition.
- Controller / Metadata Quorum (Raft / ZooKeeper) (coordinator) — Single active controller that owns cluster metadata and performs leader elections on failover.
- Consumer Client (client) — Application-side library that joins a consumer group and pulls records from assigned partitions.
- Group Coordinator (coordinator) — One broker per consumer group elected as coordinator; manages membership, heartbeats, and partition assignment.
- Offset Store (__consumer_offsets topic) (kv) — Kafka's internal compacted topic storing (group, topic, partition) → committed_offset.
Operations Walked Through (3)
- produce — Producer sends a record to the partition leader; leader appends to its log, waits for ISR replication, then acks.
- consume — Consumer polls the leader for new records, processes them, and commits the offset back to the coordinator.
- leader-failover — Broker-12 (leader of orders-7) crashes. Controller detects the heartbeat miss, removes it from ISR, picks a new leader from the remaining ISR, and propagates the change.
Implementation
package com.hld.mq.client;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducerClient implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(KafkaProducerClient.class);
private final Producer<String, byte[]> producer;
public KafkaProducerClient(@Value("${kafka.bootstrap}") String bootstrap) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 262144);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 128L * 1024 * 1024);
this.producer = new KafkaProducer<>(props);
}
public Future<RecordMetadata> publish(String topic, String key, byte[] payload) {
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, payload);
return producer.send(record, (metadata, error) -> {
if (error != null) {
log.error("publish failed topic={} key={}", topic, key, error);
return;
}
log.debug("ack topic={} partition={} offset={}", metadata.topic(), metadata.partition(), metadata.offset());
});
}
@Override
public void close() {
producer.flush();
producer.close();
}
}
package com.hld.mq.broker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public final class Topic {
private final String name;
private final int replicationFactor;
private final ConcurrentMap<Integer, Partition> partitions = new ConcurrentHashMap<>();
public Topic(String name, int partitionCount, int replicationFactor, List<BrokerId> brokers) {
this.name = Objects.requireNonNull(name);
this.replicationFactor = replicationFactor;
for (int i = 0; i < partitionCount; i++) {
List<BrokerId> replicas = pickReplicas(brokers, replicationFactor, i);
partitions.put(i, new Partition(name, i, replicas));
}
}
public Partition partitionFor(int id) {
return partitions.get(id);
}
public int partitionCount() {
return partitions.size();
}
public String name() {
return name;
}
private static List<BrokerId> pickReplicas(List<BrokerId> brokers, int rf, int partitionId) {
List<BrokerId> rotated = new ArrayList<>(brokers);
Collections.rotate(rotated, -partitionId);
return List.copyOf(rotated.subList(0, Math.min(rf, rotated.size())));
}
public static final class Partition {
private final String topic;
private final int id;
private final List<BrokerId> replicas;
private volatile BrokerId leader;
Partition(String topic, int id, List<BrokerId> replicas) {
this.topic = topic;
this.id = id;
this.replicas = replicas;
this.leader = replicas.get(0);
}
public BrokerId leader() { return leader; }
public List<BrokerId> replicas() { return replicas; }
public int id() { return id; }
public String topic() { return topic; }
public void electLeader(BrokerId newLeader) { this.leader = newLeader; }
}
public record BrokerId(int id, String host, int port) { }
}
package com.hld.mq.broker;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.CRC32;
public final class PartitionLogAppender implements AutoCloseable {
private static final int HEADER_BYTES = 20; // offset(8) + len(4) + crc(4) + ts(4)
private final FileChannel segment;
private final AtomicLong nextOffset;
public PartitionLogAppender(Path segmentPath, long startingOffset) throws IOException {
this.segment = FileChannel.open(segmentPath,
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
this.segment.position(this.segment.size());
this.nextOffset = new AtomicLong(startingOffset);
}
/** Append a batch and fsync for the acks=all durability path. Returns first assigned offset. */
public synchronized long append(List<byte[]> records) throws IOException {
long firstOffset = nextOffset.get();
int totalBytes = HEADER_BYTES * records.size();
for (byte[] r : records) totalBytes += r.length;
ByteBuffer buf = ByteBuffer.allocate(totalBytes);
CRC32 crc = new CRC32();
int ts = (int) (System.currentTimeMillis() / 1000L);
for (byte[] r : records) {
long off = nextOffset.getAndIncrement();
crc.reset();
crc.update(r, 0, r.length);
buf.putLong(off);
buf.putInt(r.length);
buf.putInt((int) crc.getValue());
buf.putInt(ts);
buf.put(r);
}
buf.flip();
while (buf.hasRemaining()) segment.write(buf);
segment.force(false);
return firstOffset;
}
public long endOffset() { return nextOffset.get(); }
@Override
public void close() throws IOException {
segment.close();
}
}
Key design decisions & trade-offs
- Ordering guarantee — Chosen: Per-partition FIFO only, no global order. Global ordering would force a single writer and cap throughput; per-partition order is enough for 95% of workloads when keys are chosen well.
- Replication protocol — Chosen: Leader-based with ISR (in-sync replicas). Simpler and faster than Paxos/Raft for the data path; acks=all + min.insync.replicas=2 gives no-data-loss on single-node failure at ~2-3x leader-only latency.
- Consumer position tracking — Chosen: Client-committed offsets stored back in Kafka. Brokers stay stateless per consumer and replay is cheap; the cost is that at-least-once is the default and apps must handle duplicates.
- Storage layout — Chosen: Append-only log with page cache, not a B-tree. Sequential disk I/O is ~100x faster than random I/O and the page cache makes consumer reads free; the trade is no in-place update or per-key lookup.
- Metadata coordination — Chosen: Dedicated controller (KRaft/ZK) separate from data brokers. Isolates Raft quorum traffic from the firehose data path; failover decisions survive data-broker overload.
Interview follow-ups
- Exactly-once semantics across produce-consume-produce cycles via transactional producer
- Tiered storage: offload cold segments to S3 so brokers only hold the hot window
- Cross-datacenter replication with MirrorMaker 2 and offset translation
- Auto-rebalancing partitions (Cruise Control) when brokers become hot
- Schema evolution via a registry (Avro/Protobuf) with forward/backward compatibility checks