From 01375a320db806f6a91f8a8211a2c9b57b37f4b9 Mon Sep 17 00:00:00 2001 From: uboness Date: Thu, 5 Feb 2015 11:50:33 +0100 Subject: [PATCH] updated the alert service and store Original commit: elastic/x-pack-elasticsearch@61c75d825897283376c718bf0bdb9a42b351ce22 --- .../java/org/elasticsearch/alerts/Alert.java | 2 +- .../elasticsearch/alerts/AlertsModule.java | 4 +- .../elasticsearch/alerts/AlertsService.java | 33 +-- .../org/elasticsearch/alerts/AlertsStore.java | 191 +++++++++--------- .../alerts/history/HistoryService.java | 4 +- .../alerts/support/TemplateUtils.java | 2 +- .../alerts/throttle/AckThrottler.java | 5 +- 7 files changed, 119 insertions(+), 122 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerts/Alert.java b/src/main/java/org/elasticsearch/alerts/Alert.java index 318b91c1219..2e10e62ceac 100644 --- a/src/main/java/org/elasticsearch/alerts/Alert.java +++ b/src/main/java/org/elasticsearch/alerts/Alert.java @@ -237,7 +237,7 @@ public class Alert implements ToXContent { public static class Status implements ToXContent { - enum State { + public enum State { NOT_EXECUTED, EXECUTED, ACKED diff --git a/src/main/java/org/elasticsearch/alerts/AlertsModule.java b/src/main/java/org/elasticsearch/alerts/AlertsModule.java index 305f4fd44a3..441aa2b983b 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsModule.java @@ -40,9 +40,9 @@ public class AlertsModule extends AbstractModule implements SpawnModules { protected void configure() { bind(Alert.Parser.class).asEagerSingleton(); - bind(TemplateUtils.class).asEagerSingleton(); - bind(AlertsStore.class).asEagerSingleton(); bind(AlertsService.class).asEagerSingleton(); + bind(AlertsStore.class).asEagerSingleton(); + bind(TemplateUtils.class).asEagerSingleton(); bind(HistoryService.class).asEagerSingleton(); bind(AlertActionRegistry.class).asEagerSingleton(); bind(ConfigurationService.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/alerts/AlertsService.java b/src/main/java/org/elasticsearch/alerts/AlertsService.java index f874376f193..16481928db4 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsService.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsService.java @@ -7,19 +7,14 @@ package org.elasticsearch.alerts; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.history.AlertRecord; import org.elasticsearch.alerts.history.HistoryService; import org.elasticsearch.alerts.scheduler.Scheduler; -import org.elasticsearch.alerts.support.init.proxy.ClientProxy; -import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy; import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.trigger.Trigger; -import org.elasticsearch.alerts.trigger.TriggerRegistry; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -42,35 +37,29 @@ import java.util.concurrent.atomic.AtomicReference; public class AlertsService extends AbstractComponent { - private final ClientProxy client; private final Scheduler scheduler; private final AlertsStore alertsStore; - private final TriggerRegistry triggerRegistry; private final HistoryService historyService; private final AlertActionRegistry actionRegistry; private final ThreadPool threadPool; private final ClusterService clusterService; - private final ScriptServiceProxy scriptService; private final KeyedLock alertLock = new KeyedLock<>(); private final AtomicReference state = new AtomicReference<>(State.STOPPED); private volatile boolean manuallyStopped; @Inject - public AlertsService(Settings settings, ClientProxy client, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore, - IndicesService indicesService, TriggerRegistry triggerRegistry, HistoryService historyService, - AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptServiceProxy scriptService) { + public AlertsService(Settings settings, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore, + IndicesService indicesService, HistoryService historyService, + AlertActionRegistry actionRegistry, ThreadPool threadPool) { super(settings); - this.client = client; this.scheduler = scheduler; this.threadPool = threadPool; this.alertsStore = alertsStore; - this.triggerRegistry = triggerRegistry; this.historyService = historyService; this.historyService.setAlertsService(this); this.actionRegistry = actionRegistry; this.clusterService = clusterService; - this.scriptService = scriptService; scheduler.addListener(new SchedulerListener()); @@ -86,15 +75,15 @@ public class AlertsService extends AbstractComponent { manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); } - public DeleteResponse deleteAlert(String name) throws InterruptedException, ExecutionException { + public AlertsStore.AlertDelete deleteAlert(String name) throws InterruptedException, ExecutionException { ensureStarted(); alertLock.acquire(name); try { - DeleteResponse deleteResponse = alertsStore.deleteAlert(name); - if (deleteResponse.isFound()) { + AlertsStore.AlertDelete delete = alertsStore.deleteAlert(name); + if (delete.deleteResponse().isFound()) { scheduler.remove(name); } - return deleteResponse; + return delete; } finally { alertLock.release(name); } @@ -104,7 +93,7 @@ public class AlertsService extends AbstractComponent { ensureStarted(); alertLock.acquire(alertName); try { - AlertsStore.AlertStoreModification result = alertsStore.putAlert(alertName, alertSource); + AlertsStore.AlertPut result = alertsStore.putAlert(alertName, alertSource); if (result.previous() == null || !result.previous().schedule().equals(result.current().schedule())) { scheduler.schedule(result.current()); } @@ -145,7 +134,7 @@ public class AlertsService extends AbstractComponent { * The reason the executing of the alert is split into two, is that we don't want to lose the fact that an alert has * fired. If we were */ - public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ + void triggerAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ ensureStarted(); alertLock.acquire(alertName); try { @@ -325,7 +314,7 @@ public class AlertsService extends AbstractComponent { private class SchedulerListener implements Scheduler.Listener { @Override public void fire(String alertName, DateTime scheduledFireTime, DateTime fireTime) { - + triggerAlert(alertName, scheduledFireTime, fireTime); } } @@ -428,7 +417,7 @@ public class AlertsService extends AbstractComponent { case 3: return STOPPING; default: - throw new ElasticsearchIllegalArgumentException("Unknown id: " + id); + throw new AlertsException("unknown id alerts service state id [" + id + "]"); } } } diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index c1cfd08caee..0f3e0a85801 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -22,15 +22,14 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.SearchHit; import java.io.IOException; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,8 +37,9 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class AlertsStore extends AbstractComponent { - public static final String ALERT_INDEX = ".alerts"; - public static final String ALERT_TYPE = "alert"; + static final String ALERT_INDEX = ".alerts"; + static final String ALERT_INDEX_TEMPLATE = "alerts"; + static final String ALERT_TYPE = "alert"; private final ClientProxy client; private final TemplateUtils templateUtils; @@ -63,6 +63,48 @@ public class AlertsStore extends AbstractComponent { this.scrollSize = componentSettings.getAsInt("scroll.size", 100); } + public boolean start(ClusterState state) { + if (started.get()) { + return true; + } + + IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); + if (alertIndexMetaData == null) { + logger.trace("alerts index [{}] was not found. skipping alerts loading...", ALERT_INDEX); + templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE); + started.set(true); + return true; + } + + if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { + logger.debug("alerts index [{}] found with all active primary shards. loading alerts...", ALERT_INDEX); + try { + int count = loadAlerts(client, scrollSize, scrollTimeout, alertIndexMetaData.numberOfShards(), alertParser, alertMap); + logger.info("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX); + } catch (Exception e) { + logger.warn("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX); + alertMap.clear(); + return false; + } + templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE); + started.set(true); + return true; + } + logger.warn("not all primary shards of the alerts index [{}] are started. scheduled to retry alert loading...", ALERT_INDEX); + return false; + } + + public boolean started() { + return started.get(); + } + + public void stop() { + if (started.compareAndSet(true, false)) { + alertMap.clear(); + logger.info("stopped alerts store"); + } + } + /** * Returns the alert with the specified name otherwise null is returned. */ @@ -75,119 +117,77 @@ public class AlertsStore extends AbstractComponent { * Creates an alert with the specified name and source. If an alert with the specified name already exists it will * get overwritten. */ - public AlertStoreModification putAlert(String alertName, BytesReference alertSource) { + public AlertPut putAlert(String alertName, BytesReference alertSource) { ensureStarted(); Alert alert = alertParser.parse(alertName, false, alertSource); IndexRequest indexRequest = createIndexRequest(alertName, alertSource); IndexResponse response = client.index(indexRequest).actionGet(); alert.status().version(response.getVersion()); Alert previous = alertMap.put(alertName, alert); - return new AlertStoreModification(previous, alert, response); + return new AlertPut(previous, alert, response); } /** - * Updates the specified alert by making sure that the made changes are persisted. + * Updates and persists the status of the given alert */ - public void updateAlertStatus(Alert alert) throws IOException { + void updateAlertStatus(Alert alert) throws IOException { + // at the moment we store the status together with the alert, + // so we just need to update the alert itself + // TODO: consider storing the status in a different documment (alert_status doc) (must smaller docs... faster for frequent updates) updateAlert(alert); } /** - * Updates the specified alert by making sure that the made changes are persisted. + * Updates and persists the given alert */ - public void updateAlert(Alert alert) throws IOException { + void updateAlert(Alert alert) throws IOException { ensureStarted(); - BytesReference source = XContentFactory.contentBuilder(XContentType.JSON).value(alert).bytes(); + assert alert == alertMap.get(alert.name()) : "update alert can only be applied to an already loaded alert"; + BytesReference source = JsonXContent.contentBuilder().value(alert).bytes(); IndexResponse response = client.index(createIndexRequest(alert.name(), source)).actionGet(); alert.status().version(response.getVersion()); // Don't need to update the alertMap, since we are working on an instance from it. - assert verifySameInstance(alert); - } - - private boolean verifySameInstance(Alert alert) { - Alert found = alertMap.get(alert.name()); - assert found == alert : "expected " + alert + " but got " + found; - return true; } /** * Deletes the alert with the specified name if exists */ - public DeleteResponse deleteAlert(String name) { + public AlertDelete deleteAlert(String name) { ensureStarted(); Alert alert = alertMap.remove(name); - if (alert == null) { - return new DeleteResponse(ALERT_INDEX, ALERT_TYPE, name, Versions.MATCH_ANY, false); + // even if the alert was not found in the alert map, we should still try to delete it + // from the index, just to make sure we don't leave traces of it + DeleteRequest request = new DeleteRequest(ALERT_INDEX, ALERT_TYPE, name); + if (alert != null) { + request.version(alert.status().version()); } - - DeleteRequest deleteRequest = new DeleteRequest(ALERT_INDEX, ALERT_TYPE, name); - deleteRequest.version(alert.status().version()); - DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet(); - assert deleteResponse.isFound(); - return deleteResponse; + DeleteResponse response = client.delete(request).actionGet(); + return new AlertDelete(response); } public ConcurrentMap getAlerts() { return alertMap; } - public boolean start(ClusterState state) { - if (started.get()) { - return true; - } - - IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX); - if (alertIndexMetaData != null) { - logger.debug("Previous alerting index"); - if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) { - logger.debug("Previous alerting index with active primary shards"); - try { - loadAlerts(alertIndexMetaData.numberOfShards()); - } catch (Exception e) { - logger.warn("Failed to load previously stored alerts. Schedule to retry alert loading...", e); - alertMap.clear(); - return false; - } - templateUtils.checkAndUploadIndexTemplate(state, "alerts"); - started.set(true); - return true; - } else { - logger.warn("Not all primary shards of the .alerts index are started. Schedule to retry alert loading..."); - return false; - } - } else { - logger.info("No previous .alert index, skip loading of alerts"); - templateUtils.checkAndUploadIndexTemplate(state, "alerts"); - started.set(true); - return true; - } - } - - public boolean started() { - return started.get(); - } - - public void stop() { - if (started.compareAndSet(true, false)) { - alertMap.clear(); - logger.info("Stopped alert store"); - } - } - - private IndexRequest createIndexRequest(String alertName, BytesReference alertSource) { + IndexRequest createIndexRequest(String alertName, BytesReference alertSource) { IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName); indexRequest.listenerThreaded(false); indexRequest.source(alertSource, false); return indexRequest; } - private void loadAlerts(int numPrimaryShards) { - assert alertMap.isEmpty() : "No alerts should reside, but there are " + alertMap.size() + " alerts."; + /** + * scrolls all the alert documents in the alerts index, parses them, and loads them into + * the given map. + */ + static int loadAlerts(ClientProxy client, int scrollSize, TimeValue scrollTimeout, int numPrimaryShards, Alert.Parser parser, Map alerts) { + assert alerts.isEmpty() : "no alerts should reside, but there are [" + alerts.size() + "] alerts."; RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_INDEX)).actionGet(); if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { - throw new ElasticsearchException("Not all required shards have been refreshed"); + throw new AlertsException("not all required shards have been refreshed"); } + int count = 0; SearchResponse response = client.prepareSearch(ALERT_INDEX) .setTypes(ALERT_TYPE) .setPreference("_primary") @@ -204,10 +204,12 @@ public class AlertsStore extends AbstractComponent { if (response.getHits().getTotalHits() > 0) { response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); while (response.getHits().hits().length != 0) { - for (SearchHit sh : response.getHits()) { - String alertId = sh.getId(); - Alert alert = parseLoadedAlert(alertId, sh); - alertMap.put(alertId, alert); + for (SearchHit hit : response.getHits()) { + String name = hit.getId(); + Alert alert = parser.parse(name, true, hit.getSourceRef()); + alert.status().version(hit.version()); + alerts.put(name, alert); + count++; } response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); } @@ -215,13 +217,7 @@ public class AlertsStore extends AbstractComponent { } finally { client.prepareClearScroll().addScrollId(response.getScrollId()).get(); } - logger.info("Loaded [{}] alerts from the alert index.", alertMap.size()); - } - - private Alert parseLoadedAlert(String alertId, SearchHit sh) { - Alert alert = alertParser.parse(alertId, true, sh.getSourceRef()); - alert.status().version(sh.version()); - return alert; + return count; } private void ensureStarted() { @@ -230,16 +226,16 @@ public class AlertsStore extends AbstractComponent { } } - public final class AlertStoreModification { + public final class AlertPut { private final Alert previous; private final Alert current; - private final IndexResponse indexResponse; + private final IndexResponse response; - public AlertStoreModification(Alert previous, Alert current, IndexResponse indexResponse) { + public AlertPut(Alert previous, Alert current, IndexResponse response) { this.current = current; this.previous = previous; - this.indexResponse = indexResponse; + this.response = response; } public Alert current() { @@ -251,7 +247,20 @@ public class AlertsStore extends AbstractComponent { } public IndexResponse indexResponse() { - return indexResponse; + return response; + } + } + + public final class AlertDelete { + + private final DeleteResponse response; + + public AlertDelete(DeleteResponse response) { + this.response = response; + } + + public DeleteResponse deleteResponse() { + return response; } } diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java index 28a22d30adb..4c47285068f 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java @@ -114,7 +114,7 @@ public class HistoryService extends AbstractComponent { String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); if (indices.length == 0) { logger.info("No previous .alerthistory index, skip loading of alert actions"); - templateUtils.checkAndUploadIndexTemplate(state, "alerthistory"); + templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); doStart(); return true; } @@ -138,7 +138,7 @@ public class HistoryService extends AbstractComponent { actionsToBeProcessed.clear(); return false; } - templateUtils.checkAndUploadIndexTemplate(state, "alerthistory"); + templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); doStart(); return true; } diff --git a/src/main/java/org/elasticsearch/alerts/support/TemplateUtils.java b/src/main/java/org/elasticsearch/alerts/support/TemplateUtils.java index 3c110fe9ca8..62367ddc4c6 100644 --- a/src/main/java/org/elasticsearch/alerts/support/TemplateUtils.java +++ b/src/main/java/org/elasticsearch/alerts/support/TemplateUtils.java @@ -42,7 +42,7 @@ public class TemplateUtils extends AbstractComponent { * * In the the template doesn't exists this method blocks until the template has been created. */ - public void checkAndUploadIndexTemplate(ClusterState state, final String templateName) { + public void ensureIndexTemplateIsLoaded(ClusterState state, final String templateName) { final byte[] template; try { InputStream is = AlertsStore.class.getResourceAsStream("/" + templateName + ".json"); diff --git a/src/main/java/org/elasticsearch/alerts/throttle/AckThrottler.java b/src/main/java/org/elasticsearch/alerts/throttle/AckThrottler.java index 97951f701cc..f3c6cee3838 100644 --- a/src/main/java/org/elasticsearch/alerts/throttle/AckThrottler.java +++ b/src/main/java/org/elasticsearch/alerts/throttle/AckThrottler.java @@ -17,9 +17,8 @@ public class AckThrottler implements Throttler { @Override public Result throttle(Alert alert, Trigger.Result result) { - Alert.Status.Ack ack = alert.status().ack(); - if (ack != null) { - return Result.throttle("alert [" + alert.name() + "] was acked at [" + formatDate(ack.timestamp())); + if (alert.status().state() != Alert.Status.State.ACKED) { + return Result.throttle("alert [" + alert.name() + "] was acked at [" + formatDate(alert.status().lastStateChanged()) + "]"); } return Result.NO; }