Skip to content
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
222943a
fix: d
cardoso Nov 18, 2025
b2818bd
fix: renew with ddp over rest
cardoso Nov 18, 2025
2c79e92
chore: improve types
cardoso Nov 18, 2025
cb85c50
refactor: take userId in renewConnection
cardoso Nov 19, 2025
818db84
fix: remove expired connections on renew
cardoso Nov 19, 2025
bcda88d
refactor: simplify logic & separate tests
cardoso Nov 21, 2025
926642b
chore: add changeset
cardoso Nov 24, 2025
d75b0cf
Merge branch 'develop' into fix/presence-dangling
cardoso Nov 27, 2025
b03787e
Merge branch 'develop' into fix/presence-dangling
cardoso Nov 28, 2025
a7f2be2
Merge branch 'develop' into fix/presence-dangling
cardoso Nov 28, 2025
6bde2cf
refactor: remove UserPresence:ping
cardoso Nov 28, 2025
d04d556
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 1, 2025
d842b11
fix: meteor types
cardoso Dec 1, 2025
992a691
chore: updateConnectionStatus throttle
cardoso Dec 1, 2025
e542ec5
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 1, 2025
7f7a552
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 1, 2025
1e7a950
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 2, 2025
62d2466
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 2, 2025
e1a7a4f
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 2, 2025
d56714a
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 2, 2025
7d3d039
chore: normalize monolith & microservice behavior
cardoso Dec 2, 2025
2e3592d
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 3, 2025
628b508
fix: clean up throttle tracking for connections
cardoso Dec 3, 2025
1688b9c
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 3, 2025
049ef81
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 3, 2025
bcf2d04
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 3, 2025
a65f79b
fix: periodically remove stale connections
cardoso Dec 3, 2025
682230a
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 4, 2025
efff1b3
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 4, 2025
60f6657
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 4, 2025
3f106f3
refactor: move throttling logic to presence service
cardoso Dec 4, 2025
8db5df2
refactor: separate cleaning logic into its own class
cardoso Dec 4, 2025
5112f8a
fix: only update connection if no packet was seen
cardoso Dec 4, 2025
e7dbb88
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 4, 2025
0ab1249
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 5, 2025
ff983de
chore: remove comments & unused properties
cardoso Dec 5, 2025
1675085
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 5, 2025
96b25ce
improve: presence reaper robustness and connection updates
cardoso Dec 5, 2025
3b0c9d0
fix(ddp-streamer): only update connection on heartbeat
cardoso Dec 5, 2025
964f024
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 5, 2025
2926cfe
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 6, 2025
5633225
chore: remove test code
cardoso Dec 6, 2025
d3b27fb
Merge branch 'develop' into fix/presence-dangling
cardoso Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/spicy-nails-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@rocket.chat/meteor": patch
"@rocket.chat/core-services": patch
"@rocket.chat/model-typings": patch
"@rocket.chat/models": patch
"@rocket.chat/ddp-streamer": patch
"@rocket.chat/presence": patch
---

Fixes user status inaccuracy by refreshing active connections and filtering out the stale ones.
64 changes: 64 additions & 0 deletions apps/meteor/definition/externals/meteor/ddp-common.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,69 @@ declare module 'meteor/ddp-common' {
userId?: string;
});
}

/**
* Heartbeat options
*/
type HeartbeatOptions = {
/**
* interval to send pings, in milliseconds
*/
heartbeatInterval: number;
/**
* timeout to close the connection if a reply isn't received, in milliseconds.
*/
heartbeatTimeout: number;
/**
* function to call to send a ping on the connection.
*/
sendPing: () => void;
/**
* function to call to close the connection
*/
onTimeout: () => void;
};

class Heartbeat {
heartbeatInterval: number;

heartbeatTimeout: number;

_sendPing: () => void;

_onTimeout: () => void;

_seenPacket: boolean;

_heartbeatIntervalHandle: ReturnType<typeof setTimeout> | null;

_heartbeatTimeoutHandle: ReturnType<typeof setTimeout> | null;

constructor(options: HeartbeatOptions);

stop(): void;

start(): void;

_startHeartbeatIntervalTimer(): void;

_startHeartbeatTimeoutTimer(): void;

_clearHeartbeatIntervalTimer(): void;

_clearHeartbeatTimeoutTimer(): void;

/**
* The heartbeat interval timer is fired when we should send a ping.
*/
_heartbeatIntervalFired(): void;

/**
* The heartbeat timeout timer is fired when we sent a ping, but we timed out waiting for the pong.
*/
_heartbeatTimeoutFired(): void;

messageReceived(): void;
}
}
}
9 changes: 7 additions & 2 deletions apps/meteor/definition/externals/meteor/meteor.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'meteor/meteor';
import type { ServerMethods } from '@rocket.chat/ddp-client';
import type { IStreamerConstructor, IStreamer } from 'meteor/rocketchat:streamer';
import type { DDPCommon, IStreamerConstructor, IStreamer } from 'meteor/ddp-common';

type StringifyBuffers<T extends unknown[]> = {
[P in keyof T]: T[P] extends Buffer ? string : T[P];
Expand Down Expand Up @@ -39,7 +39,12 @@ declare module 'meteor/meteor' {
isDesktop: () => boolean;
}

const server: any;
const server: {
sessions: Map<string, { userId: string; heartbeat: DDPCommon.Heartbeat }>;
publish_handlers: {
meteor_autoupdate_clientVersions(): void;
};
};

const runAsUser: <T>(userId: string, scope: () => T) => T;

Expand Down
10 changes: 10 additions & 0 deletions apps/meteor/ee/server/startup/presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ Meteor.startup(() => {
return;
}

const _messageReceived = session.heartbeat.messageReceived.bind(session.heartbeat);
session.heartbeat.messageReceived = function messageReceived() {
if (this._seenPacket === false) {
void Presence.updateConnection(login.user._id, login.connection.id).catch((err) => {
console.error('Error updating connection presence on heartbeat:', err);
});
}
return _messageReceived();
};

void (async function () {
await Presence.newConnection(login.user._id, login.connection.id, nodeId);
updateConns();
Expand Down
19 changes: 19 additions & 0 deletions ee/apps/ddp-streamer/src/Client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EventEmitter } from 'events';
import type { IncomingMessage } from 'http';

import { Presence } from '@rocket.chat/core-services';
import type { ISocketConnection } from '@rocket.chat/core-typings';
import { v1 as uuidv1 } from 'uuid';
import type WebSocket from 'ws';
Expand Down Expand Up @@ -73,6 +74,8 @@ export class Client extends EventEmitter {

public userToken?: string;

private _seenPacket = true;

constructor(
public ws: WebSocket,
public meteorClient = false,
Expand Down Expand Up @@ -179,6 +182,18 @@ export class Client extends EventEmitter {
this.ws.close(WS_ERRORS.TIMEOUT, WS_ERRORS_MESSAGES.TIMEOUT);
};

private messageReceived = (): void => {
if (this._seenPacket || !this.userId) {
this._seenPacket = true;
return;
}

this._seenPacket = true;
void Presence.updateConnection(this.userId, this.connection.id).catch((err) => {
console.error('Error updating connection presence after heartbeat:', err);
});
};

ping(id?: string): void {
this.send(server.serialize({ [DDP_EVENTS.MSG]: DDP_EVENTS.PING, ...(id && { [DDP_EVENTS.ID]: id }) }));
}
Expand All @@ -188,6 +203,9 @@ export class Client extends EventEmitter {
}

handleIdle = (): void => {
if (this.userId) {
this._seenPacket = false;
}
this.ping();
this.timeout = setTimeout(this.closeTimeout, TIMEOUT);
};
Expand All @@ -200,6 +218,7 @@ export class Client extends EventEmitter {
handler = async (payload: WebSocket.Data, isBinary: boolean): Promise<void> => {
try {
const packet = server.parse(payload, isBinary);
this.messageReceived();
this.emit('message', packet);
if (this.wait) {
return new Promise((resolve) => this.once(DDP_EVENTS.LOGGED, () => resolve(this.process(packet.msg, packet))));
Expand Down
45 changes: 45 additions & 0 deletions ee/packages/presence/src/Presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { IUser } from '@rocket.chat/core-typings';
import { UserStatus } from '@rocket.chat/core-typings';
import { Settings, Users, UsersSessions } from '@rocket.chat/models';

import { PresenceReaper } from './lib/PresenceReaper';
import { processPresenceAndStatus } from './lib/processConnectionStatus';

const MAX_CONNECTIONS = 200;
Expand All @@ -25,9 +26,18 @@ export class Presence extends ServiceClass implements IPresence {

private peakConnections = 0;

private reaper: PresenceReaper;

constructor() {
super();

this.reaper = new PresenceReaper({
usersSessions: UsersSessions,
batchSize: 500,
staleThresholdMs: 5 * 60 * 1000, // 5 minutes
onUpdate: (userIds) => this.handleReaperUpdates(userIds),
});

this.onEvent('watch.instanceStatus', async ({ clientAction, id, diff }): Promise<void> => {
if (clientAction === 'removed') {
this.connsPerInstance.delete(id);
Expand Down Expand Up @@ -73,6 +83,7 @@ export class Presence extends ServiceClass implements IPresence {
}

async started(): Promise<void> {
this.reaper.start();
this.lostConTimeout = setTimeout(async () => {
const affectedUsers = await this.removeLostConnections();
return affectedUsers.forEach((uid) => this.updateUserPresence(uid));
Expand All @@ -89,7 +100,13 @@ export class Presence extends ServiceClass implements IPresence {
}
}

private async handleReaperUpdates(userIds: string[]): Promise<void> {
console.log(`[PresenceReaper] Updating presence for ${userIds.length} users due to stale connections.`);
await Promise.all(userIds.map((uid) => this.updateUserPresence(uid)));
}

async stopped(): Promise<void> {
this.reaper.stop();
if (!this.lostConTimeout) {
return;
}
Expand Down Expand Up @@ -137,7 +154,35 @@ export class Presence extends ServiceClass implements IPresence {
};
}

async updateConnection(uid: string, connectionId: string): Promise<{ uid: string; connectionId: string } | undefined> {
console.debug(`Updating connection for user ${uid} and connection ${connectionId}`);

const query = {
'_id': uid,
'connections.id': connectionId,
};

const update = {
$set: {
'connections.$._updatedAt': new Date(),
},
};

const result = await UsersSessions.updateOne(query, update);
if (result.modifiedCount === 0) {
return;
}

await this.updateUserPresence(uid);

return { uid, connectionId };
}

async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> {
if (uid === 'rocketchat.internal.admin.test') {
console.log('Admin detected, skipping removal of connection for testing purposes.');
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Test-specific logic in production code path.

The hardcoded check for 'rocketchat.internal.admin.test' is a test-specific workaround in the production code path. This violates separation of concerns and could cause confusion or unintended behavior if this user ID appears in production.

Consider one of these alternatives:

  • Use dependency injection to provide a test-specific implementation of the removal logic
  • Configure excluded user IDs via environment variables or configuration
  • Mock the removeConnection method entirely in tests rather than relying on special-case logic in the implementation
 async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> {
-		if (uid === 'rocketchat.internal.admin.test') {
-			console.log('Admin detected, skipping removal of connection for testing purposes.');
-			return;
-		}
 		if (!uid || !session) {
 			return;
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (uid === 'rocketchat.internal.admin.test') {
console.log('Admin detected, skipping removal of connection for testing purposes.');
return;
}
async removeConnection(uid: string | undefined, session: string | undefined): Promise<{ uid: string; session: string } | undefined> {
if (!uid || !session) {
return;
}
🤖 Prompt for AI Agents
In ee/packages/presence/src/Presence.ts around lines 182 to 185, remove the
hardcoded test-specific check for 'rocketchat.internal.admin.test' and replace
it with a testable, configurable approach: accept an injected predicate or
configuration (e.g., an excludedUserIds set or a shouldSkipRemoval(userId)
function) on the Presence instance, defaulting to none in production; update
tests to inject a predicate or mock the removeConnection method instead of
relying on the special-case; ensure the new configuration is optional and
documented so production behavior remains unchanged when not provided.

if (!uid || !session) {
return;
}
Expand Down
Loading
Loading