Skip to content

Commit b01f4d4

Browse files
committed
Fix a bug where tokens aren't replenished if there is no activity at all
1 parent dddf2cc commit b01f4d4

File tree

2 files changed

+67
-24
lines changed

2 files changed

+67
-24
lines changed

spec/aimd-bucket.spec.ts

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -513,45 +513,48 @@ describe("AIMDBucket", () => {
513513
bucket = new AIMDBucket({ initialRate: 10 });
514514
});
515515

516-
it("should process pending requests when tokens are completed with sufficient time gaps", async () => {
517-
// This test verifies the fix for the lockup bug where pending requests get stuck
516+
it("should process pending requests when no events trigger refill (lockup bug fix)", async () => {
518517
bucket = new AIMDBucket({
519-
initialRate: 4, // Start with low rate like user's case
520-
tokenReturnTimeoutMs: 10000, // Long timeout so tokens don't auto-expire
518+
initialRate: 3, // Low rate
519+
tokenReturnTimeoutMs: 30000, // Long timeout so no auto-timeouts interfere
521520
});
522521

523-
// Acquire tokens that will be resolved immediately (up to initial capacity)
524-
const immediateTokens = await Promise.all([bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire()]);
525-
expect(immediateTokens).toHaveLength(4);
522+
// Get initial 3 tokens immediately (up to initial capacity)
523+
const token1 = await bucket.acquire();
524+
const token2 = await bucket.acquire();
525+
const token3 = await bucket.acquire();
526526

527-
// These should become pending since we've exhausted the bucket
528-
const pendingTokenPromises = [bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire()];
527+
// Advance time to get 1 more token through refill
528+
await vi.advanceTimersByTimeAsync(334); // ~1/3 second = 1 token at rate 3
529+
const token4 = await bucket.acquire();
529530

530-
// Advance time slightly to see initial state
531-
await vi.advanceTimersByTimeAsync(100);
532531
let stats = bucket.getStatistics();
533532
expect(stats.tokensIssued).toBe(4);
534-
expect(stats.pendingCount).toBe(5);
533+
expect(stats.currentRate).toBe(3);
535534

536-
// Complete the immediate tokens successfully to trigger rate increase
537-
// Add some time between completions to allow bucket to refill
538-
for (let i = 0; i < immediateTokens.length; i++) {
539-
immediateTokens[i].success();
540-
await vi.advanceTimersByTimeAsync(500); // Allow some refill time
541-
}
535+
token1.success();
536+
token2.success();
537+
token3.success();
538+
token4.success();
542539

543-
// At this point, some pending requests should have been processed during token completions
540+
const pendingPromises = [bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire()];
541+
542+
await vi.advanceTimersByTimeAsync(50);
544543
stats = bucket.getStatistics();
544+
expect(stats.currentRate).toBe(3);
545+
expect(stats.tokensIssued).toBe(4);
545546
expect(stats.successCount).toBe(4);
547+
expect(stats.pendingCount).toBe(5);
546548

547-
// All pending requests should have been processed due to the fix
549+
// Now advance time significantly, but make NO new acquire() calls and don't complete any more tokens. This a lockup scenario. Time passes, bucket should refill, but pending requests never get processed because no events trigger _refill()
550+
await vi.advanceTimersByTimeAsync(5000); // 5 seconds = 15 tokens should be available
551+
552+
stats = bucket.getStatistics();
548553
expect(stats.pendingCount).toBe(0);
549-
expect(stats.tokensIssued).toBe(9); // 4 immediate + 5 pending
554+
expect(stats.tokensIssued).toBe(9);
550555

551-
// Verify that all pending promises were resolved
552-
const resolvedTokens = await Promise.all(pendingTokenPromises);
556+
const resolvedTokens = await Promise.all(pendingPromises);
553557
expect(resolvedTokens).toHaveLength(5);
554-
resolvedTokens.forEach((token) => expect(token).toBeInstanceOf(AIMDBucketToken));
555558
});
556559

557560
it("should process pending requests when tokens timeout automatically", async () => {

src/index.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ export class AIMDBucket {
153153
private tokensIssued = 0; // Only track total issued for reporting
154154
private pending: { resolve: (token: AIMDBucketToken) => void; reject: (error: Error) => void; timestamp: number }[] = [];
155155
private isShutdown = false;
156+
private pendingTimer?: NodeJS.Timeout;
156157

157158
private config: Required<AIMDBucketConfig>;
158159

@@ -215,6 +216,9 @@ export class AIMDBucket {
215216
},
216217
timestamp: Date.now(),
217218
});
219+
220+
// Set a one-shot timer to process pending requests if no events occur
221+
this._schedulePendingCheck();
218222
});
219223
}
220224
}
@@ -260,6 +264,12 @@ export class AIMDBucket {
260264
async shutdown(): Promise<void> {
261265
this.isShutdown = true;
262266

267+
// Clear pending timer
268+
if (this.pendingTimer) {
269+
clearTimeout(this.pendingTimer);
270+
this.pendingTimer = undefined;
271+
}
272+
263273
// Reject all pending acquisitions
264274
const pendingRequests = [...this.pending];
265275
this.pending = [];
@@ -318,6 +328,36 @@ export class AIMDBucket {
318328
this.tokensIssued++;
319329
request.resolve(new AIMDBucketToken(this, this.config.tokenReturnTimeoutMs));
320330
}
331+
332+
// Clear pending timer if no more pending requests
333+
if (this.pending.length === 0 && this.pendingTimer) {
334+
clearTimeout(this.pendingTimer);
335+
this.pendingTimer = undefined;
336+
}
337+
}
338+
339+
/**
340+
* Schedule a one-shot timer to process pending requests if no events occur
341+
*/
342+
private _schedulePendingCheck(): void {
343+
// Don't schedule if already scheduled or shutting down
344+
if (this.pendingTimer || this.isShutdown) {
345+
return;
346+
}
347+
348+
// Calculate next expected refill time based on current rate
349+
// At minimum, check every 100ms, but ideally check when next token should be available
350+
const nextTokenTime = Math.max(100, 1000 / this.rate);
351+
352+
this.pendingTimer = setTimeout(() => {
353+
this.pendingTimer = undefined;
354+
this._refill();
355+
356+
// If there are still pending requests after refill, schedule another check
357+
if (this.pending.length > 0 && !this.isShutdown) {
358+
this._schedulePendingCheck();
359+
}
360+
}, nextTokenTime).unref(); // Use unref() so this doesn't keep the process alive
321361
}
322362

323363
private _adjustRate(): void {

0 commit comments

Comments
 (0)