Distributed Key-Value Store (Dynamo-style) System Design Interview Question
Problem: Design an always-writable, horizontally scaled key-value store with tunable consistency: N replicas, configurable W/R quorums, vector-clock conflict resolution, gossip membership, and anti-entropy repair.
Overview
A distributed key-value store is the workhorse primitive behind shopping carts, session stores, feature flags, leaderboards, and configuration services — any workload that reads and writes single values by a primary key at massive scale. The canonical design is Amazon's 2007 Dynamo paper, which inspired Apache Cassandra, Riak, Voldemort, and later DynamoDB and ScyllaDB; Netflix runs Cassandra for its entire streaming-session state, Discord stores every message in ScyllaDB, and Apple operates one of the world's largest Cassandra fleets. Interview versions demand an always-writable (AP in CAP terms) store with tunable consistency: N replicas per key, configurable W and R quorums where W + R > N yields strong reads, vector clocks to detect concurrent writes, gossip-based membership for decentralized failure detection, sloppy quorum plus hinted handoff to stay live during partitions, and Merkle-tree anti-entropy to repair long-term drift. Candidates who breeze through this design usually fail on the subtleties: why vector clocks matter, what a sibling is, when W=1/R=1 is the right choice, and how the commit log + memtable + SSTable write path actually hits disk.
Summary
A peer-to-peer KV cluster in the Dynamo lineage and the book's recommended AP design (Ch 6). Keys are routed via a consistent-hash ring to N replicas. Writes succeed once W replicas ack, and reads wait for R; W+R>N guarantees strong consistency per the book. Concurrent writes are detected via vector clocks and the store returns siblings for client-side resolution. Gossip handles decentralized failure detection; sloppy quorum + hinted handoff keep writes available during temporary failures; Merkle-tree anti-entropy repairs permanent divergence. Per-replica storage uses the Cassandra write/read path from the book: commit log → memtable → SSTable on disk, with a Bloom filter to skip SSTables on reads. The dominant stance is AP: availability during partitions is non-negotiable, as opposed to a CP store (e.g. a bank).
Requirements
Functional
- PUT(key, value, context) writes a value; context carries the vector clock observed on the last GET
- GET(key) returns one value, or multiple sibling values when concurrent writes happened
- DELETE(key) emits a tombstone that is GC'd after gc_grace_seconds
- Tunable consistency per operation: caller picks W (write quorum) and R (read quorum) against replication factor N
- Replication factor N, with keys placed on N consecutive nodes walking clockwise on a consistent-hash ring
- Automatic sibling resolution via a user-supplied merge function (OR-Set for carts, G-Counter for counters, LWW fallback)
Non-functional
- Peak 30K ops/sec with 80/20 read/write mix and p99 under 10ms at W=2/R=2
- 99.99% availability — writes must succeed during single-AZ failure via sloppy quorum + hinted handoff
- Durable: an ack'd write survives f concurrent node crashes where f < N - W + 1
- Horizontally scalable to petabytes by adding nodes to the ring without downtime
- Eventual convergence within seconds of a partition healing via read-repair + anti-entropy
- Storage amplification under 2x with leveled compaction; bloom filters keep read amplification under 1.5 SSTables/read
Capacity Assumptions
- Replication factor N=3, tunable W+R (default W=2, R=2 → strong; W=1, R=1 → AP-maximal)
- Coordinator fan-out per op: 3 RPCs (writes) or up to 3 (reads)
- Dataset ~500M keys at ~1 KB each (~500 GB logical, ~1.5 TB with RF=3) across 6 replicas
- Peak 30K ops/sec, 80/20 read/write
- Vector clocks: ~30 bytes overhead per key at steady state (3-replica cluster), up to 200B during churn
Back-of-Envelope Estimates
- Per-coordinator CPU: 3 RPCs × 30K ops/sec / 10 coordinators ≈ 9K outbound RPCs/sec/coord — well within a single box.
- Anti-entropy scan: Merkle tree with 2^15 leaves per replica → compare 32K hashes/pair → ~1 MB of Merkle data per pair, seconds to compare; full repair of divergent leaves bounded by disk throughput (~100 MB/s).
- Gossip: O(log N) rounds to propagate a membership change; with 1s rounds and 6 nodes, convergence in ~3s.
- Quorum failure probability at W=2/R=2 with 99.9% per-replica availability: (1 - 0.999)^2 ≈ 10^-6 — effectively negligible under independent failures.
- Storage per replica: 500 GB data + 2 GB Bloom filters + ~500 MB of vector-clock metadata.
High-level architecture
A client SDK sends PUT and GET to any coordinator over gRPC. The coordinator hashes the key through a consistent-hash ring (N=3, 150 vnodes per node) to pick the preference list — the N nodes clockwise from the key. For a PUT it fans out to all N replicas and waits for W acks before returning success; if a replica is down it streams the write to a live fallback with a hint, which the fallback replays when the original recovers (sloppy quorum + hinted handoff). On each replica, the write path is pure Cassandra: append to the commit log on disk, insert into the in-memory memtable (a skip list or sorted map), and ack. When the memtable exceeds 64 MB it flushes as an immutable SSTable; leveled compaction merges SSTables in the background. Each SSTable has a bloom filter in RAM so reads skip SSTables that definitely don't hold the key. Reads do the reverse: coordinator fans to N, waits for R responses, compares vector clocks — if all agree, return the value; if one is stale, issue a background read-repair; if clocks are concurrent, return siblings to the client. Gossip (one round per second, O(log N) rounds to converge) propagates membership and health. A background Merkle-tree anti-entropy process walks pairs of replicas nightly, comparing 2^15 leaf hashes per range to find divergent keys and streaming deltas. Vector clocks add ~30 bytes of overhead per key at steady state, ballooning to ~200 B under churn — capped client-side at 10 siblings before falling back to LWW. At 30K peak ops/sec with 10 coordinators, each coordinator does ~9K outbound RPCs/sec — well within a single box. Quorum failure probability at W=R=2 with 99.9% per-replica availability is ~10^-6 under independent failures.
Architecture Components (8)
- Client SDK (client) — Thin library that issues PUT/GET with a vector-clock context and handles sibling values on read.
- Coordinator (coordinator) — Stateless router: hashes the key, looks up the preference list on the ring, fans out to replicas, collects quorums, returns to the client.
- Consistent-Hash Ring (coordinator) — Sorted map from 32-bit token → vnode → replica. Owns the placement function used by every read and write.
- Replica 1 (Storage Node) (kv) — LSM-backed storage node — writes hit WAL + memtable, reads hit memtable/SST, local Merkle tree kept in sync for anti-entropy.
- Replica 2 (Storage Node) (kv) — Identical in role to replica1; different position on the hash ring.
- Replica 3 (Storage Node) (kv) — Identical in role to replica1 and replica2; a third peer so we can tolerate one failure under N=3 / W=2 / R=2.
- Gossip Membership (coordinator) — Background protocol that propagates membership (alive/suspect/dead) and ring version across all nodes in O(log N) rounds.
- Anti-Entropy Worker (worker) — Periodic background job that compares Merkle trees between replica pairs and repairs divergent keys.
Operations Walked Through (3)
- put-w2 — Client writes `cart:user42 → [item A]` with a vector-clock context. Coordinator fans out to all 3 replicas; once W=2 acks arrive it replies to the client. The 3rd ack finishes asynchronously.
- get-r2 — Client reads `cart:user42`. Coordinator fans out to all 3 replicas, waits for R=2 to respond, compares vector clocks: if one dominates the other, return it; if equal, return it; if concurrent, return siblings.
- conflict-resolution — Two clients write the same key in parallel, each with a vclock neither of which dominates the other. The coordinator on read returns BOTH values as siblings so the client can merge them application-side.
Implementation
package com.example.kv.coordinator;
import com.example.kv.model.*;
import com.example.ring.ConsistentHashRing;
import java.util.*;
import java.util.concurrent.*;
/**
* Dynamo-style coordinator. Routes a key to N replicas via consistent hashing,
* waits for W acks on writes and R responses on reads, and surfaces siblings
* when vector clocks are concurrent.
*/
public class Coordinator {
private final ConsistentHashRing ring;
private final ReplicaClient rpc;
private final int n;
private final int w;
private final int r;
private final ExecutorService pool = Executors.newFixedThreadPool(64);
private final HintedHandoffLog hints;
public Coordinator(ConsistentHashRing ring, ReplicaClient rpc,
int n, int w, int r, HintedHandoffLog hints) {
if (w + r <= n) throw new IllegalArgumentException("strong consistency requires W + R > N");
this.ring = ring; this.rpc = rpc; this.n = n; this.w = w; this.r = r; this.hints = hints;
}
public PutResult put(String key, byte[] value, VectorClock context) throws InterruptedException {
VectorClock next = context.incrementForCoordinator();
List<String> replicas = ring.getReplicas(key, n);
CountDownLatch acked = new CountDownLatch(w);
List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
for (String node : replicas) {
pool.submit(() -> {
try {
rpc.put(node, key, value, next);
acked.countDown();
} catch (Throwable t) {
errors.add(t);
// Sloppy quorum: record a hint so the write isn't lost.
hints.record(node, key, value, next);
acked.countDown();
}
});
}
if (!acked.await(200, TimeUnit.MILLISECONDS)) {
throw new QuorumTimeoutException("only " + (w - acked.getCount()) + " of " + w + " acks");
}
return new PutResult(next);
}
public GetResult get(String key) throws InterruptedException {
List<String> replicas = ring.getReplicas(key, n);
BlockingQueue<VersionedValue> responses = new ArrayBlockingQueue<>(n);
for (String node : replicas) {
pool.submit(() -> {
try { responses.offer(rpc.get(node, key)); } catch (Throwable ignored) {}
});
}
List<VersionedValue> gathered = new ArrayList<>(r);
long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(200);
while (gathered.size() < r) {
long remain = deadline - System.nanoTime();
if (remain <= 0) throw new QuorumTimeoutException("read quorum not met");
VersionedValue v = responses.poll(remain, TimeUnit.NANOSECONDS);
if (v != null) gathered.add(v);
}
List<VersionedValue> reconciled = VectorClock.reconcile(gathered);
if (reconciled.size() > 1) return GetResult.siblings(reconciled);
return GetResult.value(reconciled.get(0));
}
}
package com.example.kv.model;
import java.util.*;
/**
* Vector clock: map from nodeId to a monotonically increasing counter.
* Used to detect concurrent writes — if neither clock dominates the other,
* both values survive as siblings.
*/
public final class VectorClock {
private final Map<String, Long> counters;
private final String coordinatorId;
public VectorClock(String coordinatorId) {
this(coordinatorId, new HashMap<>());
}
private VectorClock(String coordinatorId, Map<String, Long> counters) {
this.coordinatorId = coordinatorId;
this.counters = counters;
}
public VectorClock incrementForCoordinator() {
Map<String, Long> next = new HashMap<>(counters);
next.merge(coordinatorId, 1L, Long::sum);
return new VectorClock(coordinatorId, next);
}
public enum Relation { EQUAL, BEFORE, AFTER, CONCURRENT }
public static Relation compare(VectorClock a, VectorClock b) {
boolean aLess = false, aGreater = false;
Set<String> all = new HashSet<>();
all.addAll(a.counters.keySet());
all.addAll(b.counters.keySet());
for (String id : all) {
long av = a.counters.getOrDefault(id, 0L);
long bv = b.counters.getOrDefault(id, 0L);
if (av < bv) aLess = true;
if (av > bv) aGreater = true;
}
if (!aLess && !aGreater) return Relation.EQUAL;
if (aLess && !aGreater) return Relation.BEFORE;
if (!aLess && aGreater) return Relation.AFTER;
return Relation.CONCURRENT;
}
/**
* Given the responses from a read quorum, return the set of values that are
* NOT dominated by any other — these are the siblings the client will see.
*/
public static List<VersionedValue> reconcile(List<VersionedValue> versions) {
List<VersionedValue> winners = new ArrayList<>();
for (VersionedValue candidate : versions) {
boolean dominated = false;
for (VersionedValue other : versions) {
if (candidate == other) continue;
if (compare(candidate.clock(), other.clock()) == Relation.BEFORE) {
dominated = true;
break;
}
}
if (!dominated && !winners.contains(candidate)) winners.add(candidate);
}
return winners;
}
public Map<String, Long> counters() { return Collections.unmodifiableMap(counters); }
}
package com.example.kv.storage;
import com.example.kv.model.VersionedValue;
import com.example.kv.model.VectorClock;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
/**
* Per-replica storage: commit log -> memtable -> SSTable, with a bloom filter per SSTable.
* Write path is Cassandra-style. Reads check memtable first, then SSTables newest->oldest,
* skipping any whose bloom filter rejects the key.
*/
public class StorageEngine {
private static final long MEMTABLE_FLUSH_BYTES = 64L * 1024 * 1024;
private final CommitLog commitLog;
private final List<SSTable> sstables = new ArrayList<>();
private ConcurrentSkipListMap<String, VersionedValue> memtable = new ConcurrentSkipListMap<>();
private final AtomicLong memtableBytes = new AtomicLong();
private final SSTableWriter writer;
public StorageEngine(CommitLog commitLog, SSTableWriter writer) {
this.commitLog = commitLog;
this.writer = writer;
}
public synchronized void put(String key, byte[] value, VectorClock clock) throws IOException {
commitLog.append(key, value, clock);
VersionedValue vv = new VersionedValue(key, value, clock);
memtable.put(key, vv);
if (memtableBytes.addAndGet(key.length() + value.length()) >= MEMTABLE_FLUSH_BYTES) {
flush();
}
}
public Optional<VersionedValue> get(String key) throws IOException {
VersionedValue hit = memtable.get(key);
if (hit != null) return Optional.of(hit);
// Walk SSTables newest -> oldest; bloom filter rejects most of them O(1).
for (int i = sstables.size() - 1; i >= 0; i--) {
SSTable s = sstables.get(i);
if (!s.bloomFilter().mightContain(key)) continue;
Optional<VersionedValue> v = s.read(key);
if (v.isPresent()) return v;
}
return Optional.empty();
}
private synchronized void flush() throws IOException {
if (memtable.isEmpty()) return;
SSTable flushed = writer.write(memtable);
sstables.add(flushed);
memtable = new ConcurrentSkipListMap<>();
memtableBytes.set(0);
commitLog.rotate();
}
}
package com.example.kv.gossip;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.*;
/**
* Decentralized SWIM-style gossip membership. Each node picks a random peer
* every 1s and exchanges its membership view. Failure detection is indirect:
* a node is marked DOWN when K gossip rounds pass without hearing from it.
*/
public class GossipMembership {
public enum State { ALIVE, SUSPECT, DOWN }
public static final class Member {
public final String nodeId;
public volatile State state;
public volatile long heartbeat;
public volatile Instant lastSeen;
public Member(String nodeId) {
this.nodeId = nodeId;
this.state = State.ALIVE;
this.heartbeat = 0L;
this.lastSeen = Instant.now();
}
}
private final String selfId;
private final ConcurrentMap<String, Member> members = new ConcurrentHashMap<>();
private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
private final GossipTransport transport;
private final long suspectAfterMs;
private final long downAfterMs;
public GossipMembership(String selfId, GossipTransport transport,
long suspectAfterMs, long downAfterMs) {
this.selfId = selfId;
this.transport = transport;
this.suspectAfterMs = suspectAfterMs;
this.downAfterMs = downAfterMs;
members.put(selfId, new Member(selfId));
}
public void start() {
timer.scheduleAtFixedRate(this::tick, 1, 1, TimeUnit.SECONDS);
}
private void tick() {
Member self = members.get(selfId);
self.heartbeat++;
self.lastSeen = Instant.now();
List<Member> peers = new ArrayList<>(members.values());
peers.removeIf(m -> m.nodeId.equals(selfId) || m.state == State.DOWN);
if (peers.isEmpty()) return;
Member target = peers.get(ThreadLocalRandom.current().nextInt(peers.size()));
transport.sendDigest(target.nodeId, snapshot());
reevaluateStates();
}
public void onDigest(Collection<Member> remote) {
for (Member m : remote) {
members.merge(m.nodeId, m, (existing, incoming) -> {
if (incoming.heartbeat > existing.heartbeat) return incoming;
return existing;
});
}
}
private void reevaluateStates() {
Instant now = Instant.now();
for (Member m : members.values()) {
if (m.nodeId.equals(selfId)) continue;
long silentMs = now.toEpochMilli() - m.lastSeen.toEpochMilli();
if (silentMs > downAfterMs) m.state = State.DOWN;
else if (silentMs > suspectAfterMs) m.state = State.SUSPECT;
else m.state = State.ALIVE;
}
}
public Collection<Member> snapshot() {
return new ArrayList<>(members.values());
}
}
Key design decisions & trade-offs
- CP vs AP design (CAP theorem) — Chosen: AP with tunable W/R. Shopping carts and session stores must stay writable during partitions; callers who need strong reads opt into W + R > N per operation.
- Vector clocks vs last-write-wins (LWW) — Chosen: Vector clocks with LWW fallback. LWW silently loses concurrent updates; vector clocks surface siblings so the application merges correctly, with LWW as the cap when siblings explode.
- Synchronous vs asynchronous replication — Chosen: Synchronous to W, asynchronous to N. W acks give the durability guarantee the client needs; the remaining replicas catch up in the background via read-repair and anti-entropy.
- Commit log + memtable + SSTable (LSM) vs B-tree (update-in-place) — Chosen: LSM. LSMs turn random writes into sequential log appends — the right fit for write-heavy KV workloads; B-trees win only for update-heavy OLTP.
- Gossip membership vs ZooKeeper/etcd-backed membership — Chosen: Gossip. Gossip removes the strongly-consistent dependency that would become a single point of failure; convergence in O(log N) seconds is fast enough.
- Strict quorum vs sloppy quorum + hinted handoff — Chosen: Sloppy quorum. During single-node outages, accepting the write on a neighbour with a hint preserves availability without violating durability once the hint is replayed.
Interview follow-ups
- Your cluster runs with W=1/R=1 for maximum availability. Walk me through a scenario where a user sees their own write disappear, and how to detect it.
- A node crashes while holding 50 GB of un-replayed hinted-handoff data. How do you bound the impact and avoid double-writing when it recovers?
- Anti-entropy is running continuously and saturating network between two AZs. How would you rate-limit it without letting divergence grow unbounded?
- Design the read-repair path in detail — when does it run synchronously on the read, when is it async, and how do you avoid write amplification?
- A hot key is receiving 50K writes/sec on a 3-replica cluster. What breaks first — memtable, gossip, commit log, bloom filters — and how do you respond?
- Compare this design to a Raft-based KV store like etcd. Which real-world workloads would you switch to etcd for, and why?