diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index c697a74335e..83fbc756d28 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -57,10 +57,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -324,7 +321,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem for (Iterator it = connectedNodes.values().iterator(); it.hasNext();) { NodeChannels nodeChannels = it.next(); it.remove(); - nodeChannels.closeAndWait(); + nodeChannels.close(); } if (clientBootstrap != null) { @@ -598,35 +595,21 @@ public class NettyTransport extends AbstractLifecycleComponent implem } } - public void close() { - closeChannels(low); - closeChannels(med); - closeChannels(high); - } - - private void closeChannels(Channel[] channels) { - for (Channel channel : channels) { - try { - if (channel != null && channel.isOpen()) { - channel.close(); - } - } catch (Exception e) { - //ignore - } + public synchronized void close() { + List futures = new ArrayList(); + closeChannelsAndWait(low, futures); + closeChannelsAndWait(med, futures); + closeChannelsAndWait(high, futures); + for (ChannelFuture future : futures) { + future.awaitUninterruptibly(); } } - public void closeAndWait() { - closeChannelsAndWait(low); - closeChannelsAndWait(med); - closeChannelsAndWait(high); - } - - private void closeChannelsAndWait(Channel[] channels) { + private void closeChannelsAndWait(Channel[] channels, List futures) { for (Channel channel : channels) { try { if (channel != null && channel.isOpen()) { - channel.close().awaitUninterruptibly(); + futures.add(channel.close()); } } catch (Exception e) { //ignore