Alerting : Claim alert runs

This commit add functionality so that multiple nodes running alert plugins won't stand on
each other trying to run the same alerts at the same time.

Original commit: elastic/x-pack-elasticsearch@9c350c0114
This commit is contained in:
Brian Murphy 2014-08-18 10:27:07 +01:00
parent 83287e009a
commit 26e053eaed
6 changed files with 139 additions and 27 deletions

View File

@ -25,6 +25,24 @@ public class Alert {
private List<AlertAction> actions;
private String schedule;
private DateTime lastRan;
private long version;
private DateTime running;
public DateTime running() {
return running;
}
public void running(DateTime running) {
this.running = running;
}
public long version() {
return version;
}
public void version(long version) {
this.version = version;
}
public List<String> indices() {
return indices;
@ -36,7 +54,6 @@ public class Alert {
private List<String> indices;
public String alertName() {
return alertName;
}
@ -91,7 +108,7 @@ public class Alert {
public Alert(String alertName, String queryName, AlertTrigger trigger,
TimeValue timePeriod, List<AlertAction> actions, String schedule, DateTime lastRan,
List<String> indices){
List<String> indices, DateTime running, long version){
this.alertName = alertName;
this.queryName = queryName;
this.trigger = trigger;
@ -100,14 +117,19 @@ public class Alert {
this.lastRan = lastRan;
this.schedule = schedule;
this.indices = indices;
this.version = version;
this.running = running;
}
public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
//Note we deliberately don't serialize the version here
builder.startObject();
builder.field(AlertManager.QUERY_FIELD.getPreferredName(), queryName);
builder.field(AlertManager.SCHEDULE_FIELD.getPreferredName(), schedule);
builder.field(AlertManager.TIMEPERIOD_FIELD.getPreferredName(), timePeriod);
builder.field(AlertManager.LASTRAN_FIELD.getPreferredName(), lastRan);
builder.field(AlertManager.CURRENTLY_RUNNING.getPreferredName(), running);
builder.field(AlertManager.TRIGGER_FIELD.getPreferredName());
trigger.toXContent(builder);
builder.field(AlertManager.ACTION_FIELD.getPreferredName());

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.alerting;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -20,12 +21,12 @@ public class AlertActionManager extends AbstractComponent {
private final Map<String, AlertActionFactory> actionImplemented;
@Inject
public AlertActionManager(Settings settings, AlertManager alertManager) {
public AlertActionManager(Settings settings, AlertManager alertManager, Client client) {
super(settings);
this.alertManager = alertManager;
this.actionImplemented = new HashMap<>();
registerAction("email", new EmailAlertActionFactory());
registerAction("index", new IndexAlertActionFactory());
registerAction("index", new IndexAlertActionFactory(client));
}
public void registerAction(String name, AlertActionFactory actionFactory){

View File

@ -14,6 +14,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
@ -57,6 +59,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public static final ParseField ACTION_FIELD = new ParseField("action");
public static final ParseField LASTRAN_FIELD = new ParseField("lastRan");
public static final ParseField INDICES = new ParseField("indices");
public static final ParseField CURRENTLY_RUNNING = new ParseField("running");
private final Client client;
private AlertScheduler scheduler;
@ -146,14 +149,68 @@ public class AlertManager extends AbstractLifecycleComponent {
this.scheduler = scheduler;
}
private ClusterHealthStatus createAlertsIndex() throws InterruptedException, ExecutionException {
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().get(); //TODO FIX MAPPINGS
private ClusterHealthStatus createAlertsIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
logger.warn(cir.toString());
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
return actionGet.getStatus();
}
public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) {
Alert alert;
try {
alert = getAlertFromIndex(alertName);
if (alert.running().equals(scheduleRunTime) || alert.running().isAfter(scheduleRunTime)) {
//Someone else is already running this alert or this alert time has passed
return false;
}
} catch (Throwable t) {
throw new ElasticsearchException("Unable to load alert from index",t);
}
alert.running(scheduleRunTime);
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(ALERT_INDEX);
updateRequest.type(ALERT_TYPE);
updateRequest.id(alertName);
updateRequest.version(alert.version());//Since we loaded this alert directly from the index the version should be correct
XContentBuilder alertBuilder;
try {
alertBuilder = XContentFactory.jsonBuilder();
alert.toXContent(alertBuilder);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to serialize alert ["+ alertName + "]", ie);
}
updateRequest.doc(alertBuilder);
updateRequest.retryOnConflict(0);
try {
client.update(updateRequest).actionGet();
} catch (ElasticsearchException ee) {
logger.error("Failed to update in claim", ee);
return false;
}
synchronized (alertMap) { //Update the alert map
if (alertMap.containsKey(alertName)) {
alertMap.get(alertName).running(scheduleRunTime);
}
}
return true;
}
private Alert getAlertFromIndex(String alertName) {
GetRequest getRequest = Requests.getRequest(ALERT_INDEX);
getRequest.type(ALERT_TYPE);
getRequest.id(alertName);
GetResponse getResponse = client.get(getRequest).actionGet();
if (getResponse.isExists()) {
return parseAlert(alertName, getResponse.getSourceAsMap(), getResponse.getVersion());
} else {
throw new ElasticsearchException("Unable to find [" + alertName + "] in the [" +ALERT_INDEX + "]" );
}
}
public void refreshAlerts() {
try {
synchronized (alertMap) {
@ -167,8 +224,8 @@ public class AlertManager extends AbstractLifecycleComponent {
}
}
private void loadAlerts() throws InterruptedException, ExecutionException{
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().get().isExists()) {
private void loadAlerts() {
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex();
}
@ -178,11 +235,16 @@ public class AlertManager extends AbstractLifecycleComponent {
"{ \"match_all\" : {}}," +
"\"size\" : \"100\"" +
"}"
).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().get();
).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().actionGet();
for (SearchHit sh : searchResponse.getHits()) {
String alertId = sh.getId();
Alert alert = parseAlert(alertId, sh);
alertMap.put(alertId, alert);
try {
Alert alert = parseAlert(alertId, sh);
alertMap.put(alertId, alert);
} catch (ElasticsearchException e) {
logger.error("Unable to parse [{}] as an alert this alert will be skipped.",e,sh);
}
}
logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size());
}
@ -244,7 +306,7 @@ public class AlertManager extends AbstractLifecycleComponent {
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.opType(IndexRequest.OpType.CREATE);
return client.index(indexRequest).get().isCreated();
return client.index(indexRequest).actionGet().isCreated();
}
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException{
@ -279,7 +341,7 @@ public class AlertManager extends AbstractLifecycleComponent {
return false;
}
public boolean addAlert(String alertName, Alert alert, boolean persist) throws InterruptedException, ExecutionException{
public boolean addAlert(String alertName, Alert alert, boolean persist) {
synchronized (alertMap) {
if (alertMap.containsKey(alertName)) {
throw new ElasticsearchIllegalArgumentException("There is already an alert named ["+alertName+"]");
@ -297,7 +359,7 @@ public class AlertManager extends AbstractLifecycleComponent {
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.source(builder);
indexRequest.opType(IndexRequest.OpType.CREATE);
return client.index(indexRequest).get().isCreated();
return client.index(indexRequest).actionGet().isCreated();
} catch (IOException ie) {
throw new ElasticsearchIllegalStateException("Unable to convert alert to JSON", ie);
}
@ -311,10 +373,17 @@ public class AlertManager extends AbstractLifecycleComponent {
private Alert parseAlert(String alertId, SearchHit sh) {
Map<String, Object> fields = sh.sourceAsMap();
return parseAlert(alertId,fields);
return parseAlert(alertId, fields, sh.getVersion());
}
public Alert parseAlert(String alertId, Map<String, Object> fields) {
return parseAlert(alertId, fields, -1);
}
public Alert parseAlert(String alertId, Map<String, Object> fields, long version ) {
//Map<String,SearchHitField> fields = sh.getFields();
logger.warn("Parsing : [{}]", alertId);
for (String field : fields.keySet() ) {
@ -351,6 +420,11 @@ public class AlertManager extends AbstractLifecycleComponent {
lastRan = new DateTime(fields.get("lastRan").toString());
}
DateTime running = new DateTime(0);
if (fields.get(CURRENTLY_RUNNING.getPreferredName()) != null) {
running = new DateTime(fields.get(CURRENTLY_RUNNING.getPreferredName()).toString());
}
List<String> indices = new ArrayList<>();
if (fields.get(INDICES.getPreferredName()) != null && fields.get(INDICES.getPreferredName()) instanceof List){
indices = (List<String>)fields.get(INDICES.getPreferredName());
@ -358,7 +432,7 @@ public class AlertManager extends AbstractLifecycleComponent {
logger.warn("Indices : " + fields.get(INDICES.getPreferredName()) + " class " + fields.get(INDICES.getPreferredName()).getClass() );
}
return new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices);
return new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices, running, version);
}
public Map<String,Alert> getSafeAlertMap() {

View File

@ -66,9 +66,12 @@ public class AlertScheduler extends AbstractLifecycleComponent {
public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){
logger.warn("Running [{}]",alertName);
Alert alert = alertManager.getAlertForName(alertName);
//@TODO : claim alert
DateTime scheduledTime = new DateTime(jobExecutionContext.getScheduledFireTime());
try {
if (!alertManager.claimAlertRun(alertName, scheduledTime) ){
logger.warn("Another process has already run this alert.");
return;
}
XContentBuilder builder = createClampedQuery(jobExecutionContext, alert);
logger.warn("Running the following query : [{}]", builder.string());
@ -99,7 +102,7 @@ public class AlertScheduler extends AbstractLifecycleComponent {
logger.warn("Failed to store history for alert [{}]", alertName);
}
} catch (Exception e) {
logger.error("Failed execute alert [{}]",e, alert.queryName());
logger.error("Failed execute alert [{}]", e, alertName);
}
}

View File

@ -19,16 +19,14 @@ public class IndexAlertAction implements AlertAction {
private final String type;
private Client client = null;
public IndexAlertAction(String index, String type){
@Inject
public IndexAlertAction(String index, String type, Client client){
this.index = index;
this.type = type;
}
@Inject
public void setClient(Client client){
this.client = client;
}
@Override
public String getActionName() {
return "index";

View File

@ -6,16 +6,22 @@
package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import java.util.Locale;
import java.util.Map;
/**
* Created by brian on 8/17/14.
*/
public class IndexAlertActionFactory implements AlertActionFactory {
Client client;
public IndexAlertActionFactory(Client client){
this.client = client;
}
@Override
public AlertAction createAction(Object parameters) {
@ -23,13 +29,21 @@ public class IndexAlertActionFactory implements AlertActionFactory {
if (parameters instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) parameters;
String index = paramMap.get("index").toString();
if (!index.toLowerCase(Locale.ROOT).equals(index)) {
throw new ElasticsearchIllegalArgumentException("Index names must be all lowercase");
}
String type = paramMap.get("type").toString();
return new IndexAlertAction(index, type);
if (!type.toLowerCase(Locale.ROOT).equals(type)) {
throw new ElasticsearchIllegalArgumentException("Type names must be all lowercase");
}
return new IndexAlertAction(index, type, client);
} else {
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an EmailAlertAction");
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an IndexAlertAction");
}
} catch (Throwable t){
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an EmailAlertAction");
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an IndexAlertAction", t);
}
}
}