Added state to AlertManager that replaced the started flag.

This helps us preventing endless re-loading logic while a node steps down as master while while we in the process of starting alert store and action manager.

Original commit: elastic/x-pack-elasticsearch@e18c8215a9
This commit is contained in:
Martijn van Groningen 2014-11-19 15:12:52 +01:00
parent 8615cdb6af
commit 506daca17e
1 changed files with 67 additions and 44 deletions

View File

@ -18,6 +18,7 @@ import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -33,7 +34,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class AlertManager extends AbstractComponent {
@ -42,9 +43,10 @@ public class AlertManager extends AbstractComponent {
private final TriggerManager triggerManager;
private final AlertActionManager actionManager;
private final AlertActionRegistry actionRegistry;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final KeyedLock<String> alertLock = new KeyedLock<>();
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
@Inject
public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore,
@ -59,6 +61,7 @@ public class AlertManager extends AbstractComponent {
this.actionManager = actionManager;
this.actionManager.setAlertManager(this);
this.actionRegistry = actionRegistry;
this.clusterService = clusterService;
clusterService.add(new AlertsClusterStateListener());
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an alert is scheduled.
@ -98,7 +101,7 @@ public class AlertManager extends AbstractComponent {
}
public boolean isStarted() {
return started.get();
return state.get() == State.STARTED;
}
public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){
@ -149,7 +152,7 @@ public class AlertManager extends AbstractComponent {
public void stop() {
if (started.compareAndSet(true, false)) {
if (state.compareAndSet(State.LOADING, State.STOPPED) || state.compareAndSet(State.STARTED, State.STOPPED)) {
logger.info("Stopping alert manager...");
scheduler.stop();
actionManager.stop();
@ -167,7 +170,7 @@ public class AlertManager extends AbstractComponent {
}
private void ensureStarted() {
if (!started.get()) {
if (state.get() != State.STARTED) {
throw new ElasticsearchIllegalStateException("not started");
}
}
@ -179,62 +182,82 @@ public class AlertManager extends AbstractComponent {
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override
public void clusterChanged(final ClusterChangedEvent event) {
public void clusterChanged(ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
// We're not the master
// We're no longer the master
stop();
} else {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
return; // wait until the gateway has recovered from disk
}
if (started.get()) {
if (isStarted()) {
return; // We're already started
}
alertsStore.start(event.state(), new LoadingListener() {
@Override
public void onSuccess() {
startIfReady();
}
@Override
public void onFailure() {
retry(event);
}
});
actionManager.start(event.state(), new LoadingListener() {
@Override
public void onSuccess() {
startIfReady();
}
@Override
public void onFailure() {
retry(event);
}
});
}
}
private void startIfReady() {
if (alertsStore.started() && actionManager.started()) {
if (started.compareAndSet(false, true)) {
scheduler.start(alertsStore.getAlerts());
if (state.compareAndSet(State.STOPPED, State.LOADING)) {
initialize(event.state());
}
}
}
private void retry(final ClusterChangedEvent event) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
private void initialize(final ClusterState state) {
alertsStore.start(state, new LoadingListener() {
@Override
public void run() {
// Retry with the same event:
clusterChanged(event);
public void onSuccess() {
startIfReady();
}
@Override
public void onFailure() {
retry();
}
});
actionManager.start(state, new LoadingListener() {
@Override
public void onSuccess() {
startIfReady();
}
@Override
public void onFailure() {
retry();
}
});
}
private void startIfReady() {
if (alertsStore.started() && actionManager.started()) {
if (state.compareAndSet(State.LOADING, State.STARTED)) {
scheduler.start(alertsStore.getAlerts());
} else {
logger.info("Didn't start alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING);
}
}
}
private void retry() {
// Only retry if our state is loading
if (state.get() == State.LOADING) {
final ClusterState newState = clusterService.state();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public void run() {
// Retry with the same event:
initialize(newState);
}
});
} else {
logger.info("Didn't retry to initialize the alert manager, because it state was [{}] while [{}] was expected", state.get(), State.LOADING);
}
}
}
private enum State {
STOPPED,
LOADING,
STARTED
}
}