diff --git a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 94333c10dde..bda1481130c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -36,7 +36,9 @@ import org.elasticsearch.discovery.zen.NodesFaultDetection; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; @@ -76,20 +78,26 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings); } - public void connectToNodes(List addedNodes) { + public void connectToNodes(Iterable discoveryNodes) { // TODO: do this in parallel (and wait) - for (final DiscoveryNode node : addedNodes) { + for (final DiscoveryNode node : discoveryNodes) { try (Releasable ignored = nodeLocks.acquire(node)) { - Integer current = nodes.put(node, 0); - assert current == null : "node " + node + " was added in event but already in internal nodes"; + nodes.putIfAbsent(node, 0); validateNodeConnected(node); } } } - public void disconnectFromNodes(List removedNodes) { - for (final DiscoveryNode node : removedNodes) { + /** + * Disconnects from all nodes except the ones provided as parameter + */ + public void disconnectFromNodesExcept(Iterable nodesToKeep) { + Set currentNodes = new HashSet<>(nodes.keySet()); + for (DiscoveryNode node : nodesToKeep) { + currentNodes.remove(node); + } + for (final DiscoveryNode node : currentNodes) { try (Releasable ignored = nodeLocks.acquire(node)) { Integer current = nodes.remove(node); assert current != null : "node " + node + " was removed in event but not in internal nodes"; diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index d4fff64530e..378fa924627 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -772,7 +772,7 @@ public class ClusterService extends AbstractLifecycleComponent { taskOutputs.createAckListener(threadPool, newClusterState) : null; - nodeConnectionsService.connectToNodes(clusterChangedEvent.nodesDelta().addedNodes()); + nodeConnectionsService.connectToNodes(newClusterState.nodes()); // if we are the master, publish the new state to all nodes // we publish here before we send a notification to all the listeners, since if it fails @@ -788,7 +788,8 @@ public class ClusterService extends AbstractLifecycleComponent { "failing [{}]: failed to commit cluster state version [{}]", taskInputs.summary, version), t); // ensure that list of connected nodes in NodeConnectionsService is in-sync with the nodes of the current cluster state - nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().addedNodes()); + nodeConnectionsService.connectToNodes(previousClusterState.nodes()); + nodeConnectionsService.disconnectFromNodesExcept(previousClusterState.nodes()); taskOutputs.publishingFailed(t); return; } @@ -808,7 +809,7 @@ public class ClusterService extends AbstractLifecycleComponent { logger.debug("set local cluster state to version {}", newClusterState.version()); callClusterStateAppliers(newClusterState, clusterChangedEvent); - nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes()); + nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes()); updateState(css -> newClusterState); diff --git a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java index ee6d719f55b..d63203eda25 100644 --- a/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java @@ -375,7 +375,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase { otherNodes.remove(master); NetworkDisruption partition = new NetworkDisruption( new TwoPartitions(Collections.singleton(master), otherNodes), - new NetworkDelay(TimeValue.timeValueMinutes(1))); + new NetworkDisruption.NetworkDisconnect()); internalCluster().setDisruptionScheme(partition); final CountDownLatch latch = new CountDownLatch(1); diff --git a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 2b8333700a3..1d7a65e3224 100644 --- a/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -87,19 +87,19 @@ public class NodeConnectionsServiceTests extends ESTestCase { ClusterState current = clusterStateFromNodes(Collections.emptyList()); ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); - service.connectToNodes(event.nodesDelta().addedNodes()); - assertConnected(event.nodesDelta().addedNodes()); + service.connectToNodes(event.state().nodes()); + assertConnected(event.state().nodes()); - service.disconnectFromNodes(event.nodesDelta().removedNodes()); + service.disconnectFromNodesExcept(event.state().nodes()); assertConnectedExactlyToNodes(event.state()); current = event.state(); event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current); - service.connectToNodes(event.nodesDelta().addedNodes()); - assertConnected(event.nodesDelta().addedNodes()); + service.connectToNodes(event.state().nodes()); + assertConnected(event.state().nodes()); - service.disconnectFromNodes(event.nodesDelta().removedNodes()); + service.disconnectFromNodesExcept(event.state().nodes()); assertConnectedExactlyToNodes(event.state()); } diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index cc76fdf9dc7..e2162e6f90d 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.cluster.service; +import com.google.common.collect.Iterables; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -25,12 +26,12 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.LocalClusterUpdateTask; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.LocalClusterUpdateTask; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -127,12 +128,12 @@ public class ClusterServiceTests extends ESTestCase { emptySet(), Version.CURRENT)); timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override - public void connectToNodes(List addedNodes) { + public void connectToNodes(Iterable discoveryNodes) { // skip } @Override - public void disconnectFromNodes(List removedNodes) { + public void disconnectFromNodesExcept(Iterable nodesToKeep) { // skip } }); @@ -1058,17 +1059,16 @@ public class ClusterServiceTests extends ESTestCase { threadPool); timedClusterService.setLocalNode(new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT)); - Set currentNodes = Collections.synchronizedSet(new HashSet<>()); - currentNodes.add(timedClusterService.localNode()); + Set currentNodes = new HashSet<>(); timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override - public void connectToNodes(List addedNodes) { - currentNodes.addAll(addedNodes); + public void connectToNodes(Iterable discoveryNodes) { + discoveryNodes.forEach(currentNodes::add); } @Override - public void disconnectFromNodes(List removedNodes) { - currentNodes.removeAll(removedNodes); + public void disconnectFromNodesExcept(Iterable nodesToKeep) { + currentNodes.removeIf(node -> Iterables.contains(nodesToKeep, node) == false); } }); AtomicBoolean failToCommit = new AtomicBoolean(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 933fd83ad5c..c4c1a45d6c3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -54,12 +54,12 @@ public class ClusterServiceUtils { clusterService.setLocalNode(localNode); clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override - public void connectToNodes(List addedNodes) { + public void connectToNodes(Iterable discoveryNodes) { // skip } @Override - public void disconnectFromNodes(List removedNodes) { + public void disconnectFromNodesExcept(Iterable nodesToKeep) { // skip } }); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 38c4f6003be..5dbfe93d0d3 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -132,6 +132,7 @@ import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; import static org.apache.lucene.util.LuceneTestCase.rarely; import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; import static org.elasticsearch.test.ESTestCase.assertBusy; +import static org.elasticsearch.test.ESTestCase.awaitBusy; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; @@ -1052,21 +1053,38 @@ public final class InternalTestCluster extends TestCluster { logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize); } - /** ensure a cluster is form with {@link #nodes}.size() nodes. */ + /** ensure a cluster is formed with all published nodes. */ private void validateClusterFormed() { String name = randomFrom(random, getNodeNames()); validateClusterFormed(name); } - /** ensure a cluster is form with {@link #nodes}.size() nodes, but do so by using the client of the specified node */ + /** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */ private void validateClusterFormed(String viaNode) { - final int size = nodes.size(); - logger.trace("validating cluster formed via [{}], expecting [{}]", viaNode, size); + Set expectedNodes = new HashSet<>(); + for (NodeAndClient nodeAndClient : nodes.values()) { + expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode()); + } + logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes); final Client client = client(viaNode); - ClusterHealthResponse response = client.admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(size)).get(); - if (response.isTimedOut()) { - logger.warn("failed to wait for a cluster of size [{}], got [{}]", size, response); - throw new IllegalStateException("cluster failed to reach the expected size of [" + size + "]"); + try { + if (awaitBusy(() -> { + DiscoveryNodes discoveryNodes = client.admin().cluster().prepareState().get().getState().nodes(); + if (discoveryNodes.getSize() != expectedNodes.size()) { + return false; + } + for (DiscoveryNode expectedNode : expectedNodes) { + if (discoveryNodes.nodeExists(expectedNode) == false) { + return false; + } + } + return true; + }, 30, TimeUnit.SECONDS) == false) { + throw new IllegalStateException("cluster failed to from with expected nodes " + expectedNodes + " and actual nodes " + + client.admin().cluster().prepareState().get().getState().nodes()); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); } }