How personal (one-to-one) chats work in the system
Personal chats are direct conversations between two users. This article explains how personal (one-to-one) chats work in the system.
Schema diagram for personal chats
Personal chat message flow
The lifecycle of a message beyond the initial delivery (Delivered and Read statuses) involves a separate event-driven flow, which is explained in detail in the Read Receipts section.
The API server provides HTTP endpoints for message sending, and immediately offloads the task to RabbitMQ, so that it remains unblocked.
We are using the incoming and outgoing exchanges of RabbitMQ described in Scalable WebSockets with RabbitMQ.
const sendMessage = async (receiverId: number, content: string, hash: string) => {
await $api('/api/messages/send/personal', {
method: 'POST',
body: {
receiverId,
content,
hash,
},
})
}
The backend API publishes the message to RabbitMQ.
func (s *MessageService) SendPersonalMessage(ctx context.Context, userID int64, dto *SendPersonalMessageDTO) error {
// Publish to RabbitMQ for worker to process
message := RabbitMQMessage{
Type: "MESSAGE_SEND",
Payload: map[string]any{
"senderId": userID,
"receiverId": dto.ReceiverID,
"content": dto.Content,
"hash": dto.Hash,
},
}
return s.rabbitmqService.PublishToIncoming("personal.message", message)
}
The chat worker handles the "heavy" database operations and notifies the system when the message is persisted.
consumer.on('personal.incoming.message', async event => {
const { hash, content, senderId, receiverId } = event.payload
const message = await messageRepository.save({
content,
sender: { id: senderId },
channel: null,
})
await messageRecipientRepository.save({
message: { id: message.id },
receiver: { id: receiverId },
status: MessageStatus.SENT,
})
// Publish event for socket server to deliver to sender
await rabbitmq.publishToOutgoing(senderId.toString(), {
event: 'PERSONAL_MESSAGE_SENT',
data: {
hash,
messageId: message.id,
status: MessageStatus.SENT,
createdAt: message.createdAt,
},
})
// Publish event for socket server to deliver to recipient
await rabbitmq.publishToOutgoing(receiverId.toString(), {
event: 'PERSONAL_MESSAGE_RECEIVE',
data: {
messageId: message.id,
content,
senderId,
createdAt: message.createdAt,
},
})
})
The socket servers keep consuming events from their dedicated queues (discussed in Scalable WebSockets with RabbitMQ). When the "sent" (for sender) and "receive" (for recipient) events arrive from OUTGOING_EXCHANGE, the server forwards them to their respective clients. This finalizes the lifecycle for both the sender and the receiver.
// Setup RabbitMQ consumer for server queue (Socket.IO server in Node.js)
consumeFromServerQueue((message, ack) => {
try {
const { event, userId, data } = message
// Find socket ID for the user
const socketId = chatsStore.getClient(userId)
if (socketId) {
io.to(socketId).emit(event, data)
}
ack()
} catch (error) {
console.error('Error processing message from RabbitMQ:', error)
ack()
}
})