Actually add the new TriggerManager classes.

These should have been added in the previous commit.

Original commit: elastic/x-pack-elasticsearch@9efecc7ace
This commit is contained in:
Brian Murphy 2014-11-07 17:42:43 +00:00
parent 5d8f43225a
commit 7a23074c7c
7 changed files with 471 additions and 0 deletions

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.ToXContent;
public interface AlertTrigger extends ToXContent {
/**
* Returns the name of this trigger, can be used to look up the action factory for creating this trigger
* @return
*/
public String getTriggerName();
}

View File

@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
public class ScriptedTrigger implements AlertTrigger{
private final String script;
private final ScriptService.ScriptType scriptType;
private final String scriptLang;
public ScriptedTrigger(String script, ScriptService.ScriptType scriptType, String scriptLang) {
this.script = script;
this.scriptType = scriptType;
this.scriptLang = scriptLang;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("script", script);
builder.field("script_type", scriptType);
builder.field("script_lang", scriptLang);
builder.endObject();
return builder;
}
@Override
public String getTriggerName() {
return "script";
}
/**
* The script to run
* @return the script as a String
*/
public String getScript() {
return script;
}
/**
* The type (INDEXED,INLINE,FILE) of the script
* @return the type
*/
public ScriptService.ScriptType getScriptType() {
return scriptType;
}
/**
* The language of the script (null for default language) as a String
* @return the langauge
*/
public String getScriptLang() {
return scriptLang;
}
@Override
public String toString() {
return "ScriptedTrigger{" +
"script='" + script + '\'' +
", scriptType=" + scriptType +
", scriptLang='" + scriptLang + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScriptedTrigger that = (ScriptedTrigger) o;
if (!script.equals(that.script)) return false;
if (scriptLang != null ? !scriptLang.equals(that.scriptLang) : that.scriptLang != null) return false;
if (scriptType != that.scriptType) return false;
return true;
}
@Override
public int hashCode() {
int result = script.hashCode();
result = 31 * result + (scriptType != null ? scriptType.hashCode() : 0);
result = 31 * result + (scriptLang != null ? scriptLang.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,110 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.util.Map;
public class ScriptedTriggerFactory implements TriggerFactory {
private final ScriptService scriptService;
public ScriptedTriggerFactory(ScriptService service) {
scriptService = service;
}
@Override
public AlertTrigger createTrigger(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
String scriptLang = null;
String script = null;
ScriptService.ScriptType scriptType = null;
ESLogger logger = Loggers.getLogger(this.getClass());
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
logger.error("FOOOBAR : [{}]", currentFieldName);
} else if (token.isValue()) {
switch (currentFieldName) {
case "script_id" :
script = parser.text();
//@TODO assert script type was null or INDEXED already
scriptType = ScriptService.ScriptType.INDEXED;
break;
case "script" :
script = parser.text();
scriptType = ScriptService.ScriptType.INLINE;
break;
case "script_type" :
ScriptService.ScriptType tmpType = ScriptService.ScriptType.valueOf(parser.text());
if (scriptType == ScriptService.ScriptType.INDEXED && tmpType != scriptType) {
throw new ElasticsearchException("Unexpected script type for script_id [" + tmpType + "]");
} else {
scriptType = tmpType;
}
break;
case "script_lang" :
scriptLang = parser.text();
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
}
}
if (script == null) {
throw new ElasticsearchException("Failed to parse ScriptedTrigger script:[" + script
+ "] scriptLang:[" + scriptLang + "] scriptType:[" + scriptType + "]");
}
return new ScriptedTrigger(script, scriptType, scriptLang);
}
@Override
public boolean isTriggered(AlertTrigger trigger, SearchRequest request, SearchResponse response) {
if (! (trigger instanceof ScriptedTrigger) ){
throw new ElasticsearchIllegalStateException("Failed to evaluate isTriggered expected type ["
+ ScriptedTrigger.class + "] got [" + trigger.getClass() + "]");
}
ScriptedTrigger scriptedTrigger = (ScriptedTrigger)trigger;
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder = response.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
Map<String, Object> responseMap = XContentHelper.convertToMap(builder.bytes(), false).v2();
ExecutableScript executable = scriptService.executable(
scriptedTrigger.getScriptLang(), scriptedTrigger.getScript(), scriptedTrigger.getScriptType(), responseMap
);
Object returnValue = executable.run();
if (returnValue instanceof Boolean) {
return (Boolean) returnValue;
} else {
throw new ElasticsearchIllegalStateException("Trigger script [" + scriptedTrigger + "] did not return a Boolean");
}
} catch (IOException e) {
throw new ElasticsearchException("Failed to execute trigger", e);
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
public interface TriggerFactory {
/**
* Creates a trigger form the given parser
* @param parser The parser containing the definition of the trigger
* @return The newly created trigger
* @throws IOException
*/
AlertTrigger createTrigger(XContentParser parser) throws IOException;
/**
* Evaulates if the trigger is triggers based off of the request and response
*
* @param trigger
* @param request
* @param response
* @return
*/
boolean isTriggered(AlertTrigger trigger, SearchRequest request, SearchResponse response);
}

View File

@ -0,0 +1,157 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.util.Map;
public class TriggerManager extends AbstractComponent {
private static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER;
private volatile ImmutableOpenMap<String, TriggerFactory> triggersImplemented;
private final Client client;
private final String fireTimePlaceHolder;
private final String scheduledFireTimePlaceHolder;
@Inject
public TriggerManager(Settings settings, Client client, ScriptService scriptService) {
super(settings);
this.client = client;
triggersImplemented = ImmutableOpenMap.<String, TriggerFactory>builder()
.fPut("script", new ScriptedTriggerFactory(scriptService))
.build();
this.fireTimePlaceHolder = settings.get("prefix", "<<<FIRE_TIME>>>");
this.scheduledFireTimePlaceHolder = settings.get("postfix", "<<<SCHEDULED_FIRE_TIME>>>");
}
/**
* Registers an AlertTrigger so that it can be instantiated by name
* @param name The name of the trigger
* @param actionFactory The factory responsible for trigger instantiation
*/
public void registerTrigger(String name, TriggerFactory actionFactory){
triggersImplemented = ImmutableOpenMap.builder(triggersImplemented)
.fPut(name, actionFactory)
.build();
}
/**
* Reads the contents of parser to create the correct Trigger
* @param parser The parser containing the trigger definition
* @return
* @throws IOException
*/
public AlertTrigger instantiateAlertTrigger(XContentParser parser) throws IOException {
ImmutableOpenMap<String, TriggerFactory> triggersImplemented = this.triggersImplemented;
String triggerFactoryName = null;
XContentParser.Token token;
AlertTrigger trigger = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
triggerFactoryName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && triggerFactoryName != null) {
TriggerFactory factory = triggersImplemented.get(triggerFactoryName);
if (factory != null) {
trigger = factory.createTrigger(parser);
} else {
throw new ElasticsearchIllegalArgumentException("No trigger exists with the name [" + triggerFactoryName + "]");
}
}
}
return trigger;
}
/**
* Tests to see if an alert will trigger for a given fireTime and scheduleFire time
*
* @param alert The Alert to test
* @param scheduledFireTime The time the alert was scheduled to run
* @param fireTime The time the alert actually ran
* @return The TriggerResult representing the trigger state of the alert
* @throws IOException
*/
public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
SearchRequest request = prepareTriggerSearch(alert, scheduledFireTime, fireTime);
if (logger.isTraceEnabled()) {
logger.trace("For alert [{}] running query for [{}]", alert.alertName(), XContentHelper.convertToJson(request.source(), false, true));
}
SearchResponse response = client.search(request).actionGet(); // actionGet deals properly with InterruptedException
logger.debug("Ran alert [{}] and got hits : [{}]", alert.alertName(), response.getHits().getTotalHits());
return isTriggered(alert.trigger(), request, response);
}
protected TriggerResult isTriggered(AlertTrigger trigger, SearchRequest request, SearchResponse response) {
TriggerFactory factory = triggersImplemented.get(trigger.getTriggerName());
if (factory == null) {
throw new ElasticsearchIllegalArgumentException("No trigger exists with the name [" + trigger.getTriggerName() + "]");
}
boolean triggered = factory.isTriggered(trigger, request,response);
return new TriggerResult(triggered, request, response, trigger);
}
private SearchRequest prepareTriggerSearch(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
SearchRequest request = alert.getSearchRequest();
if (Strings.hasLength(request.source())) {
String requestSource = XContentHelper.convertToJson(request.source(), false);
if (requestSource.contains(fireTimePlaceHolder)) {
requestSource = requestSource.replace(fireTimePlaceHolder, dateTimeFormatter.printer().print(fireTime));
}
if (requestSource.contains(scheduledFireTimePlaceHolder)) {
requestSource = requestSource.replace(scheduledFireTimePlaceHolder, dateTimeFormatter.printer().print(scheduledFireTime));
}
request.source(requestSource);
} else if (Strings.hasLength(request.templateSource())) {
Tuple<XContentType, Map<String, Object>> tuple = XContentHelper.convertToMap(request.templateSource(), false);
Map<String, Object> templateSourceAsMap = tuple.v2();
Map<String, Object> templateObject = (Map<String, Object>) templateSourceAsMap.get("template");
if (templateObject != null) {
Map<String, Object> params = (Map<String, Object>) templateObject.get("params");
params.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime));
params.put("fire_time", dateTimeFormatter.printer().print(fireTime));
XContentBuilder builder = XContentFactory.contentBuilder(tuple.v1());
builder.map(templateSourceAsMap);
request.templateSource(builder.bytes(), false);
}
} else if (request.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(request.templateParams())
.put("scheduled_fire_time", dateTimeFormatter.printer().print(scheduledFireTime))
.put("fire_time", dateTimeFormatter.printer().print(fireTime));
request.templateParams(templateParams.map());
} else {
throw new ElasticsearchIllegalStateException("Search requests needs either source, template source or template name");
}
return request;
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
/**
*/
public class TriggerResult {
private final boolean triggered;
private final SearchRequest request;
private final SearchResponse response;
private final AlertTrigger trigger;
public TriggerResult(boolean triggered, SearchRequest request, SearchResponse response, AlertTrigger trigger) {
this.triggered = triggered;
this.request = request;
this.response = response;
this.trigger = trigger;
}
public boolean isTriggered() {
return triggered;
}
public SearchRequest getRequest() {
return request;
}
public SearchResponse getResponse() {
return response;
}
public AlertTrigger getTrigger() {
return trigger;
}
}

View File

@ -0,0 +1,6 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.triggers;