-
Notifications
You must be signed in to change notification settings - Fork 587
Closed
Description
Describe the bug
Regarding the close method of the AMQConnection class in RabbitMQ 5.9.x-stable version,
when the value of sync is false, the channels managed by the ChannelManager class are not destroyed.
When using RabbitMQ in nio mode, calling the close method will cause the event loop thread to block.
This is the method details:
public void close(int closeCode,
String closeMessage,
boolean initiatedByApplication,
Throwable cause,
int timeout,
boolean abort)
throws IOException
{
boolean sync = !(Thread.currentThread() == mainLoopThread);
try {
AMQP.Connection.Close reason =
new AMQP.Connection.Close.Builder()
.replyCode(closeCode)
.replyText(closeMessage)
.build();
final ShutdownSignalException sse = startShutdown(reason, initiatedByApplication, cause, true);
if(sync){
BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
@Override
public AMQCommand transformReply(AMQCommand command) {
AMQConnection.this.finishShutdown(sse);
return command;
}};
_channel0.quiescingRpc(reason, k);
k.getReply(timeout);
} else {
_channel0.quiescingTransmit(reason);
}
} catch (TimeoutException tte) {
if (!abort) {
ShutdownSignalException sse = new ShutdownSignalException(true, true, null, this);
sse.initCause(cause);
throw sse;
}
} catch (ShutdownSignalException sse) {
if (!abort)
throw sse;
} catch (IOException ioe) {
if (!abort)
throw ioe;
} finally {
if(sync) {
_frameHandler.close();
}
}
}
Reproduction steps
1.In the "processControlCommand" method of the "AMQConnection" class, when the "AMQP.Connection.Blocked" command is received, the "handleBlocked" method of the "BlockedListener" class is called. If an exception occurs at this point, it will trigger a call to the "close" method.
This is the method details:
public boolean processControlCommand(Command c) throws IOException
{
// Similar trick to ChannelN.processAsync used here, except
// we're interested in whole-connection quiescing.
// See the detailed comments in ChannelN.processAsync.
Method method = c.getMethod();
if (isOpen()) {
if (method instanceof AMQP.Connection.Close) {
handleConnectionClose(c);
return true;
} else if (method instanceof AMQP.Connection.Blocked) {
AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;
try {
for (BlockedListener l : this.blockedListeners) {
l.handleBlocked(blocked.getReason());
}
} catch (Throwable ex) {
getExceptionHandler().handleBlockedListenerException(this, ex);
}
return true;
} else if (method instanceof AMQP.Connection.Unblocked) {
try {
for (BlockedListener l : this.blockedListeners) {
l.handleUnblocked();
}
} catch (Throwable ex) {
getExceptionHandler().handleBlockedListenerException(this, ex);
}
return true;
} else {
return false;
}
} else {
if (method instanceof AMQP.Connection.Close) {
// Already shutting down, so just send back a CloseOk.
try {
_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
} catch (IOException ignored) { } // ignore
return true;
} else if (method instanceof AMQP.Connection.CloseOk) {
// It's our final "RPC". Time to shut down.
_running = false;
// If Close was sent from within the MainLoop we
// will not have a continuation to return to, so
// we treat this as processed in that case.
return !_channel0.isOutstandingRpc();
} else { // Ignore all others.
return true;
}
}
}
Expected behavior
Non-blocking event loop thread.
Additional context
No response
Metadata
Metadata
Assignees
Labels
No labels