Skip to content

Commit 24cded1

Browse files
authored
Fix hanging graceful channel shutdown by cleaning up state. (#122)
Corrects two scenarios where stale state in the load balancer could prevent a graceful channel shutdown. Previously, multiple endpoint updates or changes to the load balancer could leave undiscarded subchannels or old load balancers in the channel's state, blocking a clean shutdown. This change ensures that previous subchannels and load balancers are fully removed from state when they shutdown. This also corrects a related test, which had been passing incorrectly due to an explicit cancellation masking the issue.
1 parent 4f30eed commit 24cded1

File tree

4 files changed

+29
-11
lines changed

4 files changed

+29
-11
lines changed

Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,14 @@ extension GRPCChannel.StateMachine {
834834
case .idle, .connecting, .transientFailure, .shutdown:
835835
()
836836
}
837+
} else {
838+
// In this case the LB is neither current nor next.
839+
switch connectivityState {
840+
case .shutdown:
841+
state.past.removeValue(forKey: id)
842+
case .idle, .connecting, .ready, .transientFailure:
843+
()
844+
}
837845
}
838846

839847
self.state = .running(state)

Sources/GRPCNIOTransportCore/Client/Connection/LoadBalancers/PickFirstLoadBalancer.swift

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -395,15 +395,13 @@ extension PickFirstLoadBalancer.State.Active {
395395
} else {
396396
onUpdate = .publishStateChange(connectivityState)
397397
}
398-
399-
self.current = next
400-
self.isCurrentGoingAway = false
401398
} else {
402399
// No state change to publish, just roll over.
403400
onUpdate = self.current.map { .close($0) } ?? .none
404-
self.current = next
405-
self.isCurrentGoingAway = false
406401
}
402+
self.current = next
403+
self.next = nil
404+
self.isCurrentGoingAway = false
407405

408406
case .idle, .connecting, .transientFailure, .shutdown:
409407
onUpdate = .none

Tests/GRPCNIOTransportCoreTests/Client/Connection/GRPCChannelTests.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -619,13 +619,14 @@ final class GRPCChannelTests: XCTestCase {
619619

620620
channel.beginGracefulShutdown()
621621

622-
case .shutdown:
623-
group.cancelAll()
624-
625622
default:
626623
()
627624
}
628625
}
626+
627+
let result = await group.nextResult()!
628+
group.cancelAll()
629+
return try result.get()
629630
}
630631
}
631632

Tests/GRPCNIOTransportCoreTests/Client/Connection/LoadBalancers/PickFirstLoadBalancerTests.swift

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ final class PickFirstLoadBalancerTests: XCTestCase {
9696
}
9797

9898
func testEndpointUpdateHandledGracefully() async throws {
99-
try await LoadBalancerTest.pickFirst(servers: 2, connector: .posix()) { context, event in
99+
try await LoadBalancerTest.pickFirst(servers: 3, connector: .posix()) { context, event in
100100
switch event {
101101
case .connectivityStateChanged(.idle):
102102
let endpoint = Endpoint(addresses: [context.servers[0].address])
@@ -109,14 +109,25 @@ final class PickFirstLoadBalancerTests: XCTestCase {
109109
}
110110

111111
// Update the endpoint so that it contains server-1.
112-
let endpoint = Endpoint(addresses: [context.servers[1].address])
113-
context.pickFirst!.updateEndpoint(endpoint)
112+
let endpoint1 = Endpoint(addresses: [context.servers[1].address])
113+
context.pickFirst!.updateEndpoint(endpoint1)
114114

115115
// Should remain in the ready state
116116
try await XCTPoll(every: .milliseconds(10)) {
117117
context.servers[0].server.clients.isEmpty && context.servers[1].server.clients.count == 1
118118
}
119119

120+
// Update the endpoint so that it contains server-2.
121+
let endpoint2 = Endpoint(addresses: [context.servers[2].address])
122+
context.pickFirst!.updateEndpoint(endpoint2)
123+
124+
// Should remain in the ready state
125+
try await XCTPoll(every: .milliseconds(10)) {
126+
context.servers[0].server.clients.isEmpty
127+
&& context.servers[1].server.clients.isEmpty
128+
&& context.servers[2].server.clients.count == 1
129+
}
130+
120131
context.loadBalancer.close()
121132

122133
default:

0 commit comments

Comments
 (0)