Projects
Jan 2, 2026

Personal Chats

How personal (one-to-one) chats work in the system

Personal chats are direct conversations between two users. This article explains how personal (one-to-one) chats work in the system.

Database Schema for Personal Chats

Schema diagram for personal chats

Personal Chat Flow

Personal chat message flow

The lifecycle of a message beyond the initial delivery (Delivered and Read statuses) involves a separate event-driven flow, which is explained in detail in the Read Receipts section.

Implementation

The API server provides HTTP endpoints for message sending, and immediately offloads the task to RabbitMQ, so that it remains unblocked.

We are using the incoming and outgoing exchanges of RabbitMQ described in Scalable WebSockets with RabbitMQ.

const sendMessage = async (receiverId: number, content: string, hash: string) => {
  await $api('/api/messages/send/personal', {
    method: 'POST',
    body: {
      receiverId,
      content,
      hash,
    },
  })
}

The backend API publishes the message to RabbitMQ.

func (s *MessageService) SendPersonalMessage(ctx context.Context, userID int64, dto *SendPersonalMessageDTO) error {
  // Publish to RabbitMQ for worker to process
  message := RabbitMQMessage{
    Type: "MESSAGE_SEND",
    Payload: map[string]any{
      "senderId":   userID,
      "receiverId": dto.ReceiverID,
      "content":    dto.Content,
      "hash":       dto.Hash,
    },
  }

  return s.rabbitmqService.PublishToIncoming("personal.message", message)
}

The chat worker handles the "heavy" database operations and notifies the system when the message is persisted.

consumer.on('personal.incoming.message', async event => {
  const { hash, content, senderId, receiverId } = event.payload

  const message = await messageRepository.save({
    content,
    sender: { id: senderId },
    channel: null,
  })

  await messageRecipientRepository.save({
    message: { id: message.id },
    receiver: { id: receiverId },
    status: MessageStatus.SENT,
  })

  // Publish event for socket server to deliver to sender
  await rabbitmq.publishToOutgoing(senderId.toString(), {
    event: 'PERSONAL_MESSAGE_SENT',
    data: {
      hash,
      messageId: message.id,
      status: MessageStatus.SENT,
      createdAt: message.createdAt,
    },
  })

  // Publish event for socket server to deliver to recipient
  await rabbitmq.publishToOutgoing(receiverId.toString(), {
    event: 'PERSONAL_MESSAGE_RECEIVE',
    data: {
      messageId: message.id,
      content,
      senderId,
      createdAt: message.createdAt,
    },
  })
})

The socket servers keep consuming events from their dedicated queues (discussed in Scalable WebSockets with RabbitMQ). When the "sent" (for sender) and "receive" (for recipient) events arrive from OUTGOING_EXCHANGE, the server forwards them to their respective clients. This finalizes the lifecycle for both the sender and the receiver.

// Setup RabbitMQ consumer for server queue (Socket.IO server in Node.js)
consumeFromServerQueue((message, ack) => {
  try {
    const { event, userId, data } = message

    // Find socket ID for the user
    const socketId = chatsStore.getClient(userId)
    if (socketId) {
      io.to(socketId).emit(event, data)
    }

    ack()
  } catch (error) {
    console.error('Error processing message from RabbitMQ:', error)
    ack()
  }
})