Skip to content

Commit aac4542

Browse files
committed
refactor: move throttling logic to presence service
1 parent 60f6657 commit aac4542

File tree

14 files changed

+530
-433
lines changed

14 files changed

+530
-433
lines changed

apps/meteor/ee/server/startup/presence.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ Meteor.startup(() => {
2929
await Presence.removeLostConnections(nodeId);
3030
});
3131

32-
Accounts.onLogin((login: { connection: { id: string }; user: { _id: string } }): void => {
32+
Accounts.onLogin((login: any): void => {
3333
if (!login.connection.id) {
3434
return;
3535
}
@@ -42,7 +42,7 @@ Meteor.startup(() => {
4242

4343
const _messageReceived = session.heartbeat.messageReceived.bind(session.heartbeat);
4444
session.heartbeat.messageReceived = function messageReceived() {
45-
void Presence.setConnectionStatus(login.user._id, login.connection.id);
45+
void Presence.updateConnection(login.user._id, login.connection.id);
4646
return _messageReceived();
4747
};
4848

ee/apps/ddp-streamer/src/DDPStreamer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ export class DDPStreamer extends ServiceClass {
221221
if (!userId) {
222222
return;
223223
}
224-
void Presence.setConnectionStatus(userId, connection.id);
224+
void Presence.updateConnection(userId, connection.id);
225225
});
226226
}
227227

ee/apps/ddp-streamer/src/configureServer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,14 +121,14 @@ server.methods({
121121
if (!userId) {
122122
return;
123123
}
124-
return Presence.setConnectionStatus(userId, session, UserStatus.ONLINE);
124+
return Presence.setConnectionStatus(userId, UserStatus.ONLINE, session);
125125
},
126126
'UserPresence:away'() {
127127
const { userId, session } = this;
128128
if (!userId) {
129129
return;
130130
}
131-
return Presence.setConnectionStatus(userId, session, UserStatus.AWAY);
131+
return Presence.setConnectionStatus(userId, UserStatus.AWAY, session);
132132
},
133133
'setUserStatus'(status, statusText) {
134134
const { userId } = this;

ee/packages/presence/src/Presence.ts

Lines changed: 114 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,12 @@
11
import type { IPresence, IBrokerNode } from '@rocket.chat/core-services';
22
import { License, ServiceClass } from '@rocket.chat/core-services';
3-
import type { IUser } from '@rocket.chat/core-typings';
3+
import type { IUser, IUserSession } from '@rocket.chat/core-typings';
44
import { UserStatus } from '@rocket.chat/core-typings';
55
import { Settings, Users, UsersSessions } from '@rocket.chat/models';
6+
import type { AnyBulkWriteOperation } from 'mongodb';
67

7-
import { processPresenceAndStatus } from './processPresenceAndStatus';
8-
9-
const MAX_CONNECTIONS = 200;
10-
const CONNECTION_STATUS_UPDATE_INTERVAL = 60000;
11-
const lastConnectionStatusUpdate = new Map<string, number>();
12-
13-
const shouldUpdateConnectionStatus = (connectionId: string): boolean => {
14-
const now = Date.now();
15-
const last = lastConnectionStatusUpdate.get(connectionId) ?? 0;
16-
if (now - last < CONNECTION_STATUS_UPDATE_INTERVAL) {
17-
return false;
18-
}
19-
lastConnectionStatusUpdate.set(connectionId, now);
20-
return true;
21-
};
8+
import { UPDATE_INTERVAL, STALE_THRESHOLD, MAX_CONNECTIONS } from './lib/constants';
9+
import { processPresenceAndStatus } from './lib/processConnectionStatus';
2210

2311
export class Presence extends ServiceClass implements IPresence {
2412
protected name = 'presence';
@@ -39,6 +27,21 @@ export class Presence extends ServiceClass implements IPresence {
3927

4028
private peakConnections = 0;
4129

30+
private lastUpdate = new Map<string, number>();
31+
32+
shouldUpdateConnectionStatus(session: string): boolean {
33+
const lastUpdated = this.lastUpdate.get(session);
34+
if (!lastUpdated) {
35+
this.lastUpdate.set(session, Date.now());
36+
return true;
37+
}
38+
if (Date.now() - lastUpdated > UPDATE_INTERVAL) {
39+
this.lastUpdate.set(session, Date.now());
40+
return true;
41+
}
42+
return false;
43+
}
44+
4245
constructor() {
4346
super();
4447

@@ -95,7 +98,7 @@ export class Presence extends ServiceClass implements IPresence {
9598
this.staleConInterval = setInterval(async () => {
9699
const affectedUsers = await this.removeStaleConnections();
97100
return affectedUsers.forEach((uid) => this.updateUserPresence(uid));
98-
}, 10000);
101+
}, UPDATE_INTERVAL);
99102

100103
try {
101104
await Settings.updateValueById('Presence_broadcast_disabled', false);
@@ -109,12 +112,8 @@ export class Presence extends ServiceClass implements IPresence {
109112
}
110113

111114
async stopped(): Promise<void> {
112-
if (this.lostConTimeout) {
113-
clearTimeout(this.lostConTimeout);
114-
}
115-
if (this.staleConInterval) {
116-
clearInterval(this.staleConInterval);
117-
}
115+
clearTimeout(this.lostConTimeout);
116+
clearInterval(this.staleConInterval);
118117
}
119118

120119
async toggleBroadcast(enabled: boolean): Promise<void> {
@@ -158,49 +157,109 @@ export class Presence extends ServiceClass implements IPresence {
158157
};
159158
}
160159

161-
async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> {
162-
if (!uid || !session) {
160+
async updateConnection(uid: string, session: string): Promise<{ uid: string; session: string } | undefined> {
161+
if (!this.shouldUpdateConnectionStatus(session)) {
162+
return;
163+
}
164+
console.debug(`Updating connection for user ${uid} and session ${session}`);
165+
const result = await UsersSessions.updateConnectionById(uid, session);
166+
if (result.modifiedCount === 0) {
163167
return;
164168
}
165-
166-
lastConnectionStatusUpdate.delete(session);
167-
168-
await UsersSessions.removeConnectionByConnectionId(session);
169169

170170
await this.updateUserPresence(uid);
171171

172-
return {
173-
uid,
174-
session,
175-
};
172+
173+
174+
return { uid, session };
176175
}
177176

178-
async removeStaleConnections(): Promise<string[]> {
179-
const cutoff = new Date(Date.now() - CONNECTION_STATUS_UPDATE_INTERVAL);
180-
const users = UsersSessions.find({ 'connections._updatedAt': { $lt: cutoff } });
181-
const affectedUsers = new Set<string>();
182-
for await (const userSession of users) {
183-
const staleConnectionIds = userSession.connections.filter((conn) => conn._updatedAt < cutoff).map((conn) => conn.id);
184-
if (staleConnectionIds.length === 0) {
185-
continue;
186-
}
187-
const result = await UsersSessions.updateOne(
188-
{ _id: userSession._id },
189-
{
190-
$pull: {
191-
connections: { id: { $in: staleConnectionIds } },
177+
/**
178+
* Runs the cleanup job to remove stale connections and sync user status.
179+
*/
180+
async removeStaleConnections() {
181+
console.debug('[Cleanup] Starting stale connections cleanup job.');
182+
const cutoffDate = new Date(Date.now() - STALE_THRESHOLD);
183+
184+
// STEP 1: Find users who have AT LEAST one stale connection
185+
// We project the whole connections array because we need to inspect it in memory
186+
const cursor = UsersSessions.find({ 'connections._updatedAt': { $lte: cutoffDate } }, { projection: { _id: 1, connections: 1 } });
187+
188+
const bulkSessionOps: AnyBulkWriteOperation<IUserSession>[] = [];
189+
const bulkUserOps: AnyBulkWriteOperation<IUser>[] = [];
190+
const processedUserIds = [];
191+
192+
// STEP 2: Iterate and Calculate
193+
for await (const sessionDoc of cursor) {
194+
const userId = sessionDoc._id;
195+
const allConnections = sessionDoc.connections || [];
196+
197+
// Separate valid vs stale based on the cutoff
198+
const staleConnections = allConnections.filter((c) => c._updatedAt <= cutoffDate);
199+
const validConnections = allConnections.filter((c) => c._updatedAt > cutoffDate);
200+
201+
if (staleConnections.length === 0) continue; // Should not happen due to query, but safe to check
202+
203+
// Collect the IDs of the connections we want to remove
204+
const staleConnectionIds = staleConnections.map((c) => c.id);
205+
206+
// OPERATION A: Remove specific connections from usersSessions
207+
// We use the unique IDs to be surgically precise
208+
bulkSessionOps.push({
209+
updateOne: {
210+
filter: { _id: userId },
211+
update: {
212+
$pull: {
213+
connections: { id: { $in: staleConnectionIds } },
214+
},
192215
},
193216
},
194-
);
195-
if (result.modifiedCount > 0) {
196-
for (const id of staleConnectionIds) {
197-
lastConnectionStatusUpdate.delete(id);
198-
}
199-
affectedUsers.add(userSession._id);
217+
});
218+
219+
// OPERATION B: Update user status if they will have NO connections left
220+
if (validConnections.length === 0) {
221+
bulkUserOps.push({
222+
updateOne: {
223+
filter: { _id: userId },
224+
update: { $set: { status: UserStatus.OFFLINE } },
225+
},
226+
});
227+
processedUserIds.push(userId);
200228
}
201229
}
202230

203-
return Array.from(affectedUsers);
231+
// STEP 3: Execute the operations
232+
if (bulkSessionOps.length > 0) {
233+
await UsersSessions.col.bulkWrite(bulkSessionOps);
234+
console.log(`[Cleanup] Removed stale connections for ${bulkSessionOps.length} users.`);
235+
}
236+
237+
if (bulkUserOps.length > 0) {
238+
await Users.col.bulkWrite(bulkUserOps);
239+
console.log(`[Cleanup] Marked ${bulkUserOps.length} users as OFFLINE.`);
240+
}
241+
242+
console.debug(`[Cleanup] Finished stale connections cleanup job.`);
243+
244+
return processedUserIds;
245+
}
246+
247+
async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> {
248+
if (uid === 'rocketchat.internal.admin.test') {
249+
console.log('Admin detected, skipping removal of connection for testing purposes.');
250+
return;
251+
}
252+
if (!uid || !session) {
253+
return;
254+
}
255+
await UsersSessions.removeConnectionByConnectionId(session);
256+
257+
await this.updateUserPresence(uid);
258+
259+
return {
260+
uid,
261+
session,
262+
};
204263
}
205264

206265
async removeLostConnections(nodeID?: string): Promise<string[]> {
@@ -255,10 +314,7 @@ export class Presence extends ServiceClass implements IPresence {
255314
return !!result.modifiedCount;
256315
}
257316

258-
async setConnectionStatus(uid: string, session: string, status?: UserStatus): Promise<boolean> {
259-
if (!status && !shouldUpdateConnectionStatus(session)) {
260-
return false;
261-
}
317+
async setConnectionStatus(uid: string, status: UserStatus, session: string): Promise<boolean> {
262318
const result = await UsersSessions.updateConnectionStatusById(uid, session, status);
263319

264320
await this.updateUserPresence(uid);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/**
2+
* Maximum number of connections allowed.
3+
*/
4+
export const MAX_CONNECTIONS = 200;
5+
6+
/**
7+
* Interval to update connection status in milliseconds.
8+
*/
9+
export const UPDATE_INTERVAL = 60_000;
10+
11+
/**
12+
* Threshold to consider a connection as stale in milliseconds.
13+
*/
14+
export const STALE_THRESHOLD = 300_000;

ee/packages/presence/src/lib/processConnectionStatus.ts

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
import { UserStatus } from '@rocket.chat/core-typings';
1+
import { UserStatus, type IUserSessionConnection } from '@rocket.chat/core-typings';
2+
3+
import { STALE_THRESHOLD } from './constants';
24

35
/**
46
* Defines new connection status compared to a previous connection status
@@ -12,3 +14,43 @@ export const processConnectionStatus = (current: UserStatus, status: UserStatus)
1214
}
1315
return current;
1416
};
17+
18+
/**
19+
* Defines user's status based on presence and connection status
20+
*/
21+
export const processStatus = (statusConnection: UserStatus, statusDefault: UserStatus): UserStatus => {
22+
if (statusConnection === UserStatus.OFFLINE) {
23+
return statusConnection;
24+
}
25+
26+
if (statusDefault === UserStatus.ONLINE) {
27+
return statusConnection;
28+
}
29+
30+
return statusDefault;
31+
};
32+
33+
const isFresh = (updatedAt: Date): boolean => {
34+
return Date.now() - updatedAt.getTime() <= STALE_THRESHOLD;
35+
}
36+
37+
/**
38+
* Defines user's status and connection status based on user's connections and default status
39+
*/
40+
export const processPresenceAndStatus = (
41+
userSessions: IUserSessionConnection[] = [],
42+
statusDefault = UserStatus.ONLINE,
43+
): { status: UserStatus; statusConnection: UserStatus } => {
44+
const statusConnection = userSessions
45+
.filter((c) => isFresh(c._updatedAt))
46+
.map((s) => s.status)
47+
.reduce(processConnectionStatus, UserStatus.OFFLINE);
48+
49+
const status = processStatus(statusConnection, statusDefault);
50+
51+
return {
52+
status,
53+
statusConnection,
54+
};
55+
};
56+

ee/packages/presence/src/lib/processStatus.ts

Lines changed: 0 additions & 16 deletions
This file was deleted.

ee/packages/presence/src/processPresenceAndStatus.ts

Lines changed: 0 additions & 31 deletions
This file was deleted.

0 commit comments

Comments
 (0)