From 69bbea6985553b64331cbad67eb9e61025f5fc75 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 17 Feb 2015 17:49:55 +0100 Subject: [PATCH] Changed the initialization retry logic of the AlertService to be event based instead of blocking (actively polling for a new cluster state). This avoids that a single thread will be busy during the time that not all primary shards of the alerts and alert history indices are started. Also the execution of alert history items that were loaded during initialization will be executed once the AlertService goes into started state, before this was executed once the AlertActionService has started, which could load to failures, because there was a small window of time where the alert manager wasn't started. Executing alert history items with the state search_needed requires the alert manager to be started and that isn't yet the case when the AlertActionService has started. Closes elastic/elasticsearch#75 Closes elastic/elasticsearch#76 Original commit: elastic/x-pack-elasticsearch@a799bc34e3ed3ac434091b6796e16a06b0d67607 --- .../elasticsearch/alerts/AlertsService.java | 38 +++++++---- .../org/elasticsearch/alerts/AlertsStore.java | 66 ++++++++++++++++--- .../alerts/history/HistoryService.java | 49 ++++++++++++-- .../alerts/history/HistoryStore.java | 4 +- .../alerts/support/Callback.java | 15 +++++ 5 files changed, 143 insertions(+), 29 deletions(-) create mode 100644 src/main/java/org/elasticsearch/alerts/support/Callback.java diff --git a/src/main/java/org/elasticsearch/alerts/AlertsService.java b/src/main/java/org/elasticsearch/alerts/AlertsService.java index a29a42d8b9c..7d0ae168c9f 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsService.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsService.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.alerts.history.HistoryService; import org.elasticsearch.alerts.scheduler.Scheduler; +import org.elasticsearch.alerts.support.Callback; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -92,22 +93,37 @@ public class AlertsService extends AbstractComponent { } } - private void internalStart(ClusterState initialState) { + private void internalStart(ClusterState clusterState) { if (state.compareAndSet(State.STOPPED, State.STARTING)) { logger.info("starting alert service..."); alertLockService.start(); - ClusterState clusterState = initialState; // Try to load alert store before the action service, b/c action depends on alert store - while (!alertsStore.start(clusterState)) { - clusterState = newClusterState(clusterState); - } - while (!historyService.start(clusterState)) { - clusterState = newClusterState(clusterState); - } - scheduler.start(alertsStore.getAlerts().values()); - state.set(State.STARTED); - logger.info("alert service has started"); + alertsStore.start(clusterState, new Callback(){ + + @Override + public void onSuccess(ClusterState clusterState) { + historyService.start(clusterState, new Callback() { + + @Override + public void onSuccess(ClusterState clusterState) { + scheduler.start(alertsStore.getAlerts().values()); + state.set(State.STARTED); + logger.info("alert service has started"); + } + + @Override + public void onFailure(Throwable e) { + logger.error("failed to start alert service", e); + } + }); + } + + @Override + public void onFailure(Throwable e) { + logger.error("failed to start alert service", e); + } + }); } } diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 5114edbb1e8..e8b31e2357e 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -15,9 +15,13 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.alerts.support.Callback; import org.elasticsearch.alerts.support.TemplateUtils; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; @@ -27,11 +31,13 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -44,28 +50,35 @@ public class AlertsStore extends AbstractComponent { private final ClientProxy client; private final TemplateUtils templateUtils; private final Alert.Parser alertParser; + private final ClusterService clusterService; + private final ThreadPool threadPool; private final ConcurrentMap alertMap; private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicInteger initializationRetries = new AtomicInteger(); private final int scrollSize; private final TimeValue scrollTimeout; @Inject - public AlertsStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Alert.Parser alertParser) { + public AlertsStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Alert.Parser alertParser, + ClusterService clusterService, ThreadPool threadPool) { super(settings); this.client = client; this.templateUtils = templateUtils; this.alertParser = alertParser; + this.clusterService = clusterService; + this.threadPool = threadPool; this.alertMap = ConcurrentCollections.newConcurrentMap(); this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollSize = componentSettings.getAsInt("scroll.size", 100); } - public boolean start(ClusterState state) { + public void start(ClusterState state, Callback callback) { if (started.get()) { - return true; + callback.onSuccess(state); + return; } IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); @@ -73,25 +86,28 @@ public class AlertsStore extends AbstractComponent { logger.trace("alerts index [{}] was not found. skipping alerts loading...", ALERT_INDEX); templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE); started.set(true); - return true; + callback.onSuccess(state); + return; } if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { logger.debug("alerts index [{}] found with all active primary shards. loading alerts...", ALERT_INDEX); try { int count = loadAlerts(client, scrollSize, scrollTimeout, alertIndexMetaData.numberOfShards(), alertParser, alertMap); - logger.info("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX); + logger.debug("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX); } catch (Exception e) { - logger.warn("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX); + logger.debug("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX); alertMap.clear(); - return false; + retry(callback); + return; } templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE); started.set(true); - return true; + callback.onSuccess(state); + } else { + logger.warn("not all primary shards of the alerts index [{}] are started. scheduled to retry alert loading...", ALERT_INDEX); + retry(callback); } - logger.warn("not all primary shards of the alerts index [{}] are started. scheduled to retry alert loading...", ALERT_INDEX); - return false; } public boolean started() { @@ -176,6 +192,36 @@ public class AlertsStore extends AbstractComponent { return indexRequest; } + private void retry(final Callback callback) { + ClusterStateListener clusterStateListener = new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + final ClusterState state = event.state(); + IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); + if (alertIndexMetaData != null) { + if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { + // Remove listener, so that it doesn't get called on the next cluster state update: + assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time"; + clusterService.remove(this); + // We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread. + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + try { + start(state, callback); + } catch (Exception e) { + callback.onFailure(e); + } + } + }); + } + } + } + }; + clusterService.add(clusterStateListener); + assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time"; + } + /** * scrolls all the alert documents in the alerts index, parses them, and loads them into * the given map. diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java index cf454d35336..7682e2da5cd 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java @@ -8,11 +8,15 @@ package org.elasticsearch.alerts.history; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.alerts.*; import org.elasticsearch.alerts.actions.Action; +import org.elasticsearch.alerts.condition.Condition; import org.elasticsearch.alerts.scheduler.Scheduler; +import org.elasticsearch.alerts.support.Callback; import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.transform.Transform; -import org.elasticsearch.alerts.condition.Condition; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -26,6 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -34,33 +39,39 @@ public class HistoryService extends AbstractComponent { private final HistoryStore historyStore; private final ThreadPool threadPool; private final AlertsStore alertsStore; + private final ClusterService clusterService; private final AlertLockService alertLockService; private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicInteger initializationRetries = new AtomicInteger(); // Holds fired alerts that were fired before on a different elected master node, but never had the chance to run. private volatile ImmutableList previousFiredAlerts = ImmutableList.of(); @Inject public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, - AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler) { + AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler, + ClusterService clusterService) { super(settings); this.historyStore = historyStore; this.threadPool = threadPool; this.alertsStore = alertsStore; this.alertLockService = alertLockService; + this.clusterService = clusterService; scheduler.addListener(new SchedulerListener()); } - public boolean start(ClusterState state) { + public void start(ClusterState state, Callback callback) { if (started.get()) { - return true; + callback.onSuccess(state); + return; } assert alertsThreadPool().getQueue().isEmpty() : "queue should be empty, but contains " + alertsThreadPool().getQueue().size() + " elements."; HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state, FiredAlert.State.AWAITS_EXECUTION); if (!loadResult.succeeded()) { - return false; + retry(callback); + return; } this.previousFiredAlerts = ImmutableList.copyOf(loadResult); if (!previousFiredAlerts.isEmpty()) { @@ -92,7 +103,7 @@ public class HistoryService extends AbstractComponent { logger.debug("started history service"); } executePreviouslyFiredAlerts(); - return true; + callback.onSuccess(state); } public void stop() { @@ -162,6 +173,32 @@ public class HistoryService extends AbstractComponent { return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME); } + private void retry(final Callback callback) { + ClusterStateListener clusterStateListener = new ClusterStateListener() { + + @Override + public void clusterChanged(final ClusterChangedEvent event) { + // Remove listener, so that it doesn't get called on the next cluster state update: + assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time"; + clusterService.remove(this); + // We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread. + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + + @Override + public void run() { + try { + start(event.state(), callback); + } catch (Exception e) { + callback.onFailure(e); + } + } + }); + } + }; + assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time"; + clusterService.add(clusterStateListener); + } + private final class AlertExecutionTask implements Runnable { private final FiredAlert firedAlert; diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java index 28dde1499cf..45f76ff238c 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java @@ -88,7 +88,7 @@ public class HistoryStore extends AbstractComponent { public LoadResult loadFiredAlerts(ClusterState state, FiredAlert.State firedAlertState) { String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); if (indices.length == 0) { - logger.info("No .alert_history indices found, skip loading of alert actions"); + logger.debug("No .alert_history indices found, skip loading of alert actions"); templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); return new LoadResult(true); } @@ -97,7 +97,7 @@ public class HistoryStore extends AbstractComponent { IndexMetaData indexMetaData = state.getMetaData().index(index); if (indexMetaData != null) { if (!state.routingTable().index(index).allPrimaryShardsActive()) { - logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index); + logger.debug("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index); return new LoadResult(false); } else { numPrimaryShards += indexMetaData.numberOfShards(); diff --git a/src/main/java/org/elasticsearch/alerts/support/Callback.java b/src/main/java/org/elasticsearch/alerts/support/Callback.java new file mode 100644 index 00000000000..3b9f72d0e6a --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/support/Callback.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.support; + +/** + */ +public interface Callback { + + void onSuccess(T t); + + void onFailure(Throwable e); +}