Transport: Improve concurrency when connecting to several nodes, closes #1007.

This commit is contained in:
kimchy 2011-06-08 18:47:37 +03:00
parent 597f3b7a8f
commit fff78d6a38
2 changed files with 43 additions and 21 deletions

View File

@ -35,7 +35,14 @@ import org.elasticsearch.common.util.concurrent.MoreExecutors;
import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.settings.ImmutableSettings.*; import static org.elasticsearch.common.settings.ImmutableSettings.*;
import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.unit.TimeValue.*;
@ -72,7 +79,7 @@ public class ThreadPool extends AbstractComponent {
Map<String, Settings> groupSettings = settings.getGroups("threadpool"); Map<String, Settings> groupSettings = settings.getGroups("threadpool");
Map<String, Executor> executors = Maps.newHashMap(); Map<String, Executor> executors = Maps.newHashMap();
executors.put(Names.CACHED, build(Names.CACHED, "cached", groupSettings.get(Names.CACHED), settingsBuilder().put("keep_alive", "5m").build())); executors.put(Names.CACHED, build(Names.CACHED, "cached", groupSettings.get(Names.CACHED), settingsBuilder().put("keep_alive", "30s").build()));
executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS)); executors.put(Names.INDEX, build(Names.INDEX, "cached", groupSettings.get(Names.INDEX), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS)); executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS));
executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS)); executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS));

View File

@ -34,7 +34,14 @@ import org.elasticsearch.common.netty.bootstrap.ClientBootstrap;
import org.elasticsearch.common.netty.bootstrap.ServerBootstrap; import org.elasticsearch.common.netty.bootstrap.ServerBootstrap;
import org.elasticsearch.common.netty.buffer.ChannelBuffer; import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import org.elasticsearch.common.netty.buffer.ChannelBuffers; import org.elasticsearch.common.netty.buffer.ChannelBuffers;
import org.elasticsearch.common.netty.channel.*; import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.common.netty.channel.ChannelFuture;
import org.elasticsearch.common.netty.channel.ChannelFutureListener;
import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
import org.elasticsearch.common.netty.channel.ChannelPipeline;
import org.elasticsearch.common.netty.channel.ChannelPipelineFactory;
import org.elasticsearch.common.netty.channel.Channels;
import org.elasticsearch.common.netty.channel.ExceptionEvent;
import org.elasticsearch.common.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.elasticsearch.common.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.elasticsearch.common.netty.channel.socket.oio.OioClientSocketChannelFactory; import org.elasticsearch.common.netty.channel.socket.oio.OioClientSocketChannelFactory;
@ -51,14 +58,24 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.support.TransportStreams; import org.elasticsearch.transport.support.TransportStreams;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.*; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -463,28 +480,26 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (nodeChannels != null) { if (nodeChannels != null) {
return; return;
} }
synchronized (this) {
// recheck here, within the sync block (we cache connections, so we don't care about this single sync block)
nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]); nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
try { try {
connectToChannels(nodeChannels, node); connectToChannels(nodeChannels, node);
} catch (Exception e) { } catch (Exception e) {
nodeChannels.close(); nodeChannels.close();
throw e; throw e;
} }
connectedNodes.put(node, nodeChannels);
NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels);
if (existing != null) {
// we are already connected to a node, close this ones
nodeChannels.close();
} else {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Connected to node [{}]", node); logger.debug("Connected to node [{}]", node);
} }
transportServiceAdapter.raiseNodeConnected(node);
} }
transportServiceAdapter.raiseNodeConnected(node);
} catch (ConnectTransportException e) { } catch (ConnectTransportException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {