From f39f9b9760c20af0205d80b370888ec4376efbfd Mon Sep 17 00:00:00 2001 From: Ali Beyad Date: Mon, 12 Sep 2016 12:07:51 -0400 Subject: [PATCH] Update discovery nodes after cluster state is published (#20409) Before, when there was a new cluster state to publish, zen discovery would first update the set of nodes to ping based on the new cluster state, then publish the new cluster state. This is problematic because if the cluster state failed to publish, then the set of nodes to ping should not have been updated. This commit fixes the issue by updating the set of nodes to ping for fault detection only *after* the new cluster state has been published. --- .../discovery/zen/ZenDiscovery.java | 13 ++- .../discovery/zen/fd/NodesFaultDetection.java | 10 ++ .../discovery/zen/ZenDiscoveryUnitTests.java | 97 ++++++++++++++++++- .../PublishClusterStateActionTests.java | 42 ++++---- .../test/ClusterServiceUtils.java | 9 +- 5 files changed, 147 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index c4fc4f15f40..f419da06e68 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -318,7 +318,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover if (!clusterChangedEvent.state().getNodes().isLocalNodeElectedMaster()) { throw new IllegalStateException("Shouldn't publish state when not master"); } - nodesFD.updateNodesAndPing(clusterChangedEvent.state()); + try { publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); } catch (FailedToCommitClusterStateException t) { @@ -338,6 +338,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover }); throw t; } + + // update the set of nodes to ping after the new cluster state has been published + nodesFD.updateNodesAndPing(clusterChangedEvent.state()); + } + + /** + * Gets the current set of nodes involved in the node fault detection. + * NB: for testing purposes + */ + public Set getFaultDetectionNodes() { + return nodesFD.getNodes(); } @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 40eb36cec1f..0ab5bde25cd 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -41,6 +41,8 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -91,6 +93,14 @@ public class NodesFaultDetection extends FaultDetection { listeners.remove(listener); } + /** + * Gets the current set of nodes involved in node fault detection. + * NB: For testing purposes. + */ + public Set getNodes() { + return Collections.unmodifiableSet(nodesFD.keySet()); + } + /** * make sure that nodes in clusterState are pinged. Any pinging to nodes which are not * part of the cluster will be stopped diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index ba4c14c2058..235df2d8a35 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -20,25 +20,44 @@ package org.elasticsearch.discovery.zen; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNode.Role; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.discovery.zen.elect.ElectMasterService; import org.elasticsearch.discovery.zen.ping.ZenPing; +import org.elasticsearch.discovery.zen.ping.ZenPingService; +import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener; +import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.MockNode; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState; +import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING; +import static org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.createMockNode; +import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; +import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -107,7 +126,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase { ArrayList masterNodes = new ArrayList<>(); ArrayList allNodes = new ArrayList<>(); for (int i = randomIntBetween(10, 20); i >= 0; i--) { - Set roles = new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values()))); + Set roles = new HashSet<>(randomSubsetOf(Arrays.asList(Role.values()))); DiscoveryNode node = new DiscoveryNode("node_" + i, "id_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(), roles, Version.CURRENT); responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomBoolean())); @@ -127,4 +146,80 @@ public class ZenDiscoveryUnitTests extends ESTestCase { assertThat(filteredNodes, equalTo(allNodes)); } } + + public void testNodesUpdatedAfterClusterStatePublished() throws Exception { + ThreadPool threadPool = new TestThreadPool(getClass().getName()); + // randomly make minimum_master_nodes a value higher than we have nodes for, so it will force failure + int minMasterNodes = randomBoolean() ? 3 : 1; + Settings settings = Settings.builder() + .put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build(); + + Map nodes = new HashMap<>(); + ZenDiscovery zenDiscovery = null; + ClusterService clusterService = null; + try { + Set expectedFDNodes = null; + // create master node and its mocked up services + MockNode master = createMockNode("master", settings, null, threadPool, logger, nodes).setAsMaster(); + ClusterState state = master.clusterState; // initial cluster state + + // build the zen discovery and cluster service + clusterService = createClusterService(threadPool, master.discoveryNode); + setState(clusterService, state); + zenDiscovery = buildZenDiscovery(settings, master, clusterService, threadPool); + + // a new cluster state with a new discovery node (we will test if the cluster state + // was updated by the presence of this node in NodesFaultDetection) + MockNode newNode = createMockNode("new_node", settings, null, threadPool, logger, nodes); + ClusterState newState = ClusterState.builder(state).incrementVersion().nodes( + DiscoveryNodes.builder(state.nodes()).add(newNode.discoveryNode).masterNodeId(master.discoveryNode.getId()) + ).build(); + + try { + // publishing a new cluster state + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state); + AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1); + expectedFDNodes = zenDiscovery.getFaultDetectionNodes(); + zenDiscovery.publish(clusterChangedEvent, listener); + listener.await(1, TimeUnit.HOURS); + // publish was a success, update expected FD nodes based on new cluster state + expectedFDNodes = fdNodesForState(newState, master.discoveryNode); + } catch (Discovery.FailedToCommitClusterStateException e) { + // not successful, so expectedFDNodes above should remain what it was originally assigned + assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail + } + + assertEquals(expectedFDNodes, zenDiscovery.getFaultDetectionNodes()); + } finally { + // clean close of transport service and publish action for each node + zenDiscovery.close(); + clusterService.close(); + for (MockNode curNode : nodes.values()) { + curNode.action.close(); + curNode.service.close(); + } + terminate(threadPool); + } + } + + private ZenDiscovery buildZenDiscovery(Settings settings, MockNode master, ClusterService clusterService, ThreadPool threadPool) { + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet()); + ElectMasterService electMasterService = new ElectMasterService(settings); + ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, master.service, clusterService, + clusterSettings, zenPingService, electMasterService); + zenDiscovery.start(); + return zenDiscovery; + } + + private Set fdNodesForState(ClusterState clusterState, DiscoveryNode localNode) { + final Set discoveryNodes = new HashSet<>(); + clusterState.getNodes().getNodes().valuesIt().forEachRemaining(discoveryNode -> { + // the local node isn't part of the nodes that are pinged (don't ping ourselves) + if (discoveryNode.getId().equals(localNode.getId()) == false) { + discoveryNodes.add(discoveryNode); + } + }); + return discoveryNodes; + } } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java index 31c828ec30f..1b0d6f63fd5 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateActionTests.java @@ -145,21 +145,22 @@ public class PublishClusterStateActionTests extends ESTestCase { } public MockNode createMockNode(final String name) throws Exception { - return createMockNode(name, Settings.EMPTY); - } - - public MockNode createMockNode(String name, Settings settings) throws Exception { - return createMockNode(name, settings, null); + return createMockNode(name, Settings.EMPTY, null); } public MockNode createMockNode(String name, final Settings basSettings, @Nullable ClusterStateListener listener) throws Exception { + return createMockNode(name, basSettings, listener, threadPool, logger, nodes); + } + + public static MockNode createMockNode(String name, final Settings basSettings, @Nullable ClusterStateListener listener, + ThreadPool threadPool, Logger logger, Map nodes) throws Exception { final Settings settings = Settings.builder() .put("name", name) .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") .put(basSettings) .build(); - MockTransportService service = buildTransportService(settings); + MockTransportService service = buildTransportService(settings, threadPool); DiscoveryNode discoveryNode = DiscoveryNode.createLocal(settings, service.boundAddress().publishAddress(), NodeEnvironment.generateNodeId(settings)); MockNode node = new MockNode(discoveryNode, service, listener, logger); @@ -228,14 +229,14 @@ public class PublishClusterStateActionTests extends ESTestCase { terminate(threadPool); } - protected MockTransportService buildTransportService(Settings settings) { - MockTransportService transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool); + private static MockTransportService buildTransportService(Settings settings, ThreadPool threadPool) { + MockTransportService transportService = MockTransportService.local(settings, Version.CURRENT, threadPool); transportService.start(); transportService.acceptIncomingRequests(); return transportService; } - protected MockPublishAction buildPublishClusterStateAction( + private static MockPublishAction buildPublishClusterStateAction( Settings settings, MockTransportService transportService, Supplier clusterStateSupplier, @@ -253,8 +254,8 @@ public class PublishClusterStateActionTests extends ESTestCase { } public void testSimpleClusterStatePublishing() throws Exception { - MockNode nodeA = createMockNode("nodeA", Settings.EMPTY).setAsMaster(); - MockNode nodeB = createMockNode("nodeB", Settings.EMPTY); + MockNode nodeA = createMockNode("nodeA").setAsMaster(); + MockNode nodeB = createMockNode("nodeB"); // Initial cluster state ClusterState clusterState = nodeA.clusterState; @@ -282,7 +283,7 @@ public class PublishClusterStateActionTests extends ESTestCase { // Adding new node - this node should get full cluster state while nodeB should still be getting diffs - MockNode nodeC = createMockNode("nodeC", Settings.EMPTY); + MockNode nodeC = createMockNode("nodeC"); // cluster state update 3 - register node C previousClusterState = clusterState; @@ -336,7 +337,7 @@ public class PublishClusterStateActionTests extends ESTestCase { fail("Shouldn't send cluster state to myself"); }).setAsMaster(); - MockNode nodeB = createMockNode("nodeB", Settings.EMPTY); + MockNode nodeB = createMockNode("nodeB"); // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build(); @@ -444,7 +445,7 @@ public class PublishClusterStateActionTests extends ESTestCase { } }).setAsMaster(); - MockNode nodeB = createMockNode("nodeB", Settings.EMPTY); + MockNode nodeB = createMockNode("nodeB"); // Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build(); @@ -495,7 +496,7 @@ public class PublishClusterStateActionTests extends ESTestCase { final int dataNodes = randomIntBetween(0, 5); final Settings dataSettings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build(); for (int i = 0; i < dataNodes; i++) { - discoveryNodesBuilder.add(createMockNode("data_" + i, dataSettings).discoveryNode); + discoveryNodesBuilder.add(createMockNode("data_" + i, dataSettings, null).discoveryNode); } discoveryNodesBuilder.localNodeId(master.discoveryNode.getId()).masterNodeId(master.discoveryNode.getId()); DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build(); @@ -521,7 +522,7 @@ public class PublishClusterStateActionTests extends ESTestCase { settings.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), expectingToCommit == false && timeOutNodes > 0 ? "100ms" : "1h") .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "5ms"); // test is about committing - MockNode master = createMockNode("master", settings.build()); + MockNode master = createMockNode("master", settings.build(), null); // randomize things a bit int[] nodeTypes = new int[goodNodes + errorNodes + timeOutNodes]; @@ -551,7 +552,8 @@ public class PublishClusterStateActionTests extends ESTestCase { } final int dataNodes = randomIntBetween(0, 3); // data nodes don't matter for (int i = 0; i < dataNodes; i++) { - final MockNode mockNode = createMockNode("data_" + i, Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build()); + final MockNode mockNode = createMockNode("data_" + i, + Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build(), null); discoveryNodesBuilder.add(mockNode.discoveryNode); if (randomBoolean()) { // we really don't care - just chaos monkey @@ -726,8 +728,8 @@ public class PublishClusterStateActionTests extends ESTestCase { Settings settings = Settings.builder() .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "1ms").build(); // short but so we will sometime commit sometime timeout - MockNode master = createMockNode("master", settings); - MockNode node = createMockNode("node", settings); + MockNode master = createMockNode("master", settings, null); + MockNode node = createMockNode("node", settings, null); ClusterState state = ClusterState.builder(master.clusterState) .nodes(DiscoveryNodes.builder(master.clusterState.nodes()).add(node.discoveryNode).masterNodeId(master.discoveryNode.getId())).build(); @@ -843,7 +845,7 @@ public class PublishClusterStateActionTests extends ESTestCase { assertFalse(actual.wasReadFromDiff()); } - static class MockPublishAction extends PublishClusterStateAction { + public static class MockPublishAction extends PublishClusterStateAction { AtomicBoolean timeoutOnSend = new AtomicBoolean(); AtomicBoolean errorOnSend = new AtomicBoolean(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index a6d35930e6b..38682239b78 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -42,11 +42,16 @@ import static junit.framework.TestCase.fail; public class ClusterServiceUtils { public static ClusterService createClusterService(ThreadPool threadPool) { + DiscoveryNode discoveryNode = new DiscoveryNode("node", LocalTransportAddress.buildUnique(), Collections.emptyMap(), + new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())),Version.CURRENT); + return createClusterService(threadPool, discoveryNode); + } + + public static ClusterService createClusterService(ThreadPool threadPool, DiscoveryNode localNode) { ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "ClusterServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), threadPool); - clusterService.setLocalNode(new DiscoveryNode("node", LocalTransportAddress.buildUnique(), Collections.emptyMap(), - new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())),Version.CURRENT)); + clusterService.setLocalNode(localNode); clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { @Override public void connectToAddedNodes(ClusterChangedEvent event) {