diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java index 0ce97e3f19f..de661d0690a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinationStateTests.java @@ -811,6 +811,7 @@ public class CoordinationStateTests extends ESTestCase { } public static class InMemoryPersistedState implements PersistedState { + // TODO add support and tests for behaviour with persistence-layer failures private long currentTerm; private ClusterState acceptedState; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index eec4bf41ecf..5230c7e5294 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -49,6 +49,8 @@ import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; import org.junit.Before; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -522,6 +524,7 @@ public class CoordinatorTests extends ESTestCase { final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue( // TODO does ThreadPool need a node name any more? Settings.builder().put(NODE_NAME_SETTING.getKey(), "deterministic-task-queue").build(), random()); + private boolean disruptStorage; private final VotingConfiguration initialConfiguration; private final Set disconnectedNodes = new HashSet<>(); @@ -566,6 +569,7 @@ public class CoordinatorTests extends ESTestCase { logger.info("--> start of safety phase of at least [{}] steps", randomSteps); deterministicTaskQueue.setExecutionDelayVariabilityMillis(EXTREME_DELAY_VARIABILITY); + disruptStorage = true; int step = 0; long finishTime = -1; @@ -636,7 +640,7 @@ public class CoordinatorTests extends ESTestCase { // - reboot a node // - abdicate leadership - } catch (CoordinationStateRejectedException ignored) { + } catch (CoordinationStateRejectedException | UncheckedIOException ignored) { // This is ok: it just means a message couldn't currently be handled. } @@ -645,6 +649,7 @@ public class CoordinatorTests extends ESTestCase { disconnectedNodes.clear(); blackholedNodes.clear(); + disruptStorage = false; } private void assertConsistentStates() { @@ -674,6 +679,7 @@ public class CoordinatorTests extends ESTestCase { void stabilise(long stabilisationDurationMillis) { assertThat("stabilisation requires default delay variability (and proper cleanup of raised variability)", deterministicTaskQueue.getExecutionDelayVariabilityMillis(), lessThanOrEqualTo(DEFAULT_DELAY_VARIABILITY)); + assertFalse("stabilisation requires stable storage", disruptStorage); if (clusterNodes.stream().allMatch(n -> n.coordinator.getLastAcceptedState().getLastAcceptedConfiguration().isEmpty())) { assertThat("setting initial configuration may fail with disconnected nodes", disconnectedNodes, empty()); @@ -826,6 +832,37 @@ public class CoordinatorTests extends ESTestCase { return getAnyNode(); } + class MockPersistedState extends InMemoryPersistedState { + MockPersistedState(long term, ClusterState acceptedState) { + super(term, acceptedState); + } + + private void possiblyFail(String description) { + if (disruptStorage && rarely()) { + // TODO revisit this when we've decided how PersistedState should throw exceptions + if (randomBoolean()) { + throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']')); + } else { + throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']'); + } + } + } + + @Override + public void setCurrentTerm(long currentTerm) { + possiblyFail("before writing term of " + currentTerm); + super.setCurrentTerm(currentTerm); + // TODO possiblyFail() here if that's a failure mode of the storage layer + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); + super.setLastAcceptedState(clusterState); + // TODO possiblyFail() here if that's a failure mode of the storage layer + } + } + class ClusterNode extends AbstractComponent { private final int nodeIndex; private Coordinator coordinator; @@ -841,7 +878,7 @@ public class CoordinatorTests extends ESTestCase { super(Settings.builder().put(NODE_NAME_SETTING.getKey(), nodeIdFromIndex(nodeIndex)).build()); this.nodeIndex = nodeIndex; localNode = createDiscoveryNode(); - persistedState = new InMemoryPersistedState(0L, + persistedState = new MockPersistedState(0L, clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); onNode(localNode, this::setUp).run(); }