-
Notifications
You must be signed in to change notification settings - Fork 12.6k
fix(presence): update connections on heartbeat and remove them when stale #37551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 31 commits
222943a
b2818bd
2c79e92
cb85c50
818db84
bcda88d
926642b
d75b0cf
b03787e
a7f2be2
6bde2cf
d04d556
d842b11
992a691
e542ec5
7f7a552
1e7a950
62d2466
e1a7a4f
d56714a
7d3d039
2e3592d
628b508
1688b9c
049ef81
bcf2d04
a65f79b
682230a
efff1b3
60f6657
3f106f3
8db5df2
5112f8a
e7dbb88
0ab1249
ff983de
1675085
96b25ce
3b0c9d0
964f024
2926cfe
5633225
d3b27fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| --- | ||
| "@rocket.chat/meteor": patch | ||
| "@rocket.chat/core-services": patch | ||
| "@rocket.chat/model-typings": patch | ||
| "@rocket.chat/models": patch | ||
| "@rocket.chat/ddp-streamer": patch | ||
| "@rocket.chat/presence": patch | ||
| --- | ||
|
|
||
| Fixes user status inaccuracy by refreshing active connections and filtering out the stale ones. |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,13 +1,13 @@ | ||||||||||||||||||
| import type { IPresence, IBrokerNode } from '@rocket.chat/core-services'; | ||||||||||||||||||
| import { License, ServiceClass } from '@rocket.chat/core-services'; | ||||||||||||||||||
| import type { IUser } from '@rocket.chat/core-typings'; | ||||||||||||||||||
| import type { IUser, IUserSession } from '@rocket.chat/core-typings'; | ||||||||||||||||||
| import { UserStatus } from '@rocket.chat/core-typings'; | ||||||||||||||||||
| import { Settings, Users, UsersSessions } from '@rocket.chat/models'; | ||||||||||||||||||
| import type { AnyBulkWriteOperation } from 'mongodb'; | ||||||||||||||||||
|
|
||||||||||||||||||
| import { UPDATE_INTERVAL, STALE_THRESHOLD, MAX_CONNECTIONS } from './lib/constants'; | ||||||||||||||||||
| import { processPresenceAndStatus } from './lib/processConnectionStatus'; | ||||||||||||||||||
|
|
||||||||||||||||||
| const MAX_CONNECTIONS = 200; | ||||||||||||||||||
|
|
||||||||||||||||||
| export class Presence extends ServiceClass implements IPresence { | ||||||||||||||||||
| protected name = 'presence'; | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -21,10 +21,27 @@ export class Presence extends ServiceClass implements IPresence { | |||||||||||||||||
|
|
||||||||||||||||||
| private lostConTimeout?: NodeJS.Timeout; | ||||||||||||||||||
|
|
||||||||||||||||||
| private staleConInterval?: NodeJS.Timeout; | ||||||||||||||||||
|
|
||||||||||||||||||
| private connsPerInstance = new Map<string, number>(); | ||||||||||||||||||
|
|
||||||||||||||||||
| private peakConnections = 0; | ||||||||||||||||||
|
|
||||||||||||||||||
| private lastUpdate = new Map<string, number>(); | ||||||||||||||||||
|
|
||||||||||||||||||
| shouldUpdateConnectionStatus(session: string): boolean { | ||||||||||||||||||
| const lastUpdated = this.lastUpdate.get(session); | ||||||||||||||||||
| if (!lastUpdated) { | ||||||||||||||||||
| this.lastUpdate.set(session, Date.now()); | ||||||||||||||||||
| return true; | ||||||||||||||||||
| } | ||||||||||||||||||
| if (Date.now() - lastUpdated > UPDATE_INTERVAL) { | ||||||||||||||||||
| this.lastUpdate.set(session, Date.now()); | ||||||||||||||||||
| return true; | ||||||||||||||||||
| } | ||||||||||||||||||
| return false; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| constructor() { | ||||||||||||||||||
| super(); | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -78,6 +95,11 @@ export class Presence extends ServiceClass implements IPresence { | |||||||||||||||||
| return affectedUsers.forEach((uid) => this.updateUserPresence(uid)); | ||||||||||||||||||
| }, 10000); | ||||||||||||||||||
|
|
||||||||||||||||||
| this.staleConInterval = setInterval(async () => { | ||||||||||||||||||
| const affectedUsers = await this.removeStaleConnections(); | ||||||||||||||||||
| return affectedUsers.forEach((uid) => this.updateUserPresence(uid)); | ||||||||||||||||||
| }, UPDATE_INTERVAL); | ||||||||||||||||||
|
|
||||||||||||||||||
| try { | ||||||||||||||||||
| await Settings.updateValueById('Presence_broadcast_disabled', false); | ||||||||||||||||||
|
|
||||||||||||||||||
|
|
@@ -90,10 +112,8 @@ export class Presence extends ServiceClass implements IPresence { | |||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| async stopped(): Promise<void> { | ||||||||||||||||||
| if (!this.lostConTimeout) { | ||||||||||||||||||
| return; | ||||||||||||||||||
| } | ||||||||||||||||||
| clearTimeout(this.lostConTimeout); | ||||||||||||||||||
| clearInterval(this.staleConInterval); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| async toggleBroadcast(enabled: boolean): Promise<void> { | ||||||||||||||||||
|
|
@@ -137,7 +157,96 @@ export class Presence extends ServiceClass implements IPresence { | |||||||||||||||||
| }; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| async updateConnection(uid: string, session: string): Promise<{ uid: string; session: string } | undefined> { | ||||||||||||||||||
| if (!this.shouldUpdateConnectionStatus(session)) { | ||||||||||||||||||
| return; | ||||||||||||||||||
| } | ||||||||||||||||||
| console.debug(`Updating connection for user ${uid} and session ${session}`); | ||||||||||||||||||
| const result = await UsersSessions.updateConnectionById(uid, session); | ||||||||||||||||||
| if (result.modifiedCount === 0) { | ||||||||||||||||||
| return; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| await this.updateUserPresence(uid); | ||||||||||||||||||
|
|
||||||||||||||||||
| return { uid, session }; | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Runs the cleanup job to remove stale connections and sync user status. | ||||||||||||||||||
| */ | ||||||||||||||||||
| async removeStaleConnections() { | ||||||||||||||||||
| console.debug('[Cleanup] Starting stale connections cleanup job.'); | ||||||||||||||||||
| const cutoffDate = new Date(Date.now() - STALE_THRESHOLD); | ||||||||||||||||||
|
|
||||||||||||||||||
| // STEP 1: Find users who have AT LEAST one stale connection | ||||||||||||||||||
| // We project the whole connections array because we need to inspect it in memory | ||||||||||||||||||
| const cursor = UsersSessions.find({ 'connections._updatedAt': { $lte: cutoffDate } }, { projection: { _id: 1, connections: 1 } }); | ||||||||||||||||||
|
|
||||||||||||||||||
| const bulkSessionOps: AnyBulkWriteOperation<IUserSession>[] = []; | ||||||||||||||||||
| const bulkUserOps: AnyBulkWriteOperation<IUser>[] = []; | ||||||||||||||||||
| const processedUserIds = []; | ||||||||||||||||||
|
|
||||||||||||||||||
| // STEP 2: Iterate and Calculate | ||||||||||||||||||
| for await (const sessionDoc of cursor) { | ||||||||||||||||||
| const userId = sessionDoc._id; | ||||||||||||||||||
| const allConnections = sessionDoc.connections || []; | ||||||||||||||||||
|
|
||||||||||||||||||
| // Separate valid vs stale based on the cutoff | ||||||||||||||||||
| const staleConnections = allConnections.filter((c) => c._updatedAt <= cutoffDate); | ||||||||||||||||||
| const validConnections = allConnections.filter((c) => c._updatedAt > cutoffDate); | ||||||||||||||||||
|
|
||||||||||||||||||
| if (staleConnections.length === 0) continue; // Should not happen due to query, but safe to check | ||||||||||||||||||
|
|
||||||||||||||||||
| // Collect the IDs of the connections we want to remove | ||||||||||||||||||
| const staleConnectionIds = staleConnections.map((c) => c.id); | ||||||||||||||||||
|
|
||||||||||||||||||
| // OPERATION A: Remove specific connections from usersSessions | ||||||||||||||||||
| // We use the unique IDs to be surgically precise | ||||||||||||||||||
| bulkSessionOps.push({ | ||||||||||||||||||
| updateOne: { | ||||||||||||||||||
| filter: { _id: userId }, | ||||||||||||||||||
| update: { | ||||||||||||||||||
| $pull: { | ||||||||||||||||||
| connections: { id: { $in: staleConnectionIds } }, | ||||||||||||||||||
| }, | ||||||||||||||||||
| }, | ||||||||||||||||||
| }, | ||||||||||||||||||
| }); | ||||||||||||||||||
|
|
||||||||||||||||||
| // OPERATION B: Update user status if they will have NO connections left | ||||||||||||||||||
| if (validConnections.length === 0) { | ||||||||||||||||||
| bulkUserOps.push({ | ||||||||||||||||||
| updateOne: { | ||||||||||||||||||
| filter: { _id: userId }, | ||||||||||||||||||
| update: { $set: { status: UserStatus.OFFLINE } }, | ||||||||||||||||||
| }, | ||||||||||||||||||
| }); | ||||||||||||||||||
| processedUserIds.push(userId); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // STEP 3: Execute the operations | ||||||||||||||||||
| if (bulkSessionOps.length > 0) { | ||||||||||||||||||
| await UsersSessions.col.bulkWrite(bulkSessionOps); | ||||||||||||||||||
| console.log(`[Cleanup] Removed stale connections for ${bulkSessionOps.length} users.`); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if (bulkUserOps.length > 0) { | ||||||||||||||||||
| await Users.col.bulkWrite(bulkUserOps); | ||||||||||||||||||
| console.log(`[Cleanup] Marked ${bulkUserOps.length} users as OFFLINE.`); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| console.debug(`[Cleanup] Finished stale connections cleanup job.`); | ||||||||||||||||||
|
|
||||||||||||||||||
| return processedUserIds; | ||||||||||||||||||
| } | ||||||||||||||||||
cardoso marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||
|
|
||||||||||||||||||
| async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> { | ||||||||||||||||||
| if (uid === 'rocketchat.internal.admin.test') { | ||||||||||||||||||
| console.log('Admin detected, skipping removal of connection for testing purposes.'); | ||||||||||||||||||
| return; | ||||||||||||||||||
| } | ||||||||||||||||||
|
||||||||||||||||||
| if (uid === 'rocketchat.internal.admin.test') { | |
| console.log('Admin detected, skipping removal of connection for testing purposes.'); | |
| return; | |
| } | |
| async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> { | |
| if (!uid || !session) { | |
| return; | |
| } |
🤖 Prompt for AI Agents
In ee/packages/presence/src/Presence.ts around lines 182 to 185, remove the
hardcoded test-specific check for 'rocketchat.internal.admin.test' and replace
it with a testable, configurable approach: accept an injected predicate or
configuration (e.g., an excludedUserIds set or a shouldSkipRemoval(userId)
function) on the Presence instance, defaulting to none in production; update
tests to inject a predicate or mock the removeConnection method instead of
relying on the special-case; ensure the new configuration is optional and
documented so production behavior remains unchanged when not provided.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| /** | ||
| * Maximum number of connections allowed. | ||
| */ | ||
| export const MAX_CONNECTIONS = 200; | ||
|
|
||
| /** | ||
| * Interval to update connection status in milliseconds. | ||
| */ | ||
| export const UPDATE_INTERVAL = 60_000; | ||
|
|
||
| /** | ||
| * Threshold to consider a connection as stale in milliseconds. | ||
| */ | ||
| export const STALE_THRESHOLD = 300_000; |
Uh oh!
There was an error while loading. Please reload this page.