Distributed event broadcasting across multiple Socket.IO servers
This article explains how the messaging system achieves horizontal scalability for real-time features using RabbitMQ as a message queue for distributed event routing.
Without a distributed approach, Socket.IO servers operate in isolation. The users connected through Socket Server 1 are disconnected from the users connected through Socket Server 2.
RabbitMQ acts as a central message hub that receives events from all servers and broadcasts them back:
RabbitMQ Distributed Routing
While we are utilizing RabbitMQ for this implementation, it is worth noting that Redis Pub/Sub is another very common solution for this specific problem.
To scale horizontally, we separate the "reception" of a message from the "delivery" of a message. This is handled by two distinct RabbitMQ Direct Exchanges.
const channel = await rabbitmq.connection.createChannel()
await this.channel.assertExchange(INCOMING_EXCHANGE, 'direct', {
durable: true,
})
await this.channel.assertExchange(OUTGOING_EXCHANGE, 'direct', {
durable: true,
})
The Incoming Exchange acts as the front door. Every Socket Server instance in the cluster publishes raw events here. When a user sends a message through a socket, the server doesn't try to find the recipient. It simply wraps the data and pushes it into this exchange.
This decouples the WebSocket logic from the business logic, allowing background workers to handle the heavy lifting, like persisting the message into a database, without slowing down the user's connection.
The Outgoing Exchange is responsible for routing processed messages back to the correct server. Instead of broadcasting every message to every server, this exchange uses Direct Routing with user-based bindings. Each server queue is bound to the exchange using userId as the routing key, allowing messages to be routed directly to the server where the user is connected without requiring a separate lookup mechanism.
To make this routing work, every Socket Server instance creates its own dedicated queue upon startup. When a server process starts, it generates a unique serverId and declares a Server Queue (e.g., queue_server_01).
await channel.assertQueue(serverId, { exclusive: true })
By declaring it as exclusive: true, we ensure that the queue is deleted when its declaring connection is closed.
The user-to-server mapping happens during the Binding phase. When a user connects to a socket server, the server creates a binding between the user's ID and its queue. This binding uses the userId as the Routing Key to the Outgoing Exchange.
// When user connects
const routingKey = userId.toString()
await channel.bindQueue(serverId, OUTGOING_EXCHANGE, routingKey)
// When user disconnects
const routingKey = userId.toString()
await channel.unbindQueue(serverId, OUTGOING_EXCHANGE, routingKey)
Instead of maintaining a separate cache to track which server a user is connected to, RabbitMQ bindings serve as the source of truth. When the worker needs to send a message to "User B", it publishes to the Outgoing Exchange with the routing key userB. RabbitMQ automatically routes the message to the server queue that has a binding for userB. If the user is not connected (no binding exists), the message is simply dropped, eliminating the need for external state management.
If we were using Redis, we would typically have to maintain an external Mapping Table (e.g., a Redis Hash) to track which userId is connected to which serverId.
With Redis, the flow would require an extra step:
- Query Redis to find the serverId for userId.
- Publish the message to a specific Redis channel for that server.
In contrast, RabbitMQ Bindings eliminate this extra management layer.
When a user sends a message, the receiving Socket Server acts as a Producer, offloading the event to RabbitMQ rather than attempting to handle cross-server delivery or database writes itself.
The Socket Server receives the message and publishes it to the INCOMING_EXCHANGE. By using { persistent: true }, we ensure the message survives even if the broker restarts.
// Socket Server 1 - User A sends message
socket.on('sendMessage', async payload => {
const message = {
content: payload.content,
sender_id: userId,
channel_id: channelId,
created_at: new Date(),
}
const channel = await rabbitmq.connection.createChannel()
await channel.publish(
INCOMING_EXCHANGE,
'message_routing_key',
Buffer.from(JSON.stringify({ type: 'message:created', payload: message })),
{ persistent: true }
)
})
A dedicated Chat Worker consumes from a separate, durable queue. This decouples the database heavy-lifting from the real-time WebSocket traffic, preventing slow DB queries from lagging the chat UI.
After persisting the message, the worker publishes to the OUTGOING_EXCHANGE using the recipient's userId as the routing key. RabbitMQ automatically routes the message to the correct server queue based on the user bindings.
// Chat Worker - Database Persistence
await channel.assertQueue('chat-worker-queue', { durable: true })
await channel.bindQueue(
'chat-worker-queue',
INCOMING_EXCHANGE,
'personal.message'
)
await channel.bindQueue('chat-worker-queue', INCOMING_EXCHANGE, 'group.message')
await channel.consume('chat-worker-queue', async msg => {
try {
const { type, payload } = JSON.parse(msg.content.toString())
if (type === 'MESSAGE_SEND') {
// Persist to database
const message = await messageRepository.save(payload)
// Publish to outgoing exchange using userId as routing key
await channel.publish(
OUTGOING_EXCHANGE,
payload.receiverId.toString(), // userId routing key
Buffer.from(
JSON.stringify({
event: 'MESSAGE_RECEIVE',
userId: payload.receiverId,
data: message,
})
),
{ persistent: true }
)
}
channel.ack(msg)
} catch (error) {
channel.nack(msg, false, false) // Send to Dead Letter Office if it fails
logger.error('Persistence failed:', error)
}
})
Whenever the background workers push to the socket server's dedicated queue in the OUTGOING_EXCHANGE, the server consumes and sends it back to the user. The socket sevrers maintain a local user-to-socket mapping using which they can emit the events back to the users.
// Consume messages from server queue
await this.channel.consume(serverId, async msg => {
const { event, userId, data } = JSON.parse(msg.content.toString())
// Find local socket and emit
const socketId = this.chatsStore.getClient(userId)
if (socketId) {
io.to(socketId).emit(event, data)
}
this.channel.ack(msg)
})
Complete Message Journey
While personal chats use user ID as the routing key, group messages use channel IDs instead. This extends the same bindings concept to support efficient multi-user messaging:
channel:{channelId} (to distinguish from user ID bindings)channel:channelId routing keyThis approach eliminates the need to publish N times (for N group members) and instead publishes once per channel, letting RabbitMQ's routing handle distribution to all relevant servers.
// Example: Publishing group message to channel
await rabbitmq.publishToOutgoing(`channel:${channelId}`, {
event: 'GROUP_MESSAGE_RECEIVE',
channelId,
data: { /* message data */ }
})
// All servers with this binding receive and emit to their Socket.IO rooms
io.to(channelId.toString()).emit('GROUP_MESSAGE_RECEIVE', data)
For detailed information about group chat implementation, see Group Chats.