updated the alert service and store

Original commit: elastic/x-pack-elasticsearch@61c75d8258
This commit is contained in:
uboness 2015-02-05 11:50:33 +01:00
parent af7cf03a1c
commit 01375a320d
7 changed files with 119 additions and 122 deletions

View File

@ -237,7 +237,7 @@ public class Alert implements ToXContent {
public static class Status implements ToXContent { public static class Status implements ToXContent {
enum State { public enum State {
NOT_EXECUTED, NOT_EXECUTED,
EXECUTED, EXECUTED,
ACKED ACKED

View File

@ -40,9 +40,9 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
protected void configure() { protected void configure() {
bind(Alert.Parser.class).asEagerSingleton(); bind(Alert.Parser.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();
bind(AlertsStore.class).asEagerSingleton();
bind(AlertsService.class).asEagerSingleton(); bind(AlertsService.class).asEagerSingleton();
bind(AlertsStore.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();
bind(HistoryService.class).asEagerSingleton(); bind(HistoryService.class).asEagerSingleton();
bind(AlertActionRegistry.class).asEagerSingleton(); bind(AlertActionRegistry.class).asEagerSingleton();
bind(ConfigurationService.class).asEagerSingleton(); bind(ConfigurationService.class).asEagerSingleton();

View File

@ -7,19 +7,14 @@ package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.history.AlertRecord; import org.elasticsearch.alerts.history.AlertRecord;
import org.elasticsearch.alerts.history.HistoryService; import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.scheduler.Scheduler; import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.trigger.Trigger; import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -42,35 +37,29 @@ import java.util.concurrent.atomic.AtomicReference;
public class AlertsService extends AbstractComponent { public class AlertsService extends AbstractComponent {
private final ClientProxy client;
private final Scheduler scheduler; private final Scheduler scheduler;
private final AlertsStore alertsStore; private final AlertsStore alertsStore;
private final TriggerRegistry triggerRegistry;
private final HistoryService historyService; private final HistoryService historyService;
private final AlertActionRegistry actionRegistry; private final AlertActionRegistry actionRegistry;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ClusterService clusterService; private final ClusterService clusterService;
private final ScriptServiceProxy scriptService;
private final KeyedLock<String> alertLock = new KeyedLock<>(); private final KeyedLock<String> alertLock = new KeyedLock<>();
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED); private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private volatile boolean manuallyStopped; private volatile boolean manuallyStopped;
@Inject @Inject
public AlertsService(Settings settings, ClientProxy client, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore, public AlertsService(Settings settings, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore,
IndicesService indicesService, TriggerRegistry triggerRegistry, HistoryService historyService, IndicesService indicesService, HistoryService historyService,
AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptServiceProxy scriptService) { AlertActionRegistry actionRegistry, ThreadPool threadPool) {
super(settings); super(settings);
this.client = client;
this.scheduler = scheduler; this.scheduler = scheduler;
this.threadPool = threadPool; this.threadPool = threadPool;
this.alertsStore = alertsStore; this.alertsStore = alertsStore;
this.triggerRegistry = triggerRegistry;
this.historyService = historyService; this.historyService = historyService;
this.historyService.setAlertsService(this); this.historyService.setAlertsService(this);
this.actionRegistry = actionRegistry; this.actionRegistry = actionRegistry;
this.clusterService = clusterService; this.clusterService = clusterService;
this.scriptService = scriptService;
scheduler.addListener(new SchedulerListener()); scheduler.addListener(new SchedulerListener());
@ -86,15 +75,15 @@ public class AlertsService extends AbstractComponent {
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true); manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
} }
public DeleteResponse deleteAlert(String name) throws InterruptedException, ExecutionException { public AlertsStore.AlertDelete deleteAlert(String name) throws InterruptedException, ExecutionException {
ensureStarted(); ensureStarted();
alertLock.acquire(name); alertLock.acquire(name);
try { try {
DeleteResponse deleteResponse = alertsStore.deleteAlert(name); AlertsStore.AlertDelete delete = alertsStore.deleteAlert(name);
if (deleteResponse.isFound()) { if (delete.deleteResponse().isFound()) {
scheduler.remove(name); scheduler.remove(name);
} }
return deleteResponse; return delete;
} finally { } finally {
alertLock.release(name); alertLock.release(name);
} }
@ -104,7 +93,7 @@ public class AlertsService extends AbstractComponent {
ensureStarted(); ensureStarted();
alertLock.acquire(alertName); alertLock.acquire(alertName);
try { try {
AlertsStore.AlertStoreModification result = alertsStore.putAlert(alertName, alertSource); AlertsStore.AlertPut result = alertsStore.putAlert(alertName, alertSource);
if (result.previous() == null || !result.previous().schedule().equals(result.current().schedule())) { if (result.previous() == null || !result.previous().schedule().equals(result.current().schedule())) {
scheduler.schedule(result.current()); scheduler.schedule(result.current());
} }
@ -145,7 +134,7 @@ public class AlertsService extends AbstractComponent {
* The reason the executing of the alert is split into two, is that we don't want to lose the fact that an alert has * The reason the executing of the alert is split into two, is that we don't want to lose the fact that an alert has
* fired. If we were * fired. If we were
*/ */
public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ void triggerAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){
ensureStarted(); ensureStarted();
alertLock.acquire(alertName); alertLock.acquire(alertName);
try { try {
@ -325,7 +314,7 @@ public class AlertsService extends AbstractComponent {
private class SchedulerListener implements Scheduler.Listener { private class SchedulerListener implements Scheduler.Listener {
@Override @Override
public void fire(String alertName, DateTime scheduledFireTime, DateTime fireTime) { public void fire(String alertName, DateTime scheduledFireTime, DateTime fireTime) {
triggerAlert(alertName, scheduledFireTime, fireTime);
} }
} }
@ -428,7 +417,7 @@ public class AlertsService extends AbstractComponent {
case 3: case 3:
return STOPPING; return STOPPING;
default: default:
throw new ElasticsearchIllegalArgumentException("Unknown id: " + id); throw new AlertsException("unknown id alerts service state id [" + id + "]");
} }
} }
} }

View File

@ -22,15 +22,14 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -38,8 +37,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/ */
public class AlertsStore extends AbstractComponent { public class AlertsStore extends AbstractComponent {
public static final String ALERT_INDEX = ".alerts"; static final String ALERT_INDEX = ".alerts";
public static final String ALERT_TYPE = "alert"; static final String ALERT_INDEX_TEMPLATE = "alerts";
static final String ALERT_TYPE = "alert";
private final ClientProxy client; private final ClientProxy client;
private final TemplateUtils templateUtils; private final TemplateUtils templateUtils;
@ -63,6 +63,48 @@ public class AlertsStore extends AbstractComponent {
this.scrollSize = componentSettings.getAsInt("scroll.size", 100); this.scrollSize = componentSettings.getAsInt("scroll.size", 100);
} }
public boolean start(ClusterState state) {
if (started.get()) {
return true;
}
IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData == null) {
logger.trace("alerts index [{}] was not found. skipping alerts loading...", ALERT_INDEX);
templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE);
started.set(true);
return true;
}
if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
logger.debug("alerts index [{}] found with all active primary shards. loading alerts...", ALERT_INDEX);
try {
int count = loadAlerts(client, scrollSize, scrollTimeout, alertIndexMetaData.numberOfShards(), alertParser, alertMap);
logger.info("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX);
} catch (Exception e) {
logger.warn("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX);
alertMap.clear();
return false;
}
templateUtils.ensureIndexTemplateIsLoaded(state, ALERT_INDEX_TEMPLATE);
started.set(true);
return true;
}
logger.warn("not all primary shards of the alerts index [{}] are started. scheduled to retry alert loading...", ALERT_INDEX);
return false;
}
public boolean started() {
return started.get();
}
public void stop() {
if (started.compareAndSet(true, false)) {
alertMap.clear();
logger.info("stopped alerts store");
}
}
/** /**
* Returns the alert with the specified name otherwise <code>null</code> is returned. * Returns the alert with the specified name otherwise <code>null</code> is returned.
*/ */
@ -75,119 +117,77 @@ public class AlertsStore extends AbstractComponent {
* Creates an alert with the specified name and source. If an alert with the specified name already exists it will * Creates an alert with the specified name and source. If an alert with the specified name already exists it will
* get overwritten. * get overwritten.
*/ */
public AlertStoreModification putAlert(String alertName, BytesReference alertSource) { public AlertPut putAlert(String alertName, BytesReference alertSource) {
ensureStarted(); ensureStarted();
Alert alert = alertParser.parse(alertName, false, alertSource); Alert alert = alertParser.parse(alertName, false, alertSource);
IndexRequest indexRequest = createIndexRequest(alertName, alertSource); IndexRequest indexRequest = createIndexRequest(alertName, alertSource);
IndexResponse response = client.index(indexRequest).actionGet(); IndexResponse response = client.index(indexRequest).actionGet();
alert.status().version(response.getVersion()); alert.status().version(response.getVersion());
Alert previous = alertMap.put(alertName, alert); Alert previous = alertMap.put(alertName, alert);
return new AlertStoreModification(previous, alert, response); return new AlertPut(previous, alert, response);
} }
/** /**
* Updates the specified alert by making sure that the made changes are persisted. * Updates and persists the status of the given alert
*/ */
public void updateAlertStatus(Alert alert) throws IOException { void updateAlertStatus(Alert alert) throws IOException {
// at the moment we store the status together with the alert,
// so we just need to update the alert itself
// TODO: consider storing the status in a different documment (alert_status doc) (must smaller docs... faster for frequent updates)
updateAlert(alert); updateAlert(alert);
} }
/** /**
* Updates the specified alert by making sure that the made changes are persisted. * Updates and persists the given alert
*/ */
public void updateAlert(Alert alert) throws IOException { void updateAlert(Alert alert) throws IOException {
ensureStarted(); ensureStarted();
BytesReference source = XContentFactory.contentBuilder(XContentType.JSON).value(alert).bytes(); assert alert == alertMap.get(alert.name()) : "update alert can only be applied to an already loaded alert";
BytesReference source = JsonXContent.contentBuilder().value(alert).bytes();
IndexResponse response = client.index(createIndexRequest(alert.name(), source)).actionGet(); IndexResponse response = client.index(createIndexRequest(alert.name(), source)).actionGet();
alert.status().version(response.getVersion()); alert.status().version(response.getVersion());
// Don't need to update the alertMap, since we are working on an instance from it. // Don't need to update the alertMap, since we are working on an instance from it.
assert verifySameInstance(alert);
}
private boolean verifySameInstance(Alert alert) {
Alert found = alertMap.get(alert.name());
assert found == alert : "expected " + alert + " but got " + found;
return true;
} }
/** /**
* Deletes the alert with the specified name if exists * Deletes the alert with the specified name if exists
*/ */
public DeleteResponse deleteAlert(String name) { public AlertDelete deleteAlert(String name) {
ensureStarted(); ensureStarted();
Alert alert = alertMap.remove(name); Alert alert = alertMap.remove(name);
if (alert == null) { // even if the alert was not found in the alert map, we should still try to delete it
return new DeleteResponse(ALERT_INDEX, ALERT_TYPE, name, Versions.MATCH_ANY, false); // from the index, just to make sure we don't leave traces of it
DeleteRequest request = new DeleteRequest(ALERT_INDEX, ALERT_TYPE, name);
if (alert != null) {
request.version(alert.status().version());
} }
DeleteResponse response = client.delete(request).actionGet();
DeleteRequest deleteRequest = new DeleteRequest(ALERT_INDEX, ALERT_TYPE, name); return new AlertDelete(response);
deleteRequest.version(alert.status().version());
DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet();
assert deleteResponse.isFound();
return deleteResponse;
} }
public ConcurrentMap<String, Alert> getAlerts() { public ConcurrentMap<String, Alert> getAlerts() {
return alertMap; return alertMap;
} }
public boolean start(ClusterState state) { IndexRequest createIndexRequest(String alertName, BytesReference alertSource) {
if (started.get()) {
return true;
}
IndexMetaData alertIndexMetaData = state.getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) {
logger.debug("Previous alerting index");
if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
logger.debug("Previous alerting index with active primary shards");
try {
loadAlerts(alertIndexMetaData.numberOfShards());
} catch (Exception e) {
logger.warn("Failed to load previously stored alerts. Schedule to retry alert loading...", e);
alertMap.clear();
return false;
}
templateUtils.checkAndUploadIndexTemplate(state, "alerts");
started.set(true);
return true;
} else {
logger.warn("Not all primary shards of the .alerts index are started. Schedule to retry alert loading...");
return false;
}
} else {
logger.info("No previous .alert index, skip loading of alerts");
templateUtils.checkAndUploadIndexTemplate(state, "alerts");
started.set(true);
return true;
}
}
public boolean started() {
return started.get();
}
public void stop() {
if (started.compareAndSet(true, false)) {
alertMap.clear();
logger.info("Stopped alert store");
}
}
private IndexRequest createIndexRequest(String alertName, BytesReference alertSource) {
IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName); IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false); indexRequest.listenerThreaded(false);
indexRequest.source(alertSource, false); indexRequest.source(alertSource, false);
return indexRequest; return indexRequest;
} }
private void loadAlerts(int numPrimaryShards) { /**
assert alertMap.isEmpty() : "No alerts should reside, but there are " + alertMap.size() + " alerts."; * scrolls all the alert documents in the alerts index, parses them, and loads them into
* the given map.
*/
static int loadAlerts(ClientProxy client, int scrollSize, TimeValue scrollTimeout, int numPrimaryShards, Alert.Parser parser, Map<String, Alert> alerts) {
assert alerts.isEmpty() : "no alerts should reside, but there are [" + alerts.size() + "] alerts.";
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_INDEX)).actionGet(); RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_INDEX)).actionGet();
if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { if (refreshResponse.getSuccessfulShards() < numPrimaryShards) {
throw new ElasticsearchException("Not all required shards have been refreshed"); throw new AlertsException("not all required shards have been refreshed");
} }
int count = 0;
SearchResponse response = client.prepareSearch(ALERT_INDEX) SearchResponse response = client.prepareSearch(ALERT_INDEX)
.setTypes(ALERT_TYPE) .setTypes(ALERT_TYPE)
.setPreference("_primary") .setPreference("_primary")
@ -204,10 +204,12 @@ public class AlertsStore extends AbstractComponent {
if (response.getHits().getTotalHits() > 0) { if (response.getHits().getTotalHits() > 0) {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
while (response.getHits().hits().length != 0) { while (response.getHits().hits().length != 0) {
for (SearchHit sh : response.getHits()) { for (SearchHit hit : response.getHits()) {
String alertId = sh.getId(); String name = hit.getId();
Alert alert = parseLoadedAlert(alertId, sh); Alert alert = parser.parse(name, true, hit.getSourceRef());
alertMap.put(alertId, alert); alert.status().version(hit.version());
alerts.put(name, alert);
count++;
} }
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
} }
@ -215,13 +217,7 @@ public class AlertsStore extends AbstractComponent {
} finally { } finally {
client.prepareClearScroll().addScrollId(response.getScrollId()).get(); client.prepareClearScroll().addScrollId(response.getScrollId()).get();
} }
logger.info("Loaded [{}] alerts from the alert index.", alertMap.size()); return count;
}
private Alert parseLoadedAlert(String alertId, SearchHit sh) {
Alert alert = alertParser.parse(alertId, true, sh.getSourceRef());
alert.status().version(sh.version());
return alert;
} }
private void ensureStarted() { private void ensureStarted() {
@ -230,16 +226,16 @@ public class AlertsStore extends AbstractComponent {
} }
} }
public final class AlertStoreModification { public final class AlertPut {
private final Alert previous; private final Alert previous;
private final Alert current; private final Alert current;
private final IndexResponse indexResponse; private final IndexResponse response;
public AlertStoreModification(Alert previous, Alert current, IndexResponse indexResponse) { public AlertPut(Alert previous, Alert current, IndexResponse response) {
this.current = current; this.current = current;
this.previous = previous; this.previous = previous;
this.indexResponse = indexResponse; this.response = response;
} }
public Alert current() { public Alert current() {
@ -251,7 +247,20 @@ public class AlertsStore extends AbstractComponent {
} }
public IndexResponse indexResponse() { public IndexResponse indexResponse() {
return indexResponse; return response;
}
}
public final class AlertDelete {
private final DeleteResponse response;
public AlertDelete(DeleteResponse response) {
this.response = response;
}
public DeleteResponse deleteResponse() {
return response;
} }
} }

View File

@ -114,7 +114,7 @@ public class HistoryService extends AbstractComponent {
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
if (indices.length == 0) { if (indices.length == 0) {
logger.info("No previous .alerthistory index, skip loading of alert actions"); logger.info("No previous .alerthistory index, skip loading of alert actions");
templateUtils.checkAndUploadIndexTemplate(state, "alerthistory"); templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
doStart(); doStart();
return true; return true;
} }
@ -138,7 +138,7 @@ public class HistoryService extends AbstractComponent {
actionsToBeProcessed.clear(); actionsToBeProcessed.clear();
return false; return false;
} }
templateUtils.checkAndUploadIndexTemplate(state, "alerthistory"); templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
doStart(); doStart();
return true; return true;
} }

View File

@ -42,7 +42,7 @@ public class TemplateUtils extends AbstractComponent {
* *
* In the the template doesn't exists this method blocks until the template has been created. * In the the template doesn't exists this method blocks until the template has been created.
*/ */
public void checkAndUploadIndexTemplate(ClusterState state, final String templateName) { public void ensureIndexTemplateIsLoaded(ClusterState state, final String templateName) {
final byte[] template; final byte[] template;
try { try {
InputStream is = AlertsStore.class.getResourceAsStream("/" + templateName + ".json"); InputStream is = AlertsStore.class.getResourceAsStream("/" + templateName + ".json");

View File

@ -17,9 +17,8 @@ public class AckThrottler implements Throttler {
@Override @Override
public Result throttle(Alert alert, Trigger.Result result) { public Result throttle(Alert alert, Trigger.Result result) {
Alert.Status.Ack ack = alert.status().ack(); if (alert.status().state() != Alert.Status.State.ACKED) {
if (ack != null) { return Result.throttle("alert [" + alert.name() + "] was acked at [" + formatDate(alert.status().lastStateChanged()) + "]");
return Result.throttle("alert [" + alert.name() + "] was acked at [" + formatDate(ack.timestamp()));
} }
return Result.NO; return Result.NO;
} }