diff --git a/src/main/java/org/elasticsearch/alerting/Alert.java b/src/main/java/org/elasticsearch/alerting/Alert.java index 91ee4c51649..0a1869ff3d2 100644 --- a/src/main/java/org/elasticsearch/alerting/Alert.java +++ b/src/main/java/org/elasticsearch/alerting/Alert.java @@ -111,11 +111,23 @@ public class Alert { builder.field(AlertManager.TRIGGER_FIELD.getPreferredName()); trigger.toXContent(builder); builder.field(AlertManager.ACTION_FIELD.getPreferredName()); + builder.startObject(); for (AlertAction action : actions){ builder.field(action.getActionName()); action.toXContent(builder); } + builder.endObject(); + + if (indices != null && !indices.isEmpty()) { + builder.field(AlertManager.INDICES.getPreferredName()); + builder.startArray(); + for (String index : indices){ + builder.value(index); + } + builder.endArray(); + } + builder.endObject(); return builder; } diff --git a/src/main/java/org/elasticsearch/alerting/AlertManager.java b/src/main/java/org/elasticsearch/alerting/AlertManager.java index ddf06611791..97457923030 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerting/AlertManager.java @@ -16,8 +16,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -27,10 +30,13 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,7 +48,7 @@ public class AlertManager extends AbstractLifecycleComponent { public final String ALERT_INDEX = ".alerts"; public final String ALERT_TYPE = "alert"; - public final String QUERY_TYPE = "alertQuery"; + public final String ALERT_HISTORY_TYPE = "alertHistory"; public static final ParseField QUERY_FIELD = new ParseField("query"); public static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); @@ -182,6 +188,65 @@ public class AlertManager extends AbstractLifecycleComponent { } } + public long getLastEventCount(String alertName){ + return 0; + } + + public boolean updateLastRan(String alertName, DateTime fireTime) throws Exception { + try { + synchronized (alertMap) { + Alert alert = getAlertForName(alertName); + alert.lastRan(fireTime); + XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint(); + alert.toXContent(alertBuilder); + UpdateRequest updateRequest = new UpdateRequest(); + updateRequest.id(alertName); + updateRequest.index(ALERT_INDEX); + updateRequest.type(ALERT_TYPE); + updateRequest.doc(alertBuilder); + updateRequest.refresh(true); + client.update(updateRequest).actionGet(); + return true; + } + } catch (Throwable t) { + logger.error("Failed to update alert [{}] with lastRan of [{}]",t, alertName, fireTime); + return false; + } + } + + public boolean addHistory(String alertName, boolean triggered, + DateTime fireTime, XContentBuilder triggeringQuery, + AlertTrigger trigger, long numberOfResults, + @Nullable List indices) throws Exception { + XContentBuilder historyEntry = XContentFactory.jsonBuilder(); + historyEntry.startObject(); + historyEntry.field("alertName", alertName); + historyEntry.field("triggered", triggered); + historyEntry.field("fireTime", fireTime.toDateTimeISO()); + historyEntry.field("trigger"); + trigger.toXContent(historyEntry); + historyEntry.field("queryRan", XContentHelper.convertToJson(triggeringQuery.bytes(),false,true)); + historyEntry.field("numberOfResults", numberOfResults); + if (indices != null) { + historyEntry.field("indices"); + historyEntry.startArray(); + for (String index : indices) { + historyEntry.value(index); + } + historyEntry.endArray(); + } + historyEntry.endObject(); + IndexRequest indexRequest = new IndexRequest(); + indexRequest.index(ALERT_INDEX); + indexRequest.type(ALERT_HISTORY_TYPE); + indexRequest.source(historyEntry); + indexRequest.listenerThreaded(false); + indexRequest.operationThreaded(false); + indexRequest.refresh(true); //Always refresh after indexing an alert + indexRequest.opType(IndexRequest.OpType.CREATE); + return client.index(indexRequest).get().isCreated(); + } + public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException{ synchronized (alertMap) { if (alertMap.containsKey(alertName)) { @@ -225,11 +290,12 @@ public class AlertManager extends AbstractLifecycleComponent { XContentBuilder builder; try { builder = XContentFactory.jsonBuilder(); + alert.toXContent(builder); IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName); indexRequest.listenerThreaded(false); indexRequest.operationThreaded(false); indexRequest.refresh(true); //Always refresh after indexing an alert - indexRequest.source(alert.toXContent(builder).bytes(), true); + indexRequest.source(builder); indexRequest.opType(IndexRequest.OpType.CREATE); return client.index(indexRequest).get().isCreated(); } catch (IOException ie) { @@ -278,11 +344,18 @@ public class AlertManager extends AbstractLifecycleComponent { throw new ElasticsearchException("Unable to parse actions [" + triggerObj + "]"); } - DateTime lastRan = new DateTime(fields.get("lastRan").toString()); + DateTime lastRan = new DateTime(0); + if( fields.get(LASTRAN_FIELD.getPreferredName()) != null){ + lastRan = new DateTime(fields.get(LASTRAN_FIELD.getPreferredName()).toString()); + } else if (fields.get("lastRan") != null) { + lastRan = new DateTime(fields.get("lastRan").toString()); + } - List indices = null; + List indices = new ArrayList<>(); if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){ indices = (List)fields.get(INDICES.getPreferredName()); + } else { + logger.warn("Indices : " + fields.get(INDICES.getPreferredName()) + " class " + fields.get(INDICES.getPreferredName()).getClass() ); } return new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices); diff --git a/src/main/java/org/elasticsearch/alerting/AlertRestHandler.java b/src/main/java/org/elasticsearch/alerting/AlertRestHandler.java index 2605dfc18f8..b53c8d07c59 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertRestHandler.java +++ b/src/main/java/org/elasticsearch/alerting/AlertRestHandler.java @@ -5,18 +5,19 @@ */ package org.elasticsearch.alerting; -import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.*; -import org.omg.CORBA.NO_IMPLEMENT; +import java.io.IOException; import java.util.Map; +import java.util.concurrent.ExecutionException; import static org.elasticsearch.rest.RestStatus.*; import static org.elasticsearch.rest.RestRequest.Method.*; @@ -37,40 +38,76 @@ public class AlertRestHandler implements RestHandler { @Override public void handleRequest(RestRequest request, RestChannel restChannel) throws Exception { logger.warn("GOT REST REQUEST"); - //@TODO : change these direct calls to actions/request/response/listener once we create the java client API - if (request.method() == POST && request.path().contains("/_refresh")) { - alertManager.refreshAlerts(); - restChannel.sendResponse(new BytesRestResponse(OK)); - return; - } else if (request.method() == GET && request.path().contains("/_list")) { - Map alertMap = alertManager.getSafeAlertMap(); + try { + if (dispatchRequest(request, restChannel)) { + return; + } + } catch (Throwable t){ XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); builder.startObject(); - for (Map.Entry alertEntry : alertMap.entrySet()) { - builder.field(alertEntry.getKey()); - alertEntry.getValue().toXContent(builder); - } + builder.field("error", t.getMessage()); + builder.field("stack", t.getStackTrace()); builder.endObject(); - restChannel.sendResponse(new BytesRestResponse(OK,builder)); - return; - } else if (request.method() == POST && request.path().contains("/_create")) { - //TODO : this should all be moved to an action - Alert alert = alertManager.parseAlert(request.param("name"), XContentHelper.convertToMap(request.content(), request.contentUnsafe()).v2()); - boolean added = alertManager.addAlert(alert.alertName(), alert, true); - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); - alert.toXContent(builder); - restChannel.sendResponse(new BytesRestResponse(OK,builder)); - return; - } else if (request.method() == DELETE) { - String alertName = request.param("name"); - alertManager.deleteAlert(alertName); - restChannel.sendResponse(new BytesRestResponse(OK)); - return; } - restChannel.sendResponse(new BytesRestResponse(NOT_IMPLEMENTED)); } + private boolean dispatchRequest(RestRequest request, RestChannel restChannel) throws IOException, InterruptedException, ExecutionException { + //@TODO : change these direct calls to actions/request/response/listener once we create the java client API + if (request.method() == POST && request.path().contains("/_refresh")) { + alertManager.refreshAlerts(); + XContentBuilder builder = getListOfAlerts(); + restChannel.sendResponse(new BytesRestResponse(OK,builder)); + return true; + } else if (request.method() == GET && request.path().contains("/_list")) { + XContentBuilder builder = getListOfAlerts(); + restChannel.sendResponse(new BytesRestResponse(OK,builder)); + return true; + } else if (request.method() == POST && request.path().contains("/_create")) { + //TODO : this should all be moved to an action + Alert alert = alertManager.parseAlert(request.param("name"), XContentHelper.convertToMap(request.content(), request.contentUnsafe()).v2()); + try { + boolean added = alertManager.addAlert(alert.alertName(), alert, true); + if (added) { + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + alert.toXContent(builder); + restChannel.sendResponse(new BytesRestResponse(OK, builder)); + } else { + restChannel.sendResponse(new BytesRestResponse(BAD_REQUEST)); + } + } catch (ElasticsearchIllegalArgumentException eia) { + XContentBuilder failed = XContentFactory.jsonBuilder().prettyPrint(); + failed.startObject(); + failed.field("ERROR", eia.getMessage()); + failed.endObject(); + restChannel.sendResponse(new BytesRestResponse(BAD_REQUEST,failed)); + } + + return true; + } else if (request.method() == DELETE) { + String alertName = request.param("name"); + logger.warn("Deleting [{}]", alertName); + boolean successful = alertManager.deleteAlert(alertName); + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.field("Success", successful); + builder.field("alertName", alertName); + restChannel.sendResponse(new BytesRestResponse(OK)); + return true; + } + return false; + } + + private XContentBuilder getListOfAlerts() throws IOException { + Map alertMap = alertManager.getSafeAlertMap(); + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + for (Map.Entry alertEntry : alertMap.entrySet()) { + builder.field(alertEntry.getKey()); + alertEntry.getValue().toXContent(builder); + } + builder.endObject(); + return builder; + } } diff --git a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java index 4bf814a01d9..f17423b16b6 100644 --- a/src/main/java/org/elasticsearch/alerting/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerting/AlertScheduler.java @@ -67,6 +67,7 @@ public class AlertScheduler extends AbstractLifecycleComponent { public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){ logger.warn("Running [{}]",alertName); Alert alert = alertManager.getAlertForName(alertName); + //@TODO : claim alert try { XContentBuilder builder = createClampedQuery(jobExecutionContext, alert); @@ -95,19 +96,22 @@ public class AlertScheduler extends AbstractLifecycleComponent { }else{ logger.warn("We didn't trigger"); } - //@TODO write this back to the alert manager + alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime())); + if (!alertManager.addHistory(alertName, result.isTriggered, + new DateTime(jobExecutionContext.getScheduledFireTime()), result.query, + result.trigger, result.searchResponse.getHits().getTotalHits(), alert.indices())) + { + logger.warn("Failed to store history for alert [{}]", alertName); + } } catch (Exception e) { - logger.error("Fail", e); - logger.error("Failed execute alert [{}]",alert.queryName(), e); + logger.error("Failed execute alert [{}]",e, alert.queryName()); } - } private XContentBuilder createClampedQuery(JobExecutionContext jobExecutionContext, Alert alert) throws IOException { Date scheduledFireTime = jobExecutionContext.getScheduledFireTime(); DateTime clampEnd = new DateTime(scheduledFireTime); DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds()); - XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); builder.startObject(); builder.field("query");