Projects
Dec 28, 2025

Scalable WebSockets with RabbitMQ

Distributed event broadcasting across multiple Socket.IO servers

This article explains how the messaging system achieves horizontal scalability for real-time features using RabbitMQ as a message queue for distributed event routing.

Single-Server Limitation

Without a distributed approach, Socket.IO servers operate in isolation. The users connected through Socket Server 1 are disconnected from the users connected through Socket Server 2.

Solution using RabbitMQ Message Broker

RabbitMQ acts as a central message hub that receives events from all servers and broadcasts them back:

RabbitMQ Distributed Routing

While we are utilizing RabbitMQ for this implementation, it is worth noting that Redis Pub/Sub is another very common solution for this specific problem.

Separation of Incoming and Outgoing Exchanges

To scale horizontally, we separate the "reception" of a message from the "delivery" of a message. This is handled by two distinct RabbitMQ Direct Exchanges.

const channel = await rabbitmq.connection.createChannel()

await this.channel.assertExchange(INCOMING_EXCHANGE, 'direct', {
  durable: true,
})
await this.channel.assertExchange(OUTGOING_EXCHANGE, 'direct', {
  durable: true,
})

The Incoming Exchange

The Incoming Exchange acts as the front door. Every Socket Server instance in the cluster publishes raw events here. When a user sends a message through a socket, the server doesn't try to find the recipient. It simply wraps the data and pushes it into this exchange.

This decouples the WebSocket logic from the business logic, allowing background workers to handle the heavy lifting, like persisting the message into a database, without slowing down the user's connection.

The Outgoing Exchange

The Outgoing Exchange is responsible for routing processed messages back to the correct server. Instead of broadcasting every message to every server, this exchange uses Direct Routing with user-based bindings. Each server queue is bound to the exchange using userId as the routing key, allowing messages to be routed directly to the server where the user is connected without requiring a separate lookup mechanism.

Server-Specific Queues and User Bindings

To make this routing work, every Socket Server instance creates its own dedicated queue upon startup. When a server process starts, it generates a unique serverId and declares a Server Queue (e.g., queue_server_01).

await channel.assertQueue(serverId, { exclusive: true })

By declaring it as exclusive: true, we ensure that the queue is deleted when its declaring connection is closed.

The user-to-server mapping happens during the Binding phase. When a user connects to a socket server, the server creates a binding between the user's ID and its queue. This binding uses the userId as the Routing Key to the Outgoing Exchange.

// When user connects
const routingKey = userId.toString()
await channel.bindQueue(serverId, OUTGOING_EXCHANGE, routingKey)

// When user disconnects
const routingKey = userId.toString()
await channel.unbindQueue(serverId, OUTGOING_EXCHANGE, routingKey)

Instead of maintaining a separate cache to track which server a user is connected to, RabbitMQ bindings serve as the source of truth. When the worker needs to send a message to "User B", it publishes to the Outgoing Exchange with the routing key userB. RabbitMQ automatically routes the message to the server queue that has a binding for userB. If the user is not connected (no binding exists), the message is simply dropped, eliminating the need for external state management.

If we were using Redis, we would typically have to maintain an external Mapping Table (e.g., a Redis Hash) to track which userId is connected to which serverId.

With Redis, the flow would require an extra step:

  1. Query Redis to find the serverId for userId.
  2. Publish the message to a specific Redis channel for that server.

In contrast, RabbitMQ Bindings eliminate this extra management layer.

Lifecycle of a message

Socket Server as the Producer

When a user sends a message, the receiving Socket Server acts as a Producer, offloading the event to RabbitMQ rather than attempting to handle cross-server delivery or database writes itself.

The Socket Server receives the message and publishes it to the INCOMING_EXCHANGE. By using { persistent: true }, we ensure the message survives even if the broker restarts.

// Socket Server 1 - User A sends message
socket.on('sendMessage', async payload => {
  const message = {
    content: payload.content,
    sender_id: userId,
    channel_id: channelId,
    created_at: new Date(),
  }

  const channel = await rabbitmq.connection.createChannel()
  await channel.publish(
    INCOMING_EXCHANGE,
    'message_routing_key',
    Buffer.from(JSON.stringify({ type: 'message:created', payload: message })),
    { persistent: true }
  )
})

Async Worker Processing

A dedicated Chat Worker consumes from a separate, durable queue. This decouples the database heavy-lifting from the real-time WebSocket traffic, preventing slow DB queries from lagging the chat UI.

After persisting the message, the worker publishes to the OUTGOING_EXCHANGE using the recipient's userId as the routing key. RabbitMQ automatically routes the message to the correct server queue based on the user bindings.

// Chat Worker - Database Persistence
await channel.assertQueue('chat-worker-queue', { durable: true })
await channel.bindQueue(
  'chat-worker-queue',
  INCOMING_EXCHANGE,
  'personal.message'
)
await channel.bindQueue('chat-worker-queue', INCOMING_EXCHANGE, 'group.message')

await channel.consume('chat-worker-queue', async msg => {
  try {
    const { type, payload } = JSON.parse(msg.content.toString())

    if (type === 'MESSAGE_SEND') {
      // Persist to database
      const message = await messageRepository.save(payload)

      // Publish to outgoing exchange using userId as routing key
      await channel.publish(
        OUTGOING_EXCHANGE,
        payload.receiverId.toString(), // userId routing key
        Buffer.from(
          JSON.stringify({
            event: 'MESSAGE_RECEIVE',
            userId: payload.receiverId,
            data: message,
          })
        ),
        { persistent: true }
      )
    }

    channel.ack(msg)
  } catch (error) {
    channel.nack(msg, false, false) // Send to Dead Letter Office if it fails
    logger.error('Persistence failed:', error)
  }
})

Socket Server as the Consumer

Whenever the background workers push to the socket server's dedicated queue in the OUTGOING_EXCHANGE, the server consumes and sends it back to the user. The socket sevrers maintain a local user-to-socket mapping using which they can emit the events back to the users.

// Consume messages from server queue
await this.channel.consume(serverId, async msg => {
  const { event, userId, data } = JSON.parse(msg.content.toString())

  // Find local socket and emit
  const socketId = this.chatsStore.getClient(userId)
  if (socketId) {
    io.to(socketId).emit(event, data)
  }

  this.channel.ack(msg)
})

Message Flow Diagram

Complete Message Journey

Channel-Based Routing for Group Messages

While personal chats use user ID as the routing key, group messages use channel IDs instead. This extends the same bindings concept to support efficient multi-user messaging:

  • Routing Key Format: channel:{channelId} (to distinguish from user ID bindings)
  • When a user connects, the server binds each of their channel IDs to the server queue
  • When a message is sent to a channel, it's published with the channel:channelId routing key
  • All servers with users in that channel receive the message
  • Servers emit to Socket.IO rooms using the channel ID
  • Reference Counting: Each channel maintains a count of users subscribed on that server
    • When count reaches 0, the binding is automatically unbound
    • This optimizes RabbitMQ load by not maintaining unnecessary bindings

This approach eliminates the need to publish N times (for N group members) and instead publishes once per channel, letting RabbitMQ's routing handle distribution to all relevant servers.

// Example: Publishing group message to channel
await rabbitmq.publishToOutgoing(`channel:${channelId}`, {
  event: 'GROUP_MESSAGE_RECEIVE',
  channelId,
  data: { /* message data */ }
})

// All servers with this binding receive and emit to their Socket.IO rooms
io.to(channelId.toString()).emit('GROUP_MESSAGE_RECEIVE', data)

For detailed information about group chat implementation, see Group Chats.