How personal (one-to-one) chats work in the system
Personal chats are direct conversations between two users. This article explains how personal (one-to-one) chats work in the system.
Schema diagram for personal chats
Personal chat message flow
The lifecycle of a message beyond the initial delivery (Delivered and Read statuses) involves a separate event-driven flow, which is explained in detail in the Read Receipts section.
The API server provides HTTP endpoints for message sending, and immediately offloads the task to RabbitMQ, so that it remains unblocked.
We are using the incoming and outgoing exchanges of RabbitMQ described in Scalable WebSockets with RabbitMQ.
const sendMessage = async (receiverId: number, content: string, hash: string) => {
await $api('/api/messages/send/personal', {
method: 'POST',
body: {
receiverId,
content,
hash,
},
})
}
The backend API publishes the message to RabbitMQ.
func (s *MessageService) SendPersonalMessage(ctx context.Context, userID int64, dto *SendPersonalMessageDTO) error {
// Publish to RabbitMQ for worker to process
message := RabbitMQMessage{
Type: "MESSAGE_SEND",
Payload: map[string]any{
"senderId": userID,
"receiverId": dto.ReceiverID,
"content": dto.Content,
"hash": dto.Hash,
},
}
return s.rabbitmqService.PublishToIncoming("personal.message", message)
}
The chat worker handles the "heavy" database operations and notifies the system when the message is persisted.
func (mp *MessageProcessor) ProcessPersonalMessage(payload *broker.PersonalMessagePayload) error {
return mp.db.Transaction(func(tx *gorm.DB) error {
message := &domain.Message{
SenderID: payload.SenderId,
Content: payload.Content,
CreatedAt: time.Now(),
}
if err := tx.Create(message).Error; err != nil {
return fmt.Errorf("failed to create message: %w", err)
}
// Create message recipient with SENT status
recipient := &domain.MessageRecipient{
MessageID: message.ID,
ReceiverID: payload.ReceiverId,
Status: domain.MessageStatusSent,
// ...
}
if err := tx.Create(recipient).Error; err != nil {
return fmt.Errorf("failed to create recipient: %w", err)
}
// Publish SENT event to sender
if err := mp.broker.PublishToOutgoing(strconv.FormatInt(payload.SenderId, 10), map[string]any{
"event": "personal:sent",
"userId": payload.SenderId,
"data": map[string]any{
"hash": payload.Hash,
"messageId": message.ID,
"createdAt": message.CreatedAt,
"receiverId": payload.ReceiverId,
"status": domain.MessageStatusSent,
},
}); err != nil {
log.Printf("failed to publish SENT event: %v", err)
}
// Publish message to receiver
if err := mp.broker.PublishToOutgoing(strconv.FormatInt(payload.ReceiverId, 10), map[string]any{
"event": "personal:receive-message",
"userId": payload.ReceiverId,
"data": map[string]any{
"messageId": message.ID,
"content": payload.Content,
"senderId": payload.SenderId,
"createdAt": message.CreatedAt,
"status": domain.MessageStatusSent,
},
}); err != nil {
log.Printf("failed to publish MESSAGE_RECEIVE event: %v", err)
}
return nil
})
}
The socket servers keep consuming events from their dedicated queues (discussed in Scalable WebSockets with RabbitMQ). When the "sent" (for sender) and "receive" (for recipient) events arrive from OUTGOING_EXCHANGE, the server forwards them to their respective clients. This finalizes the lifecycle for both the sender and the receiver.
// Setup RabbitMQ consumer for server queue (Socket.IO server in Node.js)
consumeFromServerQueue((message, ack) => {
try {
const { event, userId, data } = message
// Find socket ID for the user
const socketId = chatsStore.getClient(userId)
if (socketId) {
io.to(socketId).emit(event, data)
}
ack()
} catch (error) {
console.error('Error processing message from RabbitMQ:', error)
ack()
}
})