Projects
Jan 2, 2026

Personal Chats

How personal (one-to-one) chats work in the system

Personal chats are direct conversations between two users. This article explains how personal (one-to-one) chats work in the system.

Database Schema for Personal Chats

Schema diagram for personal chats

Personal Chat Flow

Personal chat message flow

The lifecycle of a message beyond the initial delivery (Delivered and Read statuses) involves a separate event-driven flow, which is explained in detail in the Read Receipts section.

Implementation

The API server provides HTTP endpoints for message sending, and immediately offloads the task to RabbitMQ, so that it remains unblocked.

We are using the incoming and outgoing exchanges of RabbitMQ described in Scalable WebSockets with RabbitMQ.

const sendMessage = async (receiverId: number, content: string, hash: string) => {
  await $api('/api/messages/send/personal', {
    method: 'POST',
    body: {
      receiverId,
      content,
      hash,
    },
  })
}

The backend API publishes the message to RabbitMQ.

func (s *MessageService) SendPersonalMessage(ctx context.Context, userID int64, dto *SendPersonalMessageDTO) error {
  // Publish to RabbitMQ for worker to process
  message := RabbitMQMessage{
    Type: "MESSAGE_SEND",
    Payload: map[string]any{
      "senderId":   userID,
      "receiverId": dto.ReceiverID,
      "content":    dto.Content,
      "hash":       dto.Hash,
    },
  }

  return s.rabbitmqService.PublishToIncoming("personal.message", message)
}

The chat worker handles the "heavy" database operations and notifies the system when the message is persisted.

func (mp *MessageProcessor) ProcessPersonalMessage(payload *broker.PersonalMessagePayload) error {
    return mp.db.Transaction(func(tx *gorm.DB) error {
        message := &domain.Message{
            SenderID:  payload.SenderId,
            Content:   payload.Content,
            CreatedAt: time.Now(),
        }
        if err := tx.Create(message).Error; err != nil {
            return fmt.Errorf("failed to create message: %w", err)
        }

        // Create message recipient with SENT status
        recipient := &domain.MessageRecipient{
            MessageID:  message.ID,
            ReceiverID: payload.ReceiverId,
            Status:     domain.MessageStatusSent,
      // ...
        }
        if err := tx.Create(recipient).Error; err != nil {
            return fmt.Errorf("failed to create recipient: %w", err)
        }

        // Publish SENT event to sender
        if err := mp.broker.PublishToOutgoing(strconv.FormatInt(payload.SenderId, 10), map[string]any{
            "event":  "personal:sent",
            "userId": payload.SenderId,
            "data": map[string]any{
                "hash":       payload.Hash,
                "messageId":  message.ID,
                "createdAt":  message.CreatedAt,
                "receiverId": payload.ReceiverId,
                "status":     domain.MessageStatusSent,
            },
        }); err != nil {
            log.Printf("failed to publish SENT event: %v", err)
        }

        // Publish message to receiver
        if err := mp.broker.PublishToOutgoing(strconv.FormatInt(payload.ReceiverId, 10), map[string]any{
            "event":  "personal:receive-message",
            "userId": payload.ReceiverId,
            "data": map[string]any{
                "messageId": message.ID,
                "content":   payload.Content,
                "senderId":  payload.SenderId,
                "createdAt": message.CreatedAt,
                "status":    domain.MessageStatusSent,
            },
        }); err != nil {
            log.Printf("failed to publish MESSAGE_RECEIVE event: %v", err)
        }

        return nil
    })
}

The socket servers keep consuming events from their dedicated queues (discussed in Scalable WebSockets with RabbitMQ). When the "sent" (for sender) and "receive" (for recipient) events arrive from OUTGOING_EXCHANGE, the server forwards them to their respective clients. This finalizes the lifecycle for both the sender and the receiver.

// Setup RabbitMQ consumer for server queue (Socket.IO server in Node.js)
consumeFromServerQueue((message, ack) => {
  try {
    const { event, userId, data } = message

    // Find socket ID for the user
    const socketId = chatsStore.getClient(userId)
    if (socketId) {
      io.to(socketId).emit(event, data)
    }

    ack()
  } catch (error) {
    console.error('Error processing message from RabbitMQ:', error)
    ack()
  }
})