Alerting : Cleanup and fixes.

These changes fix the alert throttling during the time period, move alert history it's own
index.

Original commit: elastic/x-pack-elasticsearch@5130637824
This commit is contained in:
Brian Murphy 2014-08-21 16:33:11 +01:00
parent 47e1e77b58
commit 8c623534c2
4 changed files with 89 additions and 58 deletions

View File

@ -22,51 +22,8 @@ public class Alert implements ToXContent{
private TimeValue timePeriod;
private List<AlertAction> actions;
private String schedule;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Alert alert = (Alert) o;
if (enabled != alert.enabled) return false;
if (simpleQuery != alert.simpleQuery) return false;
if (version != alert.version) return false;
if (actions != null ? !actions.equals(alert.actions) : alert.actions != null) return false;
if (alertName != null ? !alertName.equals(alert.alertName) : alert.alertName != null) return false;
if (indices != null ? !indices.equals(alert.indices) : alert.indices != null) return false;
if (lastRan != null ? !lastRan.equals(alert.lastRan) : alert.lastRan != null) return false;
if (queryName != null ? !queryName.equals(alert.queryName) : alert.queryName != null) return false;
if (running != null ? !running.equals(alert.running) : alert.running != null) return false;
if (schedule != null ? !schedule.equals(alert.schedule) : alert.schedule != null) return false;
if (timePeriod != null ? !timePeriod.equals(alert.timePeriod) : alert.timePeriod != null) return false;
if (timestampString != null ? !timestampString.equals(alert.timestampString) : alert.timestampString != null)
return false;
if (trigger != null ? !trigger.equals(alert.trigger) : alert.trigger != null) return false;
return true;
}
@Override
public int hashCode() {
int result = alertName != null ? alertName.hashCode() : 0;
result = 31 * result + (queryName != null ? queryName.hashCode() : 0);
result = 31 * result + (trigger != null ? trigger.hashCode() : 0);
result = 31 * result + (timePeriod != null ? timePeriod.hashCode() : 0);
result = 31 * result + (actions != null ? actions.hashCode() : 0);
result = 31 * result + (schedule != null ? schedule.hashCode() : 0);
result = 31 * result + (lastRan != null ? lastRan.hashCode() : 0);
result = 31 * result + (int) (version ^ (version >>> 32));
result = 31 * result + (running != null ? running.hashCode() : 0);
result = 31 * result + (enabled ? 1 : 0);
result = 31 * result + (simpleQuery ? 1 : 0);
result = 31 * result + (timestampString != null ? timestampString.hashCode() : 0);
result = 31 * result + (indices != null ? indices.hashCode() : 0);
return result;
}
private DateTime lastRan;
private DateTime lastActionFire;
private long version;
private DateTime running;
private boolean enabled;
@ -81,7 +38,13 @@ public class Alert implements ToXContent{
this.timestampString = timestampString;
}
public DateTime lastActionFire() {
return lastActionFire;
}
public void lastActionFire(DateTime lastActionFire) {
this.lastActionFire = lastActionFire;
}
public boolean simpleQuery() {
return simpleQuery;
@ -206,6 +169,7 @@ public class Alert implements ToXContent{
builder.field(AlertManager.CURRENTLY_RUNNING.getPreferredName(), running);
builder.field(AlertManager.ENABLED.getPreferredName(), enabled);
builder.field(AlertManager.SIMPLE_QUERY.getPreferredName(), simpleQuery);
builder.field(AlertManager.LAST_ACTION_FIRE.getPreferredName(), lastActionFire);
builder.field(AlertManager.TRIGGER_FIELD.getPreferredName());
trigger.toXContent(builder, params);

View File

@ -43,6 +43,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public final String ALERT_INDEX = ".alerts";
public final String ALERT_TYPE = "alert";
public final String ALERT_HISTORY_INDEX = "alerthistory";
public final String ALERT_HISTORY_TYPE = "alertHistory";
public static final ParseField QUERY_FIELD = new ParseField("query");
@ -56,6 +57,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public static final ParseField ENABLED = new ParseField("enabled");
public static final ParseField SIMPLE_QUERY = new ParseField("simple");
public static final ParseField TIMESTAMP_FIELD = new ParseField("timefield");
public static final ParseField LAST_ACTION_FIRE = new ParseField("lastactionfire");
private final Client client;
private AlertScheduler scheduler;
@ -76,7 +78,7 @@ public class AlertManager extends AbstractLifecycleComponent {
while (attempts < 2) {
try {
logger.warn("Sleeping [{}]", attempts);
Thread.sleep(10000);
Thread.sleep(20000);
logger.warn("Slept");
break;
} catch (InterruptedException ie) {
@ -153,6 +155,17 @@ public class AlertManager extends AbstractLifecycleComponent {
return actionGet.getStatus();
}
public DateTime timeActionLastTriggered(String alertName) {
Alert indexedAlert;
indexedAlert = getAlertFromIndex(alertName);
if (indexedAlert == null) {
return null;
} else {
return indexedAlert.lastActionFire();
}
}
public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) {
Alert indexedAlert;
try {
@ -259,7 +272,6 @@ public class AlertManager extends AbstractLifecycleComponent {
} catch (ElasticsearchException e) {
logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh);
}
}
logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size());
}
@ -269,17 +281,23 @@ public class AlertManager extends AbstractLifecycleComponent {
return 0;
}
public boolean updateLastRan(String alertName, DateTime fireTime) throws Exception {
public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime, boolean firedAction) throws Exception {
try {
synchronized (alertMap) {
Alert alert = getAlertForName(alertName);
alert.lastRan(fireTime);
XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint();
if (firedAction) {
logger.error("Fired action [{}]",firedAction);
alert.lastActionFire(scheduledTime);
}
alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS);
logger.error(XContentHelper.convertToJson(alertBuilder.bytes(),false,true));
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.id(alertName);
updateRequest.index(ALERT_INDEX);
updateRequest.type(ALERT_TYPE);
updateRequest.doc(alertBuilder);
updateRequest.refresh(true);
client.update(updateRequest).actionGet();
@ -294,6 +312,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public boolean addHistory(String alertName, boolean triggered,
DateTime fireTime, SearchRequestBuilder triggeringQuery,
AlertTrigger trigger, long numberOfResults,
List<AlertAction> actions,
@Nullable List<String> indices) throws Exception {
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
historyEntry.startObject();
@ -304,6 +323,12 @@ public class AlertManager extends AbstractLifecycleComponent {
trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
historyEntry.field("queryRan", triggeringQuery.toString());
historyEntry.field("numberOfResults", numberOfResults);
historyEntry.field("actions");
historyEntry.startArray();
for (AlertAction action : actions) {
action.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
}
historyEntry.endArray();
if (indices != null) {
historyEntry.field("indices");
historyEntry.startArray();
@ -314,14 +339,15 @@ public class AlertManager extends AbstractLifecycleComponent {
}
historyEntry.endObject();
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(ALERT_INDEX);
indexRequest.index(ALERT_HISTORY_INDEX);
indexRequest.type(ALERT_HISTORY_TYPE);
indexRequest.source(historyEntry);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.opType(IndexRequest.OpType.CREATE);
return client.index(indexRequest).actionGet().isCreated();
client.index(indexRequest).actionGet().isCreated();
return true;
}
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException{
@ -473,6 +499,11 @@ public class AlertManager extends AbstractLifecycleComponent {
running = new DateTime(fields.get(CURRENTLY_RUNNING.getPreferredName()).toString());
}
DateTime lastActionFire = new DateTime(0);
if (fields.get(LAST_ACTION_FIRE.getPreferredName()) != null) {
lastActionFire = new DateTime(fields.get(LAST_ACTION_FIRE.getPreferredName()).toString());
}
List<String> indices = new ArrayList<>();
if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){
indices = (List<String>)fields.get(INDICES.getPreferredName());
@ -495,6 +526,7 @@ public class AlertManager extends AbstractLifecycleComponent {
}
Alert alert = new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version, enabled, simpleQuery);
alert.lastActionFire(lastActionFire);
if (fields.get(TIMESTAMP_FIELD.getPreferredName()) != null) {
alert.timestampString(fields.get(TIMESTAMP_FIELD.getPreferredName()).toString());

View File

@ -26,6 +26,7 @@ public class AlertRestHandler implements RestHandler {
@Inject
public AlertRestHandler(RestController restController, AlertManager alertManager) {
restController.registerHandler(POST, "/_alerting/_refresh",this);
restController.registerHandler(GET, "/_alerting/_refresh",this);
restController.registerHandler(GET, "/_alerting/_list",this);
restController.registerHandler(POST, "/_alerting/_create/{name}", this);
restController.registerHandler(DELETE, "/_alerting/_delete/{name}", this);
@ -56,7 +57,7 @@ public class AlertRestHandler implements RestHandler {
private boolean dispatchRequest(RestRequest request, RestChannel restChannel) throws IOException, InterruptedException, ExecutionException {
//@TODO : change these direct calls to actions/request/response/listener once we create the java client API
if (request.method() == POST && request.path().contains("/_refresh")) {
if (request.path().contains("/_refresh")) {
alertManager.refreshAlerts();
XContentBuilder builder = getListOfAlerts();
restChannel.sendResponse(new BytesRestResponse(OK,builder));
@ -66,9 +67,19 @@ public class AlertRestHandler implements RestHandler {
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return true;
} else if (request.path().contains("/_enable")) {
return alertManager.enableAlert(request.param("name"));
logger.warn("Enabling [{}]", request.param("name"));
String alertName = request.param("name");
boolean enabled = alertManager.enableAlert(alertName);
XContentBuilder responseBuilder = buildEnabledResponse(alertName, enabled);
restChannel.sendResponse(new BytesRestResponse(OK,responseBuilder));
return true;
} else if (request.path().contains("/_disable")) {
return alertManager.disableAlert(request.param("name"));
logger.warn("Disabling [{}]", request.param("name"));
String alertName = request.param("name");
boolean enabled = alertManager.disableAlert(alertName);
XContentBuilder responseBuilder = buildEnabledResponse(alertName, enabled);
restChannel.sendResponse(new BytesRestResponse(OK,responseBuilder));
return true;
} else if (request.method() == POST && request.path().contains("/_create")) {
//TODO : this should all be moved to an action
Alert alert;
@ -108,6 +119,17 @@ public class AlertRestHandler implements RestHandler {
return false;
}
private XContentBuilder buildEnabledResponse(String alertName, boolean enabled) throws IOException {
XContentBuilder responseBuilder = XContentFactory.jsonBuilder().prettyPrint();
responseBuilder.startObject();
responseBuilder.field(alertName);
responseBuilder.startObject();
responseBuilder.field("enabled",enabled);
responseBuilder.endObject();
responseBuilder.endObject();
return responseBuilder;
}
private XContentBuilder getListOfAlerts() throws IOException {
Map<String, Alert> alertMap = alertManager.getSafeAlertMap();
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.query.*;
import org.elasticsearch.script.ExecutableScript;
@ -97,7 +98,7 @@ public class AlertScheduler extends AbstractLifecycleComponent {
}
//if (logger.isDebugEnabled()) {
logger.warn("Running query [{}]", XContentHelper.convertToJson(srb.request().source(),false,true));
logger.warn("Running query [{}]", XContentHelper.convertToJson(srb.request().source(), false, true));
//}
SearchResponse sr = srb.execute().get();
@ -106,17 +107,29 @@ public class AlertScheduler extends AbstractLifecycleComponent {
triggerManager.isTriggered(alertName,sr), srb, indices,
new DateTime(jobExecutionContext.getScheduledFireTime()));
boolean firedAction = false;
if (result.isTriggered) {
logger.warn("We have triggered");
actionManager.doAction(alertName,result);
logger.warn("Did action !");
}else{
DateTime lastActionFire = alertManager.timeActionLastTriggered(alertName);
long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis();
logger.error("last action fire [{}]", lastActionFire);
logger.error("msSinceLastAction [{}]", msSinceLastAction);
if (alert.timePeriod().getMillis() > msSinceLastAction) {
logger.warn("Not firing action because it was fired in the timePeriod");
} else {
actionManager.doAction(alertName, result);
logger.warn("Did action !");
firedAction = true;
}
} else {
logger.warn("We didn't trigger");
}
alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()));
alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime,firedAction);
if (!alertManager.addHistory(alertName, result.isTriggered,
new DateTime(jobExecutionContext.getScheduledFireTime()), result.query,
result.trigger, result.searchResponse.getHits().getTotalHits(), alert.indices()))
result.trigger, result.searchResponse.getHits().getTotalHits(), alert.actions(), alert.indices()))
{
logger.warn("Failed to store history for alert [{}]", alertName);
}