This closes #480

This commit is contained in:
Martyn Taylor 2016-04-25 14:48:58 +01:00
commit 00740b141a
1 changed files with 17 additions and 9 deletions

View File

@ -29,6 +29,8 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
@ -193,13 +195,13 @@ public class NettyConnection implements Connection {
boolean inEventLoop = eventLoop.inEventLoop();
//if we are in an event loop we need to close the channel after the writes have finished
if (!inEventLoop) {
closeSSLAndChannel(sslHandler, channel);
closeSSLAndChannel(sslHandler, channel, false);
}
else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
closeSSLAndChannel(sslHandler, channel);
closeSSLAndChannel(sslHandler, channel, true);
}
});
}
@ -412,12 +414,17 @@ public class NettyConnection implements Connection {
// Private -------------------------------------------------------
private void closeSSLAndChannel(SslHandler sslHandler, Channel channel) {
private void closeSSLAndChannel(SslHandler sslHandler, final Channel channel, boolean inEventLoop) {
if (sslHandler != null) {
try {
ChannelFuture sslCloseFuture = sslHandler.close();
if (!sslCloseFuture.awaitUninterruptibly(10000)) {
sslCloseFuture.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channel.close();
}
});
if (!inEventLoop && !sslCloseFuture.awaitUninterruptibly(10000)) {
ActiveMQClientLogger.LOGGER.timeoutClosingSSL();
}
}
@ -425,12 +432,13 @@ public class NettyConnection implements Connection {
// ignore
}
}
else {
ChannelFuture closeFuture = channel.close();
if (!closeFuture.awaitUninterruptibly(10000)) {
if (!inEventLoop && !closeFuture.awaitUninterruptibly(10000)) {
ActiveMQClientLogger.LOGGER.timeoutClosingNettyChannel();
}
}
}
// Inner classes -------------------------------------------------
}