Lesson 22 of 38 11 minDesign Track

System Design: Designing a Distributed Message Queue (Kafka)

Master the internals of a high-throughput, log-structured distributed message queue. Learn about partitioning, replication topologies, zero-copy page cache optimization, consumer group rebalances, and ISR durability models.

Reading Mode

Hide the curriculum rail and keep the lesson centered for focused reading.

Key Takeaways

  • **Log-Structured Append-Only Disk Storage:** Kafka gains its massive throughput by writing sequentially to partition files, turning slow random disk I/O into fast sequential operations that match RAM speeds.
  • **Zero-Copy Kernel Optimization:** Utilizing the `sendfile` system call, Kafka transfers data directly from the OS Page Cache to the NIC Buffer, bypassing user-space context switches.
  • **ISR (In-Sync Replicas) Coordination:** Fault tolerance is managed through strict leader-follower partitions coordinated by a consensus controller (ZooKeeper or KRaft), enforcing configurable durability levels via producer ACKs.
Recommended Prerequisites
System Design Module 1: Scalability

Premium outcome

Bridge the gap between architecture diagrams and implementation details.

Engineers preparing for LLD rounds or leveling up their software design depth.

What you unlock

  • Cleaner reasoning around SOLID, patterns, responsibilities, and schema design
  • A usable bridge between HLD whiteboard thinking and concrete Java classes
  • Case-study practice across common interview-style design systems

1. Core Requirements & Scale Constraints

A distributed message queue is the central nervous system of modern microservice architectures, responsible for decoupling system components, buffering spikes, and streaming real-time events.

Designing a platform like Apache Kafka requires a shift from standard transactional databases to a highly optimized Log-Structured Distributed Commit Log optimized for sequential file-system storage.

Functional Requirements

  • Publish Message (Produce): Producers can append messages (key-value payloads) to a specific topic.
  • Consume Message (Fetch): Consumers can read messages from a topic chronologically starting from a specific offset.
  • Topic Partitioning: Topics are divided into multiple partitions distributed across a cluster of brokers to support horizontal scaling and high concurrency.
  • Consumer Groups: Multiple consumer instances can group together to consume a topic in parallel, where each partition is assigned to exactly one consumer inside the group.
  • Message Retention: Messages are persisted on disk and can be replayed based on time-based or size-based retention policies.

Non-Functional Constraints (SLAs)

  • Ultra-High Throughput: Must process up to 10 Terabytes of event writes per day (avg 115 MB/sec write throughput).
  • Sub-Second Write Latency: Producer publish latency must be $\le 10\text{ms}$ at the $p99$ percentile.
  • High Durability: Zero message loss guarantees for fully committed writes.
  • High Availability: $99.999%$ uptime SLA under broker node crashes or network partitions.

Back-of-the-Envelope Estimates

  • Write Scale:
    • Assume average message size is 1 KB.
    • $10\text{ TB/day} \div 1\text{ KB/msg} \approx 10\text{ Billion messages per day}$.
    • Average Write Rate: $10\text{B} \div 86400\text{s} \approx 115,740\text{ writes per second}$.
    • Peak Write Rate (3x multiplier): $\approx 350,000\text{ writes per second}$.
  • Storage Scaling (Retention):
    • Storing 10 TB/day with a 7-day retention policy: $10\text{ TB} \times 7 = 70\text{ TB}$ raw storage.
    • With a Replication Factor of 3 for fault tolerance: $70\text{ TB} \times 3 = 210\text{ TB}$ of storage across the cluster.
  • Network Bandwidth:
    • Ingress (Writes): $115\text{ MB/sec} \times 3\text{ (replication factor)} = 345\text{ MB/sec}$ inbound write traffic.
    • Egress (Reads): Assuming 5 different consumer groups reading the same topic: $115\text{ MB/sec} \times 5 = 575\text{ MB/sec}$ outbound read traffic.

2. API Design & Core Contracts

Kafka operates over a custom TCP binary protocol rather than HTTP/REST to minimize serialization and header overhead. Below are logical representations of the core RPC APIs.

Produce API (Publish Message)

// Logical representation of the binary Produce Request
message ProduceRequest {
  string client_id = 1;
  int32 required_acks = 2; // -1 = ALL, 0 = NONE, 1 = LEADER
  int32 timeout_ms = 3;
  
  message TopicData {
    string topic_name = 1;
    message PartitionData {
      int32 partition_id = 1;
      bytes record_set = 2; // Batch of binary serialized messages
    }
    repeated PartitionData partitions = 2;
  }
  repeated TopicData topics = 4;
}
// Sample JSON Representation of HTTP-REST Gateway proxy
POST /v1/topics/order-events/produce
Content-Type: application/json

{
  "records": [
    {
      "key": "user_897123",
      "value": {
        "order_id": "ord_9082348",
        "amount": 128.50,
        "items": ["sku_1", "sku_2"]
      }
    }
  ]
}

Fetch API (Read Messages)

message FetchRequest {
  int32 replica_id = 1; // -1 for standard consumers
  int32 max_wait_ms = 2;
  int32 min_bytes = 3;
  
  message TopicFetch {
    string topic_name = 1;
    message PartitionFetch {
      int32 partition_id = 1;
      int64 fetch_offset = 2; // Starting offset
      int32 max_bytes = 3;
    }
    repeated PartitionFetch partitions = 2;
  }
  repeated TopicFetch topics = 4;
}

3. High-Level Design (HLD)

The core architecture leverages a clustered topology coordinated by a metadata consensus service (KRaft or ZooKeeper). Topics are split into physical partition logs that are distributed across a fleet of active brokers.

flowchart TD
    Producers[Producers Client Fleet] -->|TCP Write Batch| EnvoyLB[Network Load Balancer]
    EnvoyLB --> Broker1[Broker 1 - Leader Part 0]
    EnvoyLB --> Broker2[Broker 2 - Leader Part 1]
    EnvoyLB --> Broker3[Broker 3 - Leader Part 2]
    
    subgraph "Kafka Broker Cluster"
        subgraph "Broker Node 1"
            B1L[Partition 0 - Leader]
            B1F[Partition 1 - Follower]
        end
        subgraph "Broker Node 2"
            B2L[Partition 1 - Leader]
            B2F[Partition 2 - Follower]
        end
        subgraph "Broker Node 3"
            B3L[Partition 2 - Leader]
            B3F[Partition 0 - Follower]
        end
    end
    
    %% Replication path
    B1L -->|Replicate ISR| B3F
    B2L -->|Replicate ISR| B1F
    B3L -->|Replicate ISR| B2F
    
    %% Coordination
    KRaft[(KRaft Metadata Cluster)] <-->|Leader Election / Metadata Sync| Broker1
    KRaft <--> Broker2
    KRaft <--> Broker3
    
    %% Consumers
    Consumers[Consumer Group] -->|TCP Fetch Request| Broker1
    Consumers --> Broker2
    Consumers --> Broker3

4. Low-Level Design (LLD) & Data Models

Log Segment Storage Internals

A partition is not a single giant file. It is broken down into Log Segments (typically 1 GB files) saved in the broker's local file system. Each partition segment consists of three physical files:

  1. 00000000000000000000.log: The actual binary messages appended sequentially.
  2. 00000000000000000000.index: A sparse index matching offsets to physical byte positions in the .log file.
  3. 00000000000000000000.timeindex: A sparse index matching timestamps to offsets for fast time-based message seeking.
/var/lib/kafka/data/
  └── order-events-0/               (Topic: order-events, Partition: 0)
      ├── 00000000000000000000.log  (Sequential message file)
      ├── 00000000000000000000.index (Sparse index)
      └── 00000000000000000000.timeindex

Sparse Index Layout

Instead of indexing every single message, which would consume massive memory, Kafka indexes every $N$ bytes (e.g., every 4 KB).

Index File (.index)                  Log File (.log)
[Offset 0  -> Position 0]            | Offset 0: Key="A" | Payload="..." (4 KB) |
[Offset 12 -> Position 4096]   --->  | Offset 12: Key="B" | Payload="..." (3 KB) |
[Offset 24 -> Position 7168]         | Offset 24: Key="C" | Payload="..." (5 KB) |

Lookup Flow: If a consumer fetches offset 15, the broker performs a binary search in the index file to locate the largest index offset $\le 15$ (which is Offset 12 at Position 4096). The broker then seeks directly to position 4096 in the .log file and scans sequentially until it finds offset 15. This layout keeps the index entirely in RAM!

The Zero-Copy Data Transfer Mechanic

In typical systems, reading data from disk and sending it over the network requires 4 context switches and 4 memory copies:

Disk -> Kernel Buffer -> User-space Application -> Socket Buffer -> NIC Buffer

Kafka bypasses the application space using the Linux sendfile() system call (Zero-Copy):

Disk -> OS Page Cache -> NIC Buffer (using DMA - Direct Memory Access)
sequenceDiagram
    autonumber
    participant Disk as Local Disk
    participant PageCache as OS Page Cache (Kernel)
    participant NIC as Network Interface Card (NIC)
    
    Note over Disk, NIC: ZERO-COPY SEND_FILE FLOW
    Disk->>PageCache: 1. Read block sequentially (DMA Copy)
    Note over PageCache: Data sits in OS Page Cache
    PageCache->>NIC: 2. Sendfile() syscall copies pointer & data directly (DMA Transfer)
    Note over NIC: Bypasses App Heap and Socket Buffer!

5. Scaling Challenges & System Bottlenecks

The Rebalance Storm Problem

When a consumer joins or leaves a consumer group, or when topic partitions increase, the group coordinator initiates a Rebalance Storm. The coordinator stops all consumption (Stop-The-World phase) and re-assigns partitions.

  • The Bottleneck: During a rebalance under a heavy workload, consumers might fail to heart-beat in time, triggering another rebalance in a deadly cascading loop.
  • Mitigation:
    • Static Membership: Assign a unique group.instance.id to consumers. When a consumer restarts within its timeout, its partition is preserved, bypassing a full rebalance.
    • Cooperative Sticky Assignor: Rebalances only reassigned partitions rather than wiping all active mappings, preserving active consumers' throughput.

Hot Partitions & Key Distribution

Producers route messages using a partitioner: Hash(key) % total_partitions. If a single partition key (e.g. a super-active tenant ID) receives $90%$ of all messages, that single partition broker becomes CPU-bound, while other brokers remain idle.

  • Mitigation:
    • Salting Keys: Append a random suffix (e.g., tenant_ID + "_" + random(1..5)) to distribute the load across multiple partitions.
    • Custom Partitioner: Implement a fall-through routing system that switches to round-robin mapping if write thresholds on a single partition key are exceeded.

6. Operational Trade-offs & CAP Theorem Realities

Durability vs. Write Latency Trade-offs

In the engineering of highly-reliable messaging fabrics, a Staff Engineer must balance the operational spectrum of replication persistence and processing performance. When a producer publishes a message, it configures the reliability boundary using the acks parameter:

acks Setting Consistency Spectrum Write Latency Durability Guarantee
acks = 0 Highly Available (AP) Ultra-Low (< 1ms) Extremely Low: Producer fires and forgets. Message is lost if broker crashes before disk commit.
acks = 1 Balanced Low (2 - 5ms) Medium: Leader commits to local page cache and acknowledges. Message is lost if Leader crashes before replicas consume it.
acks = all (-1) Highly Consistent (CP) High (8 - 15ms) Maximal: Leader replicates to all In-Sync Replicas (ISR) before acknowledging. Zero data loss.

7. Failure Scenarios & High-Availability Resilience

A. Leader Broker Crash (ISR Re-Election)

What happens if the broker hosting the Leader Partition for order-events-0 dies?

flowchart TD
    BrokerCrash[Broker 1 Leader Crashes] -->|Heartbeat Timeout| Controller[KRaft / Active Controller]
    Controller -->|Promote Follower| SelectReplica[Select healthiest Broker in ISR pool]
    SelectReplica -->|Elect Broker 2| NewLeader[Broker 2 promoted to Partition 0 Leader]
    NewLeader -->|Sync Epoch| UpdateMetadata[Update metadata epoch & broadcast to clients]

Mitigation:

  1. Unclean Leader Election Control: By setting unclean.leader.election.enable = false, we block completely out-of-sync followers from being promoted to leaders, choosing CP over AP to prevent out-of-order data corruption.
  2. Leader Epoch Tracking: Consumers track the Leader Epoch number. If a partition leader fails and a new one is elected, messages written to the failed leader that were not replicated are truncated based on the new epoch boundaries.

B. Controller Split-Brain (Consensus Failure)

In an active cluster, one broker acts as the active Controller (electing leaders, managing metadata). If a network partition cuts off the controller, the cluster might elect a second controller, creating a "split-brain" state where two nodes issue conflicting leader promotions.

Mitigation:

  1. KRaft Consensus Quorum: Modern Kafka clusters utilize KRaft (Raft-based metadata replication). Metadata actions require a strict majority quorum ($> N/2$) of controller nodes.
  2. Epoch Monotonic Tokens: Every controller action must include the monotonically increasing Epoch Token. If a partition leader receives an instruction from an older controller epoch, it rejects the command instantly, resolving the partition clash safely.

8. Candidate Verbal Script (Interview Guide)

Interviewer: "You mentioned that Kafka persists all data to disk. How does a database that writes every message to mechanical hard drives or standard block storage achieve millions of writes per second without blocking thread execution?"

Candidate: "That is a fundamental design secret of Kafka's storage engine. Writing to disk is only slow when performing Random I/O, which involves physical disk seeks or index tree updates. Kafka entirely avoids random write patterns by structuring its partitions as immutable, append-only logs.

Sequential disk writes are incredibly fast because the OS kernel performs aggressive read-ahead caching and write-behind buffering. In fact, sequential I/O write performance on modern NVMe drives or even SATA mechanical drives matches RAM speeds, reaching hundreds of megabytes per second.

Furthermore, Kafka doesn't actively lock physical disk writes on the write thread. When a producer publishes a message, it is written directly to the OS Page Cache (RAM). The OS kernel then asynchronously flushes these pages to the physical disk (via the pdflush or dirty_background_ratio OS triggers).

Because reads fetch data directly from the OS Page Cache as well, the physical disk is frequently bypassed completely during active streaming. This architectural coupling of sequential append-only logs, OS Page Cache utilization, and Zero-Copy network streaming is what allows Kafka to handle massive ingestion volumes with sub-millisecond latencies."

Want to track your progress?

Sign in to save your progress, track completed lessons, and pick up where you left off.