diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index c4fc4f15f40..f419da06e68 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -318,7 +318,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover if (!clusterChangedEvent.state().getNodes().isLocalNodeElectedMaster()) { throw new IllegalStateException("Shouldn't publish state when not master"); } - nodesFD.updateNodesAndPing(clusterChangedEvent.state()); + try { publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); } catch (FailedToCommitClusterStateException t) { @@ -338,6 +338,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover }); throw t; } + + // update the set of nodes to ping after the new cluster state has been published + nodesFD.updateNodesAndPing(clusterChangedEvent.state()); + } + + /** + * Gets the current set of nodes involved in the node fault detection. + * NB: for testing purposes + */ + public Set getFaultDetectionNodes() { + return nodesFD.getNodes(); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 40eb36cec1f..0ab5bde25cd 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -41,6 +41,8 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -91,6 +93,14 @@ public class NodesFaultDetection extends FaultDetection { listeners.remove(listener); } + /** + * Gets the current set of nodes involved in node fault detection. + * NB: For testing purposes. + */ + public Set getNodes() { + return Collections.unmodifiableSet(nodesFD.keySet()); + } + /** * make sure that nodes in clusterState are pinged. Any pinging to nodes which are not * part of the cluster will be stopped diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index ba4c14c2058..235df2d8a35 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -20,25 +20,44 @@ package org.elasticsearch.discovery.zen; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.discovery.zen.ping.ZenPingService; +import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener; +import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.MockNode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; +import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; +import static org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.createMockNode; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -107,7 +126,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase { ArrayList masterNodes = new ArrayList<>(); ArrayList allNodes = new ArrayList<>(); for (int i = randomIntBetween(10, 20); i >= 0; i--) { - Set roles = new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))); + Set roles = new HashSet<>(randomSubsetOf(Arrays.asList(Role.values()))); DiscoveryNode node = new DiscoveryNode("node_" + i, "id_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(), roles, Version.CURRENT); responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomBoolean())); @@ -127,4 +146,80 @@ public class ZenDiscoveryUnitTests extends ESTestCase { assertThat(filteredNodes, equalTo(allNodes)); } } + + public void testNodesUpdatedAfterClusterStatePublished() throws Exception { + ThreadPool threadPool = new TestThreadPool(getClass().getName()); + // randomly make minimum_master_nodes a value higher than we have nodes for, so it will force failure + int minMasterNodes = randomBoolean() ? 3 : 1; + Settings settings = Settings.builder() + .put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build(); + + Map nodes = new HashMap<>(); + ZenDiscovery zenDiscovery = null; + ClusterService clusterService = null; + try { + Set expectedFDNodes = null; + // create master node and its mocked up services + MockNode master = createMockNode("master", settings, null, threadPool, logger, nodes).setAsMaster(); + ClusterState state = master.clusterState; // initial cluster state + + // build the zen discovery and cluster service + clusterService = createClusterService(threadPool, master.discoveryNode); + setState(clusterService, state); + zenDiscovery = buildZenDiscovery(settings, master, clusterService, threadPool); + + // a new cluster state with a new discovery node (we will test if the cluster state + // was updated by the presence of this node in NodesFaultDetection) + MockNode newNode = createMockNode("new_node", settings, null, threadPool, logger, nodes); + ClusterState newState = ClusterState.builder(state).incrementVersion().nodes( + DiscoveryNodes.builder(state.nodes()).add(newNode.discoveryNode).masterNodeId(master.discoveryNode.getId()) + ).build(); + + try { + // publishing a new cluster state + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); + AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); + expectedFDNodes = zenDiscovery.getFaultDetectionNodes(); + zenDiscovery.publish(clusterChangedEvent, listener); + listener.await(1, TimeUnit.HOURS); + // publish was a success, update expected FD nodes based on new cluster state + expectedFDNodes = fdNodesForState(newState, master.discoveryNode); + } catch (Discovery.FailedToCommitClusterStateException e) { + // not successful, so expectedFDNodes above should remain what it was originally assigned + assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail + } + + assertEquals(expectedFDNodes, zenDiscovery.getFaultDetectionNodes()); + } finally { + // clean close of transport service and publish action for each node + zenDiscovery.close(); + clusterService.close(); + for (MockNode curNode : nodes.values()) { + curNode.action.close(); + curNode.service.close(); + } + terminate(threadPool); + } + } + + private ZenDiscovery buildZenDiscovery(Settings settings, MockNode master, ClusterService clusterService, ThreadPool threadPool) { + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet()); + ElectMasterService electMasterService = new ElectMasterService(settings); + ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, master.service, clusterService, + clusterSettings, zenPingService, electMasterService); + zenDiscovery.start(); + return zenDiscovery; + } + + private Set fdNodesForState(ClusterState clusterState, DiscoveryNode localNode) { + final Set discoveryNodes = new HashSet<>(); + clusterState.getNodes().getNodes().valuesIt().forEachRemaining(discoveryNode -> { + // the local node isn't part of the nodes that are pinged (don't ping ourselves) + if (discoveryNode.getId().equals(localNode.getId()) == false) { + discoveryNodes.add(discoveryNode); + } + }); + return discoveryNodes; + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 31c828ec30f..1b0d6f63fd5 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -145,21 +145,22 @@ public class PublishClusterStateActionTests extends ESTestCase { } public MockNode createMockNode(final String name) throws Exception { - return createMockNode(name, Settings.EMPTY); - } - - public MockNode createMockNode(String name, Settings settings) throws Exception { - return createMockNode(name, settings, null); + return createMockNode(name, Settings.EMPTY, null); } public MockNode createMockNode(String name, final Settings basSettings, @Nullable ClusterStateListener listener) throws Exception { + return createMockNode(name, basSettings, listener, threadPool, logger, nodes); + } + + public static MockNode createMockNode(String name, final Settings basSettings, @Nullable ClusterStateListener listener, + ThreadPool threadPool, Logger logger, Map nodes) throws Exception { final Settings settings = Settings.builder() .put("name", name) .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .put(basSettings) .build(); - MockTransportService service = buildTransportService(settings); + MockTransportService service = buildTransportService(settings, threadPool); DiscoveryNode discoveryNode = DiscoveryNode.createLocal(settings, service.boundAddress().publishAddress(), NodeEnvironment.generateNodeId(settings)); MockNode node = new MockNode(discoveryNode, service, listener, logger); @@ -228,14 +229,14 @@ public class PublishClusterStateActionTests extends ESTestCase { terminate(threadPool); } - protected MockTransportService buildTransportService(Settings settings) { - MockTransportService transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool); + private static MockTransportService buildTransportService(Settings settings, ThreadPool threadPool) { + MockTransportService transportService = MockTransportService.local(settings, Version.CURRENT, threadPool); transportService.start(); transportService.acceptIncomingRequests(); return transportService; } - protected MockPublishAction buildPublishClusterStateAction( + private static MockPublishAction buildPublishClusterStateAction( Settings settings, MockTransportService transportService, Supplier clusterStateSupplier, @@ -253,8 +254,8 @@ public class PublishClusterStateActionTests extends ESTestCase { } public void testSimpleClusterStatePublishing() throws Exception { - MockNode nodeA = createMockNode("nodeA", Settings.EMPTY).setAsMaster(); - MockNode nodeB = createMockNode("nodeB", Settings.EMPTY); + MockNode nodeA = createMockNode("nodeA").setAsMaster(); + MockNode nodeB = createMockNode("nodeB"); // Initial cluster state ClusterState clusterState = nodeA.clusterState; @@ -282,7 +283,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // Adding new node - this node should get full cluster state while nodeB should still be getting diffs - MockNode nodeC = createMockNode("nodeC", Settings.EMPTY); + MockNode nodeC = createMockNode("nodeC"); // cluster state update 3 - register node C previousClusterState = clusterState; @@ -336,7 +337,7 @@ public class PublishClusterStateActionTests extends ESTestCase { fail("Shouldn't send cluster state to myself"); }).setAsMaster(); - MockNode nodeB = createMockNode("nodeB", Settings.EMPTY); + MockNode nodeB = createMockNode("nodeB"); // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build(); @@ -444,7 +445,7 @@ public class PublishClusterStateActionTests extends ESTestCase { } }).setAsMaster(); - MockNode nodeB = createMockNode("nodeB", Settings.EMPTY); + MockNode nodeB = createMockNode("nodeB"); // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build(); @@ -495,7 +496,7 @@ public class PublishClusterStateActionTests extends ESTestCase { final int dataNodes = randomIntBetween(0, 5); final Settings dataSettings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build(); for (int i = 0; i < dataNodes; i++) { - discoveryNodesBuilder.add(createMockNode("data_" + i, dataSettings).discoveryNode); + discoveryNodesBuilder.add(createMockNode("data_" + i, dataSettings, null).discoveryNode); } discoveryNodesBuilder.localNodeId(master.discoveryNode.getId()).masterNodeId(master.discoveryNode.getId()); DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); @@ -521,7 +522,7 @@ public class PublishClusterStateActionTests extends ESTestCase { settings.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), expectingToCommit == false && timeOutNodes > 0 ? "100ms" : "1h") .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "5ms"); // test is about committing - MockNode master = createMockNode("master", settings.build()); + MockNode master = createMockNode("master", settings.build(), null); // randomize things a bit int[] nodeTypes = new int[goodNodes + errorNodes + timeOutNodes]; @@ -551,7 +552,8 @@ public class PublishClusterStateActionTests extends ESTestCase { } final int dataNodes = randomIntBetween(0, 3); // data nodes don't matter for (int i = 0; i < dataNodes; i++) { - final MockNode mockNode = createMockNode("data_" + i, Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build()); + final MockNode mockNode = createMockNode("data_" + i, + Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build(), null); discoveryNodesBuilder.add(mockNode.discoveryNode); if (randomBoolean()) { // we really don't care - just chaos monkey @@ -726,8 +728,8 @@ public class PublishClusterStateActionTests extends ESTestCase { Settings settings = Settings.builder() .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "1ms").build(); // short but so we will sometime commit sometime timeout - MockNode master = createMockNode("master", settings); - MockNode node = createMockNode("node", settings); + MockNode master = createMockNode("master", settings, null); + MockNode node = createMockNode("node", settings, null); ClusterState state = ClusterState.builder(master.clusterState) .nodes(DiscoveryNodes.builder(master.clusterState.nodes()).add(node.discoveryNode).masterNodeId(master.discoveryNode.getId())).build(); @@ -843,7 +845,7 @@ public class PublishClusterStateActionTests extends ESTestCase { assertFalse(actual.wasReadFromDiff()); } - static class MockPublishAction extends PublishClusterStateAction { + public static class MockPublishAction extends PublishClusterStateAction { AtomicBoolean timeoutOnSend = new AtomicBoolean(); AtomicBoolean errorOnSend = new AtomicBoolean(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index a6d35930e6b..38682239b78 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -42,11 +42,16 @@ import static junit.framework.TestCase.fail; public class ClusterServiceUtils { public static ClusterService createClusterService(ThreadPool threadPool) { + DiscoveryNode discoveryNode = new DiscoveryNode("node", LocalTransportAddress.buildUnique(), Collections.emptyMap(), + new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())),Version.CURRENT); + return createClusterService(threadPool, discoveryNode); + } + + public static ClusterService createClusterService(ThreadPool threadPool, DiscoveryNode localNode) { ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "ClusterServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool); - clusterService.setLocalNode(new DiscoveryNode("node", LocalTransportAddress.buildUnique(), Collections.emptyMap(), - new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())),Version.CURRENT)); + clusterService.setLocalNode(localNode); clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override public void connectToAddedNodes(ClusterChangedEvent event) {