Watcher add stopped listener (#43939) (#45670)

When Watcher is stopped and there are still outstanding watches running
Watcher will report it self as stopped. In normal cases, this is not problematic.

However, for integration tests Watcher is started and stopped between
each test to help ensure a clean slate for each test. The tests are blocking
only on the stopped state and make an implicit assumption that all watches are
finished if the Watcher is stopped. This is an incorrect assumption since
Stopped really means, "I will not accept any more watches". This can lead to
un-predictable behavior in the tests such as message : "Watch is already queued
in thread pool" and state: "not_executed_already_queued".
This can also change the .watcher-history if watches linger between tests.

This commit changes the semantics of a manual stopping watcher to now mean:
"I will not accept any more watches AND all running watches are complete".
There is now an intermediary step "Stopping" and callback to allow transition
to a "Stopped" state when all Watches have completed.

Additionally since this impacts how long the tests will block waiting for a
"Stopped" state, the timeout has been increased.

Related: #42409
This commit is contained in:
Jake Landis 2019-08-22 10:54:29 -05:00 committed by GitHub
parent bfddaaa2ae
commit 1dab73929f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 110 additions and 43 deletions

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.watcher;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
@ -25,6 +27,7 @@ import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ -35,10 +38,12 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
public class WatcherLifeCycleService implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(WatcherLifeCycleService.class);
private final AtomicReference<WatcherState> state = new AtomicReference<>(WatcherState.STARTED);
private final AtomicReference<List<ShardRouting>> previousShardRoutings = new AtomicReference<>(Collections.emptyList());
private volatile boolean shutDown = false; // indicates that the node has been shutdown and we should never start watcher after this.
private volatile WatcherService watcherService;
private final EnumSet<WatcherState> stopStates = EnumSet.of(WatcherState.STOPPED, WatcherState.STOPPING);
WatcherLifeCycleService(ClusterService clusterService, WatcherService watcherService) {
this.watcherService = watcherService;
@ -57,8 +62,10 @@ public class WatcherLifeCycleService implements ClusterStateListener {
this.state.set(WatcherState.STOPPING);
shutDown = true;
clearAllocationIds();
watcherService.shutDown();
this.state.set(WatcherState.STOPPED);
watcherService.shutDown(() -> {
this.state.set(WatcherState.STOPPED);
logger.info("watcher has stopped and shutdown");
});
}
/**
@ -88,9 +95,10 @@ public class WatcherLifeCycleService implements ClusterStateListener {
}
boolean isWatcherStoppedManually = isWatcherStoppedManually(event.state());
boolean isStoppedOrStopping = stopStates.contains(this.state.get());
// if this is not a data node, we need to start it ourselves possibly
if (event.state().nodes().getLocalNode().isDataNode() == false &&
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
isWatcherStoppedManually == false && isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
return;
@ -99,8 +107,20 @@ public class WatcherLifeCycleService implements ClusterStateListener {
if (isWatcherStoppedManually) {
if (this.state.get() == WatcherState.STARTED) {
clearAllocationIds();
watcherService.stop("watcher manually marked to shutdown by cluster state update");
this.state.set(WatcherState.STOPPED);
boolean stopping = this.state.compareAndSet(WatcherState.STARTED, WatcherState.STOPPING);
if (stopping) {
//waiting to set state to stopped until after all currently running watches are finished
watcherService.stop("watcher manually marked to shutdown by cluster state update", () -> {
//only transition from stopping -> stopped (which may not be the case if restarted quickly)
boolean stopped = state.compareAndSet(WatcherState.STOPPING, WatcherState.STOPPED);
if (stopped) {
logger.info("watcher has stopped");
} else {
logger.info("watcher has not been stopped. not currently in a stopping state, current state [{}]", state.get());
}
});
}
}
return;
}
@ -142,7 +162,7 @@ public class WatcherLifeCycleService implements ClusterStateListener {
previousShardRoutings.set(localAffectedShardRoutings);
if (state.get() == WatcherState.STARTED) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (state.get() == WatcherState.STOPPED) {
} else if (isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.upgrade.UpgradeField;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.execution.TriggeredWatchStoreField;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.execution.ExecutionService;
@ -144,24 +145,29 @@ public class WatcherService {
}
/**
* Stops the watcher service and marks its services as paused
* Stops the watcher service and marks its services as paused. Callers should set the Watcher state to {@link WatcherState#STOPPING}
* prior to calling this method.
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null}
*/
public void stop(String reason) {
public void stop(String reason, Runnable stoppedListener) {
assert stoppedListener != null;
logger.info("stopping watch service, reason [{}]", reason);
executionService.pause();
executionService.pause(stoppedListener);
triggerService.pauseExecution();
}
/**
* shuts down the trigger service as well to make sure there are no lingering threads
* also no need to check anything, as this is final, we just can go to status STOPPED
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may not be {@code null}
*/
void shutDown() {
void shutDown(Runnable stoppedListener) {
assert stoppedListener != null;
logger.info("stopping watch service, reason [shutdown initiated]");
executionService.pause();
executionService.pause(stoppedListener);
triggerService.stop();
stopExecutor();
logger.debug("watch service has stopped");
}
void stopExecutor() {
@ -185,7 +191,7 @@ public class WatcherService {
processedClusterStateVersion.set(state.getVersion());
triggerService.pauseExecution();
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
int cancelledTaskCount = executionService.clearExecutionsAndQueue(() -> {});
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
@ -256,7 +262,7 @@ public class WatcherService {
*/
public void pauseExecution(String reason) {
triggerService.pauseExecution();
int cancelledTaskCount = executionService.pause();
int cancelledTaskCount = executionService.pause(() -> {});
logger.info("paused watch execution, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
}

View File

@ -5,8 +5,11 @@
*/
package org.elasticsearch.xpack.watcher.execution;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
@ -19,6 +22,7 @@ import static org.elasticsearch.xpack.core.watcher.support.Exceptions.illegalSta
public final class CurrentExecutions implements Iterable<ExecutionService.WatchExecution> {
private static final Logger logger = LogManager.getLogger(CurrentExecutions.class);
private final ConcurrentMap<String, ExecutionService.WatchExecution> currentExecutions = new ConcurrentHashMap<>();
// the condition of the lock is used to wait and signal the finishing of all executions on shutdown
private final ReentrantLock lock = new ReentrantLock();
@ -63,9 +67,12 @@ public final class CurrentExecutions implements Iterable<ExecutionService.WatchE
* Calling this method makes the class stop accepting new executions and throws and exception instead.
* In addition it waits for a certain amount of time for current executions to finish before returning
*
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
* @param maxStopTimeout The maximum wait time to wait to current executions to finish
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*/
void sealAndAwaitEmpty(TimeValue maxStopTimeout) {
void sealAndAwaitEmpty(TimeValue maxStopTimeout, Runnable stoppedListener) {
assert stoppedListener != null;
lock.lock();
// We may have current executions still going on.
// We should try to wait for the current executions to have completed.
@ -81,6 +88,8 @@ public final class CurrentExecutions implements Iterable<ExecutionService.WatchE
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
//fully stop Watcher after all executions are finished
stoppedListener.run();
lock.unlock();
}
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapper;
import org.elasticsearch.xpack.core.watcher.actions.ActionWrapperResult;
import org.elasticsearch.xpack.core.watcher.common.stats.Counters;
@ -137,23 +138,31 @@ public class ExecutionService {
* Pausing means, that no new watch executions will be done unless this pausing is explicitly unset.
* This is important when watcher is stopped, so that scheduled watches do not accidentally get executed.
* This should not be used when we need to reload watcher based on some cluster state changes, then just calling
* {@link #clearExecutionsAndQueue()} is the way to go
* {@link #clearExecutionsAndQueue(Runnable)} is the way to go
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*
* @return the number of tasks that have been removed
*/
public int pause() {
public int pause(Runnable stoppedListener) {
assert stoppedListener != null;
paused.set(true);
return clearExecutionsAndQueue();
return clearExecutionsAndQueue(stoppedListener);
}
/**
* Empty the currently queued tasks and wait for current executions to finish.
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*
* @return the number of tasks that have been removed
*/
public int clearExecutionsAndQueue() {
public int clearExecutionsAndQueue(Runnable stoppedListener) {
assert stoppedListener != null;
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
this.clearExecutions();
this.clearExecutions(stoppedListener);
return cancelledTaskCount;
}
@ -280,8 +289,10 @@ public class ExecutionService {
ctx.setNodeId(clusterService.localNode().getId());
WatchRecord record = null;
final String watchId = ctx.id().watchId();
//pull this to a local reference since the class reference can be swapped, and need to ensure same object is used for put/remove
final CurrentExecutions currentExecutions = this.currentExecutions.get();
try {
boolean executionAlreadyExists = currentExecutions.get().put(watchId, new WatchExecution(ctx, Thread.currentThread()));
boolean executionAlreadyExists = currentExecutions.put(watchId, new WatchExecution(ctx, Thread.currentThread()));
if (executionAlreadyExists) {
logger.trace("not executing watch [{}] because it is already queued", watchId);
record = ctx.abortBeforeExecution(ExecutionState.NOT_EXECUTED_ALREADY_QUEUED, "Watch is already queued in thread pool");
@ -336,7 +347,7 @@ public class ExecutionService {
triggeredWatchStore.delete(ctx.id());
}
currentExecutions.get().remove(watchId);
currentExecutions.remove(watchId);
logger.debug("finished [{}]/[{}]", watchId, ctx.id());
}
return record;
@ -580,11 +591,15 @@ public class ExecutionService {
/**
* This clears out the current executions and sets new empty current executions
* This is needed, because when this method is called, watcher keeps running, so sealing executions would be a bad idea
*
* @param stoppedListener The listener that will set Watcher state to: {@link WatcherState#STOPPED}, may be a no-op assuming the
* {@link WatcherState#STOPPED} is set elsewhere or not needed to be set.
*/
private void clearExecutions() {
private void clearExecutions(Runnable stoppedListener) {
assert stoppedListener != null;
final CurrentExecutions currentExecutionsBeforeSetting = currentExecutions.getAndSet(new CurrentExecutions());
// clear old executions in background, no need to wait
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout));
genericExecutor.execute(() -> currentExecutionsBeforeSetting.sealAndAwaitEmpty(maxStopTimeout, stoppedListener));
}
// the watch execution task takes another runnable as parameter

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.watcher.WatcherMetaData;
import org.elasticsearch.xpack.core.watcher.WatcherState;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import java.util.Collections;
@ -133,8 +134,8 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
when(watcherService.validate(clusterState)).thenReturn(true);
lifeCycleService.shutDown();
verify(watcherService, never()).stop(anyString());
verify(watcherService, times(1)).shutDown();
verify(watcherService, never()).stop(anyString(), any());
verify(watcherService, times(1)).shutDown(any());
reset(watcherService);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
@ -175,7 +176,12 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
.build();
lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState));
verify(watcherService, times(1)).stop(eq("watcher manually marked to shutdown by cluster state update"));
ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
verify(watcherService, times(1))
.stop(eq("watcher manually marked to shutdown by cluster state update"), captor.capture());
assertEquals(WatcherState.STOPPING, lifeCycleService.getState());
captor.getValue().run();
assertEquals(WatcherState.STOPPED, lifeCycleService.getState());
// Starting via cluster state update, as the watcher metadata block is removed/set to true
reset(watcherService);

View File

@ -269,8 +269,8 @@ public class WatcherServiceTests extends ESTestCase {
csBuilder.metaData(MetaData.builder());
service.reload(csBuilder.build(), "whatever");
verify(executionService).clearExecutionsAndQueue();
verify(executionService, never()).pause();
verify(executionService).clearExecutionsAndQueue(any());
verify(executionService, never()).pause(any());
verify(triggerService).pauseExecution();
}

View File

@ -538,10 +538,12 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
WatcherStatsResponse watcherStatsResponse = watcherClient().prepareWatcherStats().get();
assertThat(watcherStatsResponse.hasFailures(), is(false));
List<Tuple<String, WatcherState>> currentStatesFromStatsRequest = watcherStatsResponse.getNodes().stream()
.map(response -> Tuple.tuple(response.getNode().getName(), response.getWatcherState()))
.collect(Collectors.toList());
.map(response -> Tuple.tuple(response.getNode().getName() + " (" + response.getThreadPoolQueueSize() + ")",
response.getWatcherState())).collect(Collectors.toList());
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());
logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest);
boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
@ -566,7 +568,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
}
throw new AssertionError("unexpected state, retrying with next run");
});
}, 30, TimeUnit.SECONDS);
}
public static class NoopEmailService extends EmailService {

View File

@ -19,6 +19,7 @@ import org.junit.After;
import org.junit.Before;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -108,7 +109,7 @@ public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends ESClientY
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}, 30, TimeUnit.SECONDS);
}
@Override

View File

@ -22,6 +22,7 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -91,8 +92,7 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase {
@After
public void stopWatcher() throws Exception {
adminClient().performRequest(new Request("DELETE", "/my_test_index"));
assertBusy(() -> {
try {
Response statsResponse = adminClient().performRequest(new Request("GET", "/_watcher/stats"));
@ -118,7 +118,9 @@ public class SmokeTestWatcherWithSecurityIT extends ESRestTestCase {
} catch (IOException e) {
throw new AssertionError(e);
}
});
}, 30, TimeUnit.SECONDS);
adminClient().performRequest(new Request("DELETE", "/my_test_index"));
}
@Override

View File

@ -20,6 +20,7 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -92,7 +93,7 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase {
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}, 30, TimeUnit.SECONDS);
}
@Override

View File

@ -14,6 +14,8 @@ import org.elasticsearch.xpack.test.rest.XPackRestTestConstants;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
@ -90,6 +92,6 @@ public class WatcherRestIT extends ESClientYamlSuiteTestCase {
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}, 30, TimeUnit.SECONDS);
}
}

View File

@ -17,6 +17,7 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -70,6 +71,6 @@ public class WatcherJiraYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
} catch (IOException e) {
throw new AssertionError(e);
}
});
}, 30, TimeUnit.SECONDS);
}
}

View File

@ -17,6 +17,7 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -70,6 +71,6 @@ public class WatcherPagerDutyYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
} catch (IOException e) {
throw new AssertionError(e);
}
});
}, 30, TimeUnit.SECONDS);
}
}

View File

@ -17,6 +17,7 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@ -70,6 +71,6 @@ public class WatcherSlackYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
} catch (IOException e) {
throw new AssertionError(e);
}
});
}, 30, TimeUnit.SECONDS);
}
}