diff --git a/.changeset/spicy-nails-design.md b/.changeset/spicy-nails-design.md new file mode 100644 index 0000000000000..0388b121763d1 --- /dev/null +++ b/.changeset/spicy-nails-design.md @@ -0,0 +1,10 @@ +--- +"@rocket.chat/meteor": patch +"@rocket.chat/core-services": patch +"@rocket.chat/model-typings": patch +"@rocket.chat/models": patch +"@rocket.chat/ddp-streamer": patch +"@rocket.chat/presence": patch +--- + +Fixes user status inaccuracy by refreshing active connections and filtering out the stale ones. diff --git a/apps/meteor/definition/externals/meteor/ddp-common.d.ts b/apps/meteor/definition/externals/meteor/ddp-common.d.ts index 3a7f1538ab407..cbe96a9722968 100644 --- a/apps/meteor/definition/externals/meteor/ddp-common.d.ts +++ b/apps/meteor/definition/externals/meteor/ddp-common.d.ts @@ -14,5 +14,69 @@ declare module 'meteor/ddp-common' { userId?: string; }); } + + /** + * Heartbeat options + */ + type HeartbeatOptions = { + /** + * interval to send pings, in milliseconds + */ + heartbeatInterval: number; + /** + * timeout to close the connection if a reply isn't received, in milliseconds. + */ + heartbeatTimeout: number; + /** + * function to call to send a ping on the connection. + */ + sendPing: () => void; + /** + * function to call to close the connection + */ + onTimeout: () => void; + }; + + class Heartbeat { + heartbeatInterval: number; + + heartbeatTimeout: number; + + _sendPing: () => void; + + _onTimeout: () => void; + + _seenPacket: boolean; + + _heartbeatIntervalHandle: ReturnType | null; + + _heartbeatTimeoutHandle: ReturnType | null; + + constructor(options: HeartbeatOptions); + + stop(): void; + + start(): void; + + _startHeartbeatIntervalTimer(): void; + + _startHeartbeatTimeoutTimer(): void; + + _clearHeartbeatIntervalTimer(): void; + + _clearHeartbeatTimeoutTimer(): void; + + /** + * The heartbeat interval timer is fired when we should send a ping. + */ + _heartbeatIntervalFired(): void; + + /** + * The heartbeat timeout timer is fired when we sent a ping, but we timed out waiting for the pong. + */ + _heartbeatTimeoutFired(): void; + + messageReceived(): void; + } } } diff --git a/apps/meteor/definition/externals/meteor/meteor.d.ts b/apps/meteor/definition/externals/meteor/meteor.d.ts index cab110392c12c..e93445a891a36 100644 --- a/apps/meteor/definition/externals/meteor/meteor.d.ts +++ b/apps/meteor/definition/externals/meteor/meteor.d.ts @@ -1,6 +1,6 @@ import 'meteor/meteor'; import type { ServerMethods } from '@rocket.chat/ddp-client'; -import type { IStreamerConstructor, IStreamer } from 'meteor/rocketchat:streamer'; +import type { DDPCommon, IStreamerConstructor, IStreamer } from 'meteor/ddp-common'; type StringifyBuffers = { [P in keyof T]: T[P] extends Buffer ? string : T[P]; @@ -39,7 +39,12 @@ declare module 'meteor/meteor' { isDesktop: () => boolean; } - const server: any; + const server: { + sessions: Map; + publish_handlers: { + meteor_autoupdate_clientVersions(): void; + }; + }; const runAsUser: (userId: string, scope: () => T) => T; diff --git a/apps/meteor/ee/server/startup/presence.ts b/apps/meteor/ee/server/startup/presence.ts index e0756e4b4c59d..6bf587b500ba6 100644 --- a/apps/meteor/ee/server/startup/presence.ts +++ b/apps/meteor/ee/server/startup/presence.ts @@ -40,6 +40,16 @@ Meteor.startup(() => { return; } + const _messageReceived = session.heartbeat.messageReceived.bind(session.heartbeat); + session.heartbeat.messageReceived = function messageReceived() { + if (this._seenPacket === false) { + void Presence.updateConnection(login.user._id, login.connection.id).catch((err) => { + console.error('Error updating connection presence on heartbeat:', err); + }); + } + return _messageReceived(); + }; + void (async function () { await Presence.newConnection(login.user._id, login.connection.id, nodeId); updateConns(); diff --git a/ee/apps/ddp-streamer/src/Client.ts b/ee/apps/ddp-streamer/src/Client.ts index fb75af64b3da0..0ce8762948726 100644 --- a/ee/apps/ddp-streamer/src/Client.ts +++ b/ee/apps/ddp-streamer/src/Client.ts @@ -1,6 +1,7 @@ import { EventEmitter } from 'events'; import type { IncomingMessage } from 'http'; +import { Presence } from '@rocket.chat/core-services'; import type { ISocketConnection } from '@rocket.chat/core-typings'; import { v1 as uuidv1 } from 'uuid'; import type WebSocket from 'ws'; @@ -73,6 +74,8 @@ export class Client extends EventEmitter { public userToken?: string; + private _seenPacket = true; + constructor( public ws: WebSocket, public meteorClient = false, @@ -179,6 +182,18 @@ export class Client extends EventEmitter { this.ws.close(WS_ERRORS.TIMEOUT, WS_ERRORS_MESSAGES.TIMEOUT); }; + private messageReceived = (): void => { + if (this._seenPacket || !this.userId) { + this._seenPacket = true; + return; + } + + this._seenPacket = true; + void Presence.updateConnection(this.userId, this.connection.id).catch((err) => { + console.error('Error updating connection presence after heartbeat:', err); + }); + }; + ping(id?: string): void { this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PING, ...(id && { [DDP_EVENTS.ID]: id }) })); } @@ -188,6 +203,9 @@ export class Client extends EventEmitter { } handleIdle = (): void => { + if (this.userId) { + this._seenPacket = false; + } this.ping(); this.timeout = setTimeout(this.closeTimeout, TIMEOUT); }; @@ -200,6 +218,7 @@ export class Client extends EventEmitter { handler = async (payload: WebSocket.Data, isBinary: boolean): Promise => { try { const packet = server.parse(payload, isBinary); + this.messageReceived(); this.emit('message', packet); if (this.wait) { return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet)))); diff --git a/ee/packages/presence/src/Presence.ts b/ee/packages/presence/src/Presence.ts index 8c90eb5ff7c67..484f07a0c05f0 100755 --- a/ee/packages/presence/src/Presence.ts +++ b/ee/packages/presence/src/Presence.ts @@ -4,6 +4,7 @@ import type { IUser } from '@rocket.chat/core-typings'; import { UserStatus } from '@rocket.chat/core-typings'; import { Settings, Users, UsersSessions } from '@rocket.chat/models'; +import { PresenceReaper } from './lib/PresenceReaper'; import { processPresenceAndStatus } from './lib/processConnectionStatus'; const MAX_CONNECTIONS = 200; @@ -25,9 +26,18 @@ export class Presence extends ServiceClass implements IPresence { private peakConnections = 0; + private reaper: PresenceReaper; + constructor() { super(); + this.reaper = new PresenceReaper({ + usersSessions: UsersSessions, + batchSize: 500, + staleThresholdMs: 5 * 60 * 1000, // 5 minutes + onUpdate: (userIds) => this.handleReaperUpdates(userIds), + }); + this.onEvent('watch.instanceStatus', async ({ clientAction, id, diff }): Promise => { if (clientAction === 'removed') { this.connsPerInstance.delete(id); @@ -73,6 +83,7 @@ export class Presence extends ServiceClass implements IPresence { } override async started(): Promise { + this.reaper.start(); this.lostConTimeout = setTimeout(async () => { const affectedUsers = await this.removeLostConnections(); return affectedUsers.forEach((uid) => this.updateUserPresence(uid)); @@ -89,7 +100,13 @@ export class Presence extends ServiceClass implements IPresence { } } + private async handleReaperUpdates(userIds: string[]): Promise { + console.log(`[PresenceReaper] Updating presence for ${userIds.length} users due to stale connections.`); + await Promise.all(userIds.map((uid) => this.updateUserPresence(uid))); + } + override async stopped(): Promise { + this.reaper.stop(); if (!this.lostConTimeout) { return; } @@ -137,6 +154,30 @@ export class Presence extends ServiceClass implements IPresence { }; } + async updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined> { + console.debug(`Updating connection for user ${uid} and connection ${connectionId}`); + + const query = { + '_id': uid, + 'connections.id': connectionId, + }; + + const update = { + $set: { + 'connections.$._updatedAt': new Date(), + }, + }; + + const result = await UsersSessions.updateOne(query, update); + if (result.modifiedCount === 0) { + return; + } + + await this.updateUserPresence(uid); + + return { uid, connectionId }; + } + async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> { if (!uid || !session) { return; diff --git a/ee/packages/presence/src/lib/PresenceReaper.spec.ts b/ee/packages/presence/src/lib/PresenceReaper.spec.ts new file mode 100644 index 0000000000000..edc92648b8e85 --- /dev/null +++ b/ee/packages/presence/src/lib/PresenceReaper.spec.ts @@ -0,0 +1,171 @@ +import type { IUserSession } from '@rocket.chat/core-typings'; +import type { IUsersSessionsModel } from '@rocket.chat/model-typings'; +import type { Collection, FindCursor, WithId } from 'mongodb'; + +import { PresenceReaper } from './PresenceReaper'; + +// Define a simplified interface for our mock docs +type MockSession = { + _id: string; + connections: { id: string; _updatedAt: Date }[]; +}; + +describe('PresenceReaper', () => { + let reaper: PresenceReaper; + let mockSessionCollection: Omit, 'col'> & { + col: jest.Mocked>; + }; + let mockOnUpdate: jest.Mock; + + beforeEach(() => { + // 1. Mock the Collections + mockSessionCollection = { + find: jest.fn(), + col: { + bulkWrite: jest.fn().mockResolvedValue({ modifiedCount: 1 }), + }, + } as any; + + // 2. Mock the onUpdate callback + mockOnUpdate = jest.fn(); + + // 3. Instantiate Reaper + reaper = new PresenceReaper({ + usersSessions: mockSessionCollection, + onUpdate: mockOnUpdate, + staleThresholdMs: 5 * 60 * 1000, // 5 minutes + batchSize: 2, // small batch size for testing + }); + }); + + describe('processDocument (Business Logic)', () => { + it('should identify stale connections by "id" and preserve valid ones', () => { + const now = new Date(); + const staleTime = new Date(now.getTime() - 10 * 60 * 1000); // 10 mins ago + const activeTime = new Date(now.getTime() - 1 * 60 * 1000); // 1 min ago + const cutoff = new Date(now.getTime() - 5 * 60 * 1000); // 5 mins ago + + const doc: MockSession = { + _id: 'user-123', + connections: [ + { id: 'conn-stale', _updatedAt: staleTime }, // Should be removed + { id: 'conn-active', _updatedAt: activeTime }, // Should stay + ], + }; + + const changeMap = new Map(); + + // @ts-expect-error - testing private method + reaper.processDocument(doc, cutoff, changeMap); + + const result = changeMap.get('user-123'); + + // Assertions + expect(result).toBeDefined(); + expect(result?.removeIds).toContain('conn-stale'); // Found the stale ID + expect(result?.removeIds).not.toContain('conn-active'); // Ignored the active ID + expect(result?.shouldMarkOffline).toBe(false); // User still has 1 active connection + }); + + it('should mark user offline only if ALL connections are stale', () => { + const now = new Date(); + const staleTime = new Date(now.getTime() - 10000); + const cutoff = new Date(now); // Cutoff is now, so everything before is stale + + const doc: MockSession = { + _id: 'user-456', + connections: [ + { id: 'conn-1', _updatedAt: staleTime }, + { id: 'conn-2', _updatedAt: staleTime }, + ], + }; + + const changeMap = new Map(); + // @ts-expect-error - testing private method + reaper.processDocument(doc, cutoff, changeMap); + + const result = changeMap.get('user-456'); + + expect(result).toBeDefined(); + expect(result?.removeIds).toHaveLength(2); + expect(result?.shouldMarkOffline).toBe(true); // No valid connections left + }); + }); + + describe('run (Integration Flow)', () => { + it('should handle empty collections without errors', async () => { + // Mock empty cursor + const mockCursor = { + async *[Symbol.asyncIterator]() { + // No documents + }, + } as FindCursor>; + mockSessionCollection.find.mockReturnValue(mockCursor); + + // Execute Run + await reaper.run(); + + // Verify no updates were made + expect(mockOnUpdate).not.toHaveBeenCalled(); + }); + + it('should generate correct bulkWrite operations', async () => { + const now = new Date(); + const staleTime = new Date(now.getTime() - 6 * 60 * 1000); // 6 mins ago (Stale) + + // Mock Data from DB Cursor + const mockCursor = { + async *[Symbol.asyncIterator]() { + yield { + _id: 'user-789', + connections: [{ id: 'zombie-conn', _updatedAt: staleTime }], + }; + }, + } as FindCursor>; + mockSessionCollection.find.mockReturnValue(mockCursor); + + // Execute Run + await reaper.run(); + + // Verify 'users' Update (Status Offline) + expect(mockOnUpdate).toHaveBeenCalledTimes(1); + expect(mockOnUpdate).toHaveBeenCalledWith(['user-789']); + }); + }); + + describe('end-to-end Presence Reaping', () => { + it('should process multiple users and batch updates correctly', async () => { + const now = new Date(); + const staleTime = new Date(now.getTime() - 10 * 60 * 1000); // 10 mins ago + + // Mock Data from DB Cursor + const mockCursor = { + async *[Symbol.asyncIterator]() { + yield { + _id: 'user-1', + + connections: [{ id: 'conn-1', _updatedAt: staleTime }], + }; + yield { + _id: 'user-2', + connections: [{ id: 'conn-2', _updatedAt: staleTime }], + }; + yield { + _id: 'user-3', + + connections: [{ id: 'conn-3', _updatedAt: staleTime }], + }; + }, + }; + mockSessionCollection.find.mockReturnValue(mockCursor as FindCursor>); + + // Execute Run + await reaper.run(); + + // Verify 'users' Update called twice due to batch size of 2 + expect(mockOnUpdate).toHaveBeenCalledTimes(2); + expect(mockOnUpdate).toHaveBeenNthCalledWith(1, ['user-1', 'user-2']); + expect(mockOnUpdate).toHaveBeenNthCalledWith(2, ['user-3']); + }); + }); +}); diff --git a/ee/packages/presence/src/lib/PresenceReaper.ts b/ee/packages/presence/src/lib/PresenceReaper.ts new file mode 100644 index 0000000000000..40c1779f0108b --- /dev/null +++ b/ee/packages/presence/src/lib/PresenceReaper.ts @@ -0,0 +1,156 @@ +import { setInterval } from 'node:timers'; + +import type { IUserSession } from '@rocket.chat/core-typings'; +import type { IUsersSessionsModel } from '@rocket.chat/model-typings'; + +type ReaperPlan = { + userId: string; + removeIds: string[]; + shouldMarkOffline: boolean; + cutoffDate: Date; +}; + +type NonEmptyArray = [T, ...T[]]; + +const isNonEmptyArray = (arr: T[]): arr is NonEmptyArray => arr.length > 0; + +type ReaperCallback = (userIds: NonEmptyArray) => void; + +type ReaperOptions = { + usersSessions: IUsersSessionsModel; + onUpdate: ReaperCallback; + staleThresholdMs: number; + batchSize: number; +}; + +export class PresenceReaper { + private usersSessions: IUsersSessionsModel; + + private staleThresholdMs: number; + + private batchSize: number; + + private running: boolean; + + private onUpdate: ReaperCallback; + + private intervalId?: NodeJS.Timeout; + + constructor(options: ReaperOptions) { + this.usersSessions = options.usersSessions; + this.onUpdate = options.onUpdate; + this.staleThresholdMs = options.staleThresholdMs; + this.batchSize = options.batchSize; + this.running = false; + } + + public start() { + if (this.running) return; + this.running = true; + + // Run every 1 minute + this.intervalId = setInterval(() => { + this.run().catch((err) => console.error('[PresenceReaper] Error:', err)); + }, 60 * 1000); + + console.log('[PresenceReaper] Service started.'); + } + + public stop() { + if (!this.running) return; + this.running = false; + + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = undefined; + } + + console.log('[PresenceReaper] Service stopped.'); + } + + public async run(): Promise { + console.log('[PresenceReaper] Running presence reaper job...'); + const cutoffDate = new Date(Date.now() - this.staleThresholdMs); + + // 1. Find users with potentially stale connections + const cursor = this.usersSessions.find( + { 'connections._updatedAt': { $lte: cutoffDate } }, + { + projection: { _id: 1, connections: 1 }, + }, + ); + + const userChangeSet = new Map(); + + for await (const sessionDoc of cursor) { + this.processDocument(sessionDoc, cutoffDate, userChangeSet); + + if (userChangeSet.size >= this.batchSize) { + await this.flushBatch(userChangeSet); + userChangeSet.clear(); + } + } + + if (userChangeSet.size > 0) { + await this.flushBatch(userChangeSet); + } + console.log('[PresenceReaper] Presence reaper job completed.'); + } + + private processDocument(sessionDoc: IUserSession, cutoffDate: Date, changeMap: Map): void { + const userId = sessionDoc._id; + const allConnections = sessionDoc.connections || []; + + // Filter connections based on the cutoff + const staleConnections = allConnections.filter((c) => c._updatedAt <= cutoffDate); + const validConnections = allConnections.filter((c) => c._updatedAt > cutoffDate); + + if (staleConnections.length === 0) return; + + changeMap.set(userId, { + userId, + removeIds: staleConnections.map((c) => c.id), + cutoffDate, // Keep reference for race-condition check + shouldMarkOffline: validConnections.length === 0, + }); + } + + private async flushBatch(changeMap: Map): Promise { + const sessionOps = []; + const usersToUpdate: string[] = []; + + for (const plan of changeMap.values()) { + // 1. Prepare DB Cleanup + if (plan.removeIds.length > 0) { + sessionOps.push({ + updateOne: { + filter: { _id: plan.userId }, + update: { + $pull: { + connections: { + id: { $in: plan.removeIds }, + _updatedAt: { $lte: plan.cutoffDate }, + }, + }, + }, + }, + }); + } + + // 2. Identify potential offline users + if (plan.shouldMarkOffline) { + usersToUpdate.push(plan.userId); + } + } + + // Step A: Clean the Database + if (sessionOps.length > 0) { + await this.usersSessions.col.bulkWrite(sessionOps); + } + + // Step B: Notify Presence Service + if (isNonEmptyArray(usersToUpdate)) { + this.onUpdate(usersToUpdate); + } + } +} diff --git a/ee/packages/presence/src/lib/processConnectionStatus.ts b/ee/packages/presence/src/lib/processConnectionStatus.ts index a14aa45219e8a..61790ce332927 100644 --- a/ee/packages/presence/src/lib/processConnectionStatus.ts +++ b/ee/packages/presence/src/lib/processConnectionStatus.ts @@ -1,5 +1,4 @@ -import type { IUserSessionConnection } from '@rocket.chat/core-typings'; -import { UserStatus } from '@rocket.chat/core-typings'; +import { UserStatus, type IUserSessionConnection } from '@rocket.chat/core-typings'; /** * Defines new connection status compared to a previous connection status diff --git a/packages/core-services/src/types/IPresence.ts b/packages/core-services/src/types/IPresence.ts index 5f7c57d679955..268fed8cd1e66 100644 --- a/packages/core-services/src/types/IPresence.ts +++ b/packages/core-services/src/types/IPresence.ts @@ -13,6 +13,7 @@ export interface IPresence extends IServiceClass { session: string | undefined, nodeId: string, ): Promise<{ uid: string; session: string } | undefined>; + updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined>; removeLostConnections(nodeID: string): Promise; setStatus(uid: string, status: UserStatus, statusText?: string): Promise; setConnectionStatus(uid: string, status: UserStatus, session: string): Promise;