If starting watcher fails, watcher should set itself back to the stopped state.

At the moment if the starting fails watcher will hang in the `starting` phase. This is bad because any subsequent start will be ignored, because the the watcher state isn't `stopped`.

Original commit: elastic/x-pack-elasticsearch@5cbc1d2a13
This commit is contained in:
Martijn van Groningen 2015-06-24 09:39:46 +02:00
parent b8e80773d9
commit 67fdad6357
4 changed files with 37 additions and 17 deletions

View File

@ -52,15 +52,20 @@ public class WatcherService extends AbstractComponent {
public void start(ClusterState clusterState) { public void start(ClusterState clusterState) {
if (state.compareAndSet(WatcherState.STOPPED, WatcherState.STARTING)) { if (state.compareAndSet(WatcherState.STOPPED, WatcherState.STARTING)) {
logger.info("starting watch service..."); try {
watchLockService.start(); logger.info("starting watch service...");
watchLockService.start();
// Try to load watch store before the execution service, b/c action depends on watch store // Try to load watch store before the execution service, b/c action depends on watch store
watchStore.start(clusterState); watchStore.start(clusterState);
executionService.start(clusterState); executionService.start(clusterState);
triggerService.start(watchStore.watches().values()); triggerService.start(watchStore.watches().values());
state.set(WatcherState.STARTED); state.set(WatcherState.STARTED);
logger.info("watch service has started"); logger.info("watch service has started");
} catch (Exception e) {
state.set(WatcherState.STOPPED);
throw e;
}
} else { } else {
logger.debug("not starting watcher, because its state is [{}] while [{}] is expected", state, WatcherState.STOPPED); logger.debug("not starting watcher, because its state is [{}] while [{}] is expected", state, WatcherState.STOPPED);
} }

View File

@ -80,13 +80,18 @@ public class ExecutionService extends AbstractComponent {
assert executor.queue().isEmpty() : "queue should be empty, but contains " + executor.queue().size() + " elements."; assert executor.queue().isEmpty() : "queue should be empty, but contains " + executor.queue().size() + " elements.";
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
logger.debug("starting execution service"); try {
historyStore.start(); logger.debug("starting execution service");
triggeredWatchStore.start(); historyStore.start();
currentExecutions = new CurrentExecutions(); triggeredWatchStore.start();
Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.loadTriggeredWatches(state); currentExecutions = new CurrentExecutions();
executeTriggeredWatches(triggeredWatches); Collection<TriggeredWatch> triggeredWatches = triggeredWatchStore.loadTriggeredWatches(state);
logger.debug("started execution service"); executeTriggeredWatches(triggeredWatches);
logger.debug("started execution service");
} catch (Exception e) {
started.set(false);
throw e;
}
} }
} }

View File

@ -93,7 +93,12 @@ public class TriggeredWatchStore extends AbstractComponent {
public void start() { public void start() {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); try {
templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings);
} catch (Exception e) {
started.set(false);
throw e;
}
} }
} }

View File

@ -117,7 +117,12 @@ public class HistoryStore extends AbstractComponent implements NodeSettingsServi
public void start() { public void start() {
if (started.compareAndSet(false, true)) { if (started.compareAndSet(false, true)) {
templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings); try {
templateUtils.putTemplate(INDEX_TEMPLATE_NAME, customIndexSettings);
} catch (Exception e) {
started.set(false);
throw e;
}
} }
} }