Skip to main content

The Anatomy of a Distributed Message Broker

Yash Sachdeva
Author
Yash Sachdeva
Software Engineer | Turning Complex Problems into Simple Solutions
Software engineer by trade, problem solver by nature. I write about the systems I build in my free time and the experiences that shape them.

Introduction
#

Distributed message brokers are the backbone of modern microservices architectures, enabling asynchronous communication, decoupling services, and providing a reliable way to handle data streams at scale.

Requirements
#

The broker is intented to support high throughput, low latency messaging with durability and horizontal scalability, similar to Kafka’s real time streaming usecases and its role as a microservice communication backbone.

Functional Requirements
#

  1. Producer publishes messages to named topics.
  2. Consumers subscribe to topics and read messages in order (atleast within a partition).
  3. Multiple producers and consumers operate concurrently.
  4. Consumer groups provide load balancing - each message is delivered to exactly one consumer within a group, while the same topic can be consumed by multiple consumer groups independently.

Non Functional Requirements
#

  1. High Availability - The broker should be available even if some brokers fail.
  2. Durability - Messages should not be lost even if brokers fail.
  3. Scalability - The broker should be able to handle increasing load by adding more brokers.
  4. Low Latency - Messages should be delivered with low latency.

Approaches
#

1. Single Node In Memory Broker
#

Let’s start with a single broker system and then move to a distributed system. The first version will be a single broker system with an in-memory queue. Producers enqueue messages to the queue and consumers dequeue messages from the queue.

public class Message {
    private final String key;
    private final byte[] value;
    
    public Message(String key, byte[] value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public byte[] getValue() {
        return value;
    }
}
// In Memory Broker 
import java.util.Map;
import java.util.concurrent.*;

public class InMemoryBroker {
    private final Map<String, BlockingQueue<Message>> topics = new ConcurrentHashMap<>();
    
    public void createTopic(String topic) {
        topics.putIfAbsent(topic, new LinkedBlockingQueue<>());
    }
    
    public void produce(String topic, Message message) {
        BlockingQueue<Message> queue = topics.get(topic);
        if (queue == null) {
            throw new IllegalArgumentException("Topic " + topic + " does not exist");
        }
        queue.add(message);
    }
    
    public Message consume(String topic, long timeout) throws InterruptedException {
        BlockingQueue<Message> queue = topics.get(topic);
        if (queue == null) {
            throw new IllegalArgumentException("Topic " + topic + " does not exist");
        }
        return queue.poll(timeout, TimeUnit.MILLISECONDS);
    }
}

Usage
#

public class InMemoryBrokerTest {
	
	@Test
    public void testProducerConsumerForInMemoryBroker() throws InterruptedException {
        InMemoryBroker broker = new InMemoryBroker();
        broker.createTopic("test");
        
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    broker.produce("test", new Message("key" + i, ("value" + i).getBytes()));
                    System.out.println("Produced: " + i);
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        });

        Thread consumer = new Thread(() -> {
            while(true){
                try {
                    Message message = broker.consume("test", 1000);
                    if(message == null){
                        break;
                    }
                    System.out.println("Consumed: " + message.getKey());
                } catch (Exception e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });

        producer.start();
        consumer.start();

        producer.join();
        consumer.join();
    }
}

Output:

Produced: 0
Consumed: key0
Consumed: key1
Produced: 1
Produced: 2
Consumed: key2
Consumed: key3
Produced: 3
Produced: 4
Consumed: key4
Consumed: key5
Produced: 5
Produced: 6
Produced: 7
Produced: 8
Produced: 9
Consumed: key6
Consumed: key7
Consumed: key8
Consumed: key9

Notice that the print statement appears after the produce operation, so even if the producer produces first, its log message may still appear after the consumer has already consumed the message. This is due to the nature of concurrent execution.

Currently the broker is single threaded and single process. It lacks durability and scalability.

Let’s add persistence to the broker. We can use a file to store the messages.

2. Single Node Append Only Log With Replay
#

Let’s make the broker log based. Each topic is a commit log on the disk: append-only, ordered and addressable by offset. Producers append to the end, consumers read from a specific offset and track their own position. This gives us durability and replay similar to Kafka’s commit log.

Kafka does exactly this: each topic is split into partitions, and each partition is a persistent ordered log with monotonically increasing offsets.

Log Partition Abstraction

public interface LogPartition{
    long append(Message message) throws Exception;
    
    // Read message starting from a given offset
    ReadResult read(long offset) throws Exception; 
    
    long endOffset() throws Exception; //lastOffset + 1
}
import java.io.*;
import java.nio.file.*;
import java.util.*;

public class FileBackedLogPartition implements LogPartition, Closeable {
    private final RandomAccessFile raf;

    public FileBackedLogPartition(Path path) throws IOException {
        Files.createDirectories(path.getParent());
        this.raf = new RandomAccessFile(path.toFile(), "rw");
    }

    @Override
    public synchronized long append(Message message) throws IOException {
        long offset = raf.length();
        raf.seek(offset);
        raf.writeInt(message.getKey().length());
        raf.write(message.getKey().getBytes());
        raf.writeInt(message.getValue().length);
        raf.write(message.getValue());
        return offset;
    }

    @Override
    public ReadResult read(long offset) throws IOException {
    	if (offset >= raf.length()) {
            return null;
        }
        raf.seek(offset);

        int keyLen = raf.readInt();
        byte[] keyBytes = new byte[keyLen];
        raf.readFully(keyBytes);

        int valueLen = raf.readInt();
        byte[] valueBytes = new byte[valueLen];
        raf.readFully(valueBytes);

        long nextOffset = raf.getFilePointer();  // position after this record

        Message msg = new Message(new String(keyBytes), valueBytes);
        return new ReadResult(msg, nextOffset);
    }

    @Override
    public long endOffset() throws IOException {
        return raf.length();
    }

    @Override
    public void close() throws IOException {
        raf.close();
    }
}
// Broker with per group offsets
import java.nio.file.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class FileBroker {
    private final Path logDir;
    private final Map<String, LogPartition> topicsToPartitionMap = new ConcurrentHashMap<>();
    // consumerGroup -> topic -> offset
    private final Map<String,Map<String,Long>> consumerOffsets = new ConcurrentHashMap<>();

    public FileBroker(Path logDir){
        this.logDir = logDir;
    }

    public synchronized void createTopic(String name) throws Exception{
        topicsToPartitionMap.computeIfAbsent(name, t -> {
            try {
                return new FileBackedLogPartition(logDir.resolve(name+".log"));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public long produce(String topic, Message message) throws Exception{
        LogPartition partition = topicsToPartitionMap.get(topic);
        if(partition == null){
            throw new IllegalArgumentException("Topic " + topic + " does not exist");
        }
        return partition.append(message);
    }

    public Message consume(String topic, String consumerGroup) throws Exception{
        LogPartition partition = topicsToPartitionMap.get(topic);
        if(partition == null){
            throw new IllegalArgumentException("Topic " + topic + " does not exist");
        }
        long offset = consumerOffsets.computeIfAbsent(consumerGroup, g -> new ConcurrentHashMap<>()).getOrDefault(topic, 0L);

        if(offset >= partition.endOffset()){
            return null;
        }

       ReadResult result = partition.read(offset);
        long next = result.getNextOffset();
        if(result != null){
            consumerOffsets.get(consumerGroup).put(topic, next);
        }
        return result.getMessage();
    }
}

Let’s verify durability and replay now

import java.nio.file.*;

public class FileBrokerTest {
	
	@Test
    public void testFileBasedBroker() throws Exception {
        Path logDir = Files.createTempDirectory("broker");

        FileBroker broker = new FileBroker(logDir);
        broker.createTopic("test");

        for(int i=0;i<3;i++){
            broker.produce("test", new Message("key"+i, ("value"+i).getBytes()));
        }
        // simulate broker restart
        FileBroker broker2 = new FileBroker(logDir);
        broker2.createTopic("test");
        
        // consumer A reads the messages
        Message m;
        while((m = broker2.consume("test", "groupA")) != null){
            System.out.println("Consumed: " + m.getKey());
        }

        // consumer B can also read the same messages
        Message m2;
        while((m2 = broker2.consume("test", "groupB")) != null){
            System.out.println("Consumed: " + m2.getKey());
        }
    }
}

Output:

Consumed: key0
Consumed: key1
Consumed: key2
Consumed: key0
Consumed: key1
Consumed: key2

Improvements over In Memory Broker:

  1. Durability: Messages are stored on disk and can be recovered after a restart.
  2. Replay: Messages can be replayed from the beginning.
  3. Multiple consumers can consume the same message.

The above system still has limitations:

  1. No partitioning yet, so write throughput is limited by single log per topic.
  2. Offsets are stored in memory, so if the broker restarts, the offsets will be lost.
  3. No replication yet, so if the disk crashes, the data will be lost.
  4. No sharding yet, so the log size is limited by the disk size of the broker.

This leads naturally to a distributed, partitioned design with consumer groups and replication, similar to Kafka’s architecture.

3. Partitioned, Distributed Broker Cluster
#

The next step is to distribute data and load across multiple brokers using topics, partitions, and consumer groups, in line with Kafka’s artchitecture.

  1. A cluster consists of multiple brokers.
  2. Each topic is split into multiple partitions, each a separate log.
  3. For a given topic, partitions are spread across brokers to provide horizontal scalability.
  4. A consumer group shares the partitions of a topic such that each partition is consumed by atmost one consumer in that group.

Partitioning Strategy:

import java.util.Random;

public class Partitioner {
    private final Random random = new Random();

    public int choosePartition(String key, int numPartitions){
        if(key == null){
            return random.nextInt(numPartitions);
        }
        return Math.abs(key.hashCode()) % numPartitions;
    }
}

This strategy is analogous to Kafka’s use of hashing the key to select a partition, preserving key-based ordering within a partition.

Now let’s come up with a group-level partition assignment -

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class ConsumerGroupAssignment {

    public static class TopicPartition{
        public final String topic;
        public final int partition;
        public TopicPartition(String topic, int partition){
            this.topic = topic;
            this.partition = partition;
        }
    }

    // groupId -> memberId -> partitions
    private final Map<String, Map<String, List<TopicPartition>>> assignments = new ConcurrentHashMap<>();

    public void assign(String groupId, List<String> members, String topic, int numPartitions){
        Map<String, List<TopicPartition>> group = new ConcurrentHashMap<>();
        int numMembers = members.size();
        for(int i=0;i<numPartitions;i++){
            String memberId = members.get(i % numMembers);
            group.computeIfAbsent(memberId, k -> new ArrayList<>()).add(new TopicPartition(topic, i));
        }
        assignments.put(groupId, group);
    }

    public List<TopicPartition> ownedPartitions(String groupId, String memberId){
        return assignments.getOrDefault(groupId, Collections.emptyMap()).getOrDefault(memberId, Collections.emptyList());
    }
}

Kafka’s real implementation uses a group coordinator and more sophisticated assigners like round-robin, sticky, and cooperative partitioning strategies, however the above implementation gives a good idea of how partition assignment works in a distributed system.

Now let’s also add replication to the above model.

Replication with Leaders and Followers
#

Leader-follower Model

  1. Each partition has one leader and multiple followers.
  2. Producers and consumers interact only with the leader for that partition.
  3. Followers asynchronously replicate the leader’s log and can be promoted to leader upon failure.

Leader Partition:

public interface ReplicaClient {
    void replicate(String topic, int partitionId, long offset, Message msg) throws Exception;
}

public class ReplicatedPartitionLeader{
    private final String topic;
    private final int partitionId;
    private final LogPartition localLog;
    private final List<ReplicaClient> followers;

    public ReplicatedPartitionLeader(String topic, int partitionId, LogPartition localLog, List<ReplicaClient> followers){
        this.topic = topic;
        this.partitionId = partitionId;
        this.localLog = localLog;
        this.followers = followers;
    }

    public LogPartition getLocalLog(){
        return localLog;
    }

    public synchronized long append(Message msg) throws Exception {
        long offset = localLog.append(msg);
        for(ReplicaClient follower : followers){
            follower.replicate(topic, partitionId, offset, msg);
        }
        return offset;
    }
}

Broker node hosting leaders and followers:


import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class BrokerNode {
    private final int brokerId;
    private final Map<String, Map<Integer, ReplicatedPartitionLeader>> leaders = new ConcurrentHashMap<>();
    private final Map<String, Map<Integer, LogPartition>> followers = new ConcurrentHashMap<>();

    public BrokerNode(int brokerId){
        this.brokerId = brokerId;
    }

    public void addLeaderPartition(String topic, int partitionId, LogPartition log, List<ReplicaClient> followers){
        leaders.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()).put(partitionId, new ReplicatedPartitionLeader(topic, partitionId, log, followers));
    }

    public void addFollowerPartition(String topic, int partitionId, LogPartition log){
        followers.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()).put(partitionId, log);
    }

    public long handleProduce(String topic, int partitionId, Message msg) throws Exception {
        ReplicatedPartitionLeader leader = leaders.getOrDefault(topic, Collections.emptyMap()).get(partitionId);
        if(leader == null){
            throw new Exception("Leader not found for topic " + topic + " and partition " + partitionId);
        }
        return leader.append(msg);
    }

    public Message handleConsume(String topic, int partitionId, long offset) throws Exception {
        ReplicatedPartitionLeader leader = leaders.getOrDefault(topic, Collections.emptyMap()).get(partitionId);
        if(leader == null){
            throw new Exception("Leader not found for topic " + topic + " and partition " + partitionId);
        }
        return leader.getLocalLog().read(offset);
    }

    public void handleReplicate(String topic, int partitionId, long offset, Message msg) throws Exception {
        LogPartition log = followers.getOrDefault(topic, Collections.emptyMap()).get(partitionId);
        if(log == null){
            throw new Exception("Follower not found for topic " + topic + " and partition " + partitionId);
        }
        log.append(msg);
    }   
}

Now let’s verify the above code

public class DistributedBrokerTest {
	
	@Test
    public void testDistributedBroker() throws Exception {
        BrokerNode b1 = new BrokerNode(1);
        BrokerNode b2 = new BrokerNode(2);
        BrokerNode b3 = new BrokerNode(3);

        Path base = Paths.get("/tmp/broker-v3");
        FileBackedLogPartition p0b1 = new FileBackedLogPartition(base.resolve("orders-0-b1.log"));
        FileBackedLogPartition p0b2 = new FileBackedLogPartition(base.resolve("orders-0-b2.log"));
        FileBackedLogPartition p1b2 = new FileBackedLogPartition(base.resolve("orders-1-b2.log"));
        FileBackedLogPartition p1b3 = new FileBackedLogPartition(base.resolve("orders-1-b3.log"));

        ReplicaClient b2Follower = (t, p, off, msg) -> b2.handleReplicate(t, p, off, msg);
        ReplicaClient b3Follower = (t, p, off, msg) -> b3.handleReplicate(t, p, off, msg);

        b1.addLeaderPartition("orders", 0, p0b1, List.of(b2Follower));
        b2.addFollowerPartition("orders", 0, p0b2);

        b2.addLeaderPartition("orders", 1, p1b2, List.of(b3Follower));
        b3.addFollowerPartition("orders", 1, p1b3);

        Partitioner partitioner = new Partitioner();

        for (int i = 0; i < 4; i++) {
            String key = "order-" + i;
            int partition = partitioner.choosePartition(key, 2);
            BrokerNode leader = (partition == 0) ? b1 : b2;
            long offset = leader.handleProduce("orders", partition,
                new Message(key, ("payload-" + i).getBytes()));
            System.out.printf("Produced %s -> partition %d, offset %d%n",
                key, partition, offset);
        }

        // Consume from partition 0 leader (b1)
        long offset0 = 0L;
        while (true) {
            ReadResult rr = b1.handleConsume("orders", 0, offset0);
            if (rr == null) {
                break; // no more data
            }
            Message m = rr.getMessage();
            System.out.println("Consumed from p0: " + m.getKey());
            offset0 = rr.getNextOffset();  // advance to just after this record
        }

        // Consume from partition 1 leader (b2)
        long offset1 = 0L;
        while (true) {
            ReadResult rr = b2.handleConsume("orders", 1, offset1);
            if (rr == null) {
                break;
            }
            Message m = rr.getMessage();
            System.out.println("Consumed from p1: " + m.getKey());
            offset1 = rr.getNextOffset();
        }
    }

Output:

Produced order-0 -> partition 1, offset 0
Produced order-1 -> partition 0, offset 0
Produced order-2 -> partition 1, offset 24
Produced order-3 -> partition 0, offset 24
Consumed from p0: order-1
Consumed from p0: order-3
Consumed from p1: order-0
Consumed from p1: order-2

Scalability and Fault Tolerance
#

With partitioning and replication, we can achieve horizontal scalability and fault tolerance.

  1. Adding partitions increases consumer parallelism within groups.
  2. Adding brokers increases storage and throughput capacity.
  3. Replication ensures data durability and availability.

Conclusion
#

We went from a single node in memory message broker to a distributed, partitioned, replicated message broker cluster. This is a simplified version of Kafka, but it captures the core concepts of partitioning, replication, and consumer groups.

Related

Rate Limiters

·1921 words·10 mins
Life Without a Rate Limiter # Imagine a public web API that allows clients to fetch user data without any rate limiting. Under normal conditions this might work, but during traffic spikes or abuse (e.g., bots or scrapers) the backend can be overwhelmed, leading to resource exhaustion, cascading failures, and poor availability for legitimate users. Without any form of control, a single noisy neighbor can starve others, increase infrastructure costs, and make it difficult to meet SLAs.