From 8c623534c239a372f35f4a43110c081ada49eee9 Mon Sep 17 00:00:00 2001 From: Brian Murphy Date: Thu, 21 Aug 2014 16:33:11 +0100 Subject: [PATCH] Alerting : Cleanup and fixes. These changes fix the alert throttling during the time period, move alert history it's own index. Original commit: elastic/x-pack-elasticsearch@51306378240c2095b556168384e602332717d6c1 --- .../org/elasticsearch/alerting/Alert.java | 52 +++---------------- .../elasticsearch/alerting/AlertManager.java | 42 +++++++++++++-- .../alerting/AlertRestHandler.java | 28 ++++++++-- .../alerting/AlertScheduler.java | 25 ++++++--- 4 files changed, 89 insertions(+), 58 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerting/Alert.java b/src/main/java/org/elasticsearch/alerting/Alert.java index f4b7e0e7cb5..91010d766fc 100644 --- a/src/main/java/org/elasticsearch/alerting/Alert.java +++ b/src/main/java/org/elasticsearch/alerting/Alert.java @@ -22,51 +22,8 @@ public class Alert implements ToXContent{ private TimeValue timePeriod; private List actions; private String schedule; - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Alert alert = (Alert) o; - - if (enabled != alert.enabled) return false; - if (simpleQuery != alert.simpleQuery) return false; - if (version != alert.version) return false; - if (actions != null ? !actions.equals(alert.actions) : alert.actions != null) return false; - if (alertName != null ? !alertName.equals(alert.alertName) : alert.alertName != null) return false; - if (indices != null ? !indices.equals(alert.indices) : alert.indices != null) return false; - if (lastRan != null ? !lastRan.equals(alert.lastRan) : alert.lastRan != null) return false; - if (queryName != null ? !queryName.equals(alert.queryName) : alert.queryName != null) return false; - if (running != null ? !running.equals(alert.running) : alert.running != null) return false; - if (schedule != null ? !schedule.equals(alert.schedule) : alert.schedule != null) return false; - if (timePeriod != null ? !timePeriod.equals(alert.timePeriod) : alert.timePeriod != null) return false; - if (timestampString != null ? !timestampString.equals(alert.timestampString) : alert.timestampString != null) - return false; - if (trigger != null ? !trigger.equals(alert.trigger) : alert.trigger != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = alertName != null ? alertName.hashCode() : 0; - result = 31 * result + (queryName != null ? queryName.hashCode() : 0); - result = 31 * result + (trigger != null ? trigger.hashCode() : 0); - result = 31 * result + (timePeriod != null ? timePeriod.hashCode() : 0); - result = 31 * result + (actions != null ? actions.hashCode() : 0); - result = 31 * result + (schedule != null ? schedule.hashCode() : 0); - result = 31 * result + (lastRan != null ? lastRan.hashCode() : 0); - result = 31 * result + (int) (version ^ (version >>> 32)); - result = 31 * result + (running != null ? running.hashCode() : 0); - result = 31 * result + (enabled ? 1 : 0); - result = 31 * result + (simpleQuery ? 1 : 0); - result = 31 * result + (timestampString != null ? timestampString.hashCode() : 0); - result = 31 * result + (indices != null ? indices.hashCode() : 0); - return result; - } - private DateTime lastRan; + private DateTime lastActionFire; private long version; private DateTime running; private boolean enabled; @@ -81,7 +38,13 @@ public class Alert implements ToXContent{ this.timestampString = timestampString; } + public DateTime lastActionFire() { + return lastActionFire; + } + public void lastActionFire(DateTime lastActionFire) { + this.lastActionFire = lastActionFire; + } public boolean simpleQuery() { return simpleQuery; @@ -206,6 +169,7 @@ public class Alert implements ToXContent{ builder.field(AlertManager.CURRENTLY_RUNNING.getPreferredName(), running); builder.field(AlertManager.ENABLED.getPreferredName(), enabled); builder.field(AlertManager.SIMPLE_QUERY.getPreferredName(), simpleQuery); + builder.field(AlertManager.LAST_ACTION_FIRE.getPreferredName(), lastActionFire); builder.field(AlertManager.TRIGGER_FIELD.getPreferredName()); trigger.toXContent(builder, params); diff --git a/src/main/java/org/elasticsearch/alerting/AlertManager.java b/src/main/java/org/elasticsearch/alerting/AlertManager.java index 029e7b87809..ba3e787021e 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerting/AlertManager.java @@ -43,6 +43,7 @@ public class AlertManager extends AbstractLifecycleComponent { public final String ALERT_INDEX = ".alerts"; public final String ALERT_TYPE = "alert"; + public final String ALERT_HISTORY_INDEX = "alerthistory"; public final String ALERT_HISTORY_TYPE = "alertHistory"; public static final ParseField QUERY_FIELD = new ParseField("query"); @@ -56,6 +57,7 @@ public class AlertManager extends AbstractLifecycleComponent { public static final ParseField ENABLED = new ParseField("enabled"); public static final ParseField SIMPLE_QUERY = new ParseField("simple"); public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield"); + public static final ParseField LAST_ACTION_FIRE = new ParseField("lastactionfire"); private final Client client; private AlertScheduler scheduler; @@ -76,7 +78,7 @@ public class AlertManager extends AbstractLifecycleComponent { while (attempts < 2) { try { logger.warn("Sleeping [{}]", attempts); - Thread.sleep(10000); + Thread.sleep(20000); logger.warn("Slept"); break; } catch (InterruptedException ie) { @@ -153,6 +155,17 @@ public class AlertManager extends AbstractLifecycleComponent { return actionGet.getStatus(); } + public DateTime timeActionLastTriggered(String alertName) { + Alert indexedAlert; + indexedAlert = getAlertFromIndex(alertName); + if (indexedAlert == null) { + return null; + } else { + return indexedAlert.lastActionFire(); + } + } + + public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) { Alert indexedAlert; try { @@ -259,7 +272,6 @@ public class AlertManager extends AbstractLifecycleComponent { } catch (ElasticsearchException e) { logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh); } - } logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size()); } @@ -269,17 +281,23 @@ public class AlertManager extends AbstractLifecycleComponent { return 0; } - public boolean updateLastRan(String alertName, DateTime fireTime) throws Exception { + public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime, boolean firedAction) throws Exception { try { synchronized (alertMap) { Alert alert = getAlertForName(alertName); alert.lastRan(fireTime); XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint(); + if (firedAction) { + logger.error("Fired action [{}]",firedAction); + alert.lastActionFire(scheduledTime); + } alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS); + logger.error(XContentHelper.convertToJson(alertBuilder.bytes(),false,true)); UpdateRequest updateRequest = new UpdateRequest(); updateRequest.id(alertName); updateRequest.index(ALERT_INDEX); updateRequest.type(ALERT_TYPE); + updateRequest.doc(alertBuilder); updateRequest.refresh(true); client.update(updateRequest).actionGet(); @@ -294,6 +312,7 @@ public class AlertManager extends AbstractLifecycleComponent { public boolean addHistory(String alertName, boolean triggered, DateTime fireTime, SearchRequestBuilder triggeringQuery, AlertTrigger trigger, long numberOfResults, + List actions, @Nullable List indices) throws Exception { XContentBuilder historyEntry = XContentFactory.jsonBuilder(); historyEntry.startObject(); @@ -304,6 +323,12 @@ public class AlertManager extends AbstractLifecycleComponent { trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); historyEntry.field("queryRan", triggeringQuery.toString()); historyEntry.field("numberOfResults", numberOfResults); + historyEntry.field("actions"); + historyEntry.startArray(); + for (AlertAction action : actions) { + action.toXContent(historyEntry, ToXContent.EMPTY_PARAMS); + } + historyEntry.endArray(); if (indices != null) { historyEntry.field("indices"); historyEntry.startArray(); @@ -314,14 +339,15 @@ public class AlertManager extends AbstractLifecycleComponent { } historyEntry.endObject(); IndexRequest indexRequest = new IndexRequest(); - indexRequest.index(ALERT_INDEX); + indexRequest.index(ALERT_HISTORY_INDEX); indexRequest.type(ALERT_HISTORY_TYPE); indexRequest.source(historyEntry); indexRequest.listenerThreaded(false); indexRequest.operationThreaded(false); indexRequest.refresh(true); //Always refresh after indexing an alert indexRequest.opType(IndexRequest.OpType.CREATE); - return client.index(indexRequest).actionGet().isCreated(); + client.index(indexRequest).actionGet().isCreated(); + return true; } public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException{ @@ -473,6 +499,11 @@ public class AlertManager extends AbstractLifecycleComponent { running = new DateTime(fields.get(CURRENTLY_RUNNING.getPreferredName()).toString()); } + DateTime lastActionFire = new DateTime(0); + if (fields.get(LAST_ACTION_FIRE.getPreferredName()) != null) { + lastActionFire = new DateTime(fields.get(LAST_ACTION_FIRE.getPreferredName()).toString()); + } + List indices = new ArrayList<>(); if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){ indices = (List)fields.get(INDICES.getPreferredName()); @@ -495,6 +526,7 @@ public class AlertManager extends AbstractLifecycleComponent { } Alert alert = new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version, enabled, simpleQuery); + alert.lastActionFire(lastActionFire); if (fields.get(TIMESTAMP_FIELD.getPreferredName()) != null) { alert.timestampString(fields.get(TIMESTAMP_FIELD.getPreferredName()).toString()); diff --git a/src/main/java/org/elasticsearch/alerting/AlertRestHandler.java b/src/main/java/org/elasticsearch/alerting/AlertRestHandler.java index 92804d3e1e1..96247084fe4 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertRestHandler.java +++ b/src/main/java/org/elasticsearch/alerting/AlertRestHandler.java @@ -26,6 +26,7 @@ public class AlertRestHandler implements RestHandler { @Inject public AlertRestHandler(RestController restController, AlertManager alertManager) { restController.registerHandler(POST, "/_alerting/_refresh",this); + restController.registerHandler(GET, "/_alerting/_refresh",this); restController.registerHandler(GET, "/_alerting/_list",this); restController.registerHandler(POST, "/_alerting/_create/{name}", this); restController.registerHandler(DELETE, "/_alerting/_delete/{name}", this); @@ -56,7 +57,7 @@ public class AlertRestHandler implements RestHandler { private boolean dispatchRequest(RestRequest request, RestChannel restChannel) throws IOException, InterruptedException, ExecutionException { //@TODO : change these direct calls to actions/request/response/listener once we create the java client API - if (request.method() == POST && request.path().contains("/_refresh")) { + if (request.path().contains("/_refresh")) { alertManager.refreshAlerts(); XContentBuilder builder = getListOfAlerts(); restChannel.sendResponse(new BytesRestResponse(OK,builder)); @@ -66,9 +67,19 @@ public class AlertRestHandler implements RestHandler { restChannel.sendResponse(new BytesRestResponse(OK,builder)); return true; } else if (request.path().contains("/_enable")) { - return alertManager.enableAlert(request.param("name")); + logger.warn("Enabling [{}]", request.param("name")); + String alertName = request.param("name"); + boolean enabled = alertManager.enableAlert(alertName); + XContentBuilder responseBuilder = buildEnabledResponse(alertName, enabled); + restChannel.sendResponse(new BytesRestResponse(OK,responseBuilder)); + return true; } else if (request.path().contains("/_disable")) { - return alertManager.disableAlert(request.param("name")); + logger.warn("Disabling [{}]", request.param("name")); + String alertName = request.param("name"); + boolean enabled = alertManager.disableAlert(alertName); + XContentBuilder responseBuilder = buildEnabledResponse(alertName, enabled); + restChannel.sendResponse(new BytesRestResponse(OK,responseBuilder)); + return true; } else if (request.method() == POST && request.path().contains("/_create")) { //TODO : this should all be moved to an action Alert alert; @@ -108,6 +119,17 @@ public class AlertRestHandler implements RestHandler { return false; } + private XContentBuilder buildEnabledResponse(String alertName, boolean enabled) throws IOException { + XContentBuilder responseBuilder = XContentFactory.jsonBuilder().prettyPrint(); + responseBuilder.startObject(); + responseBuilder.field(alertName); + responseBuilder.startObject(); + responseBuilder.field("enabled",enabled); + responseBuilder.endObject(); + responseBuilder.endObject(); + return responseBuilder; + } + private XContentBuilder getListOfAlerts() throws IOException { Map alertMap = alertManager.getSafeAlertMap(); XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); diff --git a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java index 043b5baee91..9f15865fa99 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.query.*; import org.elasticsearch.script.ExecutableScript; @@ -97,7 +98,7 @@ public class AlertScheduler extends AbstractLifecycleComponent { } //if (logger.isDebugEnabled()) { - logger.warn("Running query [{}]", XContentHelper.convertToJson(srb.request().source(),false,true)); + logger.warn("Running query [{}]", XContentHelper.convertToJson(srb.request().source(), false, true)); //} SearchResponse sr = srb.execute().get(); @@ -106,17 +107,29 @@ public class AlertScheduler extends AbstractLifecycleComponent { triggerManager.isTriggered(alertName,sr), srb, indices, new DateTime(jobExecutionContext.getScheduledFireTime())); + boolean firedAction = false; if (result.isTriggered) { logger.warn("We have triggered"); - actionManager.doAction(alertName,result); - logger.warn("Did action !"); - }else{ + DateTime lastActionFire = alertManager.timeActionLastTriggered(alertName); + long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis(); + logger.error("last action fire [{}]", lastActionFire); + logger.error("msSinceLastAction [{}]", msSinceLastAction); + + if (alert.timePeriod().getMillis() > msSinceLastAction) { + logger.warn("Not firing action because it was fired in the timePeriod"); + } else { + actionManager.doAction(alertName, result); + logger.warn("Did action !"); + firedAction = true; + } + + } else { logger.warn("We didn't trigger"); } - alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime())); + alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime,firedAction); if (!alertManager.addHistory(alertName, result.isTriggered, new DateTime(jobExecutionContext.getScheduledFireTime()), result.query, - result.trigger, result.searchResponse.getHits().getTotalHits(), alert.indices())) + result.trigger, result.searchResponse.getHits().getTotalHits(), alert.actions(), alert.indices())) { logger.warn("Failed to store history for alert [{}]", alertName); }