diff --git a/src/main/java/org/elasticsearch/alerts/AlertsService.java b/src/main/java/org/elasticsearch/alerts/AlertsService.java index a29a42d8b9c..7d0ae168c9f 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsService.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsService.java @@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.alerts.history.HistoryService; import org.elasticsearch.alerts.scheduler.Scheduler; +import org.elasticsearch.alerts.support.Callback; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -92,22 +93,37 @@ public class AlertsService extends AbstractComponent { } } - private void internalStart(ClusterState initialState) { + private void internalStart(ClusterState clusterState) { if (state.compareAndSet(State.STOPPED, State.STARTING)) { logger.info("starting alert service..."); alertLockService.start(); - ClusterState clusterState = initialState; // Try to load alert store before the action service, b/c action depends on alert store - while (!alertsStore.start(clusterState)) { - clusterState = newClusterState(clusterState); - } - while (!historyService.start(clusterState)) { - clusterState = newClusterState(clusterState); - } - scheduler.start(alertsStore.getAlerts().values()); - state.set(State.STARTED); - logger.info("alert service has started"); + alertsStore.start(clusterState, new Callback(){ + + @Override + public void onSuccess(ClusterState clusterState) { + historyService.start(clusterState, new Callback() { + + @Override + public void onSuccess(ClusterState clusterState) { + scheduler.start(alertsStore.getAlerts().values()); + state.set(State.STARTED); + logger.info("alert service has started"); + } + + @Override + public void onFailure(Throwable e) { + logger.error("failed to start alert service", e); + } + }); + } + + @Override + public void onFailure(Throwable e) { + logger.error("failed to start alert service", e); + } + }); } } diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index 5114edbb1e8..e8b31e2357e 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -15,9 +15,13 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.alerts.support.Callback; import org.elasticsearch.alerts.support.TemplateUtils; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; @@ -27,11 +31,13 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -44,28 +50,35 @@ public class AlertsStore extends AbstractComponent { private final ClientProxy client; private final TemplateUtils templateUtils; private final Alert.Parser alertParser; + private final ClusterService clusterService; + private final ThreadPool threadPool; private final ConcurrentMap alertMap; private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicInteger initializationRetries = new AtomicInteger(); private final int scrollSize; private final TimeValue scrollTimeout; @Inject - public AlertsStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Alert.Parser alertParser) { + public AlertsStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, Alert.Parser alertParser, + ClusterService clusterService, ThreadPool threadPool) { super(settings); this.client = client; this.templateUtils = templateUtils; this.alertParser = alertParser; + this.clusterService = clusterService; + this.threadPool = threadPool; this.alertMap = ConcurrentCollections.newConcurrentMap(); this.scrollTimeout = componentSettings.getAsTime("scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollSize = componentSettings.getAsInt("scroll.size", 100); } - public boolean start(ClusterState state) { + public void start(ClusterState state, Callback callback) { if (started.get()) { - return true; + callback.onSuccess(state); + return; } IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); @@ -73,25 +86,28 @@ public class AlertsStore extends AbstractComponent { logger.trace("alerts index [{}] was not found. skipping alerts loading...", ALERT_INDEX); templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE); started.set(true); - return true; + callback.onSuccess(state); + return; } if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { logger.debug("alerts index [{}] found with all active primary shards. loading alerts...", ALERT_INDEX); try { int count = loadAlerts(client, scrollSize, scrollTimeout, alertIndexMetaData.numberOfShards(), alertParser, alertMap); - logger.info("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX); + logger.debug("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX); } catch (Exception e) { - logger.warn("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX); + logger.debug("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX); alertMap.clear(); - return false; + retry(callback); + return; } templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE); started.set(true); - return true; + callback.onSuccess(state); + } else { + logger.warn("not all primary shards of the alerts index [{}] are started. scheduled to retry alert loading...", ALERT_INDEX); + retry(callback); } - logger.warn("not all primary shards of the alerts index [{}] are started. scheduled to retry alert loading...", ALERT_INDEX); - return false; } public boolean started() { @@ -176,6 +192,36 @@ public class AlertsStore extends AbstractComponent { return indexRequest; } + private void retry(final Callback callback) { + ClusterStateListener clusterStateListener = new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + final ClusterState state = event.state(); + IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); + if (alertIndexMetaData != null) { + if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { + // Remove listener, so that it doesn't get called on the next cluster state update: + assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time"; + clusterService.remove(this); + // We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread. + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + @Override + public void run() { + try { + start(state, callback); + } catch (Exception e) { + callback.onFailure(e); + } + } + }); + } + } + } + }; + clusterService.add(clusterStateListener); + assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time"; + } + /** * scrolls all the alert documents in the alerts index, parses them, and loads them into * the given map. diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java index cf454d35336..7682e2da5cd 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java @@ -8,11 +8,15 @@ package org.elasticsearch.alerts.history; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.alerts.*; import org.elasticsearch.alerts.actions.Action; +import org.elasticsearch.alerts.condition.Condition; import org.elasticsearch.alerts.scheduler.Scheduler; +import org.elasticsearch.alerts.support.Callback; import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.transform.Transform; -import org.elasticsearch.alerts.condition.Condition; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -26,6 +30,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -34,33 +39,39 @@ public class HistoryService extends AbstractComponent { private final HistoryStore historyStore; private final ThreadPool threadPool; private final AlertsStore alertsStore; + private final ClusterService clusterService; private final AlertLockService alertLockService; private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicInteger initializationRetries = new AtomicInteger(); // Holds fired alerts that were fired before on a different elected master node, but never had the chance to run. private volatile ImmutableList previousFiredAlerts = ImmutableList.of(); @Inject public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, - AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler) { + AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler, + ClusterService clusterService) { super(settings); this.historyStore = historyStore; this.threadPool = threadPool; this.alertsStore = alertsStore; this.alertLockService = alertLockService; + this.clusterService = clusterService; scheduler.addListener(new SchedulerListener()); } - public boolean start(ClusterState state) { + public void start(ClusterState state, Callback callback) { if (started.get()) { - return true; + callback.onSuccess(state); + return; } assert alertsThreadPool().getQueue().isEmpty() : "queue should be empty, but contains " + alertsThreadPool().getQueue().size() + " elements."; HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state, FiredAlert.State.AWAITS_EXECUTION); if (!loadResult.succeeded()) { - return false; + retry(callback); + return; } this.previousFiredAlerts = ImmutableList.copyOf(loadResult); if (!previousFiredAlerts.isEmpty()) { @@ -92,7 +103,7 @@ public class HistoryService extends AbstractComponent { logger.debug("started history service"); } executePreviouslyFiredAlerts(); - return true; + callback.onSuccess(state); } public void stop() { @@ -162,6 +173,32 @@ public class HistoryService extends AbstractComponent { return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME); } + private void retry(final Callback callback) { + ClusterStateListener clusterStateListener = new ClusterStateListener() { + + @Override + public void clusterChanged(final ClusterChangedEvent event) { + // Remove listener, so that it doesn't get called on the next cluster state update: + assert initializationRetries.decrementAndGet() == 0 : "Only one retry can run at the time"; + clusterService.remove(this); + // We fork into another thread, because start(...) is expensive and we can't call this from the cluster update thread. + threadPool.executor(ThreadPool.Names.GENERIC).execute(new Runnable() { + + @Override + public void run() { + try { + start(event.state(), callback); + } catch (Exception e) { + callback.onFailure(e); + } + } + }); + } + }; + assert initializationRetries.incrementAndGet() == 1 : "Only one retry can run at the time"; + clusterService.add(clusterStateListener); + } + private final class AlertExecutionTask implements Runnable { private final FiredAlert firedAlert; diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java index 28dde1499cf..45f76ff238c 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java @@ -88,7 +88,7 @@ public class HistoryStore extends AbstractComponent { public LoadResult loadFiredAlerts(ClusterState state, FiredAlert.State firedAlertState) { String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); if (indices.length == 0) { - logger.info("No .alert_history indices found, skip loading of alert actions"); + logger.debug("No .alert_history indices found, skip loading of alert actions"); templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); return new LoadResult(true); } @@ -97,7 +97,7 @@ public class HistoryStore extends AbstractComponent { IndexMetaData indexMetaData = state.getMetaData().index(index); if (indexMetaData != null) { if (!state.routingTable().index(index).allPrimaryShardsActive()) { - logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index); + logger.debug("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index); return new LoadResult(false); } else { numPrimaryShards += indexMetaData.numberOfShards(); diff --git a/src/main/java/org/elasticsearch/alerts/support/Callback.java b/src/main/java/org/elasticsearch/alerts/support/Callback.java new file mode 100644 index 00000000000..3b9f72d0e6a --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/support/Callback.java @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.support; + +/** + */ +public interface Callback { + + void onSuccess(T t); + + void onFailure(Throwable e); +}