Core: Retry shouldn't cause a new cluster state, but rather should be done in a forked thread.

Original commit: elastic/x-pack-elasticsearch@cb944ddce7
This commit is contained in:
Martijn van Groningen 2014-11-12 18:02:15 +01:00
parent 36dc82bcc5
commit 3bed2c92e2
1 changed files with 15 additions and 17 deletions

View File

@ -15,8 +15,9 @@ import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.cluster.*;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Tuple;
@ -27,6 +28,7 @@ import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.List;
@ -45,14 +47,15 @@ public class AlertManager extends AbstractComponent {
private final AlertActionManager actionManager;
private final AlertActionRegistry actionRegistry;
private final AtomicBoolean started = new AtomicBoolean(false);
private final ThreadPool threadPool;
@Inject
public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore,
IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager,
AlertActionRegistry actionRegistry) {
AlertActionRegistry actionRegistry, ThreadPool threadPool) {
super(settings);
this.scheduler = scheduler;
this.threadPool = threadPool;
this.scheduler.setAlertManager(this);
this.alertsStore = alertsStore;
this.clusterService = clusterService;
@ -160,7 +163,7 @@ public class AlertManager extends AbstractComponent {
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override
public void clusterChanged(ClusterChangedEvent event) {
public void clusterChanged(final ClusterChangedEvent event) {
if (!event.localNodeMaster()) {
// We're not the master
stop();
@ -181,7 +184,7 @@ public class AlertManager extends AbstractComponent {
@Override
public void onFailure() {
retry();
retry(event);
}
});
actionManager.start(event.state(), new LoadingListener() {
@ -192,7 +195,7 @@ public class AlertManager extends AbstractComponent {
@Override
public void onFailure() {
retry();
retry(event);
}
});
}
@ -206,17 +209,12 @@ public class AlertManager extends AbstractComponent {
}
}
private void retry() {
clusterService.submitStateUpdateTask("alerts-retry", new ClusterStateUpdateTask() {
private void retry(final ClusterChangedEvent event) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// Force a new cluster state to trigger that alerts cluster state listener gets invoked again.
return ClusterState.builder(currentState).build();
}
@Override
public void onFailure(String source, @Nullable Throwable t) {
logger.error("Error during {} ", t, source);
public void run() {
// Retry with the same event:
clusterChanged(event);
}
});
}