← System Design Simulator

Distributed Message Queue (Kafka-style) System Design Interview Question

By Rahul Kumar · Senior Software Engineer · Updated · 9 components · 3 operations ·Source: Alex Xu, System Design Interview Vol 2, Chapter 4

Problem: Design a distributed, durable, high-throughput message queue like Apache Kafka.

Overview

A distributed message queue like Apache Kafka is the backbone of modern event-driven platforms: it decouples producers from consumers, smooths traffic spikes, and preserves an immutable log of everything that happened in the business. The hard part isn't accepting messages, it's accepting a million per second without losing any when a broker dies, while still giving each consumer group its own replayable, ordered view of the stream. Get it wrong and you either drop payments on a leader failover, or you serialize the entire firehose through one node and cap out at a few tens of thousands of messages per second. The Kafka-style answer, partitioned append-only logs replicated across brokers with a leader-per-partition model, is now the default for analytics pipelines, CDC, microservice choreography, and real-time fraud detection. Understanding why partitions, ISR replication, offsets, and consumer groups exist in this exact shape is the single most leveraged piece of system-design knowledge for any backend engineer touching data in motion.

Distributed Message Queue (Kafka-style) — Interactive Simulator

Runs fully client-side in your browser; no sign-up. Or open full screen →

Launch the interactive walkthrough for Distributed Message Queue (Kafka-style) — animated architecture diagram, step-by-step flow with real payloads, component swap, and a discrete-event stress simulator.

Summary

A partitioned, replicated, append-only log that decouples producers from consumers and guarantees ordering within a partition. The dominant design choice is a partition-leader / in-sync-replica (ISR) replication model: one broker owns each partition for writes, writes are replicated to followers, and a separate controller handles metadata + failover. The main tradeoff is strong per-partition ordering at the cost of cross-partition ordering — and the fact that leader failover causes a brief availability dip (~seconds) for that partition. Sized for ~1M msgs/sec ingest, ~3x fanout to consumers, and 7-day retention.

Requirements

Functional

Non-functional

Capacity Assumptions

Back-of-Envelope Estimates

High-level architecture

The request path starts in the producer client, a smart library embedded in the application. It fetches topic metadata once from any bootstrap broker, caches a partition-to-leader map, and from then on talks directly to the leader broker for each partition. Records are buffered in a per-partition accumulator, compressed (lz4 or zstd), and flushed as a batch when either linger.ms or batch.size is hit, turning a million tiny writes into ten thousand fat ones. The leader broker appends the batch to the partition's segment file (a memory-mapped append-only log) and asynchronously ships it to follower replicas. With acks=all and min.insync.replicas=2, the leader only acknowledges the producer after at least one follower has persisted the batch, which is what gives you the no-data-loss guarantee under single-node failure. A separate controller (originally ZooKeeper, now KRaft) watches broker liveness and, when a leader dies, promotes an in-sync replica to be the new leader and propagates the new metadata. Consumers pull from leaders in long-polled fetch requests, track their position via committed offsets stored in a special __consumer_offsets topic, and rebalance partitions across group members when membership changes. The whole design leans into sequential disk I/O and the page cache: a partition log is an append-only file, and the OS serves most consumer reads from RAM without ever touching disk.

Architecture Components (9)

Operations Walked Through (3)

Implementation

KafkaProducerClient.java
package com.hld.mq.client;

import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducerClient implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducerClient.class);

    private final Producer<String, byte[]> producer;

    public KafkaProducerClient(@Value("${kafka.bootstrap}") String bootstrap) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 262144);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 128L * 1024 * 1024);
        this.producer = new KafkaProducer<>(props);
    }

    public Future<RecordMetadata> publish(String topic, String key, byte[] payload) {
        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, payload);
        return producer.send(record, (metadata, error) -> {
            if (error != null) {
                log.error("publish failed topic={} key={}", topic, key, error);
                return;
            }
            log.debug("ack topic={} partition={} offset={}", metadata.topic(), metadata.partition(), metadata.offset());
        });
    }

    @Override
    public void close() {
        producer.flush();
        producer.close();
    }
}
TopicPartitionModel.java
package com.hld.mq.broker;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public final class Topic {
    private final String name;
    private final int replicationFactor;
    private final ConcurrentMap<Integer, Partition> partitions = new ConcurrentHashMap<>();

    public Topic(String name, int partitionCount, int replicationFactor, List<BrokerId> brokers) {
        this.name = Objects.requireNonNull(name);
        this.replicationFactor = replicationFactor;
        for (int i = 0; i < partitionCount; i++) {
            List<BrokerId> replicas = pickReplicas(brokers, replicationFactor, i);
            partitions.put(i, new Partition(name, i, replicas));
        }
    }

    public Partition partitionFor(int id) {
        return partitions.get(id);
    }

    public int partitionCount() {
        return partitions.size();
    }

    public String name() {
        return name;
    }

    private static List<BrokerId> pickReplicas(List<BrokerId> brokers, int rf, int partitionId) {
        List<BrokerId> rotated = new ArrayList<>(brokers);
        Collections.rotate(rotated, -partitionId);
        return List.copyOf(rotated.subList(0, Math.min(rf, rotated.size())));
    }

    public static final class Partition {
        private final String topic;
        private final int id;
        private final List<BrokerId> replicas;
        private volatile BrokerId leader;

        Partition(String topic, int id, List<BrokerId> replicas) {
            this.topic = topic;
            this.id = id;
            this.replicas = replicas;
            this.leader = replicas.get(0);
        }

        public BrokerId leader() { return leader; }
        public List<BrokerId> replicas() { return replicas; }
        public int id() { return id; }
        public String topic() { return topic; }
        public void electLeader(BrokerId newLeader) { this.leader = newLeader; }
    }

    public record BrokerId(int id, String host, int port) { }
}
PartitionLogAppender.java
package com.hld.mq.broker;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.CRC32;

public final class PartitionLogAppender implements AutoCloseable {
    private static final int HEADER_BYTES = 20; // offset(8) + len(4) + crc(4) + ts(4)
    private final FileChannel segment;
    private final AtomicLong nextOffset;

    public PartitionLogAppender(Path segmentPath, long startingOffset) throws IOException {
        this.segment = FileChannel.open(segmentPath,
                StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
        this.segment.position(this.segment.size());
        this.nextOffset = new AtomicLong(startingOffset);
    }

    /** Append a batch and fsync for the acks=all durability path. Returns first assigned offset. */
    public synchronized long append(List<byte[]> records) throws IOException {
        long firstOffset = nextOffset.get();
        int totalBytes = HEADER_BYTES * records.size();
        for (byte[] r : records) totalBytes += r.length;
        ByteBuffer buf = ByteBuffer.allocate(totalBytes);

        CRC32 crc = new CRC32();
        int ts = (int) (System.currentTimeMillis() / 1000L);
        for (byte[] r : records) {
            long off = nextOffset.getAndIncrement();
            crc.reset();
            crc.update(r, 0, r.length);
            buf.putLong(off);
            buf.putInt(r.length);
            buf.putInt((int) crc.getValue());
            buf.putInt(ts);
            buf.put(r);
        }
        buf.flip();
        while (buf.hasRemaining()) segment.write(buf);
        segment.force(false);
        return firstOffset;
    }

    public long endOffset() { return nextOffset.get(); }

    @Override
    public void close() throws IOException {
        segment.close();
    }
}

Key design decisions & trade-offs

Interview follow-ups

Related