Alerting : Start of multiprocess coordination work

On alert claim time, query index to make sure alert is still valid and enabled and is the same alert.

Original commit: elastic/x-pack-elasticsearch@72b816b7d1
This commit is contained in:
Brian Murphy 2014-08-19 13:31:01 +01:00
parent 0eea73dd72
commit 47e1e77b58
4 changed files with 149 additions and 23 deletions

View File

@ -22,6 +22,50 @@ public class Alert implements ToXContent{
private TimeValue timePeriod;
private List<AlertAction> actions;
private String schedule;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Alert alert = (Alert) o;
if (enabled != alert.enabled) return false;
if (simpleQuery != alert.simpleQuery) return false;
if (version != alert.version) return false;
if (actions != null ? !actions.equals(alert.actions) : alert.actions != null) return false;
if (alertName != null ? !alertName.equals(alert.alertName) : alert.alertName != null) return false;
if (indices != null ? !indices.equals(alert.indices) : alert.indices != null) return false;
if (lastRan != null ? !lastRan.equals(alert.lastRan) : alert.lastRan != null) return false;
if (queryName != null ? !queryName.equals(alert.queryName) : alert.queryName != null) return false;
if (running != null ? !running.equals(alert.running) : alert.running != null) return false;
if (schedule != null ? !schedule.equals(alert.schedule) : alert.schedule != null) return false;
if (timePeriod != null ? !timePeriod.equals(alert.timePeriod) : alert.timePeriod != null) return false;
if (timestampString != null ? !timestampString.equals(alert.timestampString) : alert.timestampString != null)
return false;
if (trigger != null ? !trigger.equals(alert.trigger) : alert.trigger != null) return false;
return true;
}
@Override
public int hashCode() {
int result = alertName != null ? alertName.hashCode() : 0;
result = 31 * result + (queryName != null ? queryName.hashCode() : 0);
result = 31 * result + (trigger != null ? trigger.hashCode() : 0);
result = 31 * result + (timePeriod != null ? timePeriod.hashCode() : 0);
result = 31 * result + (actions != null ? actions.hashCode() : 0);
result = 31 * result + (schedule != null ? schedule.hashCode() : 0);
result = 31 * result + (lastRan != null ? lastRan.hashCode() : 0);
result = 31 * result + (int) (version ^ (version >>> 32));
result = 31 * result + (running != null ? running.hashCode() : 0);
result = 31 * result + (enabled ? 1 : 0);
result = 31 * result + (simpleQuery ? 1 : 0);
result = 31 * result + (timestampString != null ? timestampString.hashCode() : 0);
result = 31 * result + (indices != null ? indices.hashCode() : 0);
return result;
}
private DateTime lastRan;
private long version;
private DateTime running;
@ -186,4 +230,23 @@ public class Alert implements ToXContent{
builder.endObject();
return builder;
}
public boolean isSameAlert(Alert otherAlert) {
if (this == otherAlert) return true;
if (enabled != otherAlert.enabled) return false;
if (simpleQuery != otherAlert.simpleQuery) return false;
if (actions != null ? !actions.equals(otherAlert.actions) : otherAlert.actions != null) return false;
if (alertName != null ? !alertName.equals(otherAlert.alertName) : otherAlert.alertName != null) return false;
if (indices != null ? !indices.equals(otherAlert.indices) : otherAlert.indices != null) return false;
if (queryName != null ? !queryName.equals(otherAlert.queryName) : otherAlert.queryName != null) return false;
if (schedule != null ? !schedule.equals(otherAlert.schedule) : otherAlert.schedule != null) return false;
if (timePeriod != null ? !timePeriod.equals(otherAlert.timePeriod) : otherAlert.timePeriod != null) return false;
if (timestampString != null ? !timestampString.equals(otherAlert.timestampString) : otherAlert.timestampString != null)
return false;
if (trigger != null ? !trigger.equals(otherAlert.trigger) : otherAlert.trigger != null) return false;
return true;
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
@ -153,26 +154,45 @@ public class AlertManager extends AbstractLifecycleComponent {
}
public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) {
Alert alert;
Alert indexedAlert;
try {
alert = getAlertFromIndex(alertName);
if (alert.running().equals(scheduleRunTime) || alert.running().isAfter(scheduleRunTime)) {
indexedAlert = getAlertFromIndex(alertName);
synchronized (alertMap) {
Alert inMemoryAlert = alertMap.get(alertName);
if (indexedAlert == null) {
//Alert has been deleted out from underneath us
alertMap.remove(alertName);
return false;
} else if (inMemoryAlert == null) {
logger.warn("Got claim attempt for alert [{}] that alert manager does not have but is in the index.", alertName);
alertMap.put(alertName, indexedAlert); //This is an odd state to get into
} else {
if (!inMemoryAlert.isSameAlert(indexedAlert)) {
alertMap.put(alertName, indexedAlert); //Probably has been changed by another process and we missed the notification
}
}
if (!indexedAlert.enabled()) {
return false;
}
}
if (indexedAlert.running().equals(scheduleRunTime) || indexedAlert.running().isAfter(scheduleRunTime)) {
//Someone else is already running this alert or this alert time has passed
return false;
}
} catch (Throwable t) {
throw new ElasticsearchException("Unable to load alert from index",t);
}
alert.running(scheduleRunTime);
indexedAlert.running(scheduleRunTime);
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(ALERT_INDEX);
updateRequest.type(ALERT_TYPE);
updateRequest.id(alertName);
updateRequest.version(alert.version());//Since we loaded this alert directly from the index the version should be correct
updateRequest.version(indexedAlert.version());//Since we loaded this alert directly from the index the version should be correct
XContentBuilder alertBuilder;
try {
alertBuilder = XContentFactory.jsonBuilder();
alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS);
indexedAlert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to serialize alert ["+ alertName + "]", ie);
}
@ -187,7 +207,7 @@ public class AlertManager extends AbstractLifecycleComponent {
}
synchronized (alertMap) { //Update the alert map
if (alertMap.containsKey(alertName)) {
if (alertMap.containsKey(alertName)) { //Check here since it may have been deleted
alertMap.get(alertName).running(scheduleRunTime);
}
}
@ -343,21 +363,9 @@ public class AlertManager extends AbstractLifecycleComponent {
} else {
alertMap.put(alertName, alert);
scheduler.addAlert(alertName,alert);
if (persist) {
XContentBuilder builder;
try {
builder = XContentFactory.jsonBuilder();
alert.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.source(builder);
indexRequest.opType(IndexRequest.OpType.CREATE);
return client.index(indexRequest).actionGet().isCreated();
} catch (IOException ie) {
throw new ElasticsearchIllegalStateException("Unable to convert alert to JSON", ie);
}
return persistAlert(alertName, alert, IndexRequest.OpType.CREATE);
} else {
return true;
}
@ -365,6 +373,51 @@ public class AlertManager extends AbstractLifecycleComponent {
}
}
public boolean disableAlert(String alertName) {
synchronized (alertMap) {
Alert alert = alertMap.get(alertName);
if (alert == null) {
throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]");
}
alert.enabled(false);
return persistAlert(alertName, alert, IndexRequest.OpType.INDEX);
}
}
public boolean enableAlert(String alertName) {
synchronized (alertMap) {
Alert alert = alertMap.get(alertName);
if (alert == null) {
throw new ElasticsearchIllegalArgumentException("Could not find an alert named [" + alertName + "]");
}
alert.enabled(true);
return persistAlert(alertName, alert, IndexRequest.OpType.INDEX);
}
}
private boolean persistAlert(String alertName, Alert alert, IndexRequest.OpType opType) {
XContentBuilder builder;
try {
builder = XContentFactory.jsonBuilder();
alert.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.source(builder);
indexRequest.opType(opType);
IndexResponse indexResponse = client.index(indexRequest).actionGet();
//@TODO : broadcast refresh here
if (opType.equals(IndexRequest.OpType.CREATE)) {
return indexResponse.isCreated();
} else {
return true;
}
} catch (IOException ie) {
throw new ElasticsearchIllegalStateException("Unable to convert alert to JSON", ie);
}
}
private Alert parseAlert(String alertId, SearchHit sh) {
Map<String, Object> fields = sh.sourceAsMap();

View File

@ -29,12 +29,16 @@ public class AlertRestHandler implements RestHandler {
restController.registerHandler(GET, "/_alerting/_list",this);
restController.registerHandler(POST, "/_alerting/_create/{name}", this);
restController.registerHandler(DELETE, "/_alerting/_delete/{name}", this);
restController.registerHandler(GET, "/_alerting/_enable/{name}", this);
restController.registerHandler(GET, "/_alerting/_disable/{name}", this);
restController.registerHandler(POST, "/_alerting/_enable/{name}", this);
restController.registerHandler(POST, "/_alerting/_disable/{name}", this);
this.alertManager = alertManager;
}
@Override
public void handleRequest(RestRequest request, RestChannel restChannel) throws Exception {
logger.warn("GOT REST REQUEST");
try {
if (dispatchRequest(request, restChannel)) {
return;
@ -61,6 +65,10 @@ public class AlertRestHandler implements RestHandler {
XContentBuilder builder = getListOfAlerts();
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return true;
} else if (request.path().contains("/_enable")) {
return alertManager.enableAlert(request.param("name"));
} else if (request.path().contains("/_disable")) {
return alertManager.disableAlert(request.param("name"));
} else if (request.method() == POST && request.path().contains("/_create")) {
//TODO : this should all be moved to an action
Alert alert;

View File

@ -86,6 +86,7 @@ public class AlertScheduler extends AbstractLifecycleComponent {
logger.warn("Another process has already run this alert.");
return;
}
alert = alertManager.getAlertForName(alertName); //The claim may have triggered a refresh
SearchRequestBuilder srb = createClampedRequest(client, jobExecutionContext, alert);
String[] indices = alert.indices().toArray(new String[0]);
@ -135,8 +136,9 @@ public class AlertScheduler extends AbstractLifecycleComponent {
filterBuilder.lt(clampEnd);
return client.prepareSearch().setQuery(new FilteredQueryBuilder(queryBuilder, filterBuilder));
} else {
//We can't just wrap the template here since it probably contains aggs or something else that doesn't play nice with FilteredQuery
Map<String,Object> fromToMap = new HashMap<>();
fromToMap.put("from", clampStart);
fromToMap.put("from", clampStart); //@TODO : make these parameters configurable ? Don't want to bloat the API too much tho
fromToMap.put("to", clampEnd);
//Go and get the search template from the script service :(
ExecutableScript script = scriptService.executable("mustache", alert.queryName(), ScriptService.ScriptType.INDEXED, fromToMap);