From 26e053eaedb2f4a59d6d745b01c37d4d950fca05 Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Mon, 18 Aug 2014 10:27:07 +0100 Subject: [PATCH] Alerting : Claim alert runs This commit add functionality so that multiple nodes running alert plugins won't stand on each other trying to run the same alerts at the same time. Original commit: elastic/x-pack-elasticsearch@9c350c011463277081408d3a0752eaa71a6c41a3 --- .../org/elasticsearch/alerting/Alert.java | 26 ++++- .../alerting/AlertActionManager.java | 5 +- .../elasticsearch/alerting/AlertManager.java | 98 ++++++++++++++++--- .../alerting/AlertScheduler.java | 9 +- .../alerting/IndexAlertAction.java | 8 +- .../alerting/IndexAlertActionFactory.java | 20 +++- 6 files changed, 139 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerting/Alert.java b/src/main/java/org/elasticsearch/alerting/Alert.java index 0a1869ff3d2..dbbee260bf0 100644 --- a/src/main/java/org/elasticsearch/alerting/Alert.java +++ b/src/main/java/org/elasticsearch/alerting/Alert.java @@ -25,6 +25,24 @@ public class Alert { private List actions; private String schedule; private DateTime lastRan; + private long version; + private DateTime running; + + public DateTime running() { + return running; + } + + public void running(DateTime running) { + this.running = running; + } + + public long version() { + return version; + } + + public void version(long version) { + this.version = version; + } public List indices() { return indices; @@ -36,7 +54,6 @@ public class Alert { private List indices; - public String alertName() { return alertName; } @@ -91,7 +108,7 @@ public class Alert { public Alert(String alertName, String queryName, AlertTrigger trigger, TimeValue timePeriod, List actions, String schedule, DateTime lastRan, - List indices){ + List indices, DateTime running, long version){ this.alertName = alertName; this.queryName = queryName; this.trigger = trigger; @@ -100,14 +117,19 @@ public class Alert { this.lastRan = lastRan; this.schedule = schedule; this.indices = indices; + this.version = version; + this.running = running; } public XContentBuilder toXContent(XContentBuilder builder) throws IOException { + + //Note we deliberately don't serialize the version here builder.startObject(); builder.field(AlertManager.QUERY_FIELD.getPreferredName(), queryName); builder.field(AlertManager.SCHEDULE_FIELD.getPreferredName(), schedule); builder.field(AlertManager.TIMEPERIOD_FIELD.getPreferredName(), timePeriod); builder.field(AlertManager.LASTRAN_FIELD.getPreferredName(), lastRan); + builder.field(AlertManager.CURRENTLY_RUNNING.getPreferredName(), running); builder.field(AlertManager.TRIGGER_FIELD.getPreferredName()); trigger.toXContent(builder); builder.field(AlertManager.ACTION_FIELD.getPreferredName()); diff --git a/src/main/java/org/elasticsearch/alerting/AlertActionManager.java b/src/main/java/org/elasticsearch/alerting/AlertActionManager.java index b523efdf5d8..4ad90b1ae6c 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerting/AlertActionManager.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.alerting; +import org.elasticsearch.client.Client; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -20,12 +21,12 @@ public class AlertActionManager extends AbstractComponent { private final Map actionImplemented; @Inject - public AlertActionManager(Settings settings, AlertManager alertManager) { + public AlertActionManager(Settings settings, AlertManager alertManager, Client client) { super(settings); this.alertManager = alertManager; this.actionImplemented = new HashMap<>(); registerAction("email", new EmailAlertActionFactory()); - registerAction("index", new IndexAlertActionFactory()); + registerAction("index", new IndexAlertActionFactory(client)); } public void registerAction(String name, AlertActionFactory actionFactory){ diff --git a/src/main/java/org/elasticsearch/alerting/AlertManager.java b/src/main/java/org/elasticsearch/alerting/AlertManager.java index 97457923030..c19c93e1435 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerting/AlertManager.java @@ -14,6 +14,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; @@ -57,6 +59,7 @@ public class AlertManager extends AbstractLifecycleComponent { public static final ParseField ACTION_FIELD = new ParseField("action"); public static final ParseField LASTRAN_FIELD = new ParseField("lastRan"); public static final ParseField INDICES = new ParseField("indices"); + public static final ParseField CURRENTLY_RUNNING = new ParseField("running"); private final Client client; private AlertScheduler scheduler; @@ -146,14 +149,68 @@ public class AlertManager extends AbstractLifecycleComponent { this.scheduler = scheduler; } - private ClusterHealthStatus createAlertsIndex() throws InterruptedException, ExecutionException { - CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().get(); //TODO FIX MAPPINGS + private ClusterHealthStatus createAlertsIndex() { + CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS logger.warn(cir.toString()); ClusterHealthResponse actionGet = client.admin().cluster() .health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet(); return actionGet.getStatus(); } + public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) { + Alert alert; + try { + alert = getAlertFromIndex(alertName); + if (alert.running().equals(scheduleRunTime) || alert.running().isAfter(scheduleRunTime)) { + //Someone else is already running this alert or this alert time has passed + return false; + } + } catch (Throwable t) { + throw new ElasticsearchException("Unable to load alert from index",t); + } + alert.running(scheduleRunTime); + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.index(ALERT_INDEX); + updateRequest.type(ALERT_TYPE); + updateRequest.id(alertName); + updateRequest.version(alert.version());//Since we loaded this alert directly from the index the version should be correct + XContentBuilder alertBuilder; + try { + alertBuilder = XContentFactory.jsonBuilder(); + alert.toXContent(alertBuilder); + } catch (IOException ie) { + throw new ElasticsearchException("Unable to serialize alert ["+ alertName + "]", ie); + } + updateRequest.doc(alertBuilder); + updateRequest.retryOnConflict(0); + + try { + client.update(updateRequest).actionGet(); + } catch (ElasticsearchException ee) { + logger.error("Failed to update in claim", ee); + return false; + } + synchronized (alertMap) { //Update the alert map + if (alertMap.containsKey(alertName)) { + alertMap.get(alertName).running(scheduleRunTime); + } + } + return true; + + } + + private Alert getAlertFromIndex(String alertName) { + GetRequest getRequest = Requests.getRequest(ALERT_INDEX); + getRequest.type(ALERT_TYPE); + getRequest.id(alertName); + GetResponse getResponse = client.get(getRequest).actionGet(); + if (getResponse.isExists()) { + return parseAlert(alertName, getResponse.getSourceAsMap(), getResponse.getVersion()); + } else { + throw new ElasticsearchException("Unable to find [" + alertName + "] in the [" +ALERT_INDEX + "]" ); + } + } + public void refreshAlerts() { try { synchronized (alertMap) { @@ -167,8 +224,8 @@ public class AlertManager extends AbstractLifecycleComponent { } } - private void loadAlerts() throws InterruptedException, ExecutionException{ - if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().get().isExists()) { + private void loadAlerts() { + if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) { createAlertsIndex(); } @@ -178,11 +235,16 @@ public class AlertManager extends AbstractLifecycleComponent { "{ \"match_all\" : {}}," + "\"size\" : \"100\"" + "}" - ).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().get(); + ).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().actionGet(); for (SearchHit sh : searchResponse.getHits()) { String alertId = sh.getId(); - Alert alert = parseAlert(alertId, sh); - alertMap.put(alertId, alert); + try { + Alert alert = parseAlert(alertId, sh); + alertMap.put(alertId, alert); + } catch (ElasticsearchException e) { + logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh); + } + } logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size()); } @@ -244,7 +306,7 @@ public class AlertManager extends AbstractLifecycleComponent { indexRequest.operationThreaded(false); indexRequest.refresh(true); //Always refresh after indexing an alert indexRequest.opType(IndexRequest.OpType.CREATE); - return client.index(indexRequest).get().isCreated(); + return client.index(indexRequest).actionGet().isCreated(); } public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException{ @@ -279,7 +341,7 @@ public class AlertManager extends AbstractLifecycleComponent { return false; } - public boolean addAlert(String alertName, Alert alert, boolean persist) throws InterruptedException, ExecutionException{ + public boolean addAlert(String alertName, Alert alert, boolean persist) { synchronized (alertMap) { if (alertMap.containsKey(alertName)) { throw new ElasticsearchIllegalArgumentException("There is already an alert named ["+alertName+"]"); @@ -297,7 +359,7 @@ public class AlertManager extends AbstractLifecycleComponent { indexRequest.refresh(true); //Always refresh after indexing an alert indexRequest.source(builder); indexRequest.opType(IndexRequest.OpType.CREATE); - return client.index(indexRequest).get().isCreated(); + return client.index(indexRequest).actionGet().isCreated(); } catch (IOException ie) { throw new ElasticsearchIllegalStateException("Unable to convert alert to JSON", ie); } @@ -311,10 +373,17 @@ public class AlertManager extends AbstractLifecycleComponent { private Alert parseAlert(String alertId, SearchHit sh) { Map fields = sh.sourceAsMap(); - return parseAlert(alertId,fields); + return parseAlert(alertId, fields, sh.getVersion()); } + + public Alert parseAlert(String alertId, Map fields) { + return parseAlert(alertId, fields, -1); + } + + public Alert parseAlert(String alertId, Map fields, long version ) { + //Map fields = sh.getFields(); logger.warn("Parsing : [{}]", alertId); for (String field : fields.keySet() ) { @@ -351,6 +420,11 @@ public class AlertManager extends AbstractLifecycleComponent { lastRan = new DateTime(fields.get("lastRan").toString()); } + DateTime running = new DateTime(0); + if (fields.get(CURRENTLY_RUNNING.getPreferredName()) != null) { + running = new DateTime(fields.get(CURRENTLY_RUNNING.getPreferredName()).toString()); + } + List indices = new ArrayList<>(); if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){ indices = (List)fields.get(INDICES.getPreferredName()); @@ -358,7 +432,7 @@ public class AlertManager extends AbstractLifecycleComponent { logger.warn("Indices : " + fields.get(INDICES.getPreferredName()) + " class " + fields.get(INDICES.getPreferredName()).getClass() ); } - return new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices); + return new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version); } public Map getSafeAlertMap() { diff --git a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java index dd1b6340f42..f077681b512 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java @@ -66,9 +66,12 @@ public class AlertScheduler extends AbstractLifecycleComponent { public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){ logger.warn("Running [{}]",alertName); Alert alert = alertManager.getAlertForName(alertName); - - //@TODO : claim alert + DateTime scheduledTime = new DateTime(jobExecutionContext.getScheduledFireTime()); try { + if (!alertManager.claimAlertRun(alertName, scheduledTime) ){ + logger.warn("Another process has already run this alert."); + return; + } XContentBuilder builder = createClampedQuery(jobExecutionContext, alert); logger.warn("Running the following query : [{}]", builder.string()); @@ -99,7 +102,7 @@ public class AlertScheduler extends AbstractLifecycleComponent { logger.warn("Failed to store history for alert [{}]", alertName); } } catch (Exception e) { - logger.error("Failed execute alert [{}]",e, alert.queryName()); + logger.error("Failed execute alert [{}]", e, alertName); } } diff --git a/src/main/java/org/elasticsearch/alerting/IndexAlertAction.java b/src/main/java/org/elasticsearch/alerting/IndexAlertAction.java index 5c021a907bf..1780083c3d2 100644 --- a/src/main/java/org/elasticsearch/alerting/IndexAlertAction.java +++ b/src/main/java/org/elasticsearch/alerting/IndexAlertAction.java @@ -19,16 +19,14 @@ public class IndexAlertAction implements AlertAction { private final String type; private Client client = null; - public IndexAlertAction(String index, String type){ + @Inject + public IndexAlertAction(String index, String type, Client client){ this.index = index; this.type = type; - } - - @Inject - public void setClient(Client client){ this.client = client; } + @Override public String getActionName() { return "index"; diff --git a/src/main/java/org/elasticsearch/alerting/IndexAlertActionFactory.java b/src/main/java/org/elasticsearch/alerting/IndexAlertActionFactory.java index a4613d625bc..9a6ca20c89b 100644 --- a/src/main/java/org/elasticsearch/alerting/IndexAlertActionFactory.java +++ b/src/main/java/org/elasticsearch/alerting/IndexAlertActionFactory.java @@ -6,16 +6,22 @@ package org.elasticsearch.alerting; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; +import java.util.Locale; import java.util.Map; /** * Created by brian on 8/17/14. */ public class IndexAlertActionFactory implements AlertActionFactory { + Client client; + public IndexAlertActionFactory(Client client){ + this.client = client; + } @Override public AlertAction createAction(Object parameters) { @@ -23,13 +29,21 @@ public class IndexAlertActionFactory implements AlertActionFactory { if (parameters instanceof Map) { Map paramMap = (Map) parameters; String index = paramMap.get("index").toString(); + if (!index.toLowerCase(Locale.ROOT).equals(index)) { + throw new ElasticsearchIllegalArgumentException("Index names must be all lowercase"); + } + String type = paramMap.get("type").toString(); - return new IndexAlertAction(index, type); + if (!type.toLowerCase(Locale.ROOT).equals(type)) { + throw new ElasticsearchIllegalArgumentException("Type names must be all lowercase"); + } + + return new IndexAlertAction(index, type, client); } else { - throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an EmailAlertAction"); + throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an IndexAlertAction"); } } catch (Throwable t){ - throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an EmailAlertAction"); + throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an IndexAlertAction", t); } } }