Lesson 40 of 41 16 minAdvanced Track

Database Sharding Part 7: Case Study - Scaling Discord to Billions

How Discord moved from MongoDB to ScyllaDB and implemented Bucketized Sharding to handle massive channel growth.

Reading Mode

Hide the curriculum rail and keep the lesson centered for focused reading.

Key Takeaways

  • **Partition Key**: `channel_id`
  • **Clustering Key**: `message_id` (TimeUUID)
  • The multi-terabyte channel is instantly broken into hundreds of small, manageable partitions.

Premium outcome

Storage engines, sharding, indexing, and data-system trade-offs.

Engineers designing or operating data-heavy backend systems.

You leave with

  • Sharper intuition around indexing, storage engines, and consistency trade-offs
  • A stronger toolkit for sharding, schema evolution, and database scalability
  • A deeper understanding of how databases behave under real production load

Designing a messaging storage system capable of handling trillions of messages with sub-millisecond query latency is one of the most demanding challenges in modern data engineering. In a chat application like Discord, the database is subjected to append-heavy write patterns, irregular read bursts (e.g., users logging on and scrolling through historical logs), and extreme data skew.

In the early days, Discord successfully operated on MongoDB. However, as their scale exploded, the indices required for full-text search and historical message retrieval grew larger than the physical RAM of their instances. This triggered severe disk paging and cascading latency spikes.

To survive this growth, Discord executed a major architectural migration to Apache Cassandra, and eventually, to ScyllaDB. In this master-tier case study, we will deconstruct the low-level data models, sharding techniques, and C++ engine mechanics that power Discord's messaging infrastructure.


System Requirements and Goals

To scale a chat platform to trillions of messages, the database tier must be designed around strict operational parameters.

1. Functional Requirements

  • Real-time Ingestion: Asynchronously ingest and persist messages sent by users in private channels (DMs) and massive public servers.
  • Chronological Retrieval: Query historical messages sequentially by time, supporting infinite backward scrolling (pagination).
  • High-Concurrency Reads: Support sudden spikes in read volume when a channel is mentioned or a celebrity posts.
  • Message Lifecycle (CRUD): Support instant message editing and deletion with immediate consistency on the active channel feed.

2. Non-Functional Requirements

  • Sub-Millisecond Ingestion Latency: P99 write ingestion latency must be strictly less than $15\text{ ms}$.
  • Neutralize Extreme Data Skew: The database must prevent noisy neighbors or spiky channels (e.g., public communities with millions of active members) from exhausting the hardware resources of individual storage nodes.
  • Fault-Tolerant, Masterless Architecture: No Single Point of Failure (SPOF). The storage cluster must withstand the loss of physical nodes or entire availability zones without dropping writes.
  • Eliminate Latency Spikes (Stop-the-World Pauses): Avoid garbage collection memory management pauses that introduce unacceptable latency spikes across stateless backend services.

3. Sizing and Capacity Math

Let's conduct a capacity planning estimation for a large-scale messaging system:

  • Average Write Ingestion Rate: $120,000$ messages per second (mps).
  • Peak Write Ingestion Rate: $400,000$ mps.
  • Average Message Size: $200\text{ Bytes}$ (uncompressed text + metadata).
  • Daily Ingestion Storage Volume: $$\text{Daily Raw Storage} = 400,000 \text{ mps} \times 200 \text{ Bytes} \times 86,400 \text{ seconds} \approx 6.91 \text{ TB/day}$$
  • High Availability Overhead (Replication Factor = 3): $$\text{Daily Replicated Storage} = 6.91 \text{ TB/day} \times 3 = 20.73 \text{ TB/day}$$
  • LSM-Tree Write Amplification & Index Overhead (1.4x factor): $$\text{Total Storage Ingestion Rate} = 20.73 \text{ TB/day} \times 1.4 \approx 29 \text{ TB/day}$$
  • Retention Policy: Indefinite storage retention. Over 5 years of operations, this cluster will accumulate: $$\text{5-Year Storage Size} = 29 \text{ TB/day} \times 365 \text{ days} \times 5 \text{ years} \approx 52.92 \text{ Petabytes}$$

API Design and Interface Contracts

Messaging interactions are driven by stateless API servers that communicate with ScyllaDB using low-level client drivers.

1. Send Message Endpoint

POST /v1/channels/{channel_id}/messages

Request Payload:

{
  "content": "Distributed databases are fascinating!",
  "nonce": "1234567890abcdef"
}

Response Payload (201 Created):

{
  "id": "119852357400018944",
  "channel_id": "8877665544332211",
  "author_id": "4433",
  "content": "Distributed databases are fascinating!",
  "timestamp": "2026-05-23T08:06:14.123Z"
}

2. Retrieve Historical Messages

GET /v1/channels/{channel_id}/messages?limit=50&before=119852357400018944

Response Payload (200 OK):

{
  "messages": [
    {
      "id": "119852356800014321",
      "author_id": "5566",
      "content": "Yes, sharding is key at scale.",
      "timestamp": "2026-05-23T08:06:12.100Z"
    }
  ],
  "has_more": true,
  "next_cursor": "119852356800014321"
}

High-Level Design Architecture

Centralized messaging requires decoupling client websocket connections, API request routing, and physical data storage nodes.

1. End-to-End Message Delivery Pipeline

graph TD
    Client[User Client] -->|Publish Message| Gateway[Websocket Gateway Fleet]
    Gateway -->|Forward Write Request| IngestService[Stateless Ingestion API]
    
    subgraph "Durable Storage Tier"
        IngestService -->|1. Calculate Bucket ID| ClientDriver[ScyllaDB Token-Aware Driver]
        ClientDriver -->|2. Hash Route Write| ScyllaCluster[ScyllaDB Masterless Ring]
        
        ScyllaCluster -->|Hot Node SSD| MemTable[Active MemTable]
        MemTable -->|Async Flush| SSTable[SSTable disk files]
    end

    subgraph "Read & Telemetry Pipeline"
        IngestService -->|3. Publish Event| Kafka[Apache Kafka Broker]
        Kafka -->|4. Push Real-time update| Gateway
        Gateway -->|5. Delivery| TargetClient[Subscribed Channel Clients]
    end

    %% Styles
    style ScyllaCluster fill:#1a1c23,stroke:#10b981,stroke-width:2px,color:#fff
    style Kafka fill:#1a1c23,stroke:#f59e0b,stroke-width:2px,color:#fff

2. Hash Ring Partitioning: Static vs. Bucketized Sharding

If we shard messages purely by channel_id (Static sharding), a massive public server like the Fortnite channel will route its entire multi-terabyte message partition to a single physical node on the consistent hashing ring, causing immediate CPU and I/O starvation.

By adopting Bucketized Sharding, the composite partition key (channel_id, bucket_id) splits a single channel's messages into hundreds of distinct partition buckets. The client driver hashes these composite keys, distributing the buckets randomly across distinct nodes in the cluster.

graph LR
    subgraph "Static Sharding (Naive)"
        ChanFortnite[Channel: Fortnite] -->|100% of data| Node1[(Scylla Node 1: Hotspots & Crash)]
        ChanSmall[Channel: Small Chat] --> Node2[(Scylla Node 2)]
    end

    subgraph "Bucketized Sharding (Scylla Hashing)"
        ChanFortniteBucket1[Fortnite, Bucket 1] -->|Hash key| NodeA[(Node A)]
        ChanFortniteBucket2[Fortnite, Bucket 2] -->|Hash key| NodeB[(Node B)]
        ChanFortniteBucket3[Fortnite, Bucket 3] -->|Hash key| NodeC[(Node C)]
    end

    style Node1 fill:#991b1b,stroke:#f87171,stroke-width:2px,color:#fff
    style NodeA fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
    style NodeB fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
    style NodeC fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff

Low-Level Design & Component Mechanics

To guarantee low latency and prevent unbounded partition growth, we configure custom CQL structures and implement client-side bucket parsing logic.

1. Database Schema DDL (ScyllaDB / Cassandra CQL)

The messaging keyspace uses masterless replication across three availability zones. Tables enforce strict compound primary keys.

CREATE KEYSPACE discord_messaging WITH replication = {
    'class': 'NetworkTopologyStrategy',
    'us-east-1a': 3,
    'us-east-1b': 3,
    'us-east-1c': 3
};

USE discord_messaging;

-- Messages Table with Bucketized Sharding
CREATE TABLE messages (
    channel_id bigint,
    bucket_id int,
    message_id timeuuid,
    author_id bigint,
    content text,
    attachments list<text>,
    PRIMARY KEY ((channel_id, bucket_id), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC)
AND compaction = {
    'class': 'SizeTieredCompactionStrategy',
    'max_threshold': 32,
    'min_threshold': 4
};

2. Client-Side Bucket Calculation & Sequential Query Code (TypeScript)

This TypeScript script demonstrates how to compute the bucket_id dynamically and retrieve historical messages across bucket transitions sequentially.

import { Client } from 'cassandra-driver';

const scyllaClient = new Client({
  contactPoints: ['scylla-node1.internal', 'scylla-node2.internal'],
  localDataCenter: 'us-east-1a',
  keyspace: 'discord_messaging'
});

// A bucket represents a fixed partition chunk based on chronological Snowflake IDs.
// For our system design, each bucket retains exactly 100,000 messages.
const BUCKET_SIZE = 100000;

export function calculateBucketId(messageSnowflakeId: bigint): number {
  // Convert 64-bit Snowflake ID (which embeds timestamp) to compute the sequential bucket ID
  return Number(messageSnowflakeId / BigInt(BUCKET_SIZE));
}

interface MessageRow {
  channel_id: string;
  bucket_id: number;
  message_id: string;
  author_id: string;
  content: string;
}

// Sequentially read historical messages across bucket boundaries
export async function queryChannelHistory(
  channelId: bigint,
  beforeMessageId: bigint,
  limit: number = 50
): Promise<MessageRow[]> {
  const resultMessages: MessageRow[] = [];
  let currentBeforeId = beforeMessageId;
  let activeBucket = calculateBucketId(currentBeforeId);

  while (resultMessages.length < limit) {
    const query = `
      SELECT channel_id, bucket_id, message_id, author_id, content 
      FROM messages 
      WHERE channel_id = ? AND bucket_id = ? AND message_id < maxTimeuuid(?) 
      LIMIT ?
    `;

    const params = [channelId.toString(), activeBucket, currentBeforeId.toString(), limit - resultMessages.length];
    const rs = await scyllaClient.execute(query, params, { prepare: true });

    for (const row of rs.rows) {
      resultMessages.push({
        channel_id: row.get('channel_id'),
        bucket_id: row.get('bucket_id'),
        message_id: row.get('message_id').toString(),
        author_id: row.get('author_id'),
        content: row.get('content')
      });
    }

    // If we fetched all matching rows in this bucket and still need more logs,
    // we decrement the bucket ID to sequentially search the previous bucket.
    if (rs.rows.length < (limit - resultMessages.length)) {
      activeBucket--;
      if (activeBucket < 0) break; // Reached beginning of channel existence

      // Reset the currentBeforeId cursor to max range to capture the entire previous bucket
      currentBeforeId = BigInt('18446744073709551615'); // Max unsigned 64-bit value
    } else {
      break;
    }
  }

  return resultMessages;
}

Scaling Challenges & Production Bottlenecks

Operating a masterless wide-column storage ring at a multi-terabyte daily scale exposes several core physical bottlenecks:

1. LSM Compaction Storms & Write Amplification

ScyllaDB and Cassandra utilize Log-Structured Merge (LSM) Trees. Under heavy write ingestion, Memtables in RAM are constantly flushed to disk as read-only SSTables (Sorted String Tables). Because SSTables are immutable, updating or deleting a row creates redundant entries across multiple files.

To clean up old data, background compaction threads continuously merge SSTables, sorting and compacting them into larger sequential files.

graph TD
    subgraph "LSM Storage Engine Compaction Loop"
        Mem[Active MemTable in RAM] -->|1. Flush to disk| SST1[SSTable File 1]
        Mem -->|2. Flush to disk| SST2[SSTable File 2]
        
        SST1 & SST2 -->|3. Read and Merge| Compactor[Scylla Compaction Thread]
        Compactor -->|4. Write Compacted File| SSTCompacted[Consolidated SSTable 3]
        Compactor -->|5. Evict obsolete pages| DiskSpace[Freed Disk Capacity]
    end

The Bottleneck: If a partition grows to several gigabytes (due to a missing bucket key), compacting this single file consumes massive amounts of CPU and disk I/O. During this time, the storage node's I/O queue saturates, dropping write throughput and causing P99 latencies to spike from $5\text{ ms}$ to over $5,000\text{ ms}$.

Mitigation (Size-Tiered Compaction & Bucketing):

  • We configure Size-Tiered Compaction Strategy (STCS) and restrict our partition sizes to less than $100\text{ MB}$ using our dynamic bucket size limit. Because the partitions are small, compaction runs in milliseconds, eliminating resource contention.

2. The Stop-the-World GC Pause (Why C++ Beats Java)

Apache Cassandra is written in Java. Under heavy write loads, millions of transient object allocations saturate the Java Virtual Machine (JVM) heap. When the Garbage Collector runs out of space, it triggers a Stop-The-World (STW) pause, freezing all active database execution threads to clean up garbage.

The Bottleneck: During a 500ms GC pause, stateless backend microservices experience connection timeouts. They respond by retrying their writes, which further saturates the recovery queue, leading to a cascading service collapse.

Mitigation (The C++ Thread-per-Core Model): ScyllaDB solves this JVM limitation by rewriting the entire engine in C++ using the Seastar framework.

  • Thread-per-core Architecture: ScyllaDB pins exactly one execution thread to each logical CPU core. It completely bypasses the Linux thread scheduler and kernel context switching.
  • Direct DMA Memory Management: Memory is pre-allocated and divided evenly across cores. There is no shared memory heap and no lock contention between cores. This completely eliminates Garbage Collection pauses, delivering flat, sub-millisecond P99 write latencies under maximum load.

Technical Trade-offs & Strategic Compromises

Selecting a distributed storage pattern for messaging requires balancing transactional guarantees, query complexity, and operations cost.

Storage Architecture Read Latency (P99) Write Throughput Schema Evolution Transactional Guarantees Operational Cost
Relational Sharding (MySQL/Postgres) Low Medium (Lock contention) Rigid / DDL migration Strong ACID (CP) High (Requires database proxies)
Masterless Wide-Column (ScyllaDB) Ultra-Low (<5ms) Max (LSM Appends) Extremely Flexible Eventual Consistency (AP) Low (Flat storage utilization)
NewSQL (Google Spanner / CockroachDB) Medium Low-Medium (Distributed 2PC locks) Rigid Global Serializability (CP) Extremely High (Network and compute overhead)

The AP Posture Choice

For a chat application, we explicitly choose Availability & Partition Tolerance (AP) over Consistency (CP). If a network partition occurs between availability zones, our ScyllaDB cluster continues accepting message writes from both sides of the partition, resolving read consistency gaps eventually through Read Repairs and background Entropy Synchronization. We compromise on strict real-time consistency to guarantee that users can always send messages.


Failure Scenarios and Fault Tolerance

Masterless ring storage is designed to survive hardware failures.

1. Partition Tombstone Saturation

In wide-column stores, deleting a message does not instantly erase it from disk. Instead, the database writes a marker called a Tombstone over the old record.

When a user scrolls through a channel's history, ScyllaDB must scan the index, reading through these tombstones. If a bot writes and deletes 100,000 messages in a channel, a query for "the last 50 messages" will force the database node to read 100,000 tombstones in memory.

Fault Tolerance Strategy:

  • We configure a strict threshold: tombstone_failure_threshold = 100000. If a query attempts to read more than 100,000 tombstones, ScyllaDB immediately aborts the query to prevent JVM/Thread memory exhaustion.
  • We run background Tombstone Compaction loops to merge and purge tombstones once their Time-To-Live (TTL) expiry has passed.

2. Node Failures and Active Anti-Entropy

If one of our ScyllaDB nodes crashes during peak write ingestion, our Masterless write path remains operational.

sequenceDiagram
    participant Client as API Client Driver
    participant NodeA as Storage Node A (Leader)
    participant NodeB as Storage Node B (Replica)
    participant NodeC as Storage Node C (Replica - Crashed!)

    Client->>NodeA: 1. Write Message (Quorum)
    NodeA->>NodeB: 2. Replicate Message
    Note over NodeA,NodeC: Node C is offline!
    NodeA--xNodeC: 2. Replicate Message (Fails)
    NodeA->>NodeA: 3. Write Hinted Handoff to local disk
    NodeA-->>Client: 4. Write Confirmed (Quorum Met 2/3)
    
    Note over NodeC: Node C boots back online
    NodeA->>NodeC: 5. Playback Hinted Handoff
    NodeC->>NodeC: 6. Apply missed writes

Fault Tolerance Strategy:

  • Hinted Handoffs: When a replica node is offline, the writing node stores the write payload locally on disk as a "Hint." Once the crashed node boots back online, the writing node plays back these hints, restoring consistency.
  • Anti-Entropy Merkle Trees: During idle periods, nodes exchange cryptographic Merkle Trees (tree hashes of their stored partition data) to quickly identify and sync divergent data chunks without transferring full datasets over the network.

Staff Engineer Perspective


Verbal Script & Mock Interview

Mock Interview Dialogue

Interviewer: "Welcome! Let's jump into a classic distributed storage problem: How would you design the storage architecture for a messaging platform like Discord that handles trillions of messages? Specifically, how do you handle data skew caused by massive public channels without degrading query latencies?"

Candidate: *"To scale a messaging platform to trillions of messages while neutralizing the data skew of public channels, I would adopt a masterless, wide-column database architecture like ScyllaDB.

If we use a naive data model where the partition key is simply channel_id, a high-traffic community like the Fortnite channel will force its entire message history—potentially terabytes of data—onto a single physical storage node. This triggers severe write hotspots and catastrophic LSM compaction storms that exhaust host disk I/O.

To prevent this, I would implement Bucketized Sharding. I will design a composite partition key: ((channel_id, bucket_id), message_id). The bucket_id represents a sequential block of messages based on their Snowflake timestamp. For high-volume channels, a bucket could represent 100,000 messages.

When a bucket fills up, the application increments the bucket_id and starts writing to a new partition. Because the partition key is a hash of both channel_id and bucket_id, our consistent hashing algorithm scatters these partition buckets randomly across every node in our ScyllaDB ring. This completely eliminates write hotspots by distributing the load of our massive channels evenly across our entire cluster."*

Interviewer: "Excellent. But by scattering a single channel's messages across multiple physical nodes, you've compromised on read latency. When a user opens a channel, how do you fetch their chat history without conducting a slow, expensive scatter-gather query across all nodes?"

Candidate: *"We explicitly avoid scatter-gather queries by keeping our query routing token-aware on the client-side. When a user opens a channel, our application service knows the current timestamp and Snowflake ID sequence. It calculates the active bucket_id for that channel and queries exactly that partition.

If the active partition contains fewer messages than the user requested (e.g., the user wants 50 messages, but the current bucket only has 10), the application client driver uses non-blocking asynchronous calls to query the previous sequential bucket_id partition (bucket_id - 1).

Because we calculate the exact bucket keys on the client-side, we can execute highly targeted O(1) point-lookup reads directly from the specific replica nodes on the hash ring, preserving sub-millisecond read latencies."*

Interviewer: "That is a very clean read-path optimization. How do you handle message deletions in this wide-column setup? What operational issues can arise from high delete rates?"

Candidate: *"In LSM-Tree engines, deletions do not physically erase records immediately; they write a marker called a Tombstone. When our query parser scans a partition, it must load these tombstones into memory to determine if a message was deleted.

If a channel experiences a high volume of deletions—for example, due to spam cleanup bots—a query for the latest messages will scan thousands of tombstones, exhausting JVM/thread memory and causing write timeouts.

To prevent this, I would establish two safeguards: First, we configure ScyllaDB's tombstone_failure_threshold to 100,000, aborting queries that attempt to scan beyond this limit to protect the node's memory. Second, we optimize our delete workflows by isolating them into separate time-bounded buckets. Once a bucket is completely dead-lettered or empty, we can drop the entire partition index rather than compiling individual tombstones, which completely bypasses the LSM merge and compact overhead."*

Interviewer: "Perfect. That shows an outstanding grasp of LSM engine mechanics and real-world system resilience. Let's proceed to the next phase!"


Knowledge Check

MySQL · 3 Questions

Test Your Understanding

Ready to test yourself?

Answer 3 quick questions to reinforce what you just learned. Takes under 2 minutes.

Want to track your progress?

Sign in to save your progress, track completed lessons, and pick up where you left off.