Add CoordinatorTests for empty unicast hosts list (#38209)

Today we have DiscoveryDisruptionIT tests for checking that discovery can still
work once the cluster has formed, even if the cluster is misconfigured and only
has a single master-eligible node in its unicast hosts list. In fact with Zen2
we can go one better: we do not need any nodes in the unicast hosts list,
because nodes also use the contents of the last-committed cluster state for
discovery. Additionally, the DiscoveryDisruptionIT tests were failing due to
the overenthusiastic fault-detection timeouts.

This commit replaces these tests with deterministic `CoordinatorTests` that
verify the same behaviour. It also removes some duplication by extracting a
test method called `testFollowerCheckerAfterMasterReelection()`

Closes #37687
This commit is contained in:
David Turner 2019-02-02 07:54:56 +00:00 committed by GitHub
parent 80d3092292
commit c311062476
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 107 additions and 150 deletions

View File

@ -341,10 +341,10 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
// Bump our term. However if there is a publication in flight then doing so would cancel the publication, so don't do that
// since we check whether a term bump is needed at the end of the publication too.
if (publicationInProgress()) {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump",
maxTermSeen, currentTerm);
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
} else {
try {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm);
ensureTermAtLeast(getLocalNode(), maxTermSeen);
startElection();
} catch (Exception e) {

View File

@ -953,9 +953,18 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
responseHandlers.prune(h -> h.connection().getCacheKey().equals(connection.getCacheKey()));
// callback that an exception happened, but on a different thread since we don't
// want handlers to worry about stack overflows
getExecutorService().execute(() -> {
getExecutorService().execute(new Runnable() {
@Override
public void run() {
for (Transport.ResponseContext holderToNotify : pruned) {
holderToNotify.handler().handleException(new NodeDisconnectedException(connection.getNode(), holderToNotify.action()));
holderToNotify.handler().handleException(
new NodeDisconnectedException(connection.getNode(), holderToNotify.action()));
}
}
@Override
public String toString() {
return "onConnectionClosed(" + connection.getNode() + ")";
}
});
} catch (EsRejectedExecutionException ex) {

View File

@ -44,6 +44,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -98,6 +99,7 @@ import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.BOOTSTRAP_PLACEHOLDER_PREFIX;
import static org.elasticsearch.cluster.coordination.CoordinationStateTests.clusterState;
@ -1046,7 +1048,7 @@ public class CoordinatorTests extends ESTestCase {
Loggers.removeAppender(joinLogger, mockAppender);
mockAppender.stop();
}
assertTrue(newNode.getLastAppliedClusterState().version() == 0);
assertEquals(0, newNode.getLastAppliedClusterState().version());
final ClusterNode detachedNode = newNode.restartedNode(
metaData -> DetachClusterCommand.updateMetaData(metaData),
@ -1055,6 +1057,27 @@ public class CoordinatorTests extends ESTestCase {
cluster1.stabilise();
}
public void testDiscoveryUsesNodesFromLastClusterState() {
final Cluster cluster = new Cluster(randomIntBetween(3, 5));
cluster.runRandomly();
cluster.stabilise();
final ClusterNode partitionedNode = cluster.getAnyNode();
if (randomBoolean()) {
logger.info("--> blackholing {}", partitionedNode);
partitionedNode.blackhole();
} else {
logger.info("--> disconnecting {}", partitionedNode);
partitionedNode.disconnect();
}
cluster.setEmptyUnicastHostsList();
cluster.stabilise();
partitionedNode.heal();
cluster.runRandomly(false);
cluster.stabilise();
}
private static long defaultMillis(Setting<TimeValue> setting) {
return setting.get(Settings.EMPTY).millis() + Cluster.DEFAULT_DELAY_VARIABILITY;
}
@ -1094,6 +1117,8 @@ public class CoordinatorTests extends ESTestCase {
* defaultInt(LEADER_CHECK_RETRY_COUNT_SETTING)
// then wait for a follower to be promoted to leader
+ DEFAULT_ELECTION_DELAY
// perhaps there is an election collision requiring another publication (which times out) and a term bump
+ defaultMillis(PUBLISH_TIMEOUT_SETTING) + DEFAULT_ELECTION_DELAY
// then wait for the new leader to notice that the old leader is unresponsive
+ (defaultMillis(FOLLOWER_CHECK_INTERVAL_SETTING) + defaultMillis(FOLLOWER_CHECK_TIMEOUT_SETTING))
* defaultInt(FOLLOWER_CHECK_RETRY_COUNT_SETTING)
@ -1110,14 +1135,17 @@ public class CoordinatorTests extends ESTestCase {
// TODO does ThreadPool need a node name any more?
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random());
private boolean disruptStorage;
private final VotingConfiguration initialConfiguration;
private final Set<String> disconnectedNodes = new HashSet<>();
private final Set<String> blackholedNodes = new HashSet<>();
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier =
localNode -> new MockPersistedState(localNode);
private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier = MockPersistedState::new;
@Nullable // null means construct a list from all the current nodes
private List<TransportAddress> unicastHostsList;
Cluster(int initialNodeCount) {
this(initialNodeCount, true);
@ -1177,6 +1205,10 @@ public class CoordinatorTests extends ESTestCase {
}
void runRandomly() {
runRandomly(true);
}
void runRandomly(boolean allowReboots) {
// TODO supporting (preserving?) existing disruptions needs implementing if needed, for now we just forbid it
assertThat("may reconnect disconnected nodes, probably unexpected", disconnectedNodes, empty());
@ -1223,7 +1255,7 @@ public class CoordinatorTests extends ESTestCase {
thisStep, autoShrinkVotingConfiguration, clusterNode.getId());
clusterNode.submitSetAutoShrinkVotingConfiguration(autoShrinkVotingConfiguration);
}).run();
} else if (rarely()) {
} else if (allowReboots && rarely()) {
// reboot random node
final ClusterNode clusterNode = getAnyNode();
logger.debug("----> [runRandomly {}] rebooting [{}]", thisStep, clusterNode.getId());
@ -1504,6 +1536,10 @@ public class CoordinatorTests extends ESTestCase {
return getAnyNode();
}
void setEmptyUnicastHostsList() {
unicastHostsList = emptyList();
}
class MockPersistedState implements PersistedState {
private final PersistedState delegate;
private final NodeEnvironment nodeEnvironment;
@ -1678,6 +1714,7 @@ public class CoordinatorTests extends ESTestCase {
}
void close() {
onNode(() -> {
logger.trace("taking down [{}]", localNode);
coordinator.stop();
clusterService.stop();
@ -1685,6 +1722,7 @@ public class CoordinatorTests extends ESTestCase {
clusterService.close();
coordinator.close();
//transportService.close(); // does blocking stuff :/
});
}
ClusterNode restartedNode() {
@ -1866,7 +1904,8 @@ public class CoordinatorTests extends ESTestCase {
}
private List<TransportAddress> provideUnicastHosts(HostsResolver ignored) {
return clusterNodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getAddress).collect(Collectors.toList());
return unicastHostsList != null ? unicastHostsList
: clusterNodes.stream().map(ClusterNode::getLocalNode).map(DiscoveryNode::getAddress).collect(Collectors.toList());
}
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -40,10 +39,8 @@ import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -58,77 +55,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
public void testIsolatedUnicastNodes() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String unicastTarget = nodes.get(0);
Set<String> unicastTargetSide = new HashSet<>();
unicastTargetSide.add(unicastTarget);
Set<String> restOfClusterSide = new HashSet<>();
restOfClusterSide.addAll(nodes);
restOfClusterSide.remove(unicastTarget);
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
clearTemporalResponses();
// Simulate a network issue between the unicast target node and the rest of the cluster
NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(unicastTargetSide, restOfClusterSide),
new NetworkDisconnect());
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();
// Wait until elected master has removed that the unlucky node...
ensureStableCluster(3, nodes.get(1));
// The isolate master node must report no master, so it starts with pinging
assertNoMaster(unicastTarget);
networkDisconnect.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(4);
}
/**
* A 4 node cluster with m_m_n set to 3 and each node has one unicast endpoint. One node partitions from the master node.
* The temporal unicast responses is empty. When partition is solved the one ping response contains a master node.
* The rejoining node should take this master node and connect.
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37687")
public void testUnicastSinglePingResponseContainsMaster() throws Exception {
internalCluster().setHostsListContainsOnlyFirstNode(true);
List<String> nodes = startCluster(4);
// Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node={}", masterNode);
List<String> otherNodes = new ArrayList<>(nodes);
otherNodes.remove(masterNode);
otherNodes.remove(nodes.get(0)); // <-- Don't isolate the node that is in the unicast endpoint for all the other nodes.
final String isolatedNode = otherNodes.get(0);
// Forcefully clean temporal response lists on all nodes. Otherwise the node in the unicast host list
// includes all the other nodes that have pinged it and the issue doesn't manifest
clearTemporalResponses();
// Simulate a network issue between the unlucky node and elected master node in both directions.
NetworkDisruption networkDisconnect = new NetworkDisruption(new TwoPartitions(masterNode, isolatedNode),
new NetworkDisconnect());
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();
// Wait until elected master has removed that the unlucky node...
ensureStableCluster(3, masterNode);
// The isolate master node must report no master, so it starts with pinging
assertNoMaster(isolatedNode);
networkDisconnect.stopDisrupting();
// Wait until the master node sees all 4 nodes again.
ensureStableCluster(4);
// The elected master shouldn't have changed, since the isolated node never could have elected himself as
// master since m_m_n of 3 could never be satisfied.
assertMaster(masterNode, nodes);
}
/**
* Test cluster join with issues in cluster state publishing *
*/
@ -187,7 +113,7 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
internalCluster().stopRandomNonMasterNode();
}
public void testClusterFormingWithASlowNode() throws Exception {
public void testClusterFormingWithASlowNode() {
SlowClusterStateProcessing disruption = new SlowClusterStateProcessing(random(), 0, 0, 1000, 2000);

View File

@ -28,7 +28,9 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkLinkDisruptionType;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive;
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService.TestPlugin;
@ -39,7 +41,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.singleton;
import static org.hamcrest.Matchers.equalTo;
@ -87,8 +88,7 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
// It may a take a bit before the node detects it has been cut off from the elected master
assertBusy(() -> assertNull(client(unluckyNode).admin().cluster().state(
new ClusterStateRequest().local(true)).get().getState().nodes().getMasterNode()));
ensureNoMaster(unluckyNode);
networkDisconnect.stopDisrupting();
@ -99,48 +99,32 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
assertThat(internalCluster().getMasterName(), equalTo(masterNode));
}
/**
* Verify that nodes fault detection works after master (re) election
*/
public void testFollowerCheckerDetectsUnresponsiveNodeAfterMasterReelection() throws Exception {
internalCluster().startNodes(4,
Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "10")
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1).build());
ensureStableCluster(4);
logger.info("--> stopping current master");
internalCluster().stopCurrentMasterNode();
ensureStableCluster(3);
final String master = internalCluster().getMasterName();
final List<String> nonMasters = Arrays.stream(internalCluster().getNodeNames()).filter(n -> master.equals(n) == false)
.collect(Collectors.toList());
final String isolatedNode = randomFrom(nonMasters);
final String otherNode = nonMasters.get(nonMasters.get(0).equals(isolatedNode) ? 1 : 0);
logger.info("--> isolating [{}]", isolatedNode);
final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(
singleton(isolatedNode), Sets.newHashSet(master, otherNode)), new NetworkUnresponsive());
setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();
logger.info("--> waiting for master to remove it");
ensureStableCluster(2, master);
networkDisruption.stopDisrupting();
ensureStableCluster(3);
private void ensureNoMaster(String node) throws Exception {
assertBusy(() -> assertNull(client(node).admin().cluster().state(
new ClusterStateRequest().local(true)).get().getState().nodes().getMasterNode()));
}
/**
* Verify that nodes fault detection works after master (re) election
* Verify that nodes fault detection detects a disconnected node after master reelection
*/
public void testFollowerCheckerDetectsDisconnectedNodeAfterMasterReelection() throws Exception {
internalCluster().startNodes(4);
testFollowerCheckerAfterMasterReelection(new NetworkDisconnect(), Settings.EMPTY);
}
/**
* Verify that nodes fault detection detects an unresponsive node after master reelection
*/
public void testFollowerCheckerDetectsUnresponsiveNodeAfterMasterReelection() throws Exception {
testFollowerCheckerAfterMasterReelection(new NetworkUnresponsive(), Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "4")
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "1s")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1).build());
}
private void testFollowerCheckerAfterMasterReelection(NetworkLinkDisruptionType networkLinkDisruptionType,
Settings settings) throws Exception {
internalCluster().startNodes(4, settings);
ensureStableCluster(4);
logger.info("--> stopping current master");
@ -156,13 +140,14 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
logger.info("--> isolating [{}]", isolatedNode);
final NetworkDisruption networkDisruption = new NetworkDisruption(new NetworkDisruption.TwoPartitions(
singleton(isolatedNode), Stream.of(master, otherNode).collect(Collectors.toSet())), new NetworkDisconnect());
final NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(
singleton(isolatedNode), Sets.newHashSet(master, otherNode)), networkLinkDisruptionType);
setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();
logger.info("--> waiting for master to remove it");
ensureStableCluster(2, master);
ensureNoMaster(isolatedNode);
networkDisruption.stopDisrupting();
ensureStableCluster(3);

View File

@ -34,12 +34,20 @@ public class MockSinglePrioritizingExecutor extends PrioritizedEsThreadPoolExecu
r -> new Thread() {
@Override
public void start() {
deterministicTaskQueue.scheduleNow(() -> {
deterministicTaskQueue.scheduleNow(new Runnable() {
@Override
public void run() {
try {
r.run();
} catch (KillWorkerError kwe) {
// hacks everywhere
}
}
@Override
public String toString() {
return r.toString();
}
});
}
},

View File

@ -241,9 +241,6 @@ public final class InternalTestCluster extends TestCluster {
private ServiceDisruptionScheme activeDisruptionScheme;
private Function<Client, Client> clientWrapper;
// If set to true only the first node in the cluster will be made a unicast node
private boolean hostsListContainsOnlyFirstNode;
private int bootstrapMasterNodeIndex = -1;
public InternalTestCluster(
@ -1667,9 +1664,6 @@ public final class InternalTestCluster extends TestCluster {
synchronized (discoveryFileMutex) {
try {
Stream<NodeAndClient> unicastHosts = Stream.concat(nodes.values().stream(), newNodes.stream());
if (hostsListContainsOnlyFirstNode) {
unicastHosts = unicastHosts.limit(1L);
}
List<String> discoveryFileContents = unicastHosts.map(
nac -> nac.node.injector().getInstance(TransportService.class)
).filter(Objects::nonNull)
@ -2207,10 +2201,6 @@ public final class InternalTestCluster extends TestCluster {
return filterNodes(nodes, NodeAndClient::isMasterEligible).size();
}
public void setHostsListContainsOnlyFirstNode(boolean hostsListContainsOnlyFirstNode) {
this.hostsListContainsOnlyFirstNode = hostsListContainsOnlyFirstNode;
}
public void setDisruptionScheme(ServiceDisruptionScheme scheme) {
assert activeDisruptionScheme == null :
"there is already and active disruption [" + activeDisruptionScheme + "]. call clearDisruptionScheme first";

View File

@ -113,7 +113,7 @@ public abstract class DisruptableMockTransport extends MockTransport {
assert destinationTransport.getLocalNode().equals(getLocalNode()) == false :
"non-local message from " + getLocalNode() + " to itself";
execute(action, new Runnable() {
destinationTransport.execute(action, new Runnable() {
@Override
public void run() {
switch (getConnectionStatus(destinationTransport.getLocalNode())) {