How group chats with channels work in the system
Group chats are conversations between multiple users, organized into channels for categorized discussions. This article explains how group chats work in the system, focusing on the channel-based routing mechanism that enables scalable multi-user messaging.
Schema diagram for group chats
Unlike personal chats where messages route directly to user IDs, group chats use channel-based routing. When a user sends a message to a group channel, the message is published to the channel ID in RabbitMQ, not to individual user IDs. All servers that have a binding for that channel receive the message and emit it to their Socket.IO rooms.
When a user connects to a socket server, the server discovers which groups and channels the user belongs to. For each channel the user has access to, the server creates a binding from the channel ID to its server queue in the OUTGOING_EXCHANGE.
Channel Binding Setup
We do not need to maintain any user-to-server mapping table because the RabbitMQ direct exchange automatically routes channel messages to all servers that have a binding for that channel. Only servers with connected users in the channel receive the message.
When a user connects, the socket server needs to discover their groups and channels. The socket server does not have direct access to the database. So this is handled through a subscription mechanism.
When a user first connects to a socket server:
channel:{channelId} as routing key)The server maintains a reference count for each channel to track how many users on that server are subscribed to it. When a user disconnects, their channels are decremented, and if a channel reaches zero subscribers, its binding is automatically removed.
await rabbitmqService.consumeFromSubscriptionQueue(async (message, ack) => {
try {
const { userId, groupIds, channelIds } = message
const socketId = chatsStore.getClient(userId)
if (socketId) {
const socket = io.sockets.sockets.get(socketId)
if (socket) {
// Bind each channel to this server's queue
const bindChannelPromises = []
for (const channelId of channelIds) {
bindChannelPromises.push(rabbitmqService.bindChannelToQueue(channelId))
}
await Promise.all(bindChannelPromises)
// Join Socket.IO rooms for each channel
socket.join(channelIds.map(id => id.toString()))
// Store channels and groups in chats store with reference counting
chatsStore.addUserChannels(userId, channelIds)
chatsStore.addUserGroups(userId, groupIds)
// ...
}
}
ack()
} catch (error) {
// ...
}
})
When a user sends a message to a group channel, here's the complete flow:
Group Message Flow
The user sends a message to a channel. The client creates a temporary message in memory immediately for optimistic UI updates, then calls the HTTP API endpoint.
const tempMessage = {
hash: generateTempHash(),
channelId: 2,
content: 'Hello everyone!',
status: MessageStatus.SENDING,
}
addGroupMessage(channelId, tempMessage)
await sendGroupMessage(groupId, channelId, 'Hello everyone!', hash)
The Backend API receives the POST request, validates the input, and immediately publishes it to RabbitMQ's INCOMING_EXCHANGE.
func (s *GroupService) SendGroupMessage(ctx context.Context, userID int64, dto *SendGroupMessageDTO) error {
// Verify user has access to this group
if err := s.verifyGroupAccess(ctx, dto.GroupID, userID); err != nil {
return err
}
// Publish to incoming exchange for worker to process
message := RabbitMQMessage{
Type: "MESSAGE_SEND",
Payload: map[string]any{
"senderId": userID,
"groupId": dto.GroupID,
"channelId": dto.ChannelID,
"content": dto.Content,
"hash": dto.Hash,
},
}
return s.rabbitmqService.PublishToIncoming("group.message", message)
}
The chat worker consumes from a durable queue and handles the database operations. It:
func (mp *MessageProcessor) ProcessGroupMessage(payload *broker.GroupMessagePayload) error {
return mp.db.Transaction(func(tx *gorm.DB) error {
// Fetch sender and channel
var sender domain.UserProfile
if err := tx.First(&sender, payload.SenderId).Error; err != nil {
return fmt.Errorf("sender not found: %w", err)
}
var channel domain.Channel
if err := tx.First(&channel, payload.ChannelId).Error; err != nil {
return fmt.Errorf("channel not found: %w", err)
}
// Get all members of the group except the sender
var userGroups []domain.UserGroup
if err := tx.Preload("User").Where("group_id = ? AND user_id != ?", payload.GroupId, payload.SenderId).
Find(&userGroups).Error; err != nil {
return fmt.Errorf("failed to fetch group members: %w", err)
}
// Save the main message record
message := &domain.Message{
SenderID: payload.SenderId,
Sender: &sender,
ChannelID: &payload.ChannelId,
Channel: &channel,
Content: payload.Content,
}
if err := tx.Create(message).Error; err != nil {
return fmt.Errorf("failed to create message: %w", err)
}
// Create message recipients for all group members
for _, ug := range userGroups {
recipient := &domain.MessageRecipient{
MessageID: message.ID,
Message: message,
ReceiverID: ug.UserID,
Receiver: ug.User,
Status: domain.MessageStatusSent,
}
if err := tx.Create(recipient).Error; err != nil {
log.Printf("failed to create recipient for user %d: %v", ug.UserID, err)
}
}
// Publish to the channel. RabbitMQ pushes the event to all queues which have binding for this channelId
routingKey := "channel:" + strconv.FormatInt(payload.ChannelId, 10)
if err := mp.broker.PublishToOutgoing(routingKey, map[string]any{
"event": "group:receive-message",
"channelId": payload.ChannelId,
"data": map[string]any{
"channelId": payload.ChannelId,
"groupId": payload.GroupId,
"messageId": message.ID,
"content": payload.Content,
"senderId": payload.SenderId,
"createdAt": message.CreatedAt,
"status": domain.MessageStatusSent,
},
}); err != nil {
log.Printf("failed to publish message to channel %d: %v", payload.ChannelId, err)
}
return nil
})
}
By pushing group messages to channels instead of individual users, we only publish once per server binding, and not for N receivers, which would have duplicated the event N times.
RabbitMQ's OUTGOING_EXCHANGE uses Direct Routing with channel IDs as routing keys. When the worker publishes a message with routing key channel:channelId:
channel:channelIdNo channel bindings will exist if none of the users from that channel are online.
RabbitMQ Channel Routing
Each socket server consumes messages from its server queue and emits them to the appropriate Socket.IO rooms.
await rabbitmqService.consumeFromServerQueue((msg, ack) => {
try {
const { event, userId, channelId, data } = msg;
if (userId) {
// Direct delivery to users (for personal chats)
const socketId = chatsStore.getClient(userId);
if (socketId) io.to(socketId).emit(event, data);
}
else if (channelId) {
// Broadcast to all local sockets in the room (for group channels)
io.to(channelId.toString()).emit(event, data);
}
ack()
} catch (error) {
console.error('Error processing message from RabbitMQ server queue:', error)
ack()
}
});
The clients receive the message through their Socket.IO connection and update the UI:
socket.on('GROUP_MESSAGE_SENT', (data) => {
// Replace temp message with actual message
messages.value = messages.value.map(msg =>
msg.hash === data.hash ? { ...msg, id: data.messageId, status: MessageStatus.SENT } : msg
)
})
socket.on('GROUP_MESSAGE_RECEIVE', (data) => {
messages.value.push({
id: data.messageId,
senderId: data.senderId,
content: data.content,
createdAt: data.createdAt,
status: MessageStatus.SENT,
})
})