diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index a5f308b60e5..7cdfc2d7810 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -29,6 +29,7 @@ import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.collect.ImmutableList; +import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -43,6 +44,7 @@ import org.elasticsearch.transport.TransportService; import java.util.HashSet; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; @@ -288,11 +290,21 @@ public class TransportClientNodesService extends AbstractComponent { if (closed) { return; } - ImmutableList listedNodes = TransportClientNodesService.this.listedNodes; - final CountDownLatch latch = new CountDownLatch(listedNodes.size()); + + // the nodes we are going to ping include the core listed nodes that were added + // and the last round of discovered nodes + Map nodesToPing = Maps.newHashMap(); + for (DiscoveryNode node : listedNodes) { + nodesToPing.put(node.address(), node); + } + for (DiscoveryNode node : nodes) { + nodesToPing.put(node.address(), node); + } + + final CountDownLatch latch = new CountDownLatch(nodesToPing.size()); final CopyOnWriteArrayList nodesInfoResponses = new CopyOnWriteArrayList(); - for (final DiscoveryNode listedNode : listedNodes) { - threadPool.cached().execute(new Runnable() { + for (final DiscoveryNode listedNode : nodesToPing.values()) { + threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new Runnable() { @Override public void run() { try { transportService.connectToNode(listedNode); // make sure we are connected to it