diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 8579aab0cd7..a347547d168 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -470,6 +470,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { if (!lifecycle.started()) { // ignore + return; } if (isCloseConnectionException(e.getCause())) { logger.trace("close connection exception caught on transport layer [{}], disconnecting from relevant node", e.getCause(), ctx.getChannel()); @@ -794,14 +795,20 @@ public class NettyTransport extends AbstractLifecycleComponent implem /** * Disconnects from a node if a channel is found as part of that nodes channels. */ - private void disconnectFromNodeChannel(Channel channel, Throwable failure) { - for (DiscoveryNode node : connectedNodes.keySet()) { - if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) { - // if we managed to find this channel and disconnect from it, then break, no need to check on - // the rest of the nodes - break; + private void disconnectFromNodeChannel(final Channel channel, final Throwable failure) { + threadPool().generic().execute(new Runnable() { + + @Override + public void run() { + for (DiscoveryNode node : connectedNodes.keySet()) { + if (disconnectFromNode(node, channel, ExceptionsHelper.detailedMessage(failure))) { + // if we managed to find this channel and disconnect from it, then break, no need to check on + // the rest of the nodes + break; + } + } } - } + }); } private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { @@ -885,8 +892,16 @@ public class NettyTransport extends AbstractLifecycleComponent implem } @Override - public void operationComplete(ChannelFuture future) throws Exception { - disconnectFromNode(node, future.getChannel(), "channel closed event"); + public void operationComplete(final ChannelFuture future) throws Exception { + NodeChannels nodeChannels = connectedNodes.get(node); + if (nodeChannels != null && nodeChannels.hasChannel(future.getChannel())) { + threadPool().generic().execute(new Runnable() { + @Override + public void run() { + disconnectFromNode(node, future.getChannel(), "channel closed event"); + } + }); + } } }