From 8b9fa55c93cf8bcd859daf285e0ec3947d1f51f3 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 13 Oct 2018 14:24:15 +0100 Subject: [PATCH] Add storage-layer disruptions to CoordinatorTests (#34347) Today we assume the storage layer operates perfectly in CoordinatorTests, which means we are not testing that the system's invariants are preserved if the storage layer fails for some reason. This change injects (rare) storage-layer failures during the safety phase to cover these cases. --- .../coordination/CoordinationStateTests.java | 1 + .../coordination/CoordinatorTests.java | 41 ++++++++++++++++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index 0ce97e3f19f..de661d0690a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -811,6 +811,7 @@ public class CoordinationStateTests extends ESTestCase { } public static class InMemoryPersistedState implements PersistedState { + // TODO add support and tests for behaviour with persistence-layer failures private long currentTerm; private ClusterState acceptedState; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index eec4bf41ecf..5230c7e5294 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -49,6 +49,8 @@ import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; import org.junit.Before; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -522,6 +524,7 @@ public class CoordinatorTests extends ESTestCase { final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( // TODO does ThreadPool need a node name any more? Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random()); + private boolean disruptStorage; private final VotingConfiguration initialConfiguration; private final Set disconnectedNodes = new HashSet<>(); @@ -566,6 +569,7 @@ public class CoordinatorTests extends ESTestCase { logger.info("--> start of safety phase of at least [{}] steps", randomSteps); deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY); + disruptStorage = true; int step = 0; long finishTime = -1; @@ -636,7 +640,7 @@ public class CoordinatorTests extends ESTestCase { // - reboot a node // - abdicate leadership - } catch (CoordinationStateRejectedException ignored) { + } catch (CoordinationStateRejectedException | UncheckedIOException ignored) { // This is ok: it just means a message couldn't currently be handled. } @@ -645,6 +649,7 @@ public class CoordinatorTests extends ESTestCase { disconnectedNodes.clear(); blackholedNodes.clear(); + disruptStorage = false; } private void assertConsistentStates() { @@ -674,6 +679,7 @@ public class CoordinatorTests extends ESTestCase { void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); + assertFalse("stabilisation requires stable storage", disruptStorage); if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); @@ -826,6 +832,37 @@ public class CoordinatorTests extends ESTestCase { return getAnyNode(); } + class MockPersistedState extends InMemoryPersistedState { + MockPersistedState(long term, ClusterState acceptedState) { + super(term, acceptedState); + } + + private void possiblyFail(String description) { + if (disruptStorage && rarely()) { + // TODO revisit this when we've decided how PersistedState should throw exceptions + if (randomBoolean()) { + throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']')); + } else { + throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']'); + } + } + } + + @Override + public void setCurrentTerm(long currentTerm) { + possiblyFail("before writing term of " + currentTerm); + super.setCurrentTerm(currentTerm); + // TODO possiblyFail() here if that's a failure mode of the storage layer + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); + super.setLastAcceptedState(clusterState); + // TODO possiblyFail() here if that's a failure mode of the storage layer + } + } + class ClusterNode extends AbstractComponent { private final int nodeIndex; private Coordinator coordinator; @@ -841,7 +878,7 @@ public class CoordinatorTests extends ESTestCase { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); this.nodeIndex = nodeIndex; localNode = createDiscoveryNode(); - persistedState = new InMemoryPersistedState(0L, + persistedState = new MockPersistedState(0L, clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); onNode(localNode, this::setUp).run(); }