Tests: Check watcher state before starting/stopping (elastic/x-pack-elasticsearch#4362)

This changes the behaviour of AbstractWatcherIntegrationTestCase and its
startWatcher/stopWatcher methods. Instead of checking for the target
state and just starting or stopping if it does not match, the methods
now wait for certain states to be reached before starting or stopping.

This will fix test failures where a failure is started instead of
stopped or vice versa.

Original commit: elastic/x-pack-elasticsearch@f0b0954803
This commit is contained in:
Alexander Reelsen 2018-04-16 18:53:04 +02:00 committed by GitHub
parent fd7b3e4d0b
commit 25895e0a3c
1 changed files with 43 additions and 12 deletions

View File

@ -70,7 +70,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -85,7 +84,6 @@ import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateR
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME; import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.WATCHES_TEMPLATE_NAME;
import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
@ -452,14 +450,30 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
.collect(Collectors.toList()); .collect(Collectors.toList());
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList()); List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());
boolean isStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED); logger.info("waiting to start watcher, current states {}", currentStatesFromStatsRequest);
if (isStateStarted == false){
assertAcked(watcherClient().prepareWatchService().start().get()); boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
if (isAllStateStarted) {
return;
} }
String message = String.format(Locale.ROOT, "Expected watcher to be started, but state was %s", currentStatesFromStatsRequest); boolean isAnyStopping = states.stream().anyMatch(w -> w == WatcherState.STOPPING);
assertThat(message, states, everyItem(is(WatcherState.STARTED))); if (isAnyStopping) {
assertThat(watcherStatsResponse.watcherMetaData().manuallyStopped(), is(false)); throw new AssertionError("at least one node is in state stopping, waiting to be stopped");
}
boolean isAllStateStopped = states.stream().allMatch(w -> w == WatcherState.STOPPED);
if (isAllStateStopped) {
assertAcked(watcherClient().prepareWatchService().start().get());
throw new AssertionError("all nodes are stopped, restarting");
}
boolean isAnyStarting = states.stream().anyMatch(w -> w == WatcherState.STARTING);
if (isAnyStarting) {
throw new AssertionError("at least one node is in state starting, waiting to be stopped");
}
throw new AssertionError("unexpected state, retrying with next run");
}); });
} }
@ -481,13 +495,30 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
.collect(Collectors.toList()); .collect(Collectors.toList());
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList()); List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());
boolean isStateStopped = states.stream().allMatch(w -> w == WatcherState.STOPPED); logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest);
if (isStateStopped == false){
boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
if (isAllStateStarted) {
assertAcked(watcherClient().prepareWatchService().stop().get()); assertAcked(watcherClient().prepareWatchService().stop().get());
throw new AssertionError("all nodes are started, stopping");
} }
String message = String.format(Locale.ROOT, "Expected watcher to be stopped, but state was %s", currentStatesFromStatsRequest); boolean isAnyStopping = states.stream().anyMatch(w -> w == WatcherState.STOPPING);
assertThat(message, states, everyItem(is(WatcherState.STOPPED))); if (isAnyStopping) {
throw new AssertionError("at least one node is in state stopping, waiting to be stopped");
}
boolean isAllStateStopped = states.stream().allMatch(w -> w == WatcherState.STOPPED);
if (isAllStateStopped) {
return;
}
boolean isAnyStarting = states.stream().anyMatch(w -> w == WatcherState.STARTING);
if (isAnyStarting) {
throw new AssertionError("at least one node is in state starting, waiting to be started before stopping");
}
throw new AssertionError("unexpected state, retrying with next run");
}); });
} }