Add storage-layer disruptions to CoordinatorTests (#34347)
Today we assume the storage layer operates perfectly in CoordinatorTests, which means we are not testing that the system's invariants are preserved if the storage layer fails for some reason. This change injects (rare) storage-layer failures during the safety phase to cover these cases.
This commit is contained in:
parent
d98199df14
commit
8b9fa55c93
|
@ -811,6 +811,7 @@ public class CoordinationStateTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class InMemoryPersistedState implements PersistedState {
|
public static class InMemoryPersistedState implements PersistedState {
|
||||||
|
// TODO add support and tests for behaviour with persistence-layer failures
|
||||||
|
|
||||||
private long currentTerm;
|
private long currentTerm;
|
||||||
private ClusterState acceptedState;
|
private ClusterState acceptedState;
|
||||||
|
|
|
@ -49,6 +49,8 @@ import org.elasticsearch.transport.TransportService;
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -522,6 +524,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
|
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(
|
||||||
// TODO does ThreadPool need a node name any more?
|
// TODO does ThreadPool need a node name any more?
|
||||||
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random());
|
Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random());
|
||||||
|
private boolean disruptStorage;
|
||||||
private final VotingConfiguration initialConfiguration;
|
private final VotingConfiguration initialConfiguration;
|
||||||
|
|
||||||
private final Set<String> disconnectedNodes = new HashSet<>();
|
private final Set<String> disconnectedNodes = new HashSet<>();
|
||||||
|
@ -566,6 +569,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
logger.info("--> start of safety phase of at least [{}] steps", randomSteps);
|
logger.info("--> start of safety phase of at least [{}] steps", randomSteps);
|
||||||
|
|
||||||
deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY);
|
deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY);
|
||||||
|
disruptStorage = true;
|
||||||
int step = 0;
|
int step = 0;
|
||||||
long finishTime = -1;
|
long finishTime = -1;
|
||||||
|
|
||||||
|
@ -636,7 +640,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
// - reboot a node
|
// - reboot a node
|
||||||
// - abdicate leadership
|
// - abdicate leadership
|
||||||
|
|
||||||
} catch (CoordinationStateRejectedException ignored) {
|
} catch (CoordinationStateRejectedException | UncheckedIOException ignored) {
|
||||||
// This is ok: it just means a message couldn't currently be handled.
|
// This is ok: it just means a message couldn't currently be handled.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -645,6 +649,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
|
|
||||||
disconnectedNodes.clear();
|
disconnectedNodes.clear();
|
||||||
blackholedNodes.clear();
|
blackholedNodes.clear();
|
||||||
|
disruptStorage = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertConsistentStates() {
|
private void assertConsistentStates() {
|
||||||
|
@ -674,6 +679,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
void stabilise(long stabilisationDurationMillis) {
|
void stabilise(long stabilisationDurationMillis) {
|
||||||
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)",
|
assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)",
|
||||||
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
|
deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY));
|
||||||
|
assertFalse("stabilisation requires stable storage", disruptStorage);
|
||||||
|
|
||||||
if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) {
|
if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) {
|
||||||
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
|
assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty());
|
||||||
|
@ -826,6 +832,37 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
return getAnyNode();
|
return getAnyNode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class MockPersistedState extends InMemoryPersistedState {
|
||||||
|
MockPersistedState(long term, ClusterState acceptedState) {
|
||||||
|
super(term, acceptedState);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void possiblyFail(String description) {
|
||||||
|
if (disruptStorage && rarely()) {
|
||||||
|
// TODO revisit this when we've decided how PersistedState should throw exceptions
|
||||||
|
if (randomBoolean()) {
|
||||||
|
throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']'));
|
||||||
|
} else {
|
||||||
|
throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCurrentTerm(long currentTerm) {
|
||||||
|
possiblyFail("before writing term of " + currentTerm);
|
||||||
|
super.setCurrentTerm(currentTerm);
|
||||||
|
// TODO possiblyFail() here if that's a failure mode of the storage layer
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setLastAcceptedState(ClusterState clusterState) {
|
||||||
|
possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version());
|
||||||
|
super.setLastAcceptedState(clusterState);
|
||||||
|
// TODO possiblyFail() here if that's a failure mode of the storage layer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class ClusterNode extends AbstractComponent {
|
class ClusterNode extends AbstractComponent {
|
||||||
private final int nodeIndex;
|
private final int nodeIndex;
|
||||||
private Coordinator coordinator;
|
private Coordinator coordinator;
|
||||||
|
@ -841,7 +878,7 @@ public class CoordinatorTests extends ESTestCase {
|
||||||
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
|
super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build());
|
||||||
this.nodeIndex = nodeIndex;
|
this.nodeIndex = nodeIndex;
|
||||||
localNode = createDiscoveryNode();
|
localNode = createDiscoveryNode();
|
||||||
persistedState = new InMemoryPersistedState(0L,
|
persistedState = new MockPersistedState(0L,
|
||||||
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
|
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
|
||||||
onNode(localNode, this::setUp).run();
|
onNode(localNode, this::setUp).run();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue