Skip to content

close method of the AMQConnection class #1082

@tlglhlchat

Description

@tlglhlchat

Describe the bug

Regarding the close method of the AMQConnection class in RabbitMQ 5.9.x-stable version,
when the value of sync is false, the channels managed by the ChannelManager class are not destroyed.
When using RabbitMQ in nio mode, calling the close method will cause the event loop thread to block.
This is the method details:

public void close(int closeCode,
                      String closeMessage,
                      boolean initiatedByApplication,
                      Throwable cause,
                      int timeout,
                      boolean abort)
        throws IOException
    {
        boolean sync = !(Thread.currentThread() == mainLoopThread);
        try {
            AMQP.Connection.Close reason =
                new AMQP.Connection.Close.Builder()
                    .replyCode(closeCode)
                    .replyText(closeMessage)
                .build();
            final ShutdownSignalException sse = startShutdown(reason, initiatedByApplication, cause, true);
            if(sync){
                BlockingRpcContinuation<AMQCommand> k = new BlockingRpcContinuation<AMQCommand>(){
                    @Override
                    public AMQCommand transformReply(AMQCommand command) {
                        AMQConnection.this.finishShutdown(sse);
                        return command;
                    }};

              _channel0.quiescingRpc(reason, k);
              k.getReply(timeout);
            } else {
              _channel0.quiescingTransmit(reason);
            }
        } catch (TimeoutException tte) {
            if (!abort) {
                ShutdownSignalException sse = new ShutdownSignalException(true, true, null, this);
                sse.initCause(cause);
                throw sse;
            }
        } catch (ShutdownSignalException sse) {
            if (!abort)
                throw sse;
        } catch (IOException ioe) {
            if (!abort)
                throw ioe;
        } finally {
            if(sync) {
                _frameHandler.close();
            }
        }
    }

Reproduction steps

1.In the "processControlCommand" method of the "AMQConnection" class, when the "AMQP.Connection.Blocked" command is received, the "handleBlocked" method of the "BlockedListener" class is called. If an exception occurs at this point, it will trigger a call to the "close" method.
This is the method details:

public boolean processControlCommand(Command c) throws IOException
    {
        // Similar trick to ChannelN.processAsync used here, except
        // we're interested in whole-connection quiescing.

        // See the detailed comments in ChannelN.processAsync.

        Method method = c.getMethod();

        if (isOpen()) {
            if (method instanceof AMQP.Connection.Close) {
                handleConnectionClose(c);
                return true;
            } else if (method instanceof AMQP.Connection.Blocked) {
                AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;
                try {
                    for (BlockedListener l : this.blockedListeners) {
                        l.handleBlocked(blocked.getReason());
                    }
                } catch (Throwable ex) {
                    getExceptionHandler().handleBlockedListenerException(this, ex);
                }
                return true;
            } else if (method instanceof AMQP.Connection.Unblocked) {
                try {
                    for (BlockedListener l : this.blockedListeners) {
                        l.handleUnblocked();
                    }
                } catch (Throwable ex) {
                    getExceptionHandler().handleBlockedListenerException(this, ex);
                }
                return true;
            } else {
                return false;
            }
        } else {
            if (method instanceof AMQP.Connection.Close) {
                // Already shutting down, so just send back a CloseOk.
                try {
                    _channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());
                } catch (IOException ignored) { } // ignore
                return true;
            } else if (method instanceof AMQP.Connection.CloseOk) {
                // It's our final "RPC". Time to shut down.
                _running = false;
                // If Close was sent from within the MainLoop we
                // will not have a continuation to return to, so
                // we treat this as processed in that case.
                return !_channel0.isOutstandingRpc();
            } else { // Ignore all others.
                return true;
            }
        }
    }

Expected behavior

Non-blocking event loop thread.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions