← System Design Simulator

Distributed Key-Value Store (Dynamo-style) System Design Interview Question

By Rahul Kumar · Senior Software Engineer · Updated · 8 components · 3 operations ·Source: Alex Xu, System Design Interview Vol 1, Chapter 6 (pages 87–109); Dynamo paper (DeCandia et al. 2007); Cassandra & BigTable architectures.

Problem: Design an always-writable, horizontally scaled key-value store with tunable consistency: N replicas, configurable W/R quorums, vector-clock conflict resolution, gossip membership, and anti-entropy repair.

Overview

A distributed key-value store is the workhorse primitive behind shopping carts, session stores, feature flags, leaderboards, and configuration services — any workload that reads and writes single values by a primary key at massive scale. The canonical design is Amazon's 2007 Dynamo paper, which inspired Apache Cassandra, Riak, Voldemort, and later DynamoDB and ScyllaDB; Netflix runs Cassandra for its entire streaming-session state, Discord stores every message in ScyllaDB, and Apple operates one of the world's largest Cassandra fleets. Interview versions demand an always-writable (AP in CAP terms) store with tunable consistency: N replicas per key, configurable W and R quorums where W + R > N yields strong reads, vector clocks to detect concurrent writes, gossip-based membership for decentralized failure detection, sloppy quorum plus hinted handoff to stay live during partitions, and Merkle-tree anti-entropy to repair long-term drift. Candidates who breeze through this design usually fail on the subtleties: why vector clocks matter, what a sibling is, when W=1/R=1 is the right choice, and how the commit log + memtable + SSTable write path actually hits disk.

Distributed Key-Value Store (Dynamo-style) — Interactive Simulator

Runs fully client-side in your browser; no sign-up. Or open full screen →

Launch the interactive walkthrough for Distributed Key-Value Store (Dynamo-style) — animated architecture diagram, step-by-step flow with real payloads, component swap, and a discrete-event stress simulator.

Summary

A peer-to-peer KV cluster in the Dynamo lineage and the book's recommended AP design (Ch 6). Keys are routed via a consistent-hash ring to N replicas. Writes succeed once W replicas ack, and reads wait for R; W+R>N guarantees strong consistency per the book. Concurrent writes are detected via vector clocks and the store returns siblings for client-side resolution. Gossip handles decentralized failure detection; sloppy quorum + hinted handoff keep writes available during temporary failures; Merkle-tree anti-entropy repairs permanent divergence. Per-replica storage uses the Cassandra write/read path from the book: commit log → memtable → SSTable on disk, with a Bloom filter to skip SSTables on reads. The dominant stance is AP: availability during partitions is non-negotiable, as opposed to a CP store (e.g. a bank).

Requirements

Functional

Non-functional

Capacity Assumptions

Back-of-Envelope Estimates

High-level architecture

A client SDK sends PUT and GET to any coordinator over gRPC. The coordinator hashes the key through a consistent-hash ring (N=3, 150 vnodes per node) to pick the preference list — the N nodes clockwise from the key. For a PUT it fans out to all N replicas and waits for W acks before returning success; if a replica is down it streams the write to a live fallback with a hint, which the fallback replays when the original recovers (sloppy quorum + hinted handoff). On each replica, the write path is pure Cassandra: append to the commit log on disk, insert into the in-memory memtable (a skip list or sorted map), and ack. When the memtable exceeds 64 MB it flushes as an immutable SSTable; leveled compaction merges SSTables in the background. Each SSTable has a bloom filter in RAM so reads skip SSTables that definitely don't hold the key. Reads do the reverse: coordinator fans to N, waits for R responses, compares vector clocks — if all agree, return the value; if one is stale, issue a background read-repair; if clocks are concurrent, return siblings to the client. Gossip (one round per second, O(log N) rounds to converge) propagates membership and health. A background Merkle-tree anti-entropy process walks pairs of replicas nightly, comparing 2^15 leaf hashes per range to find divergent keys and streaming deltas. Vector clocks add ~30 bytes of overhead per key at steady state, ballooning to ~200 B under churn — capped client-side at 10 siblings before falling back to LWW. At 30K peak ops/sec with 10 coordinators, each coordinator does ~9K outbound RPCs/sec — well within a single box. Quorum failure probability at W=R=2 with 99.9% per-replica availability is ~10^-6 under independent failures.

Architecture Components (8)

Operations Walked Through (3)

Implementation

Coordinator.java
package com.example.kv.coordinator;

import com.example.kv.model.*;
import com.example.ring.ConsistentHashRing;

import java.util.*;
import java.util.concurrent.*;

/**
 * Dynamo-style coordinator. Routes a key to N replicas via consistent hashing,
 * waits for W acks on writes and R responses on reads, and surfaces siblings
 * when vector clocks are concurrent.
 */
public class Coordinator {

    private final ConsistentHashRing ring;
    private final ReplicaClient rpc;
    private final int n;
    private final int w;
    private final int r;
    private final ExecutorService pool = Executors.newFixedThreadPool(64);
    private final HintedHandoffLog hints;

    public Coordinator(ConsistentHashRing ring, ReplicaClient rpc,
                       int n, int w, int r, HintedHandoffLog hints) {
        if (w + r <= n) throw new IllegalArgumentException("strong consistency requires W + R > N");
        this.ring = ring; this.rpc = rpc; this.n = n; this.w = w; this.r = r; this.hints = hints;
    }

    public PutResult put(String key, byte[] value, VectorClock context) throws InterruptedException {
        VectorClock next = context.incrementForCoordinator();
        List<String> replicas = ring.getReplicas(key, n);
        CountDownLatch acked = new CountDownLatch(w);
        List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
        for (String node : replicas) {
            pool.submit(() -> {
                try {
                    rpc.put(node, key, value, next);
                    acked.countDown();
                } catch (Throwable t) {
                    errors.add(t);
                    // Sloppy quorum: record a hint so the write isn't lost.
                    hints.record(node, key, value, next);
                    acked.countDown();
                }
            });
        }
        if (!acked.await(200, TimeUnit.MILLISECONDS)) {
            throw new QuorumTimeoutException("only " + (w - acked.getCount()) + " of " + w + " acks");
        }
        return new PutResult(next);
    }

    public GetResult get(String key) throws InterruptedException {
        List<String> replicas = ring.getReplicas(key, n);
        BlockingQueue<VersionedValue> responses = new ArrayBlockingQueue<>(n);
        for (String node : replicas) {
            pool.submit(() -> {
                try { responses.offer(rpc.get(node, key)); } catch (Throwable ignored) {}
            });
        }
        List<VersionedValue> gathered = new ArrayList<>(r);
        long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(200);
        while (gathered.size() < r) {
            long remain = deadline - System.nanoTime();
            if (remain <= 0) throw new QuorumTimeoutException("read quorum not met");
            VersionedValue v = responses.poll(remain, TimeUnit.NANOSECONDS);
            if (v != null) gathered.add(v);
        }
        List<VersionedValue> reconciled = VectorClock.reconcile(gathered);
        if (reconciled.size() > 1) return GetResult.siblings(reconciled);
        return GetResult.value(reconciled.get(0));
    }
}
VectorClock.java
package com.example.kv.model;

import java.util.*;

/**
 * Vector clock: map from nodeId to a monotonically increasing counter.
 * Used to detect concurrent writes — if neither clock dominates the other,
 * both values survive as siblings.
 */
public final class VectorClock {

    private final Map<String, Long> counters;
    private final String coordinatorId;

    public VectorClock(String coordinatorId) {
        this(coordinatorId, new HashMap<>());
    }

    private VectorClock(String coordinatorId, Map<String, Long> counters) {
        this.coordinatorId = coordinatorId;
        this.counters = counters;
    }

    public VectorClock incrementForCoordinator() {
        Map<String, Long> next = new HashMap<>(counters);
        next.merge(coordinatorId, 1L, Long::sum);
        return new VectorClock(coordinatorId, next);
    }

    public enum Relation { EQUAL, BEFORE, AFTER, CONCURRENT }

    public static Relation compare(VectorClock a, VectorClock b) {
        boolean aLess = false, aGreater = false;
        Set<String> all = new HashSet<>();
        all.addAll(a.counters.keySet());
        all.addAll(b.counters.keySet());
        for (String id : all) {
            long av = a.counters.getOrDefault(id, 0L);
            long bv = b.counters.getOrDefault(id, 0L);
            if (av < bv) aLess = true;
            if (av > bv) aGreater = true;
        }
        if (!aLess && !aGreater) return Relation.EQUAL;
        if (aLess && !aGreater) return Relation.BEFORE;
        if (!aLess && aGreater) return Relation.AFTER;
        return Relation.CONCURRENT;
    }

    /**
     * Given the responses from a read quorum, return the set of values that are
     * NOT dominated by any other — these are the siblings the client will see.
     */
    public static List<VersionedValue> reconcile(List<VersionedValue> versions) {
        List<VersionedValue> winners = new ArrayList<>();
        for (VersionedValue candidate : versions) {
            boolean dominated = false;
            for (VersionedValue other : versions) {
                if (candidate == other) continue;
                if (compare(candidate.clock(), other.clock()) == Relation.BEFORE) {
                    dominated = true;
                    break;
                }
            }
            if (!dominated && !winners.contains(candidate)) winners.add(candidate);
        }
        return winners;
    }

    public Map<String, Long> counters() { return Collections.unmodifiableMap(counters); }
}
StorageEngine.java
package com.example.kv.storage;

import com.example.kv.model.VersionedValue;
import com.example.kv.model.VectorClock;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Per-replica storage: commit log -> memtable -> SSTable, with a bloom filter per SSTable.
 * Write path is Cassandra-style. Reads check memtable first, then SSTables newest->oldest,
 * skipping any whose bloom filter rejects the key.
 */
public class StorageEngine {

    private static final long MEMTABLE_FLUSH_BYTES = 64L * 1024 * 1024;

    private final CommitLog commitLog;
    private final List<SSTable> sstables = new ArrayList<>();
    private ConcurrentSkipListMap<String, VersionedValue> memtable = new ConcurrentSkipListMap<>();
    private final AtomicLong memtableBytes = new AtomicLong();
    private final SSTableWriter writer;

    public StorageEngine(CommitLog commitLog, SSTableWriter writer) {
        this.commitLog = commitLog;
        this.writer = writer;
    }

    public synchronized void put(String key, byte[] value, VectorClock clock) throws IOException {
        commitLog.append(key, value, clock);
        VersionedValue vv = new VersionedValue(key, value, clock);
        memtable.put(key, vv);
        if (memtableBytes.addAndGet(key.length() + value.length()) >= MEMTABLE_FLUSH_BYTES) {
            flush();
        }
    }

    public Optional<VersionedValue> get(String key) throws IOException {
        VersionedValue hit = memtable.get(key);
        if (hit != null) return Optional.of(hit);
        // Walk SSTables newest -> oldest; bloom filter rejects most of them O(1).
        for (int i = sstables.size() - 1; i >= 0; i--) {
            SSTable s = sstables.get(i);
            if (!s.bloomFilter().mightContain(key)) continue;
            Optional<VersionedValue> v = s.read(key);
            if (v.isPresent()) return v;
        }
        return Optional.empty();
    }

    private synchronized void flush() throws IOException {
        if (memtable.isEmpty()) return;
        SSTable flushed = writer.write(memtable);
        sstables.add(flushed);
        memtable = new ConcurrentSkipListMap<>();
        memtableBytes.set(0);
        commitLog.rotate();
    }
}
GossipMembership.java
package com.example.kv.gossip;

import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;

/**
 * Decentralized SWIM-style gossip membership. Each node picks a random peer
 * every 1s and exchanges its membership view. Failure detection is indirect:
 * a node is marked DOWN when K gossip rounds pass without hearing from it.
 */
public class GossipMembership {

    public enum State { ALIVE, SUSPECT, DOWN }

    public static final class Member {
        public final String nodeId;
        public volatile State state;
        public volatile long heartbeat;
        public volatile Instant lastSeen;

        public Member(String nodeId) {
            this.nodeId = nodeId;
            this.state = State.ALIVE;
            this.heartbeat = 0L;
            this.lastSeen = Instant.now();
        }
    }

    private final String selfId;
    private final ConcurrentMap<String, Member> members = new ConcurrentHashMap<>();
    private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
    private final GossipTransport transport;
    private final long suspectAfterMs;
    private final long downAfterMs;

    public GossipMembership(String selfId, GossipTransport transport,
                            long suspectAfterMs, long downAfterMs) {
        this.selfId = selfId;
        this.transport = transport;
        this.suspectAfterMs = suspectAfterMs;
        this.downAfterMs = downAfterMs;
        members.put(selfId, new Member(selfId));
    }

    public void start() {
        timer.scheduleAtFixedRate(this::tick, 1, 1, TimeUnit.SECONDS);
    }

    private void tick() {
        Member self = members.get(selfId);
        self.heartbeat++;
        self.lastSeen = Instant.now();
        List<Member> peers = new ArrayList<>(members.values());
        peers.removeIf(m -> m.nodeId.equals(selfId) || m.state == State.DOWN);
        if (peers.isEmpty()) return;
        Member target = peers.get(ThreadLocalRandom.current().nextInt(peers.size()));
        transport.sendDigest(target.nodeId, snapshot());
        reevaluateStates();
    }

    public void onDigest(Collection<Member> remote) {
        for (Member m : remote) {
            members.merge(m.nodeId, m, (existing, incoming) -> {
                if (incoming.heartbeat > existing.heartbeat) return incoming;
                return existing;
            });
        }
    }

    private void reevaluateStates() {
        Instant now = Instant.now();
        for (Member m : members.values()) {
            if (m.nodeId.equals(selfId)) continue;
            long silentMs = now.toEpochMilli() - m.lastSeen.toEpochMilli();
            if (silentMs > downAfterMs) m.state = State.DOWN;
            else if (silentMs > suspectAfterMs) m.state = State.SUSPECT;
            else m.state = State.ALIVE;
        }
    }

    public Collection<Member> snapshot() {
        return new ArrayList<>(members.values());
    }
}

Key design decisions & trade-offs

Interview follow-ups

Related