mirror of
synced 2025-02-08 22:14:59 +00:00
Update discovery nodes after cluster state is published (#20409)
Before, when there was a new cluster state to publish, zen discovery would first update the set of nodes to ping based on the new cluster state, then publish the new cluster state. This is problematic because if the cluster state failed to publish, then the set of nodes to ping should not have been updated. This commit fixes the issue by updating the set of nodes to ping for fault detection only *after* the new cluster state has been published.
This commit is contained in:
@ -318,7 +318,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
if (!clusterChangedEvent.state().getNodes().isLocalNodeElectedMaster()) {
throw new IllegalStateException("Shouldn't publish state when not master");
try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) {
@ -338,6 +338,17 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
throw t;
// update the set of nodes to ping after the new cluster state has been published
* Gets the current set of nodes involved in the node fault detection.
* NB: for testing purposes
public Set<DiscoveryNode> getFaultDetectionNodes() {
return nodesFD.getNodes();
@ -41,6 +41,8 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -91,6 +93,14 @@ public class NodesFaultDetection extends FaultDetection {
* Gets the current set of nodes involved in node fault detection.
* NB: For testing purposes.
public Set<DiscoveryNode> getNodes() {
return Collections.unmodifiableSet(nodesFD.keySet());
* make sure that nodes in clusterState are pinged. Any pinging to nodes which are not
* part of the cluster will be stopped
@ -20,25 +20,44 @@
package org.elasticsearch.discovery.zen;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.discovery.zen.ping.ZenPing;
import org.elasticsearch.discovery.zen.ping.ZenPingService;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.MockNode;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.discovery.zen.ZenDiscovery.shouldIgnoreOrRejectNewClusterState;
import static org.elasticsearch.discovery.zen.elect.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.zen.publish.PublishClusterStateActionTests.createMockNode;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -107,7 +126,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
ArrayList<DiscoveryNode> masterNodes = new ArrayList<>();
ArrayList<DiscoveryNode> allNodes = new ArrayList<>();
for (int i = randomIntBetween(10, 20); i >= 0; i--) {
Set<DiscoveryNode.Role> roles = new HashSet<>(randomSubsetOf(Arrays.asList(DiscoveryNode.Role.values())));
Set<Role> roles = new HashSet<>(randomSubsetOf(Arrays.asList(Role.values())));
DiscoveryNode node = new DiscoveryNode("node_" + i, "id_" + i, LocalTransportAddress.buildUnique(), Collections.emptyMap(),
roles, Version.CURRENT);
responses.add(new ZenPing.PingResponse(node, randomBoolean() ? null : node, new ClusterName("test"), randomBoolean()));
@ -127,4 +146,80 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
assertThat(filteredNodes, equalTo(allNodes));
public void testNodesUpdatedAfterClusterStatePublished() throws Exception {
ThreadPool threadPool = new TestThreadPool(getClass().getName());
// randomly make minimum_master_nodes a value higher than we have nodes for, so it will force failure
int minMasterNodes = randomBoolean() ? 3 : 1;
Settings settings = Settings.builder()
.put(DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey(), Integer.toString(minMasterNodes)).build();
Map<String, MockNode> nodes = new HashMap<>();
ZenDiscovery zenDiscovery = null;
ClusterService clusterService = null;
try {
Set<DiscoveryNode> expectedFDNodes = null;
// create master node and its mocked up services
MockNode master = createMockNode("master", settings, null, threadPool, logger, nodes).setAsMaster();
ClusterState state = master.clusterState; // initial cluster state
// build the zen discovery and cluster service
clusterService = createClusterService(threadPool, master.discoveryNode);
setState(clusterService, state);
zenDiscovery = buildZenDiscovery(settings, master, clusterService, threadPool);
// a new cluster state with a new discovery node (we will test if the cluster state
// was updated by the presence of this node in NodesFaultDetection)
MockNode newNode = createMockNode("new_node", settings, null, threadPool, logger, nodes);
ClusterState newState = ClusterState.builder(state).incrementVersion().nodes(
try {
// publishing a new cluster state
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent("testing", newState, state);
AssertingAckListener listener = new AssertingAckListener(newState.nodes().getSize() - 1);
expectedFDNodes = zenDiscovery.getFaultDetectionNodes();
zenDiscovery.publish(clusterChangedEvent, listener);
listener.await(1, TimeUnit.HOURS);
// publish was a success, update expected FD nodes based on new cluster state
expectedFDNodes = fdNodesForState(newState, master.discoveryNode);
} catch (Discovery.FailedToCommitClusterStateException e) {
// not successful, so expectedFDNodes above should remain what it was originally assigned
assertEquals(3, minMasterNodes); // ensure min master nodes is the higher value, otherwise we shouldn't fail
assertEquals(expectedFDNodes, zenDiscovery.getFaultDetectionNodes());
} finally {
// clean close of transport service and publish action for each node
for (MockNode curNode : nodes.values()) {
private ZenDiscovery buildZenDiscovery(Settings settings, MockNode master, ClusterService clusterService, ThreadPool threadPool) {
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
ZenPingService zenPingService = new ZenPingService(settings, Collections.emptySet());
ElectMasterService electMasterService = new ElectMasterService(settings);
ZenDiscovery zenDiscovery = new ZenDiscovery(settings, threadPool, master.service, clusterService,
clusterSettings, zenPingService, electMasterService);
return zenDiscovery;
private Set<DiscoveryNode> fdNodesForState(ClusterState clusterState, DiscoveryNode localNode) {
final Set<DiscoveryNode> discoveryNodes = new HashSet<>();
clusterState.getNodes().getNodes().valuesIt().forEachRemaining(discoveryNode -> {
// the local node isn't part of the nodes that are pinged (don't ping ourselves)
if (discoveryNode.getId().equals(localNode.getId()) == false) {
return discoveryNodes;
@ -145,21 +145,22 @@ public class PublishClusterStateActionTests extends ESTestCase {
public MockNode createMockNode(final String name) throws Exception {
return createMockNode(name, Settings.EMPTY);
public MockNode createMockNode(String name, Settings settings) throws Exception {
return createMockNode(name, settings, null);
return createMockNode(name, Settings.EMPTY, null);
public MockNode createMockNode(String name, final Settings basSettings, @Nullable ClusterStateListener listener) throws Exception {
return createMockNode(name, basSettings, listener, threadPool, logger, nodes);
public static MockNode createMockNode(String name, final Settings basSettings, @Nullable ClusterStateListener listener,
ThreadPool threadPool, Logger logger, Map<String, MockNode> nodes) throws Exception {
final Settings settings = Settings.builder()
.put("name", name)
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
MockTransportService service = buildTransportService(settings);
MockTransportService service = buildTransportService(settings, threadPool);
DiscoveryNode discoveryNode = DiscoveryNode.createLocal(settings, service.boundAddress().publishAddress(),
MockNode node = new MockNode(discoveryNode, service, listener, logger);
@ -228,14 +229,14 @@ public class PublishClusterStateActionTests extends ESTestCase {
protected MockTransportService buildTransportService(Settings settings) {
MockTransportService transportService = MockTransportService.local(Settings.EMPTY, Version.CURRENT, threadPool);
private static MockTransportService buildTransportService(Settings settings, ThreadPool threadPool) {
MockTransportService transportService = MockTransportService.local(settings, Version.CURRENT, threadPool);
return transportService;
protected MockPublishAction buildPublishClusterStateAction(
private static MockPublishAction buildPublishClusterStateAction(
Settings settings,
MockTransportService transportService,
Supplier<ClusterState> clusterStateSupplier,
@ -253,8 +254,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
public void testSimpleClusterStatePublishing() throws Exception {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY).setAsMaster();
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY);
MockNode nodeA = createMockNode("nodeA").setAsMaster();
MockNode nodeB = createMockNode("nodeB");
// Initial cluster state
ClusterState clusterState = nodeA.clusterState;
@ -282,7 +283,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
// Adding new node - this node should get full cluster state while nodeB should still be getting diffs
MockNode nodeC = createMockNode("nodeC", Settings.EMPTY);
MockNode nodeC = createMockNode("nodeC");
// cluster state update 3 - register node C
previousClusterState = clusterState;
@ -336,7 +337,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
fail("Shouldn't send cluster state to myself");
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY);
MockNode nodeB = createMockNode("nodeB");
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build();
@ -444,7 +445,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY);
MockNode nodeB = createMockNode("nodeB");
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(nodeA.nodes()).add(nodeB.discoveryNode).build();
@ -495,7 +496,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
final int dataNodes = randomIntBetween(0, 5);
final Settings dataSettings = Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build();
for (int i = 0; i < dataNodes; i++) {
discoveryNodesBuilder.add(createMockNode("data_" + i, dataSettings).discoveryNode);
discoveryNodesBuilder.add(createMockNode("data_" + i, dataSettings, null).discoveryNode);
DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build();
@ -521,7 +522,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
settings.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), expectingToCommit == false && timeOutNodes > 0 ? "100ms" : "1h")
.put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "5ms"); // test is about committing
MockNode master = createMockNode("master", settings.build());
MockNode master = createMockNode("master", settings.build(), null);
// randomize things a bit
int[] nodeTypes = new int[goodNodes + errorNodes + timeOutNodes];
@ -551,7 +552,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
final int dataNodes = randomIntBetween(0, 3); // data nodes don't matter
for (int i = 0; i < dataNodes; i++) {
final MockNode mockNode = createMockNode("data_" + i, Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build());
final MockNode mockNode = createMockNode("data_" + i,
Settings.builder().put(Node.NODE_MASTER_SETTING.getKey(), false).build(), null);
if (randomBoolean()) {
// we really don't care - just chaos monkey
@ -726,8 +728,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
Settings settings = Settings.builder()
.put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "1ms").build(); // short but so we will sometime commit sometime timeout
MockNode master = createMockNode("master", settings);
MockNode node = createMockNode("node", settings);
MockNode master = createMockNode("master", settings, null);
MockNode node = createMockNode("node", settings, null);
ClusterState state = ClusterState.builder(master.clusterState)
@ -843,7 +845,7 @@ public class PublishClusterStateActionTests extends ESTestCase {
static class MockPublishAction extends PublishClusterStateAction {
public static class MockPublishAction extends PublishClusterStateAction {
AtomicBoolean timeoutOnSend = new AtomicBoolean();
AtomicBoolean errorOnSend = new AtomicBoolean();
@ -42,11 +42,16 @@ import static junit.framework.TestCase.fail;
public class ClusterServiceUtils {
public static ClusterService createClusterService(ThreadPool threadPool) {
DiscoveryNode discoveryNode = new DiscoveryNode("node", LocalTransportAddress.buildUnique(), Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())),Version.CURRENT);
return createClusterService(threadPool, discoveryNode);
public static ClusterService createClusterService(ThreadPool threadPool, DiscoveryNode localNode) {
ClusterService clusterService = new ClusterService(Settings.builder().put("cluster.name", "ClusterServiceTests").build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
clusterService.setLocalNode(new DiscoveryNode("node", LocalTransportAddress.buildUnique(), Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())),Version.CURRENT));
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
public void connectToAddedNodes(ClusterChangedEvent event) {
Reference in New Issue
Block a user