diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java index b96aa703ed5..d0a2a0adbbf 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java @@ -18,17 +18,16 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -46,9 +45,6 @@ public class HistoryService extends AbstractComponent { private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicInteger initializationRetries = new AtomicInteger(); - // Holds fired alerts that were fired before on a different elected master node, but never had the chance to run. - private volatile ImmutableList previousFiredAlerts = ImmutableList.of(); - @Inject public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler, @@ -74,36 +70,11 @@ public class HistoryService extends AbstractComponent { retry(callback); return; } - this.previousFiredAlerts = ImmutableList.copyOf(loadResult); - if (!previousFiredAlerts.isEmpty()) { - logger.debug("loaded [{}] actions from the alert history index into actions queue", previousFiredAlerts.size()); - } - logger.debug("starting history service"); if (started.compareAndSet(false, true)) { - if (alertsThreadPool().isShutdown()) { - logger.info("Restarting thread pool that had been shutdown"); - // this update thread pool settings work around is for restarting the alerts thread pool, - // that creates a new alerts thread pool and cleans up the existing one that has previously been shutdown. - int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - /*** - *TODO Horrible horrible hack to make sure that settings are always different from the previous settings - * - * THIS NEEDS TO CHANGE ASAP - */ - int queueSize = alertsThreadPool().getQueue().remainingCapacity(); - if (queueSize % 2 == 0){ - queueSize = queueSize + 1; - } else { - queueSize = queueSize - 1; - } - //TODO END HORRIBLE HACK - - threadPool.updateSettings(AlertsPlugin.alertThreadPoolSettings(availableProcessors, queueSize)); - assert !alertsThreadPool().isShutdown(); - } + logger.debug("starting history service"); + executePreviouslyFiredAlerts(loadResult); logger.debug("started history service"); } - executePreviouslyFiredAlerts(); callback.onSuccess(state); } @@ -112,7 +83,8 @@ public class HistoryService extends AbstractComponent { logger.debug("stopping history service"); // We could also rely on the shutdown in #updateSettings call, but // this is a forceful shutdown that also interrupts the worker threads in the threadpool - List cancelledTasks = alertsThreadPool().shutdownNow(); + List cancelledTasks = new ArrayList<>(); + alertsThreadPool().getQueue().drainTo(cancelledTasks); logger.debug("cancelled [{}] queued tasks", cancelledTasks.size()); logger.debug("stopped history service"); } @@ -144,9 +116,6 @@ public class HistoryService extends AbstractComponent { void execute(FiredAlert firedAlert, Alert alert) { try { - if (alertsThreadPool().isShutdown()) { - throw new AlertsException("attempting to add to a shutdown thread pool"); - } alertsThreadPool().execute(new AlertExecutionTask(firedAlert, alert)); } catch (EsRejectedExecutionException e) { logger.debug("[{}] failed to execute fired alert", firedAlert.name()); @@ -155,18 +124,19 @@ public class HistoryService extends AbstractComponent { } } - void executePreviouslyFiredAlerts() { - ImmutableList firedAlerts = this.previousFiredAlerts; - if (firedAlerts != null) { - this.previousFiredAlerts = ImmutableList.of(); - for (FiredAlert firedAlert : firedAlerts) { + void executePreviouslyFiredAlerts(HistoryStore.LoadResult loadResult) { + if (loadResult != null) { + int counter = 0; + for (FiredAlert firedAlert : loadResult) { Alert alert = alertsStore.getAlert(firedAlert.name()); if (alert == null) { logger.warn("unable to find alert [{}] in alert store, perhaps it has been deleted. skipping...", firedAlert.name()); continue; } execute(firedAlert, alert); + counter++; } + logger.debug("executed [{}] not executed previous fired alerts from the alert history index ", counter); } } @@ -213,7 +183,8 @@ public class HistoryService extends AbstractComponent { @Override public void run() { if (!started.get()) { - throw new ElasticsearchIllegalStateException("not started"); + logger.debug("can't run alert execution, because history service is not started, ignoring it..."); + return; } AlertLockService.Lock lock = alertLockService.acquire(alert.name()); try { diff --git a/src/test/java/org/elasticsearch/alerts/BootStrapTest.java b/src/test/java/org/elasticsearch/alerts/BootStrapTest.java index 582875ff0ee..b2408464760 100644 --- a/src/test/java/org/elasticsearch/alerts/BootStrapTest.java +++ b/src/test/java/org/elasticsearch/alerts/BootStrapTest.java @@ -107,6 +107,9 @@ public class BootStrapTest extends AbstractAlertingTests { .get(); assertTrue(indexResponse.isCreated()); + // The executor doesn't get shutdown when alerts stops, so the largest queue size don't get reset. + long previousLargestQueueSize = response.getAlertActionManagerLargestQueueSize(); + stopAlerting(); startAlerting(); @@ -114,7 +117,7 @@ public class BootStrapTest extends AbstractAlertingTests { assertTrue(response.isAlertActionManagerStarted()); assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED)); assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L)); - assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(1L)); + assertThat(response.getAlertActionManagerLargestQueueSize() - previousLargestQueueSize, equalTo(1L)); } @Test diff --git a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java index 02b9bff28a7..f69288f6044 100644 --- a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java +++ b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java @@ -123,6 +123,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests { String alertName = "alert" + i; assertAlertWithMinimumPerformedActionsCount(alertName, i); } + ensureGreen(); stopElectedMasterNodeAndWait(); startElectedMasterNodeAndWait();