Lesson 5 of 20 17 minLeadership Track

Distributed Snapshots: Chandy-Lamport Algorithm

How to capture the state of an entire distributed system without stopping traffic. Deep dive into Chandy-Lamport.

Reading Mode

Hide the curriculum rail and keep the lesson centered for focused reading.

Key Takeaways

  • checkpointing long-running stream processors
  • debugging incident-time global state
  • recovery after node failures

Premium outcome

Reliability, failure handling, and judgment for high-stakes systems.

Senior and staff engineers leading architecture, incident response, and critical platform decisions.

You leave with

  • Playbooks for resilience, graceful degradation, multi-region design, and incident thinking
  • Sharper language for communicating risk, trade-offs, and platform constraints
  • A more complete sense of how senior engineers think beyond feature delivery

How do you capture a single, consistent snapshot of the state of an entire distributed system while it is running under heavy traffic, without freezing operations? In a single-machine system, we can easily capture state by pausing executing threads or using OS-level copy-on-write mechanisms (fork).

However, in a distributed system, we face fundamental barriers: there is no global shared memory, no synchronized physical clocks, and message transmission delays are completely unpredictable. If you simply ask every node to save its state at exactly 12:00 PM, network lag and clock drift will guarantee that your snapshot is inconsistent, recording messages that have been received but never sent, or vice versa.

Leslie Lamport and K. Mani Chandy solved this problem in 1985 with the Chandy-Lamport Algorithm. This decentralized, marker-passing algorithm permits the capture of a causally consistent global state (a consistent cut) without pausing the active stream of transaction traffic. This guide deconstructs the core mathematics, execution mechanics, high-level architectures, low-level Java/Go structures, and real-world production scaling bottlenecks of distributed snapshots.


System Requirements and Goals

Designing a distributed snapshot orchestration framework for enterprise stream processors (like Apache Flink or Spark Streaming) requires aligning functional goals with rigorous physical constraints.

1. Functional Requirements

  • Decentralized Initiation: Any arbitrary process in the distributed mesh must be able to initiate a global snapshot without relying on a single central master node.
  • Complete State Capture: Capture both the internal, in-memory states of all active processes AND the in-flight states of the communication channels (messages that are sent but have not yet been processed).
  • Historical Registry: Register and catalog snapshots in a persistent metadata repository, enabling users to rollback or audit the global state at specific transaction epochs.

2. Non-Functional Constraints

  • Non-Blocking Execution (Zero-Pause): The snapshot must run asynchronously in the background. It must never freeze normal transaction execution or increase process latency significantly.
  • Causal Consistency (Consistent Cut): The captured global state must represent a causally consistent cut. If an event $A$ is included in the snapshot, and $A$ causally preceded event $B$ ($A \rightarrow B$), then if $B$ is included, $A$ must also be included.
  • Bounded Resource Overhead: Minimally impact CPU cycles and channel bandwidth. Ensure that local recording buffers do not exhaust node heap RAM.

3. Sizing and Capacity Math

Let's model a distributed stream processing cluster under high load:

  • Cluster Size: $100\text{ nodes}$ (processes).
  • Communication Channels: Fully connected mesh topology, where each node has $99$ uni-directional input/output channels. $$\text{Total Channels} = N \times (N - 1) = 100 \times 99 = 9,900 \text{ channels}$$
  • Ingestion Throughput: $50,000$ messages/second per channel.
  • Average Message Size: $200\text{ Bytes}$.
  • In-Flight Recording Buffer Memory Cost: If a network hiccup delays marker propagation by $2\text{ seconds}$, a process must buffer the incoming messages on its input channels: $$\text{Buffered Messages/Channel} = 50,000 \text{ mps} \times 2 \text{ seconds} = 100,000 \text{ messages}$$ $$\text{RAM Cost/Channel} = 100,000 \times 200 \text{ Bytes} = 20 \text{ MB}$$ $$\text{Total Node Memory Overhead} = 99 \text{ input channels} \times 20 \text{ MB} \approx 1.98 \text{ GB of RAM}$$ This math demonstrates that without strict bounding on channel recording windows or using backpressure flow control, processes can easily exceed heap limits, triggering catastrophic JVM OutOfMemory (OOM) errors during snapshotting.

API Design and Interface Contracts

The snapshot coordinator and processing nodes communicate using a structured API. Below are the REST endpoints exposed by the controller and the gRPC protocol defining process interactions.

1. Trigger Global Snapshot

POST /v1/snapshots/trigger

Request Payload:

{
  "initiated_by_node_id": "worker-node-42",
  "snapshot_type": "incremental",
  "target_storage_uri": "s3://company-checkpoints/production-stream-01/"
}

Response Payload (202 Accepted):

{
  "snapshot_id": "snap-9874-abcde",
  "status": "INITIATED",
  "initiated_at": "2026-05-23T08:12:00Z",
  "estimated_completion_ms": 1500
}

2. Fetch Snapshot Metadata

GET /v1/snapshots/snap-9874-abcde

Response Payload (200 OK):

{
  "snapshot_id": "snap-9874-abcde",
  "status": "COMPLETED",
  "duration_ms": 1120,
  "total_size_bytes": 10737418240,
  "nodes_completed": 100,
  "nodes_failed": 0,
  "checkpoint_manifest_uri": "s3://company-checkpoints/production-stream-01/snap-9874-abcde.json"
}

3. Internal gRPC Protocol

Processes use these gRPC service contracts for state synchronization:

syntax = "proto3";

package distributed.snapshot;

service SnapshotService {
  rpc SendMarker(MarkerRequest) returns (MarkerResponse);
  rpc FlushChannelState(FlushRequest) returns (FlushResponse);
}

message MarkerRequest {
  string snapshot_id = 1;
  string sender_node_id = 2;
  int64 timestamp_ns = 3;
}

message MarkerResponse {
  bool accepted = 1;
}

message FlushRequest {
  string snapshot_id = 1;
  string node_id = 2;
  string channel_id = 3;
  bytes state_data = 4;
}

message FlushResponse {
  bool status = 1;
}

High-Level Design Architecture

Distributed snapshotting decouples process computation from state serialization. The architecture relies on uni-directional, reliable FIFO channels between the processing nodes.

1. Consistent vs. Inconsistent Cuts

In distributed systems, we visualize state using space-time diagrams. A cut is a slice across the system's execution timeline.

  • A Consistent Cut represents a valid state: no message received in the cut is shown as sent from outside the cut (no effects without causes).
  • An Inconsistent Cut contains a message received by a process, but the sending process has no record of sending it, violating physical causality.
graph TD
    %% Consistent Cut Visualisation
    subgraph "Consistent Cut (Valid)"
        P1_c1[P1 State Saved] -->|Send Msg 1| P2_c1[P2 State Saved]
        P1_c1 -->|Send Msg 2| Channel1{In-flight Buffer}
        Channel1 -->|Deliver Msg 2| P2_c2[P2 Post-Snapshot]
    end
    
    %% Inconsistent Cut Visualisation
    subgraph "Inconsistent Cut (Invalid)"
        P3_c2[P3 Post-Snapshot] -->|Send Msg 3| P4_c1[P4 State Saved]
    end
    
    style P1_c1 fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
    style P2_c1 fill:#1e3a8a,stroke:#3b82f6,stroke-width:2px,color:#fff
    style P4_c1 fill:#ef4444,stroke:#ef4444,stroke-width:2px,color:#fff

2. Marker Propagation Sequence

The Chandy-Lamport algorithm uses special control messages called Markers. When process $P_1$ decides to initiate a snapshot, it:

  1. Records its own local state.
  2. Immediately broadcasts a Marker to all outbound channels before sending any more data messages.
  3. Begins recording all incoming messages on its input channels.

When process $P_2$ receives the Marker along channel $C_{1\rightarrow2}$:

  • If it has not yet saved its local state, it saves its state, forwards the Marker to all of its own outbound channels, sets $C_{1\rightarrow2}$ state as empty, and begins recording all other incoming channels.
  • If it has already saved its local state, it stops recording channel $C_{1\rightarrow2}$. The state of channel $C_{1\rightarrow2}$ is recorded as all the messages received along it since $P_2$ originally saved its state.
sequenceDiagram
    autonumber
    participant P1 as Process 1 (Initiator)
    participant Channel as FIFO Channel (P1 -> P2)
    participant P2 as Process 2 (Receiver)
    
    P1->>P1: Save Local State (L1)
    P1->>Channel: Send Marker (Snap-1)
    P1->>Channel: Send Normal Message (Msg A)
    Note over Channel: In-flight Queue: [Marker, Msg A]
    
    Channel->>P2: Deliver Marker (Snap-1)
    activate P2
    P2->>P2: Save Local State (L2)
    P2->>P2: Set Channel (P1->P2) state to EMPTY
    deactivate P2
    
    Channel->>P2: Deliver Normal Message (Msg A)
    Note over P2: Msg A processed after snapshot. Not recorded.

Low-Level Design & Component Mechanics

To build an executable simulation of the Chandy-Lamport algorithm, we design a Java model detailing process states, input/output channels, and state transition loops.

1. Java Process & Channel Model

package com.codesprintpro.snapshot;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

public class DistributedProcess {
    private final String id;
    private final Map<String, List<String>> incomingBuffers = new ConcurrentHashMap<>();
    private final Map<String, Boolean> isRecording = new ConcurrentHashMap<>();
    private final Map<String, List<String>> channelStates = new ConcurrentHashMap<>();
    private final List<String> outboundChannels = new CopyOnWriteArrayList<>();
    private String localState = "INITIAL_STATE";
    private boolean hasSavedState = false;
    private String activeSnapshotId = null;

    public DistributedProcess(String id) {
        this.id = id;
    }

    public synchronized void registerOutboundChannel(String targetProcessId) {
        outboundChannels.add(targetProcessId);
    }

    public synchronized void registerIncomingChannel(String sourceProcessId) {
        incomingBuffers.put(sourceProcessId, new ArrayList<>());
        isRecording.put(sourceProcessId, false);
        channelStates.put(sourceProcessId, new ArrayList<>());
    }

    // 1. Process Initiator Rule
    public synchronized void initiateSnapshot(String snapshotId) {
        this.activeSnapshotId = snapshotId;
        this.localState = "STATE_AT_" + System.currentTimeMillis();
        this.hasSavedState = true;
        System.out.println("Process " + id + " initiated snapshot " + snapshotId + " with local state: " + localState);

        // Turn on recording on all incoming channels
        for (String sourceId : incomingBuffers.keySet()) {
            isRecording.put(sourceId, true);
            channelStates.put(sourceId, new ArrayList<>());
        }

        // Send marker to all outbound channels
        for (String targetId : outboundChannels) {
            sendMarkerMessage(targetId, snapshotId);
        }
    }

    // 2. Marker Receiving Rule
    public synchronized void receiveMarker(String sourceId, String snapshotId) {
        if (!hasSavedState) {
            // First time seeing the marker
            this.activeSnapshotId = snapshotId;
            this.localState = "STATE_AT_" + System.currentTimeMillis();
            this.hasSavedState = true;
            System.out.println("Process " + id + " received marker first time from " + sourceId + ". Saved state: " + localState);

            // Channel on which marker arrived is recorded as EMPTY
            channelStates.put(sourceId, new ArrayList<>());
            isRecording.put(sourceId, false);

            // Turn on recording for all other incoming channels
            for (String otherSourceId : incomingBuffers.keySet()) {
                if (!otherSourceId.equals(sourceId)) {
                    isRecording.put(otherSourceId, true);
                    channelStates.put(otherSourceId, new ArrayList<>());
                }
            }

            // Forward marker to all outbound channels
            for (String targetId : outboundChannels) {
                sendMarkerMessage(targetId, snapshotId);
            }
        } else {
            // Already saved local state. Stop recording the channel on which marker arrived.
            isRecording.put(sourceId, false);
            System.out.println("Process " + id + " already saved state. Stopped recording channel: " + sourceId + ". Channel state contains: " + channelStates.get(sourceId));
        }
    }

    // Process Normal Data Messages
    public synchronized void receiveMessage(String sourceId, String message) {
        if (isRecording.getOrDefault(sourceId, false)) {
            channelStates.get(sourceId).add(message);
            System.out.println("Process " + id + " recorded message on channel " + sourceId + ": " + message);
        } else {
            System.out.println("Process " + id + " processed message normally from " + sourceId + ": " + message);
        }
    }

    private void sendMarkerMessage(String targetId, String snapshotId) {
        System.out.println("Process " + id + " sending MARKER (" + snapshotId + ") to " + targetId);
    }
}

2. Algorithmic Mathematical Invariant

The correctness of the Chandy-Lamport snapshot relies on the causal precedence of state transitions. For any run of the algorithm:

  • The recorded state of process $P_i$ represents its state immediately before it sends out its markers.
  • The recorded state of channel $C_{i\rightarrow j}$ is the sequence of messages sent by $P_i$ before $P_i$ sent its marker, but not received by $P_j$ before $P_j$ received its marker.
  • The algorithm terminates in a finite time because markers propagate through the finite network paths, and eventually all processes receive a marker from every incoming neighbor.

Scaling Challenges & Production Bottlenecks

Moving from abstract mathematical theorems to production MANG-scale clusters reveals several severe implementation bottlenecks:

1. High Channel Ingestion Rate and Buffering Outages

Under a peak workload of 500,000 events/second, forcing a process to buffer all incoming messages while it waits for a Marker creates a massive memory bubble. If a slow upstream network link delays a Marker's arrival by just 10 seconds, the node will accumulate millions of buffered events in heap space, triggering severe JVM Garbage Collection pauses.

Mitigation (Asynchronous Barrier Snapshotting & Backpressure): Modern streaming platforms like Apache Flink do not record channel states. Instead, they use a modified variant called Asynchronous Barrier Snapshotting (ABS).

  1. Barrier Alignment: When a Flink task receives a checkpoint barrier (equivalent to a Marker) from an input channel, it pauses reading from that channel and continues processing messages from other input channels.
  2. Alignment Backpressure: This pause exerts natural backpressure on the upstream node of the paused channel. The upstream node slows down its transmission, keeping the in-flight channel buffers bounded in size. Once barriers from all input channels arrive, the process saves its local state (using RocksDB) and resumes processing on all channels, completely eliminating the need to buffer in-flight messages.

2. Dynamic Network Topology during Snapshotting

In highly dynamic cloud environments, Kubernetes pods auto-scale, terminate, or migrate while a snapshot is in-flight. If a process crash changes the network topology, the markers will fail to traverse all paths, causing the snapshot to hang indefinitely.

Mitigation (Epoch-Based Snapshot Expirations & Gossip Consensus):

  • Timeouts: Every snapshot is initiated with a strict time-to-live (TTL). If a node fails to receive markers from all input channels within the TTL (e.g. 5 seconds), it aborts the snapshot locally, garbage-collects its temporary buffers, and notifies the coordinator.
  • Gossip Topology Sync: Processes utilize a decentralized Gossip protocol (like HashiCorp Consul/Serf) to maintain cluster membership. If a node migration occurs, Flink cancels the active checkpoint epoch and triggers a fresh coordinate cycle.

3. FIFO Channel Constraints on Non-FIFO Networks

Chandy-Lamport mathematically requires FIFO (First-In-First-Out) communication channels. If a network path reorders a data message to arrive after a Marker when it was actually sent before it, the snapshot cut becomes causally inconsistent. However, in modern microservice meshes utilizing HTTP/2, gRPC, or UDP, FIFO delivery across raw nodes is not guaranteed.

Mitigation:

  • Sequence Numbers and TCP Enforcement: All inter-node communication channels must be encapsulated in TCP connections (which internally enforce FIFO order via packet sequence numbers).
  • Kafka Offset Tracking: In event-driven streaming, processes track Kafka partition offsets instead of raw networking buffers. The "channel state" is recorded simply as the high-water offset of the partition, bypassing the need to capture network-level transport packages.

Technical Trade-offs & Strategic Compromises

Engineers must select snapshot algorithms based on cluster structure and business SLAs.

Checkpoint Method Latency Impact Memory Overhead Network Overhead Network Topology Constraint Recovery Complexity
Synchronous Stop-The-World Catastrophic (Pauses cluster) Zero Low None Low
Standard Chandy-Lamport Low (Zero-Pause) High (Buffers in-flight channels) Medium Fully Connected FIFO Medium
Asynchronous Barrier (Flink ABS) Low (Zero-Pause) Minimal (Uses backpressure) Low Directed Acyclic Graph (DAG) Medium
Redis RDB (Copy-on-Write) Minimal (Uses OS fork) Medium (Increases during writes) Low Single Node Low

Asynchronous Barrier (ABS) vs. Standard Chandy-Lamport

Standard Chandy-Lamport is designed for any arbitrary directed graph topology (including cycles) and requires recording the exact state of channels. This is ideal for arbitrary actor-model networks but has a high memory cost.

Flink's Asynchronous Barrier Snapshotting (ABS) takes advantage of the fact that dataflow architectures are structured as Directed Acyclic Graphs (DAGs). By blocking the input channel upon receiving a barrier, it aligns checkpoints at process boundaries, completely eliminating the need to capture and store channel message streams. This is the optimal trade-off for data ingestion pipelines where minimizing storage volume is a top priority.


Failure Scenarios and Fault Tolerance

Distributed snapshots are used to recover from failures, but the snapshot mechanism itself must be resilient.

1. Process Crash Mid-Snapshot

If a process crashes while recording a snapshot, the global state is corrupted.

Recovery Protocol:

  • The global coordinator tracks snapshot completion status.
  • If a node heartbeat times out, the coordinator transitions the snapshot status to FAILED.
  • The coordinator issues a cleanup request to all remaining nodes to release their snapshot buffers.
  • The system rolls back and restores all active processing states from the last known successful checkpoint manifest in S3, restarting Flink tasks from their saved Kafka offsets.

2. Checkpoint Storage Write Saturation

When 100 Flink worker nodes attempt to upload their local RocksDB states (potentially gigabytes of data) to AWS S3 at the exact same millisecond, the S3 storage endpoint will return HTTP 503 Slow Down rate-limiting responses, causing the snapshot to fail due to timeouts.

Mitigation:

  • Incremental Checkpointing: Processes only upload SST (Sorted String Table) files that have changed since the last checkpoint, reducing S3 write volume by up to 90%.
  • Jittered Upload Flush: Nodes introduce micro-jitters (random delays between 0-500ms) before initiating uploads, smoothing the peak bandwidth pressure.

Staff Engineer Perspective


Verbal Script & Mock Interview

Mock Interview Dialogue

Interviewer: "We operate a high-throughput transaction ledger streaming system that processes billions of updates daily. We need to take global, consistent state backups periodically for audit compliance. However, we cannot tolerate any service downtime or processing pauses. How would you design this?"

Candidate: *"To capture a consistent global state without stopping traffic, I would implement the Chandy-Lamport distributed snapshot algorithm, tailored for streaming pipelines.

In this architecture, any processing node can initiate a snapshot. Upon initiation, the node saves its local, in-memory state, and immediately broadcasts a special control message called a Marker on all outbound communication channels.

When neighbor processes receive this Marker, they perform a conditional evaluation. If they have not yet saved their state, they immediately freeze their incoming channel, record their local state, forward the Marker downstream, and begin recording the streams on all other input channels. If they have already saved their state, they stop recording the channel on which the marker arrived. The state of that channel is saved as the sequence of all messages received since the initial state capture.

By propagating these markers through FIFO TCP channels, we guarantee a causally consistent cut of the system. The entire process runs asynchronously in the background without freezing operations."*

Interviewer: "That works beautifully in theory, but under high load, recording in-flight channel messages consumes excessive memory. How would you optimize this for a production Flink cluster?"

Candidate: *"Under high production scale, capturing channel states is indeed a bottleneck. To mitigate this, I would adopt Asynchronous Barrier Snapshotting (ABS), which is the system design primitive behind Apache Flink.

Instead of buffering messages on input channels, when a processing node receives a checkpoint barrier from one input channel, it temporarily halts ingestion on that specific channel while continuing to ingest from the other unsynchronized channels. This is called Barrier Alignment.

This alignment pause propagates natural backpressure upstream, bounding the number of in-flight messages. Once barriers have arrived from all incoming channels, the process saves its local state (writing incrementally to RocksDB), forwards the barrier downstream, and immediately resumes reading from all input channels. By aligning barriers at the node boundary, we completely eliminate the need to store channel states, reducing our snapshot memory overhead to almost zero."*

Interviewer: "Excellent. What if a node crashes mid-checkpoint? How do you ensure we don't end up with corrupted, partially completed snapshots in S3?"

Candidate: *"To guarantee transactional integrity, the system uses a two-phase checkpoint coordinator pattern.

When a process completes its local snapshot, it uploads its state data to durable object storage like S3 and sends a metadata confirmation to a central coordinator (such as the Flink JobManager). The JManager maintains an in-memory coordinator tracking which workers have completed.

If any worker node fails or its heartbeat times out during a checkpoint, the coordinator marks that checkpoint ID as FAILED and broadcasts an abort command to all workers to purge their local temporary buffers. A checkpoint is only considered valid once all active nodes have successfully uploaded their states. If a crash occurs, we restore the cluster by rolling back every node to the last fully committed checkpoint manifest stored in S3, replaying the Kafka logs from the recorded offset boundaries. This guarantees exactly-once state processing."*

Interviewer: "Outstanding. You have a very clear grasp of the physical network constraints, memory mechanics, and failure modes of distributed state systems."


Want to track your progress?

Sign in to save your progress, track completed lessons, and pick up where you left off.