diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index c284d52718a..8ad819fe334 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -18,6 +18,7 @@ import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -33,7 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class AlertManager extends AbstractComponent { @@ -42,9 +43,10 @@ public class AlertManager extends AbstractComponent { private final TriggerManager triggerManager; private final AlertActionManager actionManager; private final AlertActionRegistry actionRegistry; - private final AtomicBoolean started = new AtomicBoolean(false); private final ThreadPool threadPool; + private final ClusterService clusterService; private final KeyedLock alertLock = new KeyedLock<>(); + private final AtomicReference state = new AtomicReference<>(State.STOPPED); @Inject public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore, @@ -59,6 +61,7 @@ public class AlertManager extends AbstractComponent { this.actionManager = actionManager; this.actionManager.setAlertManager(this); this.actionRegistry = actionRegistry; + this.clusterService = clusterService; clusterService.add(new AlertsClusterStateListener()); // Close if the indices service is being stopped, so we don't run into search failures (locally) that will // happen because we're shutting down and an alert is scheduled. @@ -98,7 +101,7 @@ public class AlertManager extends AbstractComponent { } public boolean isStarted() { - return started.get(); + return state.get() == State.STARTED; } public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ @@ -149,7 +152,7 @@ public class AlertManager extends AbstractComponent { public void stop() { - if (started.compareAndSet(true, false)) { + if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) { logger.info("Stopping alert manager..."); scheduler.stop(); actionManager.stop(); @@ -167,7 +170,7 @@ public class AlertManager extends AbstractComponent { } private void ensureStarted() { - if (!started.get()) { + if (state.get() != State.STARTED) { throw new ElasticsearchIllegalStateException("not started"); } } @@ -179,62 +182,82 @@ public class AlertManager extends AbstractComponent { private final class AlertsClusterStateListener implements ClusterStateListener { @Override - public void clusterChanged(final ClusterChangedEvent event) { + public void clusterChanged(ClusterChangedEvent event) { if (!event.localNodeMaster()) { - // We're not the master + // We're no longer the master stop(); } else { if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { return; // wait until the gateway has recovered from disk } - - if (started.get()) { + if (isStarted()) { return; // We're already started } - - alertsStore.start(event.state(), new LoadingListener() { - @Override - public void onSuccess() { - startIfReady(); - } - - @Override - public void onFailure() { - retry(event); - } - }); - actionManager.start(event.state(), new LoadingListener() { - @Override - public void onSuccess() { - startIfReady(); - } - - @Override - public void onFailure() { - retry(event); - } - }); - } - } - - private void startIfReady() { - if (alertsStore.started() && actionManager.started()) { - if (started.compareAndSet(false, true)) { - scheduler.start(alertsStore.getAlerts()); + if (state.compareAndSet(State.STOPPED, State.LOADING)) { + initialize(event.state()); } } } - private void retry(final ClusterChangedEvent event) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + private void initialize(final ClusterState state) { + alertsStore.start(state, new LoadingListener() { @Override - public void run() { - // Retry with the same event: - clusterChanged(event); + public void onSuccess() { + startIfReady(); + } + + @Override + public void onFailure() { + retry(); + } + }); + actionManager.start(state, new LoadingListener() { + @Override + public void onSuccess() { + startIfReady(); + } + + @Override + public void onFailure() { + retry(); } }); } + private void startIfReady() { + if (alertsStore.started() && actionManager.started()) { + if (state.compareAndSet(State.LOADING, State.STARTED)) { + scheduler.start(alertsStore.getAlerts()); + } else { + logger.info("Didn't start alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING); + } + } + } + + private void retry() { + // Only retry if our state is loading + if (state.get() == State.LOADING) { + final ClusterState newState = clusterService.state(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + // Retry with the same event: + initialize(newState); + } + }); + } else { + logger.info("Didn't retry to initialize the alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING); + } + } + + } + + private enum State { + + STOPPED, + LOADING, + STARTED + } }