Fix NettyTransport

This commit is contained in:
Jason Tedor 2015-08-31 14:29:00 -04:00
parent e39a3bae2c
commit b0af7a1426
1 changed files with 48 additions and 15 deletions

View File

@ -22,7 +22,6 @@ package org.elasticsearch.transport.netty;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -57,13 +56,31 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.support.TransportStatus;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.ReceiveBufferSizePredictorFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioWorkerPool;
@ -77,8 +94,20 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.CancelledKeyException;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
@ -946,8 +975,13 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
} catch (RuntimeException e) {
// clean the futures
for (ChannelFuture[] futures : Arrays.asList(connectRecovery, connectBulk, connectReg, connectState, connectPing)) {
for (ChannelFuture future : futures) {
List<ChannelFuture> futures = new ArrayList<>();
futures.addAll(Arrays.asList(connectRecovery));
futures.addAll(Arrays.asList(connectBulk));
futures.addAll(Arrays.asList(connectReg));
futures.addAll(Arrays.asList(connectState));
futures.addAll(Arrays.asList(connectPing));
for (ChannelFuture future : Collections.unmodifiableList(futures)) {
future.cancel();
if (future.getChannel() != null && future.getChannel().isOpen()) {
try {
@ -957,7 +991,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
}
}
}
throw e;
}
}
@ -1158,7 +1191,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
newAllChannels.addAll(Arrays.asList(reg));
newAllChannels.addAll(Arrays.asList(state));
newAllChannels.addAll(Arrays.asList(ping));
this.allChannels = Collections.unmodifiableList(allChannels );
this.allChannels = Collections.unmodifiableList(newAllChannels);
}
public boolean hasChannel(Channel channel) {