Stabilise testStaleMasterNotHijackingMajority (#40253)
This test inadvertently asserts that the election occurs after a master failure is clean. However, messy elections are a fact of life so we should not fail on a messy election. This change moves this test away from an `AbstractDisruptionTestCase` since it does not need the fault detector to be so enthusiastic, and weakens the assertions to merely say that we ignore states published by the old master without saying anything about the cleanliness of the election. Closes #36556
This commit is contained in:
parent
f1eca21081
commit
707d40ce06
|
@ -19,28 +19,20 @@
|
|||
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.coordination.NoMasterBlockService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||
import org.elasticsearch.monitor.jvm.HotThreads;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.disruption.BlockMasterServiceOnMaster;
|
||||
import org.elasticsearch.test.disruption.IntermittentLongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.LongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
|
@ -48,21 +40,14 @@ import org.elasticsearch.test.disruption.SingleNodeDisruption;
|
|||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
/**
|
||||
* Tests relating to the loss of the master.
|
||||
|
@ -71,121 +56,6 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
|
||||
public class MasterDisruptionIT extends AbstractDisruptionTestCase {
|
||||
|
||||
/**
|
||||
* Tests that emulates a frozen elected master node that unfreezes and pushes his cluster state to other nodes
|
||||
* that already are following another elected master node. These nodes should reject this cluster state and prevent
|
||||
* them from following the stale master.
|
||||
*/
|
||||
@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE,org.elasticsearch.test.disruption:TRACE")
|
||||
public void testStaleMasterNotHijackingMajority() throws Exception {
|
||||
final List<String> nodes = startCluster(3);
|
||||
|
||||
// Save the current master node as old master node, because that node will get frozen
|
||||
final String oldMasterNode = internalCluster().getMasterName();
|
||||
for (String node : nodes) {
|
||||
ensureStableCluster(3, node);
|
||||
}
|
||||
assertMaster(oldMasterNode, nodes);
|
||||
|
||||
// Simulating a painful gc by suspending all threads for a long time on the current elected master node.
|
||||
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(random(), oldMasterNode);
|
||||
|
||||
// Save the majority side
|
||||
final List<String> majoritySide = new ArrayList<>(nodes);
|
||||
majoritySide.remove(oldMasterNode);
|
||||
|
||||
// Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
|
||||
final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<>());
|
||||
for (final String node : majoritySide) {
|
||||
masters.put(node, new ArrayList<>());
|
||||
internalCluster().getInstance(ClusterService.class, node).addListener(event -> {
|
||||
DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode();
|
||||
DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
|
||||
if (!Objects.equals(previousMaster, currentMaster)) {
|
||||
logger.info("node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(),
|
||||
event.previousState());
|
||||
String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null;
|
||||
String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null;
|
||||
masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
final CountDownLatch oldMasterNodeSteppedDown = new CountDownLatch(1);
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).addListener(event -> {
|
||||
if (event.state().nodes().getMasterNodeId() == null) {
|
||||
oldMasterNodeSteppedDown.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
logger.info("freezing node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
// Wait for the majority side to get stable
|
||||
assertDifferentMaster(majoritySide.get(0), oldMasterNode);
|
||||
assertDifferentMaster(majoritySide.get(1), oldMasterNode);
|
||||
|
||||
// the test is periodically tripping on the following assertion. To find out which threads are blocking the nodes from making
|
||||
// progress we print a stack dump
|
||||
boolean failed = true;
|
||||
try {
|
||||
assertDiscoveryCompleted(majoritySide);
|
||||
failed = false;
|
||||
} finally {
|
||||
if (failed) {
|
||||
logger.error("discovery failed to complete, probably caused by a blocked thread: {}",
|
||||
new HotThreads().busiestThreads(Integer.MAX_VALUE).ignoreIdleThreads(false).detect());
|
||||
}
|
||||
}
|
||||
|
||||
// The old master node is frozen, but here we submit a cluster state update task that doesn't get executed,
|
||||
// but will be queued and once the old master node un-freezes it gets executed.
|
||||
// The old master node will send this update + the cluster state where he is flagged as master to the other
|
||||
// nodes that follow the new master. These nodes should ignore this update.
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new
|
||||
ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return ClusterState.builder(currentState).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("failure [{}]", source), e);
|
||||
}
|
||||
});
|
||||
|
||||
// Save the new elected master node
|
||||
final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0));
|
||||
logger.info("new detected master node [{}]", newMasterNode);
|
||||
|
||||
// Stop disruption
|
||||
logger.info("Unfreeze node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.stopDisrupting();
|
||||
|
||||
oldMasterNodeSteppedDown.await(30, TimeUnit.SECONDS);
|
||||
// Make sure that the end state is consistent on all nodes:
|
||||
assertDiscoveryCompleted(nodes);
|
||||
assertMaster(newMasterNode, nodes);
|
||||
|
||||
assertThat(masters.size(), equalTo(2));
|
||||
for (Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) {
|
||||
String nodeName = entry.getKey();
|
||||
List<Tuple<String, String>> recordedMasterTransition = entry.getValue();
|
||||
assertThat("[" + nodeName + "] Each node should only record two master node transitions",
|
||||
recordedMasterTransition, hasSize(2));
|
||||
assertThat("[" + nodeName + "] First transition's previous master should be [" + oldMasterNode + "]",
|
||||
recordedMasterTransition.get(0).v1(), equalTo(oldMasterNode));
|
||||
assertThat("[" + nodeName + "] First transition's current master should be [null]",
|
||||
recordedMasterTransition.get(0).v2(), nullValue());
|
||||
assertThat("[" + nodeName + "] Second transition's previous master should be [null]",
|
||||
recordedMasterTransition.get(1).v1(), nullValue());
|
||||
assertThat("[" + nodeName + "] Second transition's current master should be [" + newMasterNode + "]",
|
||||
recordedMasterTransition.get(1).v2(), equalTo(newMasterNode));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that cluster recovers from a long GC on master that causes other nodes to elect a new one
|
||||
*/
|
||||
|
|
|
@ -18,28 +18,44 @@
|
|||
*/
|
||||
package org.elasticsearch.discovery;
|
||||
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.coordination.Coordinator;
|
||||
import org.elasticsearch.cluster.coordination.FollowersChecker;
|
||||
import org.elasticsearch.cluster.coordination.LeaderChecker;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.disruption.LongGCDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive;
|
||||
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
|
||||
import org.elasticsearch.test.disruption.SingleNodeDisruption;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.test.transport.MockTransportService.TestPlugin;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.singleton;
|
||||
|
@ -55,7 +71,7 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
|
|||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Collections.singletonList(TestPlugin.class);
|
||||
return Collections.singletonList(MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -152,4 +168,101 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
|
|||
networkDisruption.stopDisrupting();
|
||||
ensureStableCluster(3);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests that emulates a frozen elected master node that unfreezes and pushes its cluster state to other nodes that already are
|
||||
* following another elected master node. These nodes should reject this cluster state and prevent them from following the stale master.
|
||||
*/
|
||||
public void testStaleMasterNotHijackingMajority() throws Exception {
|
||||
final List<String> nodes = internalCluster().startNodes(3, Settings.builder()
|
||||
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
|
||||
.put(Coordinator.PUBLISH_TIMEOUT_SETTING.getKey(), "1s")
|
||||
.build());
|
||||
ensureStableCluster(3);
|
||||
|
||||
// Save the current master node as old master node, because that node will get frozen
|
||||
final String oldMasterNode = internalCluster().getMasterName();
|
||||
|
||||
// Simulating a painful gc by suspending all threads for a long time on the current elected master node.
|
||||
SingleNodeDisruption masterNodeDisruption = new LongGCDisruption(random(), oldMasterNode);
|
||||
|
||||
// Save the majority side
|
||||
final List<String> majoritySide = new ArrayList<>(nodes);
|
||||
majoritySide.remove(oldMasterNode);
|
||||
|
||||
// Keeps track of the previous and current master when a master node transition took place on each node on the majority side:
|
||||
final Map<String, List<Tuple<String, String>>> masters = Collections.synchronizedMap(new HashMap<>());
|
||||
for (final String node : majoritySide) {
|
||||
masters.put(node, new ArrayList<>());
|
||||
internalCluster().getInstance(ClusterService.class, node).addListener(event -> {
|
||||
DiscoveryNode previousMaster = event.previousState().nodes().getMasterNode();
|
||||
DiscoveryNode currentMaster = event.state().nodes().getMasterNode();
|
||||
if (!Objects.equals(previousMaster, currentMaster)) {
|
||||
logger.info("--> node {} received new cluster state: {} \n and had previous cluster state: {}", node, event.state(),
|
||||
event.previousState());
|
||||
String previousMasterNodeName = previousMaster != null ? previousMaster.getName() : null;
|
||||
String currentMasterNodeName = currentMaster != null ? currentMaster.getName() : null;
|
||||
masters.get(node).add(new Tuple<>(previousMasterNodeName, currentMasterNodeName));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
final CountDownLatch oldMasterNodeSteppedDown = new CountDownLatch(1);
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).addListener(event -> {
|
||||
if (event.state().nodes().getMasterNodeId() == null) {
|
||||
oldMasterNodeSteppedDown.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
internalCluster().setDisruptionScheme(masterNodeDisruption);
|
||||
logger.info("--> freezing node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.startDisrupting();
|
||||
|
||||
// Wait for majority side to elect a new master
|
||||
assertBusy(() -> {
|
||||
for (final Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) {
|
||||
final List<Tuple<String, String>> transitions = entry.getValue();
|
||||
assertTrue(entry.getKey() + ": " + transitions,
|
||||
transitions.stream().anyMatch(transition -> transition.v2() != null));
|
||||
}
|
||||
});
|
||||
|
||||
// The old master node is frozen, but here we submit a cluster state update task that doesn't get executed, but will be queued and
|
||||
// once the old master node un-freezes it gets executed. The old master node will send this update + the cluster state where it is
|
||||
// flagged as master to the other nodes that follow the new master. These nodes should ignore this update.
|
||||
internalCluster().getInstance(ClusterService.class, oldMasterNode).submitStateUpdateTask("sneaky-update", new
|
||||
ClusterStateUpdateTask(Priority.IMMEDIATE) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
return ClusterState.builder(currentState).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("failure [{}]", source), e);
|
||||
}
|
||||
});
|
||||
|
||||
// Save the new elected master node
|
||||
final String newMasterNode = internalCluster().getMasterName(majoritySide.get(0));
|
||||
logger.info("--> new detected master node [{}]", newMasterNode);
|
||||
|
||||
// Stop disruption
|
||||
logger.info("--> unfreezing node [{}]", oldMasterNode);
|
||||
masterNodeDisruption.stopDisrupting();
|
||||
|
||||
oldMasterNodeSteppedDown.await(30, TimeUnit.SECONDS);
|
||||
logger.info("--> [{}] stepped down as master", oldMasterNode);
|
||||
ensureStableCluster(3);
|
||||
|
||||
assertThat(masters.size(), equalTo(2));
|
||||
for (Map.Entry<String, List<Tuple<String, String>>> entry : masters.entrySet()) {
|
||||
String nodeName = entry.getKey();
|
||||
List<Tuple<String, String>> transitions = entry.getValue();
|
||||
assertTrue("[" + nodeName + "] should not apply state from old master [" + oldMasterNode + "] but it did: " + transitions,
|
||||
transitions.stream().noneMatch(t -> oldMasterNode.equals(t.v2())));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue