Watcher: Mark watcher as started only after loading watches (#30403)
Starting watcher should wait for the watcher to be started before marking the status as started, which is now done via a callback. Also, reloading watcher could set the execution service to paused. This could lead to watches not being executed, when run in tests. This fix does not change the paused flag in the execution service, just clears out the current queue and executions. Closes #30381
This commit is contained in:
parent
39623402fc
commit
71ebed6371
|
@ -110,8 +110,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||||
// if this is not a data node, we need to start it ourselves possibly
|
// if this is not a data node, we need to start it ourselves possibly
|
||||||
if (event.state().nodes().getLocalNode().isDataNode() == false &&
|
if (event.state().nodes().getLocalNode().isDataNode() == false &&
|
||||||
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
|
isWatcherStoppedManually == false && this.state.get() == WatcherState.STOPPED) {
|
||||||
watcherService.start(event.state());
|
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
|
||||||
this.state.set(WatcherState.STARTED);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,8 +156,8 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
|
||||||
if (state.get() == WatcherState.STARTED) {
|
if (state.get() == WatcherState.STARTED) {
|
||||||
watcherService.reload(event.state(), "new local watcher shard allocation ids");
|
watcherService.reload(event.state(), "new local watcher shard allocation ids");
|
||||||
} else if (state.get() == WatcherState.STOPPED) {
|
} else if (state.get() == WatcherState.STOPPED) {
|
||||||
watcherService.start(event.state());
|
this.state.set(WatcherState.STARTING);
|
||||||
this.state.set(WatcherState.STARTED);
|
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
clearAllocationIds();
|
clearAllocationIds();
|
||||||
|
|
|
@ -183,23 +183,40 @@ public class WatcherService extends AbstractComponent {
|
||||||
// by checking the cluster state version before and after loading the watches we can potentially just exit without applying the
|
// by checking the cluster state version before and after loading the watches we can potentially just exit without applying the
|
||||||
// changes
|
// changes
|
||||||
processedClusterStateVersion.set(state.getVersion());
|
processedClusterStateVersion.set(state.getVersion());
|
||||||
pauseExecution(reason);
|
|
||||||
triggerService.pauseExecution();
|
triggerService.pauseExecution();
|
||||||
|
int cancelledTaskCount = executionService.clearExecutionsAndQueue();
|
||||||
|
logger.info("reloading watcher, reason [{}], cancelled [{}] queued tasks", reason, cancelledTaskCount);
|
||||||
|
|
||||||
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
|
executor.execute(wrapWatcherService(() -> reloadInner(state, reason, false),
|
||||||
e -> logger.error("error reloading watcher", e)));
|
e -> logger.error("error reloading watcher", e)));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(ClusterState state) {
|
/**
|
||||||
|
* start the watcher service, load watches in the background
|
||||||
|
*
|
||||||
|
* @param state the current cluster state
|
||||||
|
* @param postWatchesLoadedCallback the callback to be triggered, when watches where loaded successfully
|
||||||
|
*/
|
||||||
|
public void start(ClusterState state, Runnable postWatchesLoadedCallback) {
|
||||||
|
executionService.unPause();
|
||||||
processedClusterStateVersion.set(state.getVersion());
|
processedClusterStateVersion.set(state.getVersion());
|
||||||
executor.execute(wrapWatcherService(() -> reloadInner(state, "starting", true),
|
executor.execute(wrapWatcherService(() -> {
|
||||||
|
if (reloadInner(state, "starting", true)) {
|
||||||
|
postWatchesLoadedCallback.run();
|
||||||
|
}
|
||||||
|
},
|
||||||
e -> logger.error("error starting watcher", e)));
|
e -> logger.error("error starting watcher", e)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* reload the watches and start scheduling them
|
* reload watches and start scheduling them
|
||||||
|
*
|
||||||
|
* @param state the current cluster state
|
||||||
|
* @param reason the reason for reloading, will be logged
|
||||||
|
* @param loadTriggeredWatches should triggered watches be loaded in this run, not needed for reloading, only for starting
|
||||||
|
* @return true if no other loading of a newer cluster state happened in parallel, false otherwise
|
||||||
*/
|
*/
|
||||||
private synchronized void reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) {
|
private synchronized boolean reloadInner(ClusterState state, String reason, boolean loadTriggeredWatches) {
|
||||||
// exit early if another thread has come in between
|
// exit early if another thread has come in between
|
||||||
if (processedClusterStateVersion.get() != state.getVersion()) {
|
if (processedClusterStateVersion.get() != state.getVersion()) {
|
||||||
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
|
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
|
||||||
|
@ -221,9 +238,11 @@ public class WatcherService extends AbstractComponent {
|
||||||
executionService.executeTriggeredWatches(triggeredWatches);
|
executionService.executeTriggeredWatches(triggeredWatches);
|
||||||
}
|
}
|
||||||
logger.debug("watch service has been reloaded, reason [{}]", reason);
|
logger.debug("watch service has been reloaded, reason [{}]", reason);
|
||||||
|
return true;
|
||||||
} else {
|
} else {
|
||||||
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
|
logger.debug("watch service has not been reloaded for state [{}], another reload for state [{}] in progress",
|
||||||
state.getVersion(), processedClusterStateVersion.get());
|
state.getVersion(), processedClusterStateVersion.get());
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -121,11 +121,25 @@ public class ExecutionService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pause the execution of the watcher executor
|
* Pause the execution of the watcher executor, and empty the state.
|
||||||
|
* Pausing means, that no new watch executions will be done unless this pausing is explicitely unset.
|
||||||
|
* This is important when watcher is stopped, so that scheduled watches do not accidentally get executed.
|
||||||
|
* This should not be used when we need to reload watcher based on some cluster state changes, then just calling
|
||||||
|
* {@link #clearExecutionsAndQueue()} is the way to go
|
||||||
|
*
|
||||||
* @return the number of tasks that have been removed
|
* @return the number of tasks that have been removed
|
||||||
*/
|
*/
|
||||||
public int pause() {
|
public int pause() {
|
||||||
paused.set(true);
|
paused.set(true);
|
||||||
|
return clearExecutionsAndQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty the currently queued tasks and wait for current executions to finish.
|
||||||
|
*
|
||||||
|
* @return the number of tasks that have been removed
|
||||||
|
*/
|
||||||
|
public int clearExecutionsAndQueue() {
|
||||||
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
|
int cancelledTaskCount = executor.queue().drainTo(new ArrayList<>());
|
||||||
this.clearExecutions();
|
this.clearExecutions();
|
||||||
return cancelledTaskCount;
|
return cancelledTaskCount;
|
||||||
|
|
|
@ -180,7 +180,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
||||||
reset(watcherService);
|
reset(watcherService);
|
||||||
when(watcherService.validate(clusterState)).thenReturn(true);
|
when(watcherService.validate(clusterState)).thenReturn(true);
|
||||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState));
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState));
|
||||||
verify(watcherService, times(1)).start(eq(clusterState));
|
verify(watcherService, times(1)).start(eq(clusterState), anyObject());
|
||||||
|
|
||||||
// no change, keep going
|
// no change, keep going
|
||||||
reset(watcherService);
|
reset(watcherService);
|
||||||
|
@ -423,7 +423,7 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
|
||||||
when(watcherService.validate(eq(state))).thenReturn(true);
|
when(watcherService.validate(eq(state))).thenReturn(true);
|
||||||
|
|
||||||
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
|
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
|
||||||
verify(watcherService, times(0)).start(any(ClusterState.class));
|
verify(watcherService, times(0)).start(any(ClusterState.class), anyObject());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testWatcherStopsWhenMasterNodeIsMissing() {
|
public void testWatcherStopsWhenMasterNodeIsMissing() {
|
||||||
|
|
|
@ -68,6 +68,7 @@ import static org.hamcrest.Matchers.is;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -199,7 +200,7 @@ public class WatcherServiceTests extends ESTestCase {
|
||||||
when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture);
|
when(client.clearScroll(any(ClearScrollRequest.class))).thenReturn(clearScrollFuture);
|
||||||
clearScrollFuture.onResponse(new ClearScrollResponse(true, 1));
|
clearScrollFuture.onResponse(new ClearScrollResponse(true, 1));
|
||||||
|
|
||||||
service.start(clusterState);
|
service.start(clusterState, () -> {});
|
||||||
|
|
||||||
ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
|
ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
|
||||||
verify(triggerService).start(captor.capture());
|
verify(triggerService).start(captor.capture());
|
||||||
|
@ -238,6 +239,27 @@ public class WatcherServiceTests extends ESTestCase {
|
||||||
verify(triggerEngine).pauseExecution();
|
verify(triggerEngine).pauseExecution();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if we have to reload the watcher service, the execution service should not be paused, as this might
|
||||||
|
// result in missing executions
|
||||||
|
public void testReloadingWatcherDoesNotPauseExecutionService() {
|
||||||
|
ExecutionService executionService = mock(ExecutionService.class);
|
||||||
|
TriggerService triggerService = mock(TriggerService.class);
|
||||||
|
WatcherService service = new WatcherService(Settings.EMPTY, triggerService, mock(TriggeredWatchStore.class),
|
||||||
|
executionService, mock(WatchParser.class), mock(Client.class), executorService) {
|
||||||
|
@Override
|
||||||
|
void stopExecutor() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
|
||||||
|
csBuilder.metaData(MetaData.builder());
|
||||||
|
|
||||||
|
service.reload(csBuilder.build(), "whatever");
|
||||||
|
verify(executionService).clearExecutionsAndQueue();
|
||||||
|
verify(executionService, never()).pause();
|
||||||
|
verify(triggerService).pauseExecution();
|
||||||
|
}
|
||||||
|
|
||||||
private static DiscoveryNode newNode() {
|
private static DiscoveryNode newNode() {
|
||||||
return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
|
return new DiscoveryNode("node", ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
|
||||||
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);
|
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.loggingAction;
|
||||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
||||||
|
@ -36,6 +37,8 @@ import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
|
||||||
public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTestCase {
|
public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTestCase {
|
||||||
|
|
||||||
|
private String watchId = randomAlphaOfLength(20);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<Class<? extends Plugin>> pluginTypes() {
|
protected List<Class<? extends Plugin>> pluginTypes() {
|
||||||
List<Class<? extends Plugin>> types = super.pluginTypes();
|
List<Class<? extends Plugin>> types = super.pluginTypes();
|
||||||
|
@ -107,7 +110,7 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
||||||
public void testVars() throws Exception {
|
public void testVars() throws Exception {
|
||||||
WatcherClient watcherClient = watcherClient();
|
WatcherClient watcherClient = watcherClient();
|
||||||
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
|
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder()
|
||||||
.trigger(schedule(cron("0/1 * * * * ?")))
|
.trigger(schedule(cron("0/1 * * * * ?")))
|
||||||
.input(simpleInput("value", 5))
|
.input(simpleInput("value", 5))
|
||||||
.condition(new ScriptCondition(
|
.condition(new ScriptCondition(
|
||||||
|
@ -126,7 +129,7 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
||||||
|
|
||||||
assertThat(putWatchResponse.isCreated(), is(true));
|
assertThat(putWatchResponse.isCreated(), is(true));
|
||||||
|
|
||||||
timeWarp().trigger("_id");
|
timeWarp().trigger(watchId);
|
||||||
|
|
||||||
flush();
|
flush();
|
||||||
refresh();
|
refresh();
|
||||||
|
@ -135,11 +138,11 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
||||||
// defaults to match all;
|
// defaults to match all;
|
||||||
});
|
});
|
||||||
|
|
||||||
assertThat(searchResponse.getHits().getTotalHits(), is(1L));
|
assertHitCount(searchResponse, 1L);
|
||||||
|
|
||||||
Map<String, Object> source = searchResponse.getHits().getAt(0).getSourceAsMap();
|
Map<String, Object> source = searchResponse.getHits().getAt(0).getSourceAsMap();
|
||||||
|
|
||||||
assertValue(source, "watch_id", is("_id"));
|
assertValue(source, "watch_id", is(watchId));
|
||||||
assertValue(source, "state", is("executed"));
|
assertValue(source, "state", is("executed"));
|
||||||
|
|
||||||
// we don't store the computed vars in history
|
// we don't store the computed vars in history
|
||||||
|
@ -171,7 +174,7 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
||||||
public void testVarsManual() throws Exception {
|
public void testVarsManual() throws Exception {
|
||||||
WatcherClient watcherClient = watcherClient();
|
WatcherClient watcherClient = watcherClient();
|
||||||
|
|
||||||
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch("_id").setSource(watchBuilder()
|
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch(watchId).setSource(watchBuilder()
|
||||||
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
.trigger(schedule(cron("0/1 * * * * ? 2020")))
|
||||||
.input(simpleInput("value", 5))
|
.input(simpleInput("value", 5))
|
||||||
.condition(new ScriptCondition(
|
.condition(new ScriptCondition(
|
||||||
|
@ -193,13 +196,13 @@ public class ExecutionVarsIntegrationTests extends AbstractWatcherIntegrationTes
|
||||||
boolean debug = randomBoolean();
|
boolean debug = randomBoolean();
|
||||||
|
|
||||||
ExecuteWatchResponse executeWatchResponse = watcherClient
|
ExecuteWatchResponse executeWatchResponse = watcherClient
|
||||||
.prepareExecuteWatch("_id")
|
.prepareExecuteWatch(watchId)
|
||||||
.setDebug(debug)
|
.setDebug(debug)
|
||||||
.get();
|
.get();
|
||||||
assertThat(executeWatchResponse.getRecordId(), notNullValue());
|
assertThat(executeWatchResponse.getRecordId(), notNullValue());
|
||||||
XContentSource source = executeWatchResponse.getRecordSource();
|
XContentSource source = executeWatchResponse.getRecordSource();
|
||||||
|
|
||||||
assertValue(source, "watch_id", is("_id"));
|
assertValue(source, "watch_id", is(watchId));
|
||||||
assertValue(source, "state", is("executed"));
|
assertValue(source, "state", is("executed"));
|
||||||
|
|
||||||
if (debug) {
|
if (debug) {
|
||||||
|
|
Loading…
Reference in New Issue