diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index 27d292f8007..b3ed0a9ba05 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.collect.Lists; -import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -33,6 +32,9 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.DynamicExecutors; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.discovery.zen.DiscoveryNodesProvider; import org.elasticsearch.discovery.zen.ping.ZenPing; @@ -55,8 +57,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -79,6 +83,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private final ClusterName clusterName; + private final int concurrentConnects; private final DiscoveryNode[] nodes; @@ -103,13 +108,14 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen this.transportService = transportService; this.clusterName = clusterName; + this.concurrentConnects = componentSettings.getAsInt("concurrent_connects", 10); String[] hostArr = componentSettings.getAsArray("hosts"); // trim the hosts for (int i = 0; i < hostArr.length; i++) { hostArr[i] = hostArr[i].trim(); } List hosts = Lists.newArrayList(hostArr); - logger.debug("using initial hosts {}", hosts); + logger.debug("using initial hosts {}, with concurrent_connects [{}]", hosts, concurrentConnects); List nodes = Lists.newArrayList(); int idCounter = 0; @@ -169,27 +175,61 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen } @Override public void ping(final PingListener listener, final TimeValue timeout) throws ElasticSearchException { - final AtomicBoolean done = new AtomicBoolean(); - final int id = pingIdGenerator.incrementAndGet(); - receivedResponses.put(id, new ConcurrentHashMap()); - final Set nodesToDisconnect1 = sendPings(id, timeout, false, done); + final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingIdGenerator.incrementAndGet()); + receivedResponses.put(sendPingsHandler.id(), new ConcurrentHashMap()); + sendPings(timeout, false, sendPingsHandler); threadPool.schedule(timeout, ThreadPool.Names.CACHED, new Runnable() { @Override public void run() { - final Set nodesToDisconnect = Sets.newHashSet(nodesToDisconnect1); - nodesToDisconnect.addAll(sendPings(id, timeout, true, done)); - done.set(true); - for (DiscoveryNode node : nodesToDisconnect) { + sendPings(timeout, true, sendPingsHandler); + for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) { transportService.disconnectFromNode(node); } - ConcurrentMap responses = receivedResponses.remove(id); + ConcurrentMap responses = receivedResponses.remove(sendPingsHandler.id()); + sendPingsHandler.close(); listener.onPing(responses.values().toArray(new PingResponse[responses.size()])); } }); } - Set sendPings(final int id, final TimeValue timeout, boolean wait, final AtomicBoolean done) { + class SendPingsHandler { + private final int id; + private volatile ExecutorService executor; + private final Set nodeToDisconnect = ConcurrentCollections.newConcurrentSet(); + private volatile boolean closed; + + SendPingsHandler(int id) { + this.id = id; + } + + public int id() { + return this.id; + } + + public boolean isClosed() { + return this.closed; + } + + public Executor executor() { + if (executor == null) { + ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]"); + executor = DynamicExecutors.newScalingThreadPool(1, concurrentConnects, 60000, threadFactory); + } + return executor; + } + + public void close() { + closed = true; + if (executor != null) { + executor.shutdownNow(); + executor = null; + } + nodeToDisconnect.clear(); + } + } + + void sendPings(final TimeValue timeout, boolean wait, final SendPingsHandler sendPingsHandler) { final UnicastPingRequest pingRequest = new UnicastPingRequest(); - pingRequest.id = id; + pingRequest.id = sendPingsHandler.id(); pingRequest.timeout = timeout; DiscoveryNodes discoNodes = nodesProvider.nodes(); pingRequest.pingResponse = new PingResponse(discoNodes.localNode(), discoNodes.masterNode(), clusterName); @@ -199,30 +239,28 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen nodesToPing.addAll(provider.buildDynamicNodes()); } - Set nodesToDisconnect = Sets.newHashSet(); - final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); for (final DiscoveryNode node : nodesToPing) { // make sure we are connected boolean nodeFoundByAddressX; DiscoveryNode nodeToSendX = discoNodes.findByAddress(node.address()); if (nodeToSendX != null) { - nodeFoundByAddressX = false; + nodeFoundByAddressX = true; } else { nodeToSendX = node; - nodeFoundByAddressX = true; + nodeFoundByAddressX = false; } final DiscoveryNode nodeToSend = nodeToSendX; final boolean nodeFoundByAddress = nodeFoundByAddressX; if (!transportService.nodeConnected(nodeToSend)) { - nodesToDisconnect.add(nodeToSend); + if (sendPingsHandler.isClosed()) { + return; + } + sendPingsHandler.nodeToDisconnect.add(nodeToSend); // fork the connection to another thread - threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { + sendPingsHandler.executor().execute(new Runnable() { @Override public void run() { - if (done.get()) { - return; - } try { // connect to the node, see if we manage to do it, if not, bail if (!nodeFoundByAddress) { @@ -231,16 +269,16 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen transportService.connectToNode(nodeToSend); } // we are connected, send the ping request - sendPingRequestToNode(id, timeout, pingRequest, latch, node, nodeToSend); + sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend); } catch (ConnectTransportException e) { // can't connect to the node - logger.trace("[{}] failed to connect to {}", e, id, nodeToSend); + logger.trace("[{}] failed to connect to {}", e, sendPingsHandler.id(), nodeToSend); latch.countDown(); } } }); } else { - sendPingRequestToNode(id, timeout, pingRequest, latch, node, nodeToSend); + sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend); } } if (wait) { @@ -250,8 +288,6 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen // ignore } } - - return nodesToDisconnect; } private void sendPingRequestToNode(final int id, TimeValue timeout, UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {