From 3bed2c92e2a0a906516b86ab8b6d06a829ed8840 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Nov 2014 18:02:15 +0100 Subject: [PATCH] Core: Retry shouldn't cause a new cluster state, but rather should be done in a forked thread. Original commit: elastic/x-pack-elasticsearch@cb944ddce7b4b8b14c6a9d07e637fc1bd1c31f4d --- .../elasticsearch/alerts/AlertManager.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index 21d5d111668..60ee083be7a 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -15,8 +15,9 @@ import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerResult; -import org.elasticsearch.cluster.*; -import org.elasticsearch.common.Nullable; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.collect.Tuple; @@ -27,6 +28,7 @@ import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.List; @@ -45,14 +47,15 @@ public class AlertManager extends AbstractComponent { private final AlertActionManager actionManager; private final AlertActionRegistry actionRegistry; private final AtomicBoolean started = new AtomicBoolean(false); - + private final ThreadPool threadPool; @Inject public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore, IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager, - AlertActionRegistry actionRegistry) { + AlertActionRegistry actionRegistry, ThreadPool threadPool) { super(settings); this.scheduler = scheduler; + this.threadPool = threadPool; this.scheduler.setAlertManager(this); this.alertsStore = alertsStore; this.clusterService = clusterService; @@ -160,7 +163,7 @@ public class AlertManager extends AbstractComponent { private final class AlertsClusterStateListener implements ClusterStateListener { @Override - public void clusterChanged(ClusterChangedEvent event) { + public void clusterChanged(final ClusterChangedEvent event) { if (!event.localNodeMaster()) { // We're not the master stop(); @@ -181,7 +184,7 @@ public class AlertManager extends AbstractComponent { @Override public void onFailure() { - retry(); + retry(event); } }); actionManager.start(event.state(), new LoadingListener() { @@ -192,7 +195,7 @@ public class AlertManager extends AbstractComponent { @Override public void onFailure() { - retry(); + retry(event); } }); } @@ -206,17 +209,12 @@ public class AlertManager extends AbstractComponent { } } - private void retry() { - clusterService.submitStateUpdateTask("alerts-retry", new ClusterStateUpdateTask() { + private void retry(final ClusterChangedEvent event) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // Force a new cluster state to trigger that alerts cluster state listener gets invoked again. - return ClusterState.builder(currentState).build(); - } - - @Override - public void onFailure(String source, @Nullable Throwable t) { - logger.error("Error during {} ", t, source); + public void run() { + // Retry with the same event: + clusterChanged(event); } }); }