Alerting : Add support for history and fix issues around parsing lastRun.

This commit adds a history log for alerts and updates lastRan with the correct time.

Original commit: elastic/x-pack-elasticsearch@49c77f1ef8
This commit is contained in:
Brian Murphy 2014-08-15 16:46:41 +01:00
parent 4c47c8ba9a
commit 019cdb37d4
4 changed files with 165 additions and 39 deletions

View File

@ -111,11 +111,23 @@ public class Alert {
builder.field(AlertManager.TRIGGER_FIELD.getPreferredName());
trigger.toXContent(builder);
builder.field(AlertManager.ACTION_FIELD.getPreferredName());
builder.startObject();
for (AlertAction action : actions){
builder.field(action.getActionName());
action.toXContent(builder);
}
builder.endObject();
if (indices != null && !indices.isEmpty()) {
builder.field(AlertManager.INDICES.getPreferredName());
builder.startArray();
for (String index : indices){
builder.value(index);
}
builder.endArray();
}
builder.endObject();
return builder;
}

View File

@ -16,8 +16,11 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -27,10 +30,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -42,7 +48,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public final String ALERT_INDEX = ".alerts";
public final String ALERT_TYPE = "alert";
public final String QUERY_TYPE = "alertQuery";
public final String ALERT_HISTORY_TYPE = "alertHistory";
public static final ParseField QUERY_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
@ -182,6 +188,65 @@ public class AlertManager extends AbstractLifecycleComponent {
}
}
public long getLastEventCount(String alertName){
return 0;
}
public boolean updateLastRan(String alertName, DateTime fireTime) throws Exception {
try {
synchronized (alertMap) {
Alert alert = getAlertForName(alertName);
alert.lastRan(fireTime);
XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint();
alert.toXContent(alertBuilder);
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.id(alertName);
updateRequest.index(ALERT_INDEX);
updateRequest.type(ALERT_TYPE);
updateRequest.doc(alertBuilder);
updateRequest.refresh(true);
client.update(updateRequest).actionGet();
return true;
}
} catch (Throwable t) {
logger.error("Failed to update alert [{}] with lastRan of [{}]",t, alertName, fireTime);
return false;
}
}
public boolean addHistory(String alertName, boolean triggered,
DateTime fireTime, XContentBuilder triggeringQuery,
AlertTrigger trigger, long numberOfResults,
@Nullable List<String> indices) throws Exception {
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
historyEntry.startObject();
historyEntry.field("alertName", alertName);
historyEntry.field("triggered", triggered);
historyEntry.field("fireTime", fireTime.toDateTimeISO());
historyEntry.field("trigger");
trigger.toXContent(historyEntry);
historyEntry.field("queryRan", XContentHelper.convertToJson(triggeringQuery.bytes(),false,true));
historyEntry.field("numberOfResults", numberOfResults);
if (indices != null) {
historyEntry.field("indices");
historyEntry.startArray();
for (String index : indices) {
historyEntry.value(index);
}
historyEntry.endArray();
}
historyEntry.endObject();
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(ALERT_INDEX);
indexRequest.type(ALERT_HISTORY_TYPE);
indexRequest.source(historyEntry);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.opType(IndexRequest.OpType.CREATE);
return client.index(indexRequest).get().isCreated();
}
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException{
synchronized (alertMap) {
if (alertMap.containsKey(alertName)) {
@ -225,11 +290,12 @@ public class AlertManager extends AbstractLifecycleComponent {
XContentBuilder builder;
try {
builder = XContentFactory.jsonBuilder();
alert.toXContent(builder);
IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.source(alert.toXContent(builder).bytes(), true);
indexRequest.source(builder);
indexRequest.opType(IndexRequest.OpType.CREATE);
return client.index(indexRequest).get().isCreated();
} catch (IOException ie) {
@ -278,11 +344,18 @@ public class AlertManager extends AbstractLifecycleComponent {
throw new ElasticsearchException("Unable to parse actions [" + triggerObj + "]");
}
DateTime lastRan = new DateTime(fields.get("lastRan").toString());
DateTime lastRan = new DateTime(0);
if( fields.get(LASTRAN_FIELD.getPreferredName()) != null){
lastRan = new DateTime(fields.get(LASTRAN_FIELD.getPreferredName()).toString());
} else if (fields.get("lastRan") != null) {
lastRan = new DateTime(fields.get("lastRan").toString());
}
List<String> indices = null;
List<String> indices = new ArrayList<>();
if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){
indices = (List<String>)fields.get(INDICES.getPreferredName());
} else {
logger.warn("Indices : " + fields.get(INDICES.getPreferredName()) + " class " + fields.get(INDICES.getPreferredName()).getClass() );
}
return new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices);

View File

@ -5,18 +5,19 @@
*/
package org.elasticsearch.alerting;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.*;
import org.omg.CORBA.NO_IMPLEMENT;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static org.elasticsearch.rest.RestStatus.*;
import static org.elasticsearch.rest.RestRequest.Method.*;
@ -37,40 +38,76 @@ public class AlertRestHandler implements RestHandler {
@Override
public void handleRequest(RestRequest request, RestChannel restChannel) throws Exception {
logger.warn("GOT REST REQUEST");
//@TODO : change these direct calls to actions/request/response/listener once we create the java client API
if (request.method() == POST && request.path().contains("/_refresh")) {
alertManager.refreshAlerts();
restChannel.sendResponse(new BytesRestResponse(OK));
return;
} else if (request.method() == GET && request.path().contains("/_list")) {
Map<String, Alert> alertMap = alertManager.getSafeAlertMap();
try {
if (dispatchRequest(request, restChannel)) {
return;
}
} catch (Throwable t){
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
for (Map.Entry<String, Alert> alertEntry : alertMap.entrySet()) {
builder.field(alertEntry.getKey());
alertEntry.getValue().toXContent(builder);
}
builder.field("error", t.getMessage());
builder.field("stack", t.getStackTrace());
builder.endObject();
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return;
} else if (request.method() == POST && request.path().contains("/_create")) {
//TODO : this should all be moved to an action
Alert alert = alertManager.parseAlert(request.param("name"), XContentHelper.convertToMap(request.content(), request.contentUnsafe()).v2());
boolean added = alertManager.addAlert(alert.alertName(), alert, true);
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
alert.toXContent(builder);
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return;
} else if (request.method() == DELETE) {
String alertName = request.param("name");
alertManager.deleteAlert(alertName);
restChannel.sendResponse(new BytesRestResponse(OK));
return;
}
restChannel.sendResponse(new BytesRestResponse(NOT_IMPLEMENTED));
}
private boolean dispatchRequest(RestRequest request, RestChannel restChannel) throws IOException, InterruptedException, ExecutionException {
//@TODO : change these direct calls to actions/request/response/listener once we create the java client API
if (request.method() == POST && request.path().contains("/_refresh")) {
alertManager.refreshAlerts();
XContentBuilder builder = getListOfAlerts();
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return true;
} else if (request.method() == GET && request.path().contains("/_list")) {
XContentBuilder builder = getListOfAlerts();
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return true;
} else if (request.method() == POST && request.path().contains("/_create")) {
//TODO : this should all be moved to an action
Alert alert = alertManager.parseAlert(request.param("name"), XContentHelper.convertToMap(request.content(), request.contentUnsafe()).v2());
try {
boolean added = alertManager.addAlert(alert.alertName(), alert, true);
if (added) {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
alert.toXContent(builder);
restChannel.sendResponse(new BytesRestResponse(OK, builder));
} else {
restChannel.sendResponse(new BytesRestResponse(BAD_REQUEST));
}
} catch (ElasticsearchIllegalArgumentException eia) {
XContentBuilder failed = XContentFactory.jsonBuilder().prettyPrint();
failed.startObject();
failed.field("ERROR", eia.getMessage());
failed.endObject();
restChannel.sendResponse(new BytesRestResponse(BAD_REQUEST,failed));
}
return true;
} else if (request.method() == DELETE) {
String alertName = request.param("name");
logger.warn("Deleting [{}]", alertName);
boolean successful = alertManager.deleteAlert(alertName);
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.field("Success", successful);
builder.field("alertName", alertName);
restChannel.sendResponse(new BytesRestResponse(OK));
return true;
}
return false;
}
private XContentBuilder getListOfAlerts() throws IOException {
Map<String, Alert> alertMap = alertManager.getSafeAlertMap();
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
for (Map.Entry<String, Alert> alertEntry : alertMap.entrySet()) {
builder.field(alertEntry.getKey());
alertEntry.getValue().toXContent(builder);
}
builder.endObject();
return builder;
}
}

View File

@ -67,6 +67,7 @@ public class AlertScheduler extends AbstractLifecycleComponent {
public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){
logger.warn("Running [{}]",alertName);
Alert alert = alertManager.getAlertForName(alertName);
//@TODO : claim alert
try {
XContentBuilder builder = createClampedQuery(jobExecutionContext, alert);
@ -95,19 +96,22 @@ public class AlertScheduler extends AbstractLifecycleComponent {
}else{
logger.warn("We didn't trigger");
}
//@TODO write this back to the alert manager
alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()));
if (!alertManager.addHistory(alertName, result.isTriggered,
new DateTime(jobExecutionContext.getScheduledFireTime()), result.query,
result.trigger, result.searchResponse.getHits().getTotalHits(), alert.indices()))
{
logger.warn("Failed to store history for alert [{}]", alertName);
}
} catch (Exception e) {
logger.error("Fail", e);
logger.error("Failed execute alert [{}]",alert.queryName(), e);
logger.error("Failed execute alert [{}]",e, alert.queryName());
}
}
private XContentBuilder createClampedQuery(JobExecutionContext jobExecutionContext, Alert alert) throws IOException {
Date scheduledFireTime = jobExecutionContext.getScheduledFireTime();
DateTime clampEnd = new DateTime(scheduledFireTime);
DateTime clampStart = clampEnd.minusSeconds((int)alert.timePeriod().seconds());
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
builder.field("query");