Projects
Jan 7, 2026

Group Chats

How group chats with channels work in the system

Group chats are conversations between multiple users, organized into channels for categorized discussions. This article explains how group chats work in the system, focusing on the channel-based routing mechanism that enables scalable multi-user messaging.

Database Schema for Group Chats

Schema diagram for group chats

Channel-Based Routing Architecture

Unlike personal chats where messages route directly to user IDs, group chats use channel-based routing. When a user sends a message to a group channel, the message is published to the channel ID in RabbitMQ, not to individual user IDs. All servers that have a binding for that channel receive the message and emit it to their Socket.IO rooms.

How Channel Bindings Work

When a user connects to a socket server, the server discovers which groups and channels the user belongs to. For each channel the user has access to, the server creates a binding from the channel ID to its server queue in the OUTGOING_EXCHANGE.

Channel Binding Setup

We do not need to maintain any user-to-server mapping table because the RabbitMQ direct exchange automatically routes channel messages to all servers that have a binding for that channel. Only servers with connected users in the channel receive the message.

Subscription Flow

When a user connects, the socket server needs to discover their groups and channels. The socket server does not have direct access to the database. So this is handled through a subscription mechanism.

When a user first connects to a socket server:

  1. The server extracts the user ID from the connection handshake (via JWT token)
  2. The server publishes a subscription message to RabbitMQ
  3. A worker fetches all groups and channels the user belongs to and returns to the socket server through RabbitMQ
  4. The server binds each channel ID to its server queue (using channel:{channelId} as routing key)
  5. The server joins the user in Socket.IO rooms for each channel
  6. The server increments the reference count for channels and groups

The server maintains a reference count for each channel to track how many users on that server are subscribed to it. When a user disconnects, their channels are decremented, and if a channel reaches zero subscribers, its binding is automatically removed.

await rabbitmqService.consumeFromSubscriptionQueue(async (message, ack) => {
  try {
    const { userId, groupIds, channelIds } = message

    const socketId = chatsStore.getClient(userId)
    if (socketId) {
      const socket = io.sockets.sockets.get(socketId)
      if (socket) {
        // Bind each channel to this server's queue
        const bindChannelPromises = []
        for (const channelId of channelIds) {
          bindChannelPromises.push(rabbitmqService.bindChannelToQueue(channelId))
        }
        await Promise.all(bindChannelPromises)

        // Join Socket.IO rooms for each channel
        socket.join(channelIds.map(id => id.toString()))

        // Store channels and groups in chats store with reference counting
        chatsStore.addUserChannels(userId, channelIds)
        chatsStore.addUserGroups(userId, groupIds)

        // ...
      }
    }

    ack()
  } catch (error) {
    // ...
  }
})

Group Message Flow

When a user sends a message to a group channel, here's the complete flow:

Group Message Flow

Implementation and Breakdown

The user sends a message to a channel. The client creates a temporary message in memory immediately for optimistic UI updates, then calls the HTTP API endpoint.

const tempMessage = {
  hash: generateTempHash(),
  channelId: 2,
  content: 'Hello everyone!',
  status: MessageStatus.SENDING,
}
addGroupMessage(channelId, tempMessage)

await sendGroupMessage(groupId, channelId, 'Hello everyone!', hash)

The Backend API receives the POST request, validates the input, and immediately publishes it to RabbitMQ's INCOMING_EXCHANGE.

func (s *GroupService) SendGroupMessage(ctx context.Context, userID int64, dto *SendGroupMessageDTO) error {
  // Verify user has access to this group
  if err := s.verifyGroupAccess(ctx, dto.GroupID, userID); err != nil {
    return err
  }

  // Publish to incoming exchange for worker to process
  message := RabbitMQMessage{
    Type: "MESSAGE_SEND",
    Payload: map[string]any{
      "senderId":   userID,
      "groupId":    dto.GroupID,
      "channelId":  dto.ChannelID,
      "content":    dto.Content,
      "hash":       dto.Hash,
    },
  }

  return s.rabbitmqService.PublishToIncoming("group.message", message)
}

The chat worker consumes from a durable queue and handles the database operations. It:

  • Creates the message record
  • Creates message_recipients entries for all group members (except sender)
  • Publishes the result back to RabbitMQ
async processGroupMessage(payload: SocketEventPayloads.Group.EmitMessage) {
  const { message } = await AppDataSource.manager.transaction(async (txn) => {
    // Get all members of the group except the sender
    const userGroups = await txn.find(UserGroup, {
      where: { group: { id: payload.groupId }, user: { id: Not(payload.senderId) } },
      relations: ['user']
    });

    // Save the main message record
    const message = await txn.save(Message, {
      sender: { id: payload.senderId },
      channel: { id: payload.channelId },
      content: payload.content,
    });

    // Bulk create recipient records for tracking message status
    const recipients = userGroups.map(ug => ({ message, receiver: ug.user, status: 'SENT' }));
    await txn.insert(MessageRecipient, recipients);

    return { message };
  });

  const messageData = {
    messageId: message.id,
    content: payload.content,
    senderId: payload.senderId,
    channelId: payload.channelId,
    hash: payload.hash,
  };

  // Publish confirmation only to the sender
  await this.rabbitmq.publish(payload.senderId.toString(), {
    event: 'STATUS_SENT',
    ...messageData
  });

  // Publish to the channel. RabbitMQ pushes the event to all queues which
  // have binding for this channelId
  await this.rabbitmq.publish(`channel:${payload.channelId}`, {
    event: 'MESSAGE_RECEIVE',
    ...messageData
  });
}

By pushing group messages to channels instead of individual users, we only publish once per server binding, and not for N receivers, which would have duplicated the event N times.

RabbitMQ Routes Based on Channel Bindings

RabbitMQ's OUTGOING_EXCHANGE uses Direct Routing with channel IDs as routing keys. When the worker publishes a message with routing key channel:channelId:

  • It's delivered to all server queues that have a binding for channel:channelId
  • Servers where no user is connected to that channel don't receive it
  • The message is automatically dropped if no bindings exist

No channel bindings will exist if none of the users from that channel are online.

RabbitMQ Channel Routing

Socket Servers Consume and Emit

Each socket server consumes messages from its server queue and emits them to the appropriate Socket.IO rooms.

await rabbitmqService.consumeFromServerQueue((msg, ack) => {
  try {
    const { event, userId, channelId, data } = msg;

    if (userId) {
      // Direct delivery to users (for personal chats)
      const socketId = chatsStore.getClient(userId);
      if (socketId) io.to(socketId).emit(event, data);
    }
    else if (channelId) {
      // Broadcast to all local sockets in the room (for group channels)
      io.to(channelId.toString()).emit(event, data);
    }

    ack()
  } catch (error) {
    console.error('Error processing message from RabbitMQ server queue:', error)
    ack()
  }
});

Clients Receive and Update UI

The clients receive the message through their Socket.IO connection and update the UI:

  • Sender receives a SENT confirmation and replaces the temporary message with the actual persisted message
  • Receivers receive the message and display it in the channel
socket.on('GROUP_MESSAGE_SENT', (data) => {
  // Replace temp message with actual message
  messages.value = messages.value.map(msg =>
    msg.hash === data.hash ? { ...msg, id: data.messageId, status: MessageStatus.SENT } : msg
  )
})

socket.on('GROUP_MESSAGE_RECEIVE', (data) => {
  messages.value.push({
    id: data.messageId,
    senderId: data.senderId,
    content: data.content,
    createdAt: data.createdAt,
    status: MessageStatus.SENT,
  })
})