From 36ff6c9b8b80de1202701454f3e4f7f3ca3915b4 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 24 Aug 2012 02:23:43 +0200 Subject: [PATCH] add a lock not allowing connecting to nodes while shutting down --- .../transport/netty/NettyTransport.java | 90 +++++++++++-------- 1 file changed, 54 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 7f5f185a2ef..b7a107bf7ad 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -65,6 +65,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.elasticsearch.common.network.NetworkService.TcpSettings.*; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; @@ -138,6 +140,9 @@ public class NettyTransport extends AbstractLifecycleComponent implem private volatile BoundTransportAddress boundAddress; private final Object[] connectMutex; + // this lock is here to make sure we close this transport and disconnect all the client nodes + // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) + private final ReadWriteLock globalLock = new ReentrantReadWriteLock(); public NettyTransport(ThreadPool threadPool) { this(EMPTY_SETTINGS, threadPool, new NetworkService(EMPTY_SETTINGS)); @@ -367,6 +372,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem threadPool.generic().execute(new Runnable() { @Override public void run() { + globalLock.writeLock().lock(); try { for (Iterator it = connectedNodes.values().iterator(); it.hasNext(); ) { NodeChannels nodeChannels = it.next(); @@ -403,6 +409,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem clientBootstrap = null; } } finally { + globalLock.writeLock().unlock(); latch.countDown(); } } @@ -535,46 +542,57 @@ public class NettyTransport extends AbstractLifecycleComponent implem public void connectToNode(DiscoveryNode node, boolean light) { if (!lifecycle.started()) { - throw new ElasticSearchIllegalStateException("Can't add nodes to a stopped transport"); + throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport"); } if (node == null) { - throw new ConnectTransportException(null, "Can't connect to a null node"); + throw new ConnectTransportException(null, "can't connect to a null node"); } - synchronized (connectLock(node.id())) { - try { - NodeChannels nodeChannels = connectedNodes.get(node); - if (nodeChannels != null) { - return; - } - - if (light) { - nodeChannels = connectToChannelsLight(node); - } else { - nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]); - try { - connectToChannels(nodeChannels, node); - } catch (Exception e) { - nodeChannels.close(); - throw e; - } - } - - NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels); - if (existing != null) { - // we are already connected to a node, close this ones - nodeChannels.close(); - } else { - if (logger.isDebugEnabled()) { - logger.debug("connected to node [{}]", node); - } - transportServiceAdapter.raiseNodeConnected(node); - } - - } catch (ConnectTransportException e) { - throw e; - } catch (Exception e) { - throw new ConnectTransportException(node, "General node connection failure", e); + globalLock.readLock().lock(); + try { + if (!lifecycle.started()) { + throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport"); } + synchronized (connectLock(node.id())) { + if (!lifecycle.started()) { + throw new ElasticSearchIllegalStateException("can't add nodes to a stopped transport"); + } + try { + NodeChannels nodeChannels = connectedNodes.get(node); + if (nodeChannels != null) { + return; + } + + if (light) { + nodeChannels = connectToChannelsLight(node); + } else { + nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]); + try { + connectToChannels(nodeChannels, node); + } catch (Exception e) { + nodeChannels.close(); + throw e; + } + } + + NodeChannels existing = connectedNodes.putIfAbsent(node, nodeChannels); + if (existing != null) { + // we are already connected to a node, close this ones + nodeChannels.close(); + } else { + if (logger.isDebugEnabled()) { + logger.debug("connected to node [{}]", node); + } + transportServiceAdapter.raiseNodeConnected(node); + } + + } catch (ConnectTransportException e) { + throw e; + } catch (Exception e) { + throw new ConnectTransportException(node, "General node connection failure", e); + } + } + } finally { + globalLock.readLock().unlock(); } }