Skip to content

Commit dddf2cc

Browse files
committed
Fix bug where returning tokens didn't always dispatch new ones
1 parent f4ada15 commit dddf2cc

File tree

2 files changed

+218
-6
lines changed

2 files changed

+218
-6
lines changed

spec/aimd-bucket.spec.ts

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,47 @@ describe("AIMDBucket", () => {
513513
bucket = new AIMDBucket({ initialRate: 10 });
514514
});
515515

516+
it("should process pending requests when tokens are completed with sufficient time gaps", async () => {
517+
// This test verifies the fix for the lockup bug where pending requests get stuck
518+
bucket = new AIMDBucket({
519+
initialRate: 4, // Start with low rate like user's case
520+
tokenReturnTimeoutMs: 10000, // Long timeout so tokens don't auto-expire
521+
});
522+
523+
// Acquire tokens that will be resolved immediately (up to initial capacity)
524+
const immediateTokens = await Promise.all([bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire()]);
525+
expect(immediateTokens).toHaveLength(4);
526+
527+
// These should become pending since we've exhausted the bucket
528+
const pendingTokenPromises = [bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire()];
529+
530+
// Advance time slightly to see initial state
531+
await vi.advanceTimersByTimeAsync(100);
532+
let stats = bucket.getStatistics();
533+
expect(stats.tokensIssued).toBe(4);
534+
expect(stats.pendingCount).toBe(5);
535+
536+
// Complete the immediate tokens successfully to trigger rate increase
537+
// Add some time between completions to allow bucket to refill
538+
for (let i = 0; i < immediateTokens.length; i++) {
539+
immediateTokens[i].success();
540+
await vi.advanceTimersByTimeAsync(500); // Allow some refill time
541+
}
542+
543+
// At this point, some pending requests should have been processed during token completions
544+
stats = bucket.getStatistics();
545+
expect(stats.successCount).toBe(4);
546+
547+
// All pending requests should have been processed due to the fix
548+
expect(stats.pendingCount).toBe(0);
549+
expect(stats.tokensIssued).toBe(9); // 4 immediate + 5 pending
550+
551+
// Verify that all pending promises were resolved
552+
const resolvedTokens = await Promise.all(pendingTokenPromises);
553+
expect(resolvedTokens).toHaveLength(5);
554+
resolvedTokens.forEach((token) => expect(token).toBeInstanceOf(AIMDBucketToken));
555+
});
556+
516557
it("should process pending requests when tokens timeout automatically", async () => {
517558
// This test reproduces the lockup bug
518559
bucket = new AIMDBucket({
@@ -628,6 +669,177 @@ describe("AIMDBucket", () => {
628669
const finalStats = bucket.getStatistics();
629670
expect(finalStats.tokensIssued).toBe(initialStats.tokensIssued + 13);
630671
});
672+
673+
it("should process pending requests when new acquire call triggers processing", async () => {
674+
bucket = new AIMDBucket({
675+
initialRate: 2,
676+
tokenReturnTimeoutMs: 5000,
677+
});
678+
679+
// Exhaust the bucket completely
680+
const immediateTokens = await Promise.all([bucket.acquire(), bucket.acquire()]);
681+
682+
// Create pending requests
683+
const pendingPromises = [bucket.acquire(), bucket.acquire(), bucket.acquire()];
684+
685+
// Advance time slightly to ensure requests are pending
686+
await vi.advanceTimersByTimeAsync(100);
687+
let stats = bucket.getStatistics();
688+
expect(stats.pendingCount).toBe(3);
689+
690+
// Complete immediate tokens with time gaps to trigger processing
691+
for (let i = 0; i < immediateTokens.length; i++) {
692+
immediateTokens[i].success();
693+
await vi.advanceTimersByTimeAsync(1000); // Allow refill time
694+
}
695+
696+
// Now make a new acquire call which should trigger processing of old pending requests
697+
const newToken = await bucket.acquire();
698+
expect(newToken).toBeInstanceOf(AIMDBucketToken);
699+
700+
// Verify pending requests were processed
701+
const resolvedTokens = await Promise.all(pendingPromises);
702+
expect(resolvedTokens).toHaveLength(3);
703+
704+
stats = bucket.getStatistics();
705+
expect(stats.pendingCount).toBe(0);
706+
});
707+
708+
it("should handle mixed fast and slow token completion patterns", async () => {
709+
bucket = new AIMDBucket({
710+
initialRate: 3,
711+
tokenReturnTimeoutMs: 10000,
712+
});
713+
714+
// Get initial tokens
715+
const tokens = await Promise.all([bucket.acquire(), bucket.acquire(), bucket.acquire()]);
716+
717+
// Create pending requests
718+
const pendingPromises = [bucket.acquire(), bucket.acquire(), bucket.acquire(), bucket.acquire()];
719+
720+
// Complete tokens with mixed timing
721+
tokens[0].success(); // Fast completion
722+
await vi.advanceTimersByTimeAsync(500);
723+
tokens[1].success(); // Medium completion
724+
await vi.advanceTimersByTimeAsync(1500);
725+
tokens[2].success(); // Slow completion
726+
727+
// All pending should be processed due to refill over time
728+
const resolvedTokens = await Promise.all(pendingPromises);
729+
expect(resolvedTokens).toHaveLength(4);
730+
731+
const stats = bucket.getStatistics();
732+
expect(stats.pendingCount).toBe(0);
733+
expect(stats.tokensIssued).toBe(7);
734+
});
735+
736+
it("should handle low rates with proper token completion timing", async () => {
737+
bucket = new AIMDBucket({
738+
initialRate: 2,
739+
tokenReturnTimeoutMs: 5000,
740+
});
741+
742+
// Should get first tokens immediately
743+
const firstTokens = await Promise.all([bucket.acquire(), bucket.acquire()]);
744+
expect(firstTokens).toHaveLength(2);
745+
746+
// These should be pending
747+
const pendingPromises = [bucket.acquire(), bucket.acquire()];
748+
749+
await vi.advanceTimersByTimeAsync(100);
750+
let stats = bucket.getStatistics();
751+
expect(stats.pendingCount).toBe(2);
752+
753+
// Complete first tokens with time gaps to allow refill and processing
754+
for (let i = 0; i < firstTokens.length; i++) {
755+
firstTokens[i].success();
756+
await vi.advanceTimersByTimeAsync(1000); // Allow time for refill
757+
}
758+
759+
// Should have processed the pending requests
760+
const resolvedTokens = await Promise.all(pendingPromises);
761+
expect(resolvedTokens).toHaveLength(2);
762+
763+
stats = bucket.getStatistics();
764+
expect(stats.pendingCount).toBe(0);
765+
});
766+
767+
it("should handle burst of requests with new acquire triggering processing", async () => {
768+
bucket = new AIMDBucket({
769+
initialRate: 3,
770+
tokenReturnTimeoutMs: 8000,
771+
});
772+
773+
// Create a burst of requests
774+
const burstPromises = Array.from({ length: 6 }, () => bucket.acquire());
775+
776+
await vi.advanceTimersByTimeAsync(100);
777+
let stats = bucket.getStatistics();
778+
expect(stats.tokensIssued).toBe(3); // Only initial capacity issued
779+
expect(stats.pendingCount).toBe(3); // Rest are pending
780+
781+
// Complete some initial tokens with time gaps for refill
782+
const immediateTokens = await Promise.all(burstPromises.slice(0, 3));
783+
for (let i = 0; i < immediateTokens.length; i++) {
784+
immediateTokens[i].success();
785+
await vi.advanceTimersByTimeAsync(500);
786+
}
787+
788+
// Now make a single new request which should trigger processing of remaining pending
789+
const newToken = await bucket.acquire();
790+
expect(newToken).toBeInstanceOf(AIMDBucketToken);
791+
792+
// All burst requests should be processed
793+
const resolvedTokens = await Promise.all(burstPromises);
794+
expect(resolvedTokens).toHaveLength(6);
795+
796+
stats = bucket.getStatistics();
797+
expect(stats.pendingCount).toBe(0);
798+
});
799+
800+
it("should handle token timeouts gracefully", async () => {
801+
bucket = new AIMDBucket({
802+
initialRate: 2,
803+
tokenReturnTimeoutMs: 1000, // Short timeout
804+
});
805+
806+
// Get initial tokens but don't complete them (let them timeout)
807+
const initialTokens = await Promise.all([bucket.acquire(), bucket.acquire()]);
808+
809+
// Advance past token timeout
810+
await vi.advanceTimersByTimeAsync(1200);
811+
812+
const stats = bucket.getStatistics();
813+
expect(stats.timeoutCount).toBe(2); // Initial tokens timed out
814+
expect(stats.tokensIssued).toBe(2); // Only initial tokens were issued
815+
});
816+
817+
it("should process all pending requests when bucket refills", async () => {
818+
bucket = new AIMDBucket({
819+
initialRate: 2,
820+
tokenReturnTimeoutMs: 5000,
821+
});
822+
823+
// Get first tokens
824+
const firstTokens = await Promise.all([bucket.acquire(), bucket.acquire()]);
825+
826+
// Create pending requests
827+
const pendingPromises = [bucket.acquire(), bucket.acquire()];
828+
829+
// Complete first tokens with time to trigger processing
830+
for (let i = 0; i < firstTokens.length; i++) {
831+
firstTokens[i].success();
832+
await vi.advanceTimersByTimeAsync(1000);
833+
}
834+
835+
// All pending requests should be processed and resolved
836+
const resolvedTokens = await Promise.all(pendingPromises);
837+
expect(resolvedTokens).toHaveLength(2);
838+
resolvedTokens.forEach((token) => expect(token).toBeInstanceOf(AIMDBucketToken));
839+
840+
const stats = bucket.getStatistics();
841+
expect(stats.pendingCount).toBe(0);
842+
});
631843
});
632844
});
633845

src/index.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ export class AIMDBucket {
184184
throw new Error("Bucket has been shut down");
185185
}
186186

187+
// Update token count and process any pending requests that can now be fulfilled
187188
this._refill();
188189

189190
if (this.tokens >= 1) {
@@ -275,7 +276,7 @@ export class AIMDBucket {
275276
this.recentOutcomes.push({ timestamp: Date.now(), outcome });
276277
this._adjustRate();
277278
// Process pending requests in case rate adjustment or time passage made tokens available
278-
this._processPending();
279+
this._refill();
279280
}
280281

281282
/**
@@ -285,7 +286,7 @@ export class AIMDBucket {
285286
this.recentOutcomes.push({ timestamp: Date.now(), outcome: "timeout" });
286287
this._adjustRate();
287288
// Process pending requests in case rate adjustment or time passage made tokens available
288-
this._processPending();
289+
this._refill();
289290
}
290291

291292
private _validate(): void {
@@ -302,15 +303,14 @@ export class AIMDBucket {
302303
}
303304
}
304305

306+
/**
307+
* Update token count based on elapsed time and process any pending requests
308+
*/
305309
private _refill(): void {
306310
const now = Date.now();
307311
const elapsed = (now - this.lastRefill) / 1000;
308312
this.tokens = Math.min(this.rate, this.tokens + elapsed * this.rate);
309313
this.lastRefill = now;
310-
}
311-
312-
private _processPending(): void {
313-
this._refill();
314314

315315
while (this.pending.length > 0 && this.tokens >= 1) {
316316
const request = this.pending.shift()!;

0 commit comments

Comments
 (0)