Alerts update trigger manager.

This change changes the trigger manager to be pluggable.
Also removes the SimpleTrigger class, for now all triggers should be scripts.

Original commit: elastic/x-pack-elasticsearch@f7d0cb77e7
This commit is contained in:
Brian Murphy 2014-11-07 17:40:37 +00:00
parent a6bdb5f572
commit 5d8f43225a
17 changed files with 74 additions and 664 deletions

View File

@ -34,6 +34,7 @@ public class Alert implements ToXContent {
private long version; private long version;
private boolean enabled; private boolean enabled;
public Alert() { public Alert() {
} }
@ -69,53 +70,15 @@ public class Alert implements ToXContent {
} }
if (trigger != null) { if (trigger != null) {
builder.field(AlertsStore.TRIGGER_FIELD.getPreferredName()); builder.field(AlertsStore.TRIGGER_FIELD.getPreferredName());
builder.startObject();
builder.field(trigger.getTriggerName());
trigger.toXContent(builder, params); trigger.toXContent(builder, params);
builder.endObject();
} }
builder.endObject(); builder.endObject();
return builder; return builder;
} }
public void readFrom(StreamInput in) throws IOException {
alertName = in.readString();
searchRequest = new SearchRequest();
searchRequest.readFrom(in);
trigger = AlertTrigger.readFrom(in);
int numActions = in.readInt();
actions = new ArrayList<>(numActions);
for (int i=0; i<numActions; ++i) {
actions.add(AlertActionRegistry.readFrom(in));
}
schedule = in.readOptionalString();
if (in.readBoolean()) {
lastActionFire = new DateTime(in.readLong(), DateTimeZone.UTC);
}
version = in.readLong();
enabled = in.readBoolean();
}
public void writeTo(StreamOutput out) throws IOException {
out.writeString(alertName);
searchRequest.writeTo(out);
AlertTrigger.writeTo(trigger, out);
if (actions == null) {
out.writeInt(0);
} else {
out.writeInt(actions.size());
for (AlertAction action : actions) {
action.writeTo(out);
}
}
out.writeOptionalString(schedule);
if (lastActionFire == null) {
out.writeBoolean(false);
} else {
out.writeLong(lastActionFire.toDateTime(DateTimeZone.UTC).getMillis());
}
out.writeLong(version);
out.writeBoolean(enabled);
}
/** /**
* @return The last time this alert ran. * @return The last time this alert ran.
*/ */
@ -203,4 +166,38 @@ public class Alert implements ToXContent {
public void schedule(String schedule) { public void schedule(String schedule) {
this.schedule = schedule; this.schedule = schedule;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Alert alert = (Alert) o;
if (enabled != alert.enabled) return false;
if (version != alert.version) return false;
if (!actions.equals(alert.actions)) return false;
if (!alertName.equals(alert.alertName)) return false;
if (lastActionFire != null ? !lastActionFire.equals(alert.lastActionFire) : alert.lastActionFire != null)
return false;
if (!schedule.equals(alert.schedule)) return false;
if (!searchRequest.equals(alert.searchRequest)) return false;
if (!trigger.equals(alert.trigger)) return false;
return true;
}
@Override
public int hashCode() {
int result = alertName.hashCode();
result = 31 * result + searchRequest.hashCode();
result = 31 * result + trigger.hashCode();
result = 31 * result + actions.hashCode();
result = 31 * result + schedule.hashCode();
result = 31 * result + (lastActionFire != null ? lastActionFire.hashCode() : 0);
result = 31 * result + (int) (version ^ (version >>> 32));
result = 31 * result + (enabled ? 1 : 0);
return result;
}
} }

View File

@ -62,13 +62,15 @@ public class AlertsStore extends AbstractComponent {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ConcurrentMap<String,Alert> alertMap; private final ConcurrentMap<String,Alert> alertMap;
private final AlertActionRegistry alertActionRegistry; private final AlertActionRegistry alertActionRegistry;
private final TriggerManager triggerManager;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED); private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private final int scrollSize; private final int scrollSize;
private final TimeValue scrollTimeout; private final TimeValue scrollTimeout;
@Inject @Inject
public AlertsStore(Settings settings, Client client, ThreadPool threadPool, AlertActionRegistry alertActionRegistry) { public AlertsStore(Settings settings, Client client, ThreadPool threadPool, AlertActionRegistry alertActionRegistry,
TriggerManager triggerManager) {
super(settings); super(settings);
this.client = client; this.client = client;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -77,6 +79,7 @@ public class AlertsStore extends AbstractComponent {
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings // Not using component settings, to let AlertsStore and AlertActionManager share the same settings
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
this.triggerManager = triggerManager;
} }
/** /**
@ -247,7 +250,7 @@ public class AlertsStore extends AbstractComponent {
return alert; return alert;
} }
private Alert parseAlert(String alertName, BytesReference source) { protected Alert parseAlert(String alertName, BytesReference source) {
Alert alert = new Alert(); Alert alert = new Alert();
alert.alertName(alertName); alert.alertName(alertName);
try (XContentParser parser = XContentHelper.createParser(source)) { try (XContentParser parser = XContentHelper.createParser(source)) {
@ -259,7 +262,7 @@ public class AlertsStore extends AbstractComponent {
currentFieldName = parser.currentName(); currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) { } else if (token == XContentParser.Token.START_OBJECT) {
if (TRIGGER_FIELD.match(currentFieldName)) { if (TRIGGER_FIELD.match(currentFieldName)) {
alert.trigger(TriggerManager.parseTrigger(parser)); alert.trigger(triggerManager.instantiateAlertTrigger(parser));
} else if (ACTION_FIELD.match(currentFieldName)) { } else if (ACTION_FIELD.match(currentFieldName)) {
List<AlertAction> actions = alertActionRegistry.instantiateAlertActions(parser); List<AlertAction> actions = alertActionRegistry.instantiateAlertActions(parser);
alert.actions(actions); alert.actions(actions);

View File

@ -20,10 +20,5 @@ public interface AlertAction extends ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException; public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException;
public void writeTo(StreamOutput out) throws IOException;
public void readFrom(StreamInput in) throws IOException;
public boolean doAction(Alert alert, TriggerResult result); public boolean doAction(Alert alert, TriggerResult result);
} }

View File

@ -15,7 +15,4 @@ public interface AlertActionFactory {
AlertAction createAction(XContentParser parser) throws IOException; AlertAction createAction(XContentParser parser) throws IOException;
AlertAction readFrom(StreamInput in) throws IOException;
} }

View File

@ -62,6 +62,7 @@ public class AlertActionManager extends AbstractComponent {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final AlertsStore alertsStore; private final AlertsStore alertsStore;
private final AlertActionRegistry actionRegistry; private final AlertActionRegistry actionRegistry;
private final TriggerManager triggerManager;
private AlertManager alertManager; private AlertManager alertManager;
private final BlockingQueue<AlertActionEntry> actionsToBeProcessed = new LinkedBlockingQueue<>(); private final BlockingQueue<AlertActionEntry> actionsToBeProcessed = new LinkedBlockingQueue<>();
@ -73,12 +74,14 @@ public class AlertActionManager extends AbstractComponent {
private static AlertActionEntry END_ENTRY = new AlertActionEntry(); private static AlertActionEntry END_ENTRY = new AlertActionEntry();
@Inject @Inject
public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, ThreadPool threadPool, AlertsStore alertsStore) { public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry,
ThreadPool threadPool, AlertsStore alertsStore, TriggerManager triggerManager) {
super(settings); super(settings);
this.client = client; this.client = client;
this.actionRegistry = actionRegistry; this.actionRegistry = actionRegistry;
this.threadPool = threadPool; this.threadPool = threadPool;
this.alertsStore = alertsStore; this.alertsStore = alertsStore;
this.triggerManager = triggerManager;
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings // Not using component settings, to let AlertsStore and AlertActionManager share the same settings
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
@ -167,7 +170,7 @@ public class AlertActionManager extends AbstractComponent {
logger.info("Loaded [{}] actions from the alert history index into actions queue", actionsToBeProcessed.size()); logger.info("Loaded [{}] actions from the alert history index into actions queue", actionsToBeProcessed.size());
} }
static AlertActionEntry parseHistory(String historyId, BytesReference source, long version, AlertActionRegistry actionRegistry) { AlertActionEntry parseHistory(String historyId, BytesReference source, long version, AlertActionRegistry actionRegistry) {
AlertActionEntry entry = new AlertActionEntry(); AlertActionEntry entry = new AlertActionEntry();
entry.setId(historyId); entry.setId(historyId);
entry.setVersion(version); entry.setVersion(version);
@ -179,12 +182,13 @@ public class AlertActionManager extends AbstractComponent {
if (token == XContentParser.Token.FIELD_NAME) { if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName(); currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) { } else if (token == XContentParser.Token.START_OBJECT) {
logger.error("START_OBJECT");
switch (currentFieldName) { switch (currentFieldName) {
case ACTIONS_FIELD: case ACTIONS_FIELD:
entry.setActions(actionRegistry.instantiateAlertActions(parser)); entry.setActions(actionRegistry.instantiateAlertActions(parser));
break; break;
case TRIGGER_FIELD: case TRIGGER_FIELD:
entry.setTrigger(TriggerManager.parseTrigger(parser)); entry.setTrigger(triggerManager.instantiateAlertTrigger(parser));
break; break;
case "response": case "response":
// Ignore this, the binary form is already read // Ignore this, the binary form is already read
@ -194,6 +198,7 @@ public class AlertActionManager extends AbstractComponent {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
} }
} else if (token.isValue()) { } else if (token.isValue()) {
logger.error("IS_VALUE");
switch (currentFieldName) { switch (currentFieldName) {
case ALERT_NAME_FIELD: case ALERT_NAME_FIELD:
entry.setAlertName(parser.text()); entry.setAlertName(parser.text());

View File

@ -67,19 +67,4 @@ public class AlertActionRegistry extends AbstractComponent {
} }
} }
public static void writeTo(AlertAction action, StreamOutput out) throws IOException {
out.writeString(action.getActionName());
action.writeTo(out);
}
public static AlertAction readFrom(StreamInput in) throws IOException {
String actionName = in.readString();
AlertActionFactory factory = actionImplemented.get(actionName);
if (factory != null) {
return factory.readFrom(in);
} else {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionName + "]");
}
}
} }

View File

@ -71,30 +71,6 @@ public class EmailAlertAction implements AlertAction {
return builder; return builder;
} }
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(displayField);
out.writeInt(emailAddresses.size());
for (Address emailAddress : emailAddresses) {
out.writeString(emailAddress.toString());
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
displayField = in.readOptionalString();
int numberOfEmails = in.readInt();
emailAddresses = new ArrayList<>(numberOfEmails);
for (int i=0; i<numberOfEmails; ++i) {
String address = in.readString();
try {
emailAddresses.add(InternetAddress.parse(address)[0]);
} catch (AddressException ae) {
throw new IOException("Unable to parse [" + address + "] as an email adderss", ae);
}
}
}
@Override @Override
public boolean doAction(Alert alert, TriggerResult result) { public boolean doAction(Alert alert, TriggerResult result) {
Properties props = new Properties(); Properties props = new Properties();

View File

@ -50,19 +50,4 @@ public class EmailAlertActionFactory implements AlertActionFactory {
return new EmailAlertAction(display, addresses.toArray(new String[addresses.size()])); return new EmailAlertAction(display, addresses.toArray(new String[addresses.size()]));
} }
@Override
public AlertAction readFrom(StreamInput in) throws IOException{
String displayField = in.readOptionalString();
int numberOfEmails = in.readInt();
String[] emailAddresses = new String[numberOfEmails];
for (int i=0; i<numberOfEmails; ++i) {
String address = in.readString();
emailAddresses[i] = address;
}
EmailAlertAction emailAction = new EmailAlertAction(displayField, emailAddresses);
return emailAction;
}
} }

View File

@ -47,17 +47,6 @@ public class IndexAlertAction implements AlertAction, ToXContent {
return builder; return builder;
} }
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeString(type);
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readString();
type = in.readString();
}
@Override @Override
public boolean doAction(Alert alert, TriggerResult result) { public boolean doAction(Alert alert, TriggerResult result) {
@ -68,7 +57,7 @@ public class IndexAlertAction implements AlertAction, ToXContent {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint(); XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject(); resultBuilder.startObject();
resultBuilder = result.getResponse().toXContent(resultBuilder, ToXContent.EMPTY_PARAMS); resultBuilder = result.getResponse().toXContent(resultBuilder, ToXContent.EMPTY_PARAMS);
//resultBuilder.field("timestamp", result.getFireTime()); ///@TODO FIXME //resultBuilder.field("timestamp", result.); ///@TODO FIXME
resultBuilder.endObject(); resultBuilder.endObject();
indexRequest.source(resultBuilder); indexRequest.source(resultBuilder);
} catch (IOException ie) { } catch (IOException ie) {

View File

@ -49,8 +49,4 @@ public class IndexAlertActionFactory implements AlertActionFactory {
return new IndexAlertAction(index, type, client); return new IndexAlertAction(index, type, client);
} }
@Override
public AlertAction readFrom(StreamInput in) throws IOException {
return new IndexAlertAction(in.readString(), in.readString(), client);
}
} }

View File

@ -1,227 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
public class AlertTrigger implements ToXContent {
private SimpleTrigger trigger;
private TriggerType triggerType;
private int value;
private ScriptedAlertTrigger scriptedTrigger;
public ScriptedAlertTrigger scriptedTrigger() {
return scriptedTrigger;
}
public void scriptedTrigger(ScriptedAlertTrigger scriptedTrigger) {
this.scriptedTrigger = scriptedTrigger;
}
public SimpleTrigger trigger() {
return trigger;
}
public void trigger(SimpleTrigger trigger) {
this.trigger = trigger;
}
public TriggerType triggerType() {
return triggerType;
}
public void triggerType(TriggerType triggerType) {
this.triggerType = triggerType;
}
public int value() {
return value;
}
public void value(int value) {
this.value = value;
}
public AlertTrigger(SimpleTrigger trigger, TriggerType triggerType, int value){
this.trigger = trigger;
this.triggerType = triggerType;
this.value = value;
}
public AlertTrigger(ScriptedAlertTrigger scriptedTrigger){
this.scriptedTrigger = scriptedTrigger;
this.triggerType = TriggerType.SCRIPT;
}
public String toString(){
if(triggerType != TriggerType.SCRIPT) {
return triggerType + " " + trigger + " " + value;
} else {
return scriptedTrigger.toString();
}
}
public static enum SimpleTrigger {
EQUAL,
NOT_EQUAL,
GREATER_THAN,
LESS_THAN,
RISES_BY,
FALLS_BY;
public static SimpleTrigger fromString(final String sTrigger) {
switch (sTrigger) {
case ">":
return GREATER_THAN;
case "<":
return LESS_THAN;
case "=":
case "==":
return EQUAL;
case "!=":
return NOT_EQUAL;
case "->":
return RISES_BY;
case "<-":
return FALLS_BY;
default:
throw new ElasticsearchIllegalArgumentException("Unknown AlertAction:SimpleAction [" + sTrigger + "]");
}
}
public static String asString(final SimpleTrigger trigger){
switch (trigger) {
case GREATER_THAN:
return ">";
case LESS_THAN:
return "<";
case EQUAL:
return "==";
case NOT_EQUAL:
return "!=";
case RISES_BY:
return "->";
case FALLS_BY:
return "<-";
default:
return "?";
}
}
public String toString(){
return asString(this);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (triggerType != TriggerType.SCRIPT) {
builder.startObject();
builder.field(triggerType.toString(), trigger.toString() + value);
builder.endObject();
} else {
builder.startObject();
builder.field(triggerType.toString());
scriptedTrigger.toXContent(builder, params);
builder.endObject();
}
return builder;
}
public static enum TriggerType {
NUMBER_OF_EVENTS,
SCRIPT;
public static TriggerType fromString(final String sTriggerType) {
switch (sTriggerType) {
case "numberOfEvents":
return NUMBER_OF_EVENTS;
case "script":
return SCRIPT;
default:
throw new ElasticsearchIllegalArgumentException("Unknown AlertTrigger:TriggerType [" + sTriggerType + "]");
}
}
public static String asString(final TriggerType triggerType) {
switch (triggerType) {
case NUMBER_OF_EVENTS:
return "numberOfEvents";
case SCRIPT:
return "script";
default:
return "unknown";
}
}
public String toString(){
return asString(this);
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertTrigger that = (AlertTrigger) o;
if (value != that.value) return false;
if (scriptedTrigger != null ? !scriptedTrigger.equals(that.scriptedTrigger) : that.scriptedTrigger != null)
return false;
if (trigger != that.trigger) return false;
if (triggerType != that.triggerType) return false;
return true;
}
@Override
public int hashCode() {
int result = trigger != null ? trigger.hashCode() : 0;
result = 31 * result + (triggerType != null ? triggerType.hashCode() : 0);
result = 31 * result + value;
result = 31 * result + (scriptedTrigger != null ? scriptedTrigger.hashCode() : 0);
return result;
}
public static void writeTo(AlertTrigger trigger, StreamOutput out) throws IOException {
out.writeString(trigger.triggerType.toString());
if (trigger.triggerType.equals(TriggerType.NUMBER_OF_EVENTS)) {
out.writeString(trigger.toString());
out.writeInt(trigger.value);
} else {
out.writeString(trigger.scriptedTrigger.scriptLang);
ScriptService.ScriptType.writeTo(trigger.scriptedTrigger.scriptType, out);
out.writeString(trigger.scriptedTrigger.script);
}
}
public static AlertTrigger readFrom(StreamInput in) throws IOException {
TriggerType triggerType = TriggerType.fromString(in.readString());
if (triggerType.equals(TriggerType.NUMBER_OF_EVENTS)) {
SimpleTrigger trigger = SimpleTrigger.fromString(in.readString());
int value = in.readInt();
return new AlertTrigger(trigger, triggerType, value);
} else {
String scriptLang = in.readString();
ScriptService.ScriptType scriptType = ScriptService.ScriptType.readFrom(in);
String script = in.readString();
ScriptedAlertTrigger scriptedTrigger = new ScriptedAlertTrigger(script, scriptType, scriptLang);
return new AlertTrigger(scriptedTrigger);
}
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
public class ScriptedAlertTrigger implements ToXContent {
public String script;
public ScriptService.ScriptType scriptType;
public String scriptLang;
public ScriptedAlertTrigger(String script, ScriptService.ScriptType scriptType, String scriptLang) {
this.script = script;
this.scriptType = scriptType;
this.scriptLang = scriptLang;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("script",script);
builder.field("script_type", scriptType);
builder.field("script_lang", scriptLang);
builder.endObject();
return builder;
}
}

View File

@ -1,205 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.util.Map;
/*
* TODO : The trigger classes need cleanup and refactoring to be similar to the AlertActions and be pluggable
*/
public class TriggerManager extends AbstractComponent {
private static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER;
private final Client client;
private final ScriptService scriptService;
private final String fireTimePlaceHolder;
private final String scheduledFireTimePlaceHolder;
@Inject
public TriggerManager(Settings settings, Client client, ScriptService scriptService) {
super(settings);
this.client = client;
this.scriptService = scriptService;
this.fireTimePlaceHolder = settings.get("prefix", "<<<FIRE_TIME>>>");
this.scheduledFireTimePlaceHolder = settings.get("postfix", "<<<SCHEDULED_FIRE_TIME>>>");
}
public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
SearchRequest request = prepareTriggerSearch(alert, scheduledFireTime, fireTime);
if (logger.isTraceEnabled()) {
logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true));
}
SearchResponse response = client.search(request).actionGet(); // actionGet deals properly with InterruptedException
logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits());
switch (alert.trigger().triggerType()) {
case NUMBER_OF_EVENTS:
return doSimpleTrigger(alert, request, response);
case SCRIPT:
return doScriptTrigger(alert, request, response);
default:
throw new ElasticsearchIllegalArgumentException("Bad value for trigger.triggerType [" + alert.trigger().triggerType() + "]");
}
}
public static AlertTrigger parseTrigger(XContentParser parser) throws IOException {
AlertTrigger trigger = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
switch (currentFieldName) {
case "script":
String script = null;
ScriptService.ScriptType scriptType = null;
String scriptLang = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
switch (currentFieldName) {
case "script" :
script = parser.text();
break;
case "script_type" :
scriptType = ScriptService.ScriptType.valueOf(parser.text());
break;
case "script_lang" :
scriptLang = parser.text();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
}
}
trigger = new AlertTrigger(new ScriptedAlertTrigger(script, scriptType, scriptLang));
break;
default:
break;
}
} else if (token.isValue()) {
String expression = parser.text();
AlertTrigger.SimpleTrigger simpleTrigger = AlertTrigger.SimpleTrigger.fromString(expression.substring(0, 1));
int value = Integer.valueOf(expression.substring(1));
trigger = new AlertTrigger(simpleTrigger, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, value);
}
}
return trigger;
}
private TriggerResult doSimpleTrigger(Alert alert, SearchRequest request, SearchResponse response) {
boolean triggered = false;
long testValue = response.getHits().getTotalHits();
int triggerValue = alert.trigger().value();
//Move this to SimpleTrigger
switch (alert.trigger().trigger()) {
case GREATER_THAN:
triggered = testValue > triggerValue;
break;
case LESS_THAN:
triggered = testValue < triggerValue;
break;
case EQUAL:
triggered = testValue == triggerValue;
break;
case NOT_EQUAL:
triggered = testValue != triggerValue;
break;
case RISES_BY:
case FALLS_BY:
triggered = false; //TODO FIX THESE
break;
}
return new TriggerResult(triggered, request, response, alert.trigger());
}
private TriggerResult doScriptTrigger(Alert alert, SearchRequest request, SearchResponse response) {
boolean triggered = false;
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
Map<String, Object> responseMap = XContentHelper.convertToMap(builder.bytes(), false).v2();
ScriptedAlertTrigger scriptTrigger = alert.trigger().scriptedTrigger();
ExecutableScript executable = scriptService.executable(
scriptTrigger.scriptLang, scriptTrigger.script, scriptTrigger.scriptType, responseMap
);
Object returnValue = executable.run();
logger.trace("Returned [{}] from script", returnValue);
if (returnValue instanceof Boolean) {
triggered = (Boolean) returnValue;
} else {
throw new ElasticsearchIllegalStateException("Trigger script [" + scriptTrigger.script + "] did not return a Boolean");
}
} catch (Exception e ){
logger.error("Failed to execute script trigger", e);
}
return new TriggerResult(triggered, request, response, alert.trigger());
}
private SearchRequest prepareTriggerSearch(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
SearchRequest request = alert.getSearchRequest();
if (Strings.hasLength(request.source())) {
String requestSource = XContentHelper.convertToJson(request.source(), false);
if (requestSource.contains(fireTimePlaceHolder)) {
requestSource = requestSource.replace(fireTimePlaceHolder, dateTimeFormatter.printer().print(fireTime));
}
if (requestSource.contains(scheduledFireTimePlaceHolder)) {
requestSource = requestSource.replace(scheduledFireTimePlaceHolder, dateTimeFormatter.printer().print(scheduledFireTime));
}
request.source(requestSource);
} else if (Strings.hasLength(request.templateSource())) {
Tuple<XContentType, Map<String, Object>> tuple = XContentHelper.convertToMap(request.templateSource(), false);
Map<String, Object> templateSourceAsMap = tuple.v2();
Map<String, Object> templateObject = (Map<String, Object>) templateSourceAsMap.get("template");
if (templateObject != null) {
Map<String, Object> params = (Map<String, Object>) templateObject.get("params");
params.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime));
params.put("fire_time", dateTimeFormatter.printer().print(fireTime));
XContentBuilder builder = XContentFactory.contentBuilder(tuple.v1());
builder.map(templateSourceAsMap);
request.templateSource(builder.bytes(), false);
}
} else if (request.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(request.templateParams())
.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime))
.put("fire_time", dateTimeFormatter.printer().print(fireTime));
request.templateParams(templateParams.map());
} else {
throw new ElasticsearchIllegalStateException("Search requests needs either source, template source or template name");
}
return request;
}
}

View File

@ -1,43 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
/**
*/
public class TriggerResult {
private final boolean triggered;
private final SearchRequest request;
private final SearchResponse response;
private final AlertTrigger trigger;
public TriggerResult(boolean triggered, SearchRequest request, SearchResponse response, AlertTrigger trigger) {
this.triggered = triggered;
this.request = request;
this.response = response;
this.trigger = trigger;
}
public boolean isTriggered() {
return triggered;
}
public SearchRequest getRequest() {
return request;
}
public SearchResponse getResponse() {
return response;
}
public AlertTrigger getTrigger() {
return trigger;
}
}

View File

@ -1,6 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;

View File

@ -6,16 +6,25 @@
package org.elasticsearch.alerts; package org.elasticsearch.alerts;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.client.AlertsClientInterface; import org.elasticsearch.alerts.client.AlertsClientInterface;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
import org.elasticsearch.alerts.triggers.ScriptedTrigger;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
/** /**

View File

@ -22,7 +22,7 @@ import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
import org.elasticsearch.alerts.transport.actions.get.GetAlertRequest; import org.elasticsearch.alerts.transport.actions.get.GetAlertRequest;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse; import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger; import org.elasticsearch.alerts.triggers.ScriptedTrigger;
import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -84,7 +84,12 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
DateTime scheduledFireTime = new DateTime(DateTimeZone.UTC); DateTime scheduledFireTime = new DateTime(DateTimeZone.UTC);
Map<String, Object> triggerMap = new HashMap<>(); Map<String, Object> triggerMap = new HashMap<>();
triggerMap.put("numberOfEvents", ">1"); Map<String, Object> scriptTriggerMap = new HashMap<>();
scriptTriggerMap.put("script", "hits.total>1");
scriptTriggerMap.put("script_lang", "groovy");
triggerMap.put("script", scriptTriggerMap );
Map<String,Object> actionMap = new HashMap<>(); Map<String,Object> actionMap = new HashMap<>();
Map<String,Object> emailParamMap = new HashMap<>(); Map<String,Object> emailParamMap = new HashMap<>();
List<String> addresses = new ArrayList<>(); List<String> addresses = new ArrayList<>();
@ -113,9 +118,10 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
builder.field(AlertActionManager.ACTIONS_FIELD, actionMap); builder.field(AlertActionManager.ACTIONS_FIELD, actionMap);
builder.field(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED.toString()); builder.field(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED.toString());
builder.endObject(); builder.endObject();
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName()); final AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry); final AlertActionManager alertManager = internalCluster().getInstance(AlertActionManager.class, internalCluster().getMasterName());
AlertActionEntry actionEntry = alertManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry);
assertEquals(actionEntry.getVersion(), 0); assertEquals(actionEntry.getVersion(), 0);
assertEquals(actionEntry.getAlertName(), "testName"); assertEquals(actionEntry.getAlertName(), "testName");
assertEquals(actionEntry.isTriggered(), true); assertEquals(actionEntry.isTriggered(), true);
@ -123,9 +129,6 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
assertEquals(actionEntry.getFireTime(), fireTime); assertEquals(actionEntry.getFireTime(), fireTime);
assertEquals(actionEntry.getEntryState(), AlertActionState.SEARCH_NEEDED); assertEquals(actionEntry.getEntryState(), AlertActionState.SEARCH_NEEDED);
assertEquals(actionEntry.getSearchResponse().getHits().getTotalHits(), 10); assertEquals(actionEntry.getSearchResponse().getHits().getTotalHits(), 10);
assertEquals(actionEntry.getTrigger(),
new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1));
} }
@Test @Test
@ -163,16 +166,6 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
return builder; return builder;
} }
@Override
public void writeTo(StreamOutput out) throws IOException {
}
@Override
public void readFrom(StreamInput in) throws IOException {
}
@Override @Override
public boolean doAction(Alert alert, TriggerResult actionEntry) { public boolean doAction(Alert alert, TriggerResult actionEntry) {
logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry); logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry);
@ -187,13 +180,9 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
parser.nextToken(); parser.nextToken();
return alertAction; return alertAction;
} }
@Override
public AlertAction readFrom(StreamInput in) throws IOException {
return alertAction;
}
}); });
AlertTrigger alertTrigger = new AlertTrigger(new ScriptedAlertTrigger("return true", ScriptService.ScriptType.INLINE, "groovy"));
AlertTrigger alertTrigger = new ScriptedTrigger("return true", ScriptService.ScriptType.INLINE, "groovy");
Alert alert = new Alert( Alert alert = new Alert(