← System Design Simulator

SQL Sharding Strategies

By Rahul Kumar · Senior Software Engineer · Updated · Category: System Design Primer · Unique Topics

Federation / horizontal / denormalized — fan-out cost per query.

This interactive explanation is built for system design interview prep: step through SQL Sharding Strategies, watch the internal state change, and connect the concept to real distributed-system trade-offs.

Overview

Sharding a SQL database is the moment a system stops being a single-box problem and starts being a distributed-systems problem. A shard is a subset of rows that lives on its own primary, with its own replicas and its own connection pool. The router — usually an application-layer library or a middleware proxy like Vitess — hashes or ranges the shard key to pick the right shard for every query. Sharding buys you two things: capacity (N shards hold N times the data) and parallelism (N shards handle N times the writes). It costs you cross-shard queries, distributed transactions, rebalancing operations, and a whole category of debugging you did not have before. The two dominant strategies are hash sharding (uniform load, terrible for range scans) and range sharding (great for range scans, prone to hot shards). Real systems pick based on their access pattern, and the best ones make the strategy per-table rather than per-cluster.

SQL Sharding Strategies — Interactive Simulator

Runs fully client-side in your browser; no sign-up. Or open full screen →

Launch the interactive SQL Sharding Strategies widget — step through the algorithm or protocol and observe the internal state updating in real time.

How it works

Hash sharding computes a hash of the shard key — usually consistent hashing or a simple mod-N for static clusters — and maps the hash to a shard. A query for user_id=42 hits exactly one shard, determined by hash(42) % N. Writes and point reads are O(1) in shard count. Range queries (all users between 40 and 50) are terrible: the router has no locality, so it must scatter the query across every shard and gather. Range sharding keeps rows contiguous: shard 1 holds keys 0-999, shard 2 holds 1000-1999, and so on. Range scans hit one or two shards. Point reads still hit one shard. The cost is hotspots — if traffic is skewed to recent inserts (monotonic timestamp keys) the newest shard drowns while the others idle. Cross-shard queries are unavoidable for analytics and any query that does not include the shard key. The router fans out, each shard returns a partial result, and the router merges. CompletableFuture or equivalent async frameworks are the right tool: parallel scatter, blocking gather, timeout guard. Rebalancing happens when shards fill up. Consistent hashing minimizes row movement on add/remove; range sharding requires a split operation that moves half of one shard to a new one. Vitess, YugabyteDB, and CockroachDB automate this; DIY sharding makes you write a migration tool and test it in production.

Implementation

ShardRouter: hash + range routing with cross-shard fan-out
public class ShardRouter {
    private final List<Shard> shards;
    private final List<long[]> ranges; // parallel to shards: [lo, hi]
    private final ExecutorService pool = Executors.newFixedThreadPool(16);

    public ShardRouter(List<Shard> shards, List<long[]> ranges) {
        this.shards = shards; this.ranges = ranges;
    }

    /** Hash shard: hash(key) % N, uniform distribution. */
    public Shard hashShard(long key) {
        int idx = Math.floorMod(Long.hashCode(key), shards.size());
        return shards.get(idx);
    }

    /** Range shard: find shard whose range contains the key. */
    public Shard rangeShard(long key) {
        for (int i = 0; i < ranges.size(); i++) {
            long[] r = ranges.get(i);
            if (key >= r[0] && key < r[1]) return shards.get(i);
        }
        throw new IllegalArgumentException("No shard for key " + key);
    }

    /** Fan-out across every shard; merge partial results. */
    public <T> List<T> fanout(Function<Shard, List<T>> q, Duration timeout) {
        List<CompletableFuture<List<T>>> fs = shards.stream()
            .map(s -> CompletableFuture.supplyAsync(() -> q.apply(s), pool))
            .toList();
        try {
            CompletableFuture.allOf(fs.toArray(new CompletableFuture[0]))
                .get(timeout.toMillis(), TimeUnit.MILLISECONDS);
        } catch (Exception e) { throw new RuntimeException(e); }
        return fs.stream().flatMap(f -> f.join().stream()).toList();
    }
}

interface Shard {
    List<Row> query(String sql, Object... params);
}
UserRepository using the router
public class UserRepository {
    private final ShardRouter router;

    public UserRepository(ShardRouter router) { this.router = router; }

    /** Point read: single shard by user_id hash. */
    public User findById(long userId) {
        Shard s = router.hashShard(userId);
        List<Row> rows = s.query("SELECT * FROM users WHERE id = ?", userId);
        return rows.isEmpty() ? null : User.from(rows.get(0));
    }

    /** Cross-shard search: must fan out because email is not the shard key. */
    public List<User> searchByEmailPrefix(String prefix) {
        return router.fanout(
            shard -> shard.query("SELECT * FROM users WHERE email LIKE ?", prefix + "%")
                          .stream().map(User::from).toList(),
            Duration.ofSeconds(2)
        );
    }
}

Complexity

Key design decisions & trade-offs

Common pitfalls

Interview follow-ups

Recommended reading

Related