Alerting : REST endpoints

This commit adds REST endpoints for list/delete/create and fixes refresh.

Original commit: elastic/x-pack-elasticsearch@36e2c15753
This commit is contained in:
Brian Murphy 2014-08-15 11:45:45 +01:00
parent e648cc7f82
commit 4c47c8ba9a
9 changed files with 217 additions and 26 deletions

View File

@ -7,7 +7,11 @@ package org.elasticsearch.alerting;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.List;
/**
@ -98,7 +102,21 @@ public class Alert {
this.indices = indices;
}
public String toJSON(){
return null;
public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
builder.startObject();
builder.field(AlertManager.QUERY_FIELD.getPreferredName(), queryName);
builder.field(AlertManager.SCHEDULE_FIELD.getPreferredName(), schedule);
builder.field(AlertManager.TIMEPERIOD_FIELD.getPreferredName(), timePeriod);
builder.field(AlertManager.LASTRAN_FIELD.getPreferredName(), lastRan);
builder.field(AlertManager.TRIGGER_FIELD.getPreferredName());
trigger.toXContent(builder);
builder.field(AlertManager.ACTION_FIELD.getPreferredName());
builder.startObject();
for (AlertAction action : actions){
builder.field(action.getActionName());
action.toXContent(builder);
}
builder.endObject();
return builder;
}
}

View File

@ -5,6 +5,14 @@
*/
package org.elasticsearch.alerting;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
public interface AlertAction {
public String getActionName();
public XContentBuilder toXContent(XContentBuilder builder) throws IOException;
public boolean doAction(String alertName, AlertResult alert);
}

View File

@ -5,11 +5,9 @@
*/
package org.elasticsearch.alerting;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchHitField;
import java.util.ArrayList;
import java.util.HashMap;
@ -26,17 +24,21 @@ public class AlertActionManager extends AbstractComponent {
super(settings);
this.alertManager = alertManager;
this.actionImplemented = new HashMap<>();
actionImplemented.put("email", new EmailAlertActionFactory());
registerAction("email", new EmailAlertActionFactory());
}
public void registerAction(String name, AlertActionFactory actionFactory){
actionImplemented.put(name, actionFactory);
synchronized (actionImplemented) {
actionImplemented.put(name, actionFactory);
}
}
public List<AlertAction> parseActionsFromMap(Map<String,Object> actionMap) {
List<AlertAction> actions = new ArrayList<>();
for (Map.Entry<String, Object> actionEntry : actionMap.entrySet() ) {
actions.add(actionImplemented.get(actionEntry.getKey()).createAction(actionEntry.getValue()));
synchronized (actionImplemented) {
for (Map.Entry<String, Object> actionEntry : actionMap.entrySet()) {
actions.add(actionImplemented.get(actionEntry.getKey()).createAction(actionEntry.getValue()));
}
}
return actions;
}
@ -47,4 +49,5 @@ public class AlertActionManager extends AbstractComponent {
action.doAction(alertName, alertResult);
}
}
}

View File

@ -7,10 +7,14 @@ package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
@ -21,8 +25,12 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -36,13 +44,13 @@ public class AlertManager extends AbstractLifecycleComponent {
public final String ALERT_TYPE = "alert";
public final String QUERY_TYPE = "alertQuery";
public final ParseField QUERY_FIELD = new ParseField("query");
public final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public final ParseField TRIGGER_FIELD = new ParseField("trigger");
public final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod");
public final ParseField ACTION_FIELD = new ParseField("action");
public final ParseField LASTRAN_FIELD = new ParseField("lastRan");
public final ParseField INDICES = new ParseField("indices");
public static final ParseField QUERY_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField TIMEPERIOD_FIELD = new ParseField("timeperiod");
public static final ParseField ACTION_FIELD = new ParseField("action");
public static final ParseField LASTRAN_FIELD = new ParseField("lastRan");
public static final ParseField INDICES = new ParseField("indices");
private final Client client;
private AlertScheduler scheduler;
@ -72,13 +80,11 @@ public class AlertManager extends AbstractLifecycleComponent {
}
logger.warn("Loading alerts");
try {
loadAlerts();
refreshAlerts();
started.set(true);
} catch (Throwable t) {
logger.error("Failed to load alerts", t);
}
//Build the mapping for the scheduler
sendAlertsToScheduler();
}
}
@ -149,7 +155,6 @@ public class AlertManager extends AbstractLifecycleComponent {
alertMap.clear();
loadAlerts();
sendAlertsToScheduler();
}
} catch (Exception e){
throw new ElasticsearchException("Failed to refresh alerts",e);
@ -170,18 +175,82 @@ public class AlertManager extends AbstractLifecycleComponent {
).setTypes(ALERT_TYPE).setIndices(ALERT_INDEX).execute().get();
for (SearchHit sh : searchResponse.getHits()) {
String alertId = sh.getId();
Alert alert = parseAlert(sh, alertId);
Alert alert = parseAlert(alertId, sh);
alertMap.put(alertId, alert);
}
logger.warn("Loaded [{}] alerts from the alert index.", alertMap.size());
}
}
private Alert parseAlert(SearchHit sh, String alertId) {
logger.warn("Found : [{}]", alertId);
Map<String,Object> fields = sh.sourceAsMap();
//Map<String,SearchHitField> fields = sh.getFields();
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException{
synchronized (alertMap) {
if (alertMap.containsKey(alertName)) {
scheduler.deleteAlertFromSchedule(alertName);
alertMap.remove(alertName);
try {
DeleteRequest deleteRequest = new DeleteRequest();
deleteRequest.id(alertName);
deleteRequest.index(ALERT_INDEX);
deleteRequest.type(ALERT_TYPE);
deleteRequest.operationThreaded(false);
deleteRequest.refresh(true);
if (client.delete(deleteRequest).actionGet().isFound()) {
return true;
} else {
logger.warn("Couldn't find [{}] in the index triggering a full refresh", alertName);
//Something went wrong refresh
refreshAlerts();
return false;
}
}
catch (Exception e){
logger.warn("Something went wrong when deleting [{}] from the index triggering a full refresh", e, alertName);
//Something went wrong refresh
refreshAlerts();
throw e;
}
}
}
return false;
}
public boolean addAlert(String alertName, Alert alert, boolean persist) throws InterruptedException, ExecutionException{
synchronized (alertMap) {
if (alertMap.containsKey(alertName)) {
throw new ElasticsearchIllegalArgumentException("There is already an alert named ["+alertName+"]");
} else {
alertMap.put(alertName, alert);
scheduler.addAlert(alertName,alert);
if (persist) {
XContentBuilder builder;
try {
builder = XContentFactory.jsonBuilder();
IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.source(alert.toXContent(builder).bytes(), true);
indexRequest.opType(IndexRequest.OpType.CREATE);
return client.index(indexRequest).get().isCreated();
} catch (IOException ie) {
throw new ElasticsearchIllegalStateException("Unable to convert alert to JSON", ie);
}
} else {
return true;
}
}
}
}
private Alert parseAlert(String alertId, SearchHit sh) {
Map<String, Object> fields = sh.sourceAsMap();
return parseAlert(alertId,fields);
}
public Alert parseAlert(String alertId, Map<String, Object> fields) {
//Map<String,SearchHitField> fields = sh.getFields();
logger.warn("Parsing : [{}]", alertId);
for (String field : fields.keySet() ) {
logger.warn("Field : [{}]", field);
}
@ -219,6 +288,12 @@ public class AlertManager extends AbstractLifecycleComponent {
return new Alert(alertId, query, trigger, timePeriod, actions, schedule, lastRan, indices);
}
public Map<String,Alert> getSafeAlertMap() {
synchronized (alertMap) {
return new HashMap<>(alertMap);
}
}
public Alert getAlertForName(String alertName) {
synchronized (alertMap) {
return alertMap.get(alertName);

View File

@ -9,7 +9,14 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.*;
import org.omg.CORBA.NO_IMPLEMENT;
import java.util.Map;
import static org.elasticsearch.rest.RestStatus.*;
import static org.elasticsearch.rest.RestRequest.Method.*;
@ -30,9 +37,40 @@ public class AlertRestHandler implements RestHandler {
@Override
public void handleRequest(RestRequest request, RestChannel restChannel) throws Exception {
logger.warn("GOT REST REQUEST");
if (request.method() == POST && request.path().contains("/_refresh") ) {
//@TODO : change these direct calls to actions/request/response/listener once we create the java client API
if (request.method() == POST && request.path().contains("/_refresh")) {
alertManager.refreshAlerts();
restChannel.sendResponse(new BytesRestResponse(OK));
return;
} else if (request.method() == GET && request.path().contains("/_list")) {
Map<String, Alert> alertMap = alertManager.getSafeAlertMap();
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
for (Map.Entry<String, Alert> alertEntry : alertMap.entrySet()) {
builder.field(alertEntry.getKey());
alertEntry.getValue().toXContent(builder);
}
builder.endObject();
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return;
} else if (request.method() == POST && request.path().contains("/_create")) {
//TODO : this should all be moved to an action
Alert alert = alertManager.parseAlert(request.param("name"), XContentHelper.convertToMap(request.content(), request.contentUnsafe()).v2());
boolean added = alertManager.addAlert(alert.alertName(), alert, true);
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
alert.toXContent(builder);
restChannel.sendResponse(new BytesRestResponse(OK,builder));
return;
} else if (request.method() == DELETE) {
String alertName = request.param("name");
alertManager.deleteAlert(alertName);
restChannel.sendResponse(new BytesRestResponse(OK));
return;
}
restChannel.sendResponse(new BytesRestResponse(OK));
restChannel.sendResponse(new BytesRestResponse(NOT_IMPLEMENTED));
}
}

View File

@ -47,6 +47,15 @@ public class AlertScheduler extends AbstractLifecycleComponent {
}
}
public boolean deleteAlertFromSchedule(String alertName) {
try {
scheduler.deleteJob(new JobKey(alertName));
return true;
} catch (SchedulerException se){
throw new ElasticsearchException("Failed to remove [" + alertName + "] from the scheduler", se);
}
}
public void clearAlerts() {
try {
scheduler.clear();

View File

@ -6,6 +6,11 @@
package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
/**
* Created by brian on 8/12/14.
@ -102,6 +107,13 @@ public class AlertTrigger {
}
}
public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
builder.startObject();
builder.field(triggerType.toString(),trigger.toString()+value);
builder.endObject();
return builder;
}
public static enum TriggerType {
NUMBER_OF_EVENTS;

View File

@ -6,10 +6,14 @@
package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
@ -47,6 +51,27 @@ public class EmailAlertAction implements AlertAction {
this.displayField = displayField;
}
@Override
public String getActionName() {
return "email";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
builder.startObject();
builder.field("addresses");
builder.startArray();
for (Address emailAddress : emailAddresses){
builder.value(emailAddress.toString());
}
builder.endArray();
if (displayField != null) {
builder.field("display", displayField);
}
builder.endObject();
return builder;
}
@Override
public boolean doAction(String alertName, AlertResult result) {
Properties props = new Properties();

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerting;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import java.util.List;
import java.util.Map;
@ -32,6 +33,8 @@ public class EmailAlertActionFactory implements AlertActionFactory{
if (displayField != null){
action.displayField(displayField.toString());
}
} else {
throw new ElasticsearchIllegalArgumentException("Unable to parse [" + parameters + "] as an EmailAlertAction");
}
return action;
}