Prevent watcher from starting after node has shutdown (elastic/x-pack-elasticsearch#3709)

Some tests seem to be pretty flaky due to concurrent watcher restarts.
This change makes sure we never restart watcher once the node is shutting down.

Original commit: elastic/x-pack-elasticsearch@f0bed7269b
This commit is contained in:
Simon Willnauer 2018-01-24 14:16:05 +01:00 committed by GitHub
parent 1ebccfcf50
commit 3b7d1f4d98
2 changed files with 43 additions and 3 deletions

View File

@ -44,6 +44,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
private final ExecutorService executor;
private AtomicReference<List<String>> previousAllocationIds = new AtomicReference<>(Collections.emptyList());
private volatile WatcherMetaData watcherMetaData;
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
WatcherLifeCycleService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
WatcherService watcherService) {
@ -56,17 +58,25 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
stop("shutdown initiated");
shutDown();
}
});
watcherMetaData = new WatcherMetaData(!settings.getAsBoolean("xpack.watcher.start_immediately", true));
}
public void stop(String reason) {
public synchronized void stop(String reason) {
watcherService.stop(reason);
}
synchronized void shutDown() {
shutDown = true;
stop("shutdown initiated");
}
private synchronized void start(ClusterState state, boolean manual) {
if (shutDown) {
return;
}
WatcherState watcherState = watcherService.state();
if (watcherState != WatcherState.STOPPED) {
logger.debug("not starting watcher. watcher can only start if its current state is [{}], but its current state now is [{}]",
@ -109,7 +119,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
*/
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) || shutDown) {
// wait until the gateway has recovered from disk, otherwise we think may not have .watches and
// a .triggered_watches index, but they may not have been restored from the cluster state on disk
return;

View File

@ -123,6 +123,36 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
verify(watcherService, never()).start(any(ClusterState.class));
}
public void testShutdown() throws Exception {
IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(new Index(Watch.INDEX, "foo")).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
.routingTable(RoutingTable.builder().add(watchRoutingTable).build())
.metaData(MetaData.builder()
.put(IndexTemplateMetaData.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(TRIGGERED_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.put(IndexTemplateMetaData.builder(WATCHES_TEMPLATE_NAME).patterns(randomIndexPatterns()))
.build())
.build();
when(watcherService.validate(clusterState)).thenReturn(true);
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", clusterState, clusterState));
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, never()).stop(anyString());
when(watcherService.state()).thenReturn(WatcherState.STARTED);
lifeCycleService.shutDown();
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, times(1)).stop(eq("shutdown initiated"));
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verify(watcherService, times(1)).start(any(ClusterState.class));
verify(watcherService, times(1)).stop(eq("shutdown initiated"));
}
public void testManualStartStop() throws Exception {
IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(new Index(Watch.INDEX, "foo")).build();
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))