From b0af7a14266761d222516bae56c05ec42e3d85b7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 31 Aug 2015 14:29:00 -0400 Subject: [PATCH] Fix NettyTransport --- .../transport/netty/NettyTransport.java | 63 ++++++++++++++----- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 5fb5a264829..83b6acfde18 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -22,7 +22,6 @@ package org.elasticsearch.transport.netty; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; - import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -57,13 +56,31 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.*; +import org.elasticsearch.transport.BindTransportException; +import org.elasticsearch.transport.BytesTransportRequest; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.support.TransportStatus; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; +import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory; +import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioWorkerPool; @@ -77,8 +94,20 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.nio.channels.CancelledKeyException; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; @@ -946,15 +975,19 @@ public class NettyTransport extends AbstractLifecycleComponent implem } } catch (RuntimeException e) { // clean the futures - for (ChannelFuture[] futures : Arrays.asList(connectRecovery, connectBulk, connectReg, connectState, connectPing)) { - for (ChannelFuture future : futures) { - future.cancel(); - if (future.getChannel() != null && future.getChannel().isOpen()) { - try { - future.getChannel().close(); - } catch (Exception e1) { - // ignore - } + List futures = new ArrayList<>(); + futures.addAll(Arrays.asList(connectRecovery)); + futures.addAll(Arrays.asList(connectBulk)); + futures.addAll(Arrays.asList(connectReg)); + futures.addAll(Arrays.asList(connectState)); + futures.addAll(Arrays.asList(connectPing)); + for (ChannelFuture future : Collections.unmodifiableList(futures)) { + future.cancel(); + if (future.getChannel() != null && future.getChannel().isOpen()) { + try { + future.getChannel().close(); + } catch (Exception e1) { + // ignore } } } @@ -1158,7 +1191,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem newAllChannels.addAll(Arrays.asList(reg)); newAllChannels.addAll(Arrays.asList(state)); newAllChannels.addAll(Arrays.asList(ping)); - this.allChannels = Collections.unmodifiableList(allChannels ); + this.allChannels = Collections.unmodifiableList(newAllChannels); } public boolean hasChannel(Channel channel) {