This commit is contained in:
Justin Bertram 2018-05-04 14:27:43 -05:00
commit 950698960a
1 changed files with 7 additions and 24 deletions

View File

@ -30,7 +30,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.handler.ssl.SslHandler;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -238,12 +237,10 @@ public class NettyConnection implements Connection {
boolean inEventLoop = eventLoop.inEventLoop();
//if we are in an event loop we need to close the channel after the writes have finished
if (!inEventLoop) {
final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
closeSSLAndChannel(sslHandler, channel, false);
closeChannel(channel, false);
} else {
eventLoop.execute(() -> {
final SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
closeSSLAndChannel(sslHandler, channel, true);
closeChannel(channel, true);
});
}
@ -555,26 +552,12 @@ public class NettyConnection implements Connection {
return super.toString() + "[ID=" + getID() + ", local= " + channel.localAddress() + ", remote=" + channel.remoteAddress() + "]";
}
private void closeSSLAndChannel(SslHandler sslHandler, final Channel channel, boolean inEventLoop) {
private void closeChannel(final Channel channel, boolean inEventLoop) {
checkFlushBatchBuffer();
if (sslHandler != null) {
try {
ChannelFuture sslCloseFuture = sslHandler.close();
sslCloseFuture.addListener(future -> channel.close());
if (!inEventLoop && !sslCloseFuture.awaitUninterruptibly(DEFAULT_WAIT_MILLIS)) {
ActiveMQClientLogger.LOGGER.timeoutClosingSSL();
}
} catch (Throwable t) {
// ignore
if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
ActiveMQClientLogger.LOGGER.trace(t.getMessage(), t);
}
}
} else {
ChannelFuture closeFuture = channel.close();
if (!inEventLoop && !closeFuture.awaitUninterruptibly(DEFAULT_WAIT_MILLIS)) {
ActiveMQClientLogger.LOGGER.timeoutClosingNettyChannel();
}
// closing the channel results in closing any sslHandler first; SslHandler#close() was deprecated by netty
ChannelFuture closeFuture = channel.close();
if (!inEventLoop && !closeFuture.awaitUninterruptibly(DEFAULT_WAIT_MILLIS)) {
ActiveMQClientLogger.LOGGER.timeoutClosingNettyChannel();
}
}