Merge result from rebase.
Original commit: elastic/x-pack-elasticsearch@7e091273da
This commit is contained in:
parent
e09fface02
commit
8375788839
|
@ -22,7 +22,7 @@ import java.util.Map;
|
|||
public class Alert implements ToXContent {
|
||||
|
||||
private String alertName;
|
||||
private SearchRequest searchRequest;
|
||||
private SearchRequest triggerSearchRequest;
|
||||
private AlertTrigger trigger;
|
||||
private List<AlertAction> actions;
|
||||
private String schedule;
|
||||
|
@ -30,7 +30,10 @@ public class Alert implements ToXContent {
|
|||
private TimeValue throttlePeriod = new TimeValue(0);
|
||||
private DateTime timeLastActionExecuted = null;
|
||||
private AlertAckState ackState = AlertAckState.NOT_ACKABLE;
|
||||
private Map<String,Object> metadata = null;
|
||||
|
||||
//Optional
|
||||
private Map<String,Object> metadata;
|
||||
private SearchRequest payloadSearchRequest;
|
||||
|
||||
private transient long version;
|
||||
private transient XContentType contentType;
|
||||
|
@ -40,9 +43,9 @@ public class Alert implements ToXContent {
|
|||
}
|
||||
|
||||
|
||||
public Alert(String alertName, SearchRequest searchRequest, AlertTrigger trigger, List<AlertAction> actions, String schedule, DateTime lastExecuteTime, long version, TimeValue throttlePeriod, AlertAckState ackState) {
|
||||
public Alert(String alertName, SearchRequest triggerSearchRequest, AlertTrigger trigger, List<AlertAction> actions, String schedule, DateTime lastExecuteTime, long version, TimeValue throttlePeriod, AlertAckState ackState) {
|
||||
this.alertName = alertName;
|
||||
this.searchRequest = searchRequest;
|
||||
this.triggerSearchRequest = triggerSearchRequest;
|
||||
this.trigger = trigger;
|
||||
this.actions = actions;
|
||||
this.schedule = schedule;
|
||||
|
@ -56,8 +59,14 @@ public class Alert implements ToXContent {
|
|||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(AlertsStore.SCHEDULE_FIELD.getPreferredName(), schedule);
|
||||
builder.field(AlertsStore.REQUEST_FIELD.getPreferredName());
|
||||
AlertUtils.writeSearchRequest(searchRequest, builder, params);
|
||||
builder.field(AlertsStore.TRIGGER_REQUEST_FIELD.getPreferredName());
|
||||
AlertUtils.writeSearchRequest(triggerSearchRequest, builder, params);
|
||||
|
||||
if (payloadSearchRequest != null) {
|
||||
builder.field(AlertsStore.PAYLOAD_REQUEST_FIELD.getPreferredName());
|
||||
AlertUtils.writeSearchRequest(payloadSearchRequest, builder, params);
|
||||
}
|
||||
|
||||
builder.field(AlertsStore.THROTTLE_PERIOD_FIELD.getPreferredName(), throttlePeriod.millis());
|
||||
builder.field(AlertsStore.ACK_STATE_FIELD.getPreferredName(), ackState.toString());
|
||||
|
||||
|
@ -138,12 +147,12 @@ public class Alert implements ToXContent {
|
|||
/**
|
||||
* @return The search request that runs when the alert runs by the sc
|
||||
*/
|
||||
public SearchRequest getSearchRequest() {
|
||||
return searchRequest;
|
||||
public SearchRequest getTriggerSearchRequest() {
|
||||
return triggerSearchRequest;
|
||||
}
|
||||
|
||||
void setSearchRequest(SearchRequest searchRequest) {
|
||||
this.searchRequest = searchRequest;
|
||||
public void setTriggerSearchRequest(SearchRequest triggerSearchRequest) {
|
||||
this.triggerSearchRequest = triggerSearchRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -223,6 +232,17 @@ public class Alert implements ToXContent {
|
|||
this.metadata = metadata;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the search request that will be run for actions
|
||||
*/
|
||||
public SearchRequest getPayloadSearchRequest() {
|
||||
return payloadSearchRequest;
|
||||
}
|
||||
|
||||
public void setPayloadSearchRequest(SearchRequest payloadSearchRequest) {
|
||||
this.payloadSearchRequest = payloadSearchRequest;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
|
|
|
@ -10,12 +10,15 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.alerts.actions.AlertActionEntry;
|
||||
import org.elasticsearch.alerts.actions.AlertActionManager;
|
||||
import org.elasticsearch.alerts.actions.AlertActionRegistry;
|
||||
import org.elasticsearch.alerts.scheduler.AlertScheduler;
|
||||
import org.elasticsearch.alerts.triggers.TriggerManager;
|
||||
import org.elasticsearch.alerts.triggers.TriggerResult;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -28,14 +31,20 @@ import org.elasticsearch.common.joda.time.DateTime;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
||||
public class AlertManager extends AbstractComponent {
|
||||
|
||||
private final AlertScheduler scheduler;
|
||||
|
@ -45,6 +54,8 @@ public class AlertManager extends AbstractComponent {
|
|||
private final AlertActionRegistry actionRegistry;
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
private final ScriptService scriptService;
|
||||
private final Client client;
|
||||
private final KeyedLock<String> alertLock = new KeyedLock<>();
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
|
||||
|
||||
|
@ -53,7 +64,7 @@ public class AlertManager extends AbstractComponent {
|
|||
@Inject
|
||||
public AlertManager(Settings settings, ClusterService clusterService, AlertScheduler scheduler, AlertsStore alertsStore,
|
||||
IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager,
|
||||
AlertActionRegistry actionRegistry, ThreadPool threadPool) {
|
||||
AlertActionRegistry actionRegistry, ThreadPool threadPool, ScriptService scriptService, Client client) {
|
||||
super(settings);
|
||||
this.scheduler = scheduler;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -64,6 +75,8 @@ public class AlertManager extends AbstractComponent {
|
|||
this.actionManager.setAlertManager(this);
|
||||
this.actionRegistry = actionRegistry;
|
||||
this.clusterService = clusterService;
|
||||
this.scriptService = scriptService;
|
||||
this.client = client;
|
||||
clusterService.add(new AlertsClusterStateListener());
|
||||
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
|
||||
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
|
||||
|
@ -140,9 +153,18 @@ public class AlertManager extends AbstractComponent {
|
|||
throw new ElasticsearchException("Alert is not available");
|
||||
}
|
||||
TriggerResult triggerResult = triggerManager.isTriggered(alert, entry.getScheduledTime(), entry.getFireTime());
|
||||
|
||||
if (triggerResult.isTriggered()) {
|
||||
triggerResult.setThrottled(isActionThrottled(alert));
|
||||
if (!triggerResult.isThrottled()) {
|
||||
if (alert.getPayloadSearchRequest() != null) {
|
||||
SearchRequest payloadRequest = AlertUtils.createSearchRequestWithTimes(alert.getPayloadSearchRequest(), entry.getScheduledTime(), entry.getFireTime(), scriptService);
|
||||
SearchResponse payloadResponse = client.search(payloadRequest).actionGet();
|
||||
triggerResult.setPayloadRequest(payloadRequest);
|
||||
XContentBuilder builder = jsonBuilder().startObject().value(payloadResponse).endObject();
|
||||
Map<String, Object> responseMap = XContentHelper.convertToMap(builder.bytes(), false).v2();
|
||||
triggerResult.setPayloadResponse(responseMap);
|
||||
}
|
||||
actionRegistry.doAction(alert, triggerResult);
|
||||
alert.setTimeLastActionExecuted(entry.getScheduledTime());
|
||||
if (alert.getAckState() == AlertAckState.NOT_TRIGGERED) {
|
||||
|
|
|
@ -6,38 +6,79 @@
|
|||
package org.elasticsearch.alerts;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.mapper.core.DateFieldMapper;
|
||||
import org.elasticsearch.script.ExecutableScript;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
*/
|
||||
public final class AlertUtils {
|
||||
|
||||
public final static IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.lenientExpandOpen();
|
||||
public final static SearchType DEFAULT_SEARCH_TYPE = SearchType.COUNT;
|
||||
public final static SearchType DEFAULT_TRIGGER_SEARCH_TYPE = SearchType.COUNT;
|
||||
public final static SearchType DEFAULT_PAYLOAD_SEARCH_TYPE = SearchType.DFS_QUERY_AND_FETCH;
|
||||
|
||||
private static final String FIRE_TIME_VARIABLE_NAME = "FIRE_TIME";
|
||||
private static final String SCHEDULED_FIRE_TIME_VARIABLE_NAME = "SCHEDULED_FIRE_TIME";
|
||||
|
||||
public static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER;
|
||||
|
||||
private AlertUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new search request applying the scheduledFireTime and fireTime to the original request
|
||||
*/
|
||||
public static SearchRequest createSearchRequestWithTimes(SearchRequest request, DateTime scheduledFireTime, DateTime fireTime, ScriptService scriptService) throws IOException {
|
||||
SearchRequest triggerSearchRequest = new SearchRequest(request)
|
||||
.indicesOptions(request.indicesOptions())
|
||||
.indices(request.indices());
|
||||
if (Strings.hasLength(request.source())) {
|
||||
Map<String, String> templateParams = new HashMap<>();
|
||||
templateParams.put(SCHEDULED_FIRE_TIME_VARIABLE_NAME, dateTimeFormatter.printer().print(scheduledFireTime));
|
||||
templateParams.put(FIRE_TIME_VARIABLE_NAME, dateTimeFormatter.printer().print(fireTime));
|
||||
String requestSource = XContentHelper.convertToJson(request.source(), false);
|
||||
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
|
||||
triggerSearchRequest.source((BytesReference) script.unwrap(script.run()), false);
|
||||
} else if (request.templateName() != null) {
|
||||
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(request.templateParams())
|
||||
.put(SCHEDULED_FIRE_TIME_VARIABLE_NAME, dateTimeFormatter.printer().print(scheduledFireTime))
|
||||
.put(FIRE_TIME_VARIABLE_NAME, dateTimeFormatter.printer().print(fireTime));
|
||||
triggerSearchRequest.templateParams(templateParams.map());
|
||||
triggerSearchRequest.templateName(request.templateName());
|
||||
triggerSearchRequest.templateType(request.templateType());
|
||||
} else {
|
||||
throw new ElasticsearchIllegalStateException("Search requests needs either source or template name");
|
||||
}
|
||||
return triggerSearchRequest;
|
||||
}
|
||||
|
||||
public static SearchRequest readSearchRequest(XContentParser parser) throws IOException {
|
||||
return readSearchRequest(parser, DEFAULT_TRIGGER_SEARCH_TYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a new search request instance for the specified parser.
|
||||
*/
|
||||
public static SearchRequest readSearchRequest(XContentParser parser) throws IOException {
|
||||
public static SearchRequest readSearchRequest(XContentParser parser, SearchType searchType) throws IOException {
|
||||
SearchRequest searchRequest = new SearchRequest();
|
||||
IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
|
||||
SearchType searchType = DEFAULT_SEARCH_TYPE;
|
||||
|
||||
XContentParser.Token token;
|
||||
String searchRequestFieldName = null;
|
||||
|
@ -182,7 +223,7 @@ public final class AlertUtils {
|
|||
builder.field("allow_no_indices", options.allowNoIndices());
|
||||
builder.endObject();
|
||||
}
|
||||
if (searchRequest.searchType() != DEFAULT_SEARCH_TYPE) {
|
||||
if (searchRequest.searchType() != DEFAULT_TRIGGER_SEARCH_TYPE) {
|
||||
builder.field("search_type", searchRequest.searchType().toString().toLowerCase(Locale.ENGLISH));
|
||||
}
|
||||
builder.endObject();
|
||||
|
|
|
@ -52,7 +52,8 @@ public class AlertsStore extends AbstractComponent {
|
|||
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
|
||||
public static final ParseField ACTION_FIELD = new ParseField("actions");
|
||||
public static final ParseField LAST_ACTION_FIRE = new ParseField("last_alert_executed");
|
||||
public static final ParseField REQUEST_FIELD = new ParseField("request");
|
||||
public static final ParseField TRIGGER_REQUEST_FIELD = new ParseField("trigger_request");
|
||||
public static final ParseField PAYLOAD_REQUEST_FIELD = new ParseField("payload_request");
|
||||
public static final ParseField THROTTLE_PERIOD_FIELD = new ParseField("throttle_period");
|
||||
public static final ParseField LAST_ACTION_EXECUTED_FIELD = new ParseField("last_action_executed");
|
||||
public static final ParseField ACK_STATE_FIELD = new ParseField("ack_state");
|
||||
|
@ -253,8 +254,10 @@ public class AlertsStore extends AbstractComponent {
|
|||
} else if (ACTION_FIELD.match(currentFieldName)) {
|
||||
List<AlertAction> actions = alertActionRegistry.instantiateAlertActions(parser);
|
||||
alert.setActions(actions);
|
||||
} else if (REQUEST_FIELD.match(currentFieldName)) {
|
||||
alert.setSearchRequest(AlertUtils.readSearchRequest(parser));
|
||||
} else if (TRIGGER_REQUEST_FIELD.match(currentFieldName)) {
|
||||
alert.setTriggerSearchRequest(AlertUtils.readSearchRequest(parser, AlertUtils.DEFAULT_TRIGGER_SEARCH_TYPE));
|
||||
} else if (PAYLOAD_REQUEST_FIELD.match(currentFieldName)) {
|
||||
alert.setPayloadSearchRequest(AlertUtils.readSearchRequest(parser, AlertUtils.DEFAULT_PAYLOAD_SEARCH_TYPE));
|
||||
} else if (META_FIELD.match(currentFieldName)) {
|
||||
alert.setMetadata(parser.map());
|
||||
} else {
|
||||
|
|
|
@ -31,14 +31,14 @@ public class AlertActionEntry implements ToXContent {
|
|||
private AlertTrigger trigger;
|
||||
private List<AlertAction> actions;
|
||||
private AlertActionState state;
|
||||
private SearchRequest searchRequest;
|
||||
/*Optional*/
|
||||
private Map<String, Object> searchResponse;
|
||||
private SearchRequest triggerRequest;
|
||||
|
||||
|
||||
/*Optional*/
|
||||
private Map<String, Object> triggerResponse;
|
||||
private boolean triggered;
|
||||
private String errorMsg;
|
||||
private Map<String,Object> metadata;
|
||||
|
||||
private transient long version;
|
||||
private transient XContentType contentType;
|
||||
|
||||
|
@ -53,11 +53,10 @@ public class AlertActionEntry implements ToXContent {
|
|||
this.trigger = alert.getTrigger();
|
||||
this.actions = alert.getActions();
|
||||
this.state = state;
|
||||
this.searchRequest = alert.getSearchRequest();
|
||||
this.metadata = alert.getMetadata();
|
||||
|
||||
this.version = 1;
|
||||
this.contentType = alert.getContentType();
|
||||
this.triggerRequest = alert.getTriggerSearchRequest();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -129,23 +128,23 @@ public class AlertActionEntry implements ToXContent {
|
|||
/**
|
||||
* @return The query that ran at fire time
|
||||
*/
|
||||
public SearchRequest getSearchRequest() {
|
||||
return searchRequest;
|
||||
public SearchRequest getTriggerRequest() {
|
||||
return triggerRequest;
|
||||
}
|
||||
|
||||
void setSearchRequest(SearchRequest searchRequest) {
|
||||
this.searchRequest = searchRequest;
|
||||
public void setTriggerRequest(SearchRequest triggerRequest) {
|
||||
this.triggerRequest = triggerRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The search response that resulted at out the search request that ran.
|
||||
*/
|
||||
public Map<String, Object> getSearchResponse() {
|
||||
return searchResponse;
|
||||
public Map<String, Object> getTriggerResponse() {
|
||||
return triggerResponse;
|
||||
}
|
||||
|
||||
void setSearchResponse(Map<String, Object> searchResponse) {
|
||||
this.searchResponse = searchResponse;
|
||||
public void setTriggerResponse(Map<String, Object> triggerResponse) {
|
||||
this.triggerResponse = triggerResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -223,9 +222,9 @@ public class AlertActionEntry implements ToXContent {
|
|||
historyEntry.field(trigger.getTriggerName(), trigger, params);
|
||||
historyEntry.endObject();
|
||||
historyEntry.field("request");
|
||||
AlertUtils.writeSearchRequest(searchRequest, historyEntry, params);
|
||||
if (searchResponse != null) {
|
||||
historyEntry.field("response", searchResponse);
|
||||
AlertUtils.writeSearchRequest(triggerRequest, historyEntry, params);
|
||||
if (triggerResponse != null) {
|
||||
historyEntry.field("response", triggerResponse);
|
||||
}
|
||||
|
||||
historyEntry.startObject("actions");
|
||||
|
|
|
@ -223,10 +223,10 @@ public class AlertActionManager extends AbstractComponent {
|
|||
entry.setTrigger(triggerManager.instantiateAlertTrigger(parser));
|
||||
break;
|
||||
case REQUEST:
|
||||
entry.setSearchRequest(AlertUtils.readSearchRequest(parser));
|
||||
entry.setTriggerRequest(AlertUtils.readSearchRequest(parser));
|
||||
break;
|
||||
case RESPONSE:
|
||||
entry.setSearchResponse(parser.map());
|
||||
entry.setTriggerResponse(parser.map());
|
||||
break;
|
||||
case METADATA:
|
||||
entry.setMetadata(parser.map());
|
||||
|
@ -355,7 +355,7 @@ public class AlertActionManager extends AbstractComponent {
|
|||
updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY);
|
||||
logger.debug("Running an alert action entry for [{}]", entry.getAlertName());
|
||||
TriggerResult result = alertManager.executeAlert(entry);
|
||||
entry.setSearchResponse(result.getResponse());
|
||||
entry.setTriggerResponse(result.getTriggerResponse());
|
||||
if (result.isTriggered()) {
|
||||
entry.setTriggered(true);
|
||||
if (result.isThrottled()) {
|
||||
|
|
|
@ -59,11 +59,11 @@ public class AlertActionRegistry extends AbstractComponent {
|
|||
return actions;
|
||||
}
|
||||
|
||||
public void doAction(Alert alert, TriggerResult actionEntry){
|
||||
public void doAction(Alert alert, TriggerResult triggerResult){
|
||||
for (AlertAction action : alert.getActions()) {
|
||||
AlertActionFactory factory = actionImplemented.get(action.getActionName());
|
||||
if (factory != null) {
|
||||
factory.doAction(action, alert, actionEntry);
|
||||
factory.doAction(action, alert, triggerResult);
|
||||
} else {
|
||||
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + action.getActionName() + "]");
|
||||
}
|
||||
|
|
|
@ -92,12 +92,12 @@ public class EmailAlertActionFactory implements AlertActionFactory {
|
|||
message.setSubject("Elasticsearch Alert " + alert.getAlertName() + " triggered");
|
||||
StringBuilder output = new StringBuilder();
|
||||
output.append("The following query triggered because ").append(result.getTrigger().toString()).append("\n");
|
||||
Object totalHits = XContentMapValues.extractValue("hits.total", result.getResponse());
|
||||
Object totalHits = XContentMapValues.extractValue("hits.total", result.getTriggerResponse());
|
||||
output.append("The total number of hits returned : ").append(totalHits).append("\n");
|
||||
output.append("For query : ").append(result.getRequest());
|
||||
output.append("For query : ").append(result.getActionRequest());
|
||||
output.append("\n");
|
||||
output.append("Indices : ");
|
||||
for (String index : result.getRequest().indices()) {
|
||||
for (String index : result.getActionRequest().indices()) {
|
||||
output.append(index);
|
||||
output.append("/");
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ public class EmailAlertActionFactory implements AlertActionFactory {
|
|||
output.append("\n");
|
||||
|
||||
if (emailAlertAction.getDisplayField() != null) {
|
||||
List<Map<String, Object>> hits = (List<Map<String, Object>>) XContentMapValues.extractValue("hits.hits", result.getResponse());
|
||||
List<Map<String, Object>> hits = (List<Map<String, Object>>) XContentMapValues.extractValue("hits.hits", result.getActionResponse());
|
||||
for (Map<String, Object> hit : hits) {
|
||||
Map<String, Object> _source = (Map<String, Object>) hit.get("_source");
|
||||
if (_source.containsKey(emailAlertAction.getDisplayField())) {
|
||||
|
@ -116,7 +116,7 @@ public class EmailAlertActionFactory implements AlertActionFactory {
|
|||
output.append("\n");
|
||||
}
|
||||
} else {
|
||||
output.append(result.getResponse().toString());
|
||||
output.append(result.getActionResponse().toString());
|
||||
}
|
||||
|
||||
message.setText(output.toString());
|
||||
|
|
|
@ -70,7 +70,7 @@ public class IndexAlertActionFactory implements AlertActionFactory {
|
|||
try {
|
||||
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
|
||||
resultBuilder.startObject();
|
||||
resultBuilder.field("response", result.getResponse());
|
||||
resultBuilder.field("response", result.getActionResponse());
|
||||
resultBuilder.field("timestamp", alert.getLastExecuteTime()); ///@TODO FIXME the firetime should be in the result ?
|
||||
resultBuilder.endObject();
|
||||
indexRequest.source(resultBuilder);
|
||||
|
|
|
@ -6,40 +6,29 @@
|
|||
package org.elasticsearch.alerts.triggers;
|
||||
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.alerts.Alert;
|
||||
import org.elasticsearch.alerts.AlertUtils;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.index.mapper.core.DateFieldMapper;
|
||||
import org.elasticsearch.script.ExecutableScript;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
|
||||
public class TriggerManager extends AbstractComponent {
|
||||
|
||||
private static final String FIRE_TIME_VARIABLE_NAME = "FIRE_TIME";
|
||||
private static final String SCHEDULED_FIRE_TIME_VARIABLE_NAME = "SCHEDULED_FIRE_TIME";
|
||||
public static final FormatDateTimeFormatter dateTimeFormatter = DateFieldMapper.Defaults.DATE_TIME_FORMATTER;
|
||||
|
||||
private final Client client;
|
||||
private final ScriptService scriptService;
|
||||
private volatile ImmutableOpenMap<String, TriggerFactory> triggersImplemented;
|
||||
|
@ -101,7 +90,7 @@ public class TriggerManager extends AbstractComponent {
|
|||
* @throws IOException
|
||||
*/
|
||||
public TriggerResult isTriggered(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
|
||||
SearchRequest request = prepareTriggerSearch(alert, scheduledFireTime, fireTime);
|
||||
SearchRequest request = AlertUtils.createSearchRequestWithTimes(alert.getTriggerSearchRequest(), scheduledFireTime, fireTime, scriptService);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("For alert [{}] running query for [{}]", alert.getAlertName(), XContentHelper.convertToJson(request.source(), false, true));
|
||||
}
|
||||
|
@ -129,28 +118,4 @@ public class TriggerManager extends AbstractComponent {
|
|||
return new TriggerResult(triggered, request, response, trigger);
|
||||
}
|
||||
|
||||
private SearchRequest prepareTriggerSearch(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
|
||||
SearchRequest triggerSearchRequest = new SearchRequest(alert.getSearchRequest())
|
||||
.indicesOptions(alert.getSearchRequest().indicesOptions())
|
||||
.indices(alert.getSearchRequest().indices());
|
||||
if (Strings.hasLength(alert.getSearchRequest().source())) {
|
||||
Map<String, String> templateParams = new HashMap<>();
|
||||
templateParams.put(SCHEDULED_FIRE_TIME_VARIABLE_NAME, dateTimeFormatter.printer().print(scheduledFireTime));
|
||||
templateParams.put(FIRE_TIME_VARIABLE_NAME, dateTimeFormatter.printer().print(fireTime));
|
||||
String requestSource = XContentHelper.convertToJson(alert.getSearchRequest().source(), false);
|
||||
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
|
||||
triggerSearchRequest.source((BytesReference) script.unwrap(script.run()), false);
|
||||
} else if (alert.getSearchRequest().templateName() != null) {
|
||||
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(alert.getSearchRequest().templateParams())
|
||||
.put(SCHEDULED_FIRE_TIME_VARIABLE_NAME, dateTimeFormatter.printer().print(scheduledFireTime))
|
||||
.put(FIRE_TIME_VARIABLE_NAME, dateTimeFormatter.printer().print(fireTime));
|
||||
triggerSearchRequest.templateParams(templateParams.map());
|
||||
triggerSearchRequest.templateName(alert.getSearchRequest().templateName());
|
||||
triggerSearchRequest.templateType(alert.getSearchRequest().templateType());
|
||||
} else {
|
||||
throw new ElasticsearchIllegalStateException("Search requests needs either source or template name");
|
||||
}
|
||||
return triggerSearchRequest;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -15,14 +15,17 @@ public class TriggerResult {
|
|||
|
||||
private final boolean triggered;
|
||||
private boolean throttled;
|
||||
private final SearchRequest request;
|
||||
private final Map<String, Object> response;
|
||||
private final SearchRequest triggerRequest;
|
||||
private final Map<String, Object> triggerResponse;
|
||||
private final AlertTrigger trigger;
|
||||
|
||||
public TriggerResult(boolean triggered, SearchRequest request, Map<String, Object> response, AlertTrigger trigger) {
|
||||
private SearchRequest payloadRequest = null;
|
||||
private Map<String, Object> payloadResponse = null;
|
||||
|
||||
public TriggerResult(boolean triggered, SearchRequest triggerRequest, Map<String, Object> triggerResponse, AlertTrigger trigger) {
|
||||
this.triggered = triggered;
|
||||
this.request = request;
|
||||
this.response = response;
|
||||
this.triggerRequest = triggerRequest;
|
||||
this.triggerResponse = triggerResponse;
|
||||
this.trigger = trigger;
|
||||
}
|
||||
|
||||
|
@ -38,16 +41,61 @@ public class TriggerResult {
|
|||
this.throttled = throttled;
|
||||
}
|
||||
|
||||
public SearchRequest getRequest() {
|
||||
return request;
|
||||
/**
|
||||
* Get's the request to trigger
|
||||
*/
|
||||
public SearchRequest getTriggerRequest() {
|
||||
return triggerRequest;
|
||||
}
|
||||
|
||||
public Map<String, Object> getResponse() {
|
||||
return response;
|
||||
/**
|
||||
* The response from the trigger request
|
||||
* @return
|
||||
*/
|
||||
public Map<String, Object> getTriggerResponse() {
|
||||
return triggerResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* The request to generate the payloads for the alert actions
|
||||
* @return
|
||||
*/
|
||||
public SearchRequest getPayloadRequest() {
|
||||
return payloadRequest;
|
||||
}
|
||||
|
||||
public void setPayloadRequest(SearchRequest payloadRequest) {
|
||||
this.payloadRequest = payloadRequest;
|
||||
}
|
||||
|
||||
/**
|
||||
* The response from the payload request
|
||||
* @return
|
||||
*/
|
||||
public Map<String,Object> getPayloadResponse() {
|
||||
return payloadResponse;
|
||||
}
|
||||
|
||||
public void setPayloadResponse(Map<String, Object> payloadResponse) {
|
||||
this.payloadResponse = payloadResponse;
|
||||
}
|
||||
|
||||
public AlertTrigger getTrigger() {
|
||||
return trigger;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the response the actions should use
|
||||
*/
|
||||
public Map<String, Object> getActionResponse() {
|
||||
return payloadResponse != null ? payloadResponse : triggerResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the request the actions should use
|
||||
*/
|
||||
public SearchRequest getActionRequest() {
|
||||
return payloadRequest != null ? payloadRequest : triggerRequest;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -38,7 +38,12 @@
|
|||
"enabled" : false,
|
||||
"dynamic" : true
|
||||
},
|
||||
"request": {
|
||||
"trigger_request": {
|
||||
"type" : "object",
|
||||
"enabled" : false,
|
||||
"dynamic" : true
|
||||
},
|
||||
"payload_request": {
|
||||
"type" : "object",
|
||||
"enabled" : false,
|
||||
"dynamic" : true
|
||||
|
|
|
@ -106,7 +106,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
|||
}
|
||||
builder.startObject();
|
||||
builder.field("schedule", cron);
|
||||
builder.field("request");
|
||||
builder.field("trigger_request");
|
||||
AlertUtils.writeSearchRequest(request, builder, ToXContent.EMPTY_PARAMS);
|
||||
|
||||
if (metadata != null) {
|
||||
|
@ -133,7 +133,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
|
|||
protected SearchRequest createTriggerSearchRequest(String... indices) {
|
||||
SearchRequest request = new SearchRequest(indices);
|
||||
request.indicesOptions(AlertUtils.DEFAULT_INDICES_OPTIONS);
|
||||
request.searchType(AlertUtils.DEFAULT_SEARCH_TYPE);
|
||||
request.searchType(AlertUtils.DEFAULT_TRIGGER_SEARCH_TYPE);
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -23,17 +22,21 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class AlertSerializationTest extends ElasticsearchIntegrationTest {
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
|
||||
public class AlertSerializationTest extends AbstractAlertingTests {
|
||||
|
||||
@Test
|
||||
public void testAlertSerialization() throws Exception {
|
||||
|
||||
SearchRequest request = new SearchRequest();
|
||||
request.indices("my-index");
|
||||
SearchRequest triggerRequest = createTriggerSearchRequest("my-trigger-index").source(searchSource().query(matchAllQuery()));
|
||||
SearchRequest payloadRequest = createTriggerSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
|
||||
|
||||
List<AlertAction> actions = new ArrayList<>();
|
||||
actions.add(new EmailAlertAction("message", "foo@bar.com"));
|
||||
Alert alert = new Alert("test-serialization",
|
||||
request,
|
||||
triggerRequest,
|
||||
new ScriptedTrigger("return true", ScriptService.ScriptType.INLINE, "groovy"),
|
||||
actions,
|
||||
"0/5 * * * * ? *",
|
||||
|
@ -42,8 +45,9 @@ public class AlertSerializationTest extends ElasticsearchIntegrationTest {
|
|||
new TimeValue(0),
|
||||
AlertAckState.NOT_TRIGGERED);
|
||||
|
||||
alert.setPayloadSearchRequest(payloadRequest);
|
||||
Map<String, Object> metadata = new HashMap<>();
|
||||
metadata.put("foo","bar");
|
||||
metadata.put("foo", "bar");
|
||||
metadata.put("list", "baz");
|
||||
alert.setMetadata(metadata);
|
||||
|
||||
|
@ -51,14 +55,16 @@ public class AlertSerializationTest extends ElasticsearchIntegrationTest {
|
|||
alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||
|
||||
final AlertsStore alertsStore =
|
||||
internalCluster().getInstance(AlertsStore.class, internalCluster().getMasterName());
|
||||
internalTestCluster().getInstance(AlertsStore.class, internalTestCluster().getMasterName());
|
||||
|
||||
Alert parsedAlert = alertsStore.parseAlert("test-serialization", jsonBuilder.bytes());
|
||||
|
||||
assertEquals(parsedAlert.getVersion(), alert.getVersion());
|
||||
assertEquals(parsedAlert.getActions(), alert.getActions());
|
||||
assertEquals(parsedAlert.getLastExecuteTime().getMillis(), alert.getLastExecuteTime().getMillis());
|
||||
assertEquals(parsedAlert.getSchedule(), alert.getSchedule());
|
||||
assertEquals(parsedAlert.getSearchRequest().source(), alert.getSearchRequest().source());
|
||||
assertEquals(parsedAlert.getTriggerSearchRequest().indices()[0], "my-trigger-index");
|
||||
assertEquals(parsedAlert.getPayloadSearchRequest().indices()[0], "my-payload-index");
|
||||
assertEquals(parsedAlert.getTrigger(), alert.getTrigger());
|
||||
assertEquals(parsedAlert.getThrottlePeriod(), alert.getThrottlePeriod());
|
||||
if (parsedAlert.getTimeLastActionExecuted() == null) {
|
||||
|
@ -68,7 +74,4 @@ public class AlertSerializationTest extends ElasticsearchIntegrationTest {
|
|||
assertEquals(parsedAlert.getMetadata(), alert.getMetadata());
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -52,11 +52,11 @@ public class AlertThrottleTests extends AbstractAlertingTests {
|
|||
|
||||
Alert alert = new Alert();
|
||||
alert.setAckState(AlertAckState.NOT_TRIGGERED);
|
||||
alert.setSearchRequest(createTriggerSearchRequest("test-index").source(searchSource().query(matchAllQuery())));
|
||||
|
||||
alert.setTriggerSearchRequest(createTriggerSearchRequest("test-index").source(searchSource().query(matchAllQuery())));
|
||||
alert.setTrigger(new ScriptedTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"));
|
||||
alert.getActions().add(new IndexAlertAction("action-index", "action-type"));
|
||||
alert.setSchedule("0/5 * * * * ? *");
|
||||
|
||||
alert.lastExecuteTime(new DateTime());
|
||||
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
||||
alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||
|
@ -124,10 +124,11 @@ public class AlertThrottleTests extends AbstractAlertingTests {
|
|||
|
||||
Alert alert = new Alert();
|
||||
alert.setAckState(AlertAckState.NOT_ACKABLE);
|
||||
alert.setSearchRequest(createTriggerSearchRequest("test-index").source(searchSource().query(matchAllQuery())));
|
||||
alert.setTriggerSearchRequest(createTriggerSearchRequest("test-index").source(searchSource().query(matchAllQuery())));
|
||||
alert.setTrigger(new ScriptedTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"));
|
||||
alert.getActions().add(new IndexAlertAction("action-index", "action-type"));
|
||||
alert.setSchedule("0/5 * * * * ? *");
|
||||
|
||||
alert.lastExecuteTime(new DateTime());
|
||||
alert.setThrottlePeriod(new TimeValue(10, TimeUnit.SECONDS));
|
||||
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.alerts;
|
||||
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.alerts.actions.AlertAction;
|
||||
import org.elasticsearch.alerts.actions.IndexAlertAction;
|
||||
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
|
||||
import org.elasticsearch.alerts.triggers.ScriptedTrigger;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class PayloadSearchTest extends AbstractAlertingTests {
|
||||
|
||||
@Test
|
||||
public void testPayloadSearchRequest() throws Exception {
|
||||
createIndex("my-trigger-index", "my-payload-index", "my-payload-output");
|
||||
ensureGreen("my-trigger-index", "my-payload-index", "my-payload-output");
|
||||
|
||||
index("my-payload-index","payload", "mytestresult");
|
||||
refresh();
|
||||
|
||||
SearchRequest triggerRequest = createTriggerSearchRequest("my-trigger-index").source(searchSource().query(matchAllQuery()));
|
||||
SearchRequest payloadRequest = createTriggerSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
|
||||
payloadRequest.searchType(AlertUtils.DEFAULT_PAYLOAD_SEARCH_TYPE);
|
||||
|
||||
List<AlertAction> actions = new ArrayList<>();
|
||||
actions.add(new IndexAlertAction("my-payload-output","result"));
|
||||
Alert alert = new Alert("test-payload",
|
||||
triggerRequest,
|
||||
new ScriptedTrigger("return true", ScriptService.ScriptType.INLINE, "groovy"),
|
||||
actions,
|
||||
"0/5 * * * * ? *",
|
||||
new DateTime(),
|
||||
0,
|
||||
new TimeValue(0),
|
||||
AlertAckState.NOT_ACKABLE);
|
||||
|
||||
alert.setPayloadSearchRequest(payloadRequest);
|
||||
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
|
||||
alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
|
||||
PutAlertResponse putAlertResponse = alertClient().preparePutAlert("test-payload").setAlertSource(jsonBuilder.bytes()).get();
|
||||
assertTrue(putAlertResponse.indexResponse().isCreated());
|
||||
|
||||
assertAlertTriggered("test-payload", 1, false);
|
||||
refresh();
|
||||
SearchRequest searchRequest = client().prepareSearch("my-payload-output").request();
|
||||
searchRequest.source(searchSource().query(matchAllQuery()));
|
||||
SearchResponse searchResponse = client().search(searchRequest).actionGet();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), greaterThan(0L));
|
||||
SearchHit hit = searchResponse.getHits().getHits()[0];
|
||||
String source = hit.getSourceRef().toUtf8();
|
||||
|
||||
assertTrue(source.contains("mytestresult"));
|
||||
}
|
||||
}
|
|
@ -19,7 +19,6 @@ import org.elasticsearch.alerts.transport.actions.put.PutAlertRequest;
|
|||
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
|
||||
import org.elasticsearch.alerts.triggers.AlertTrigger;
|
||||
import org.elasticsearch.alerts.triggers.ScriptedTrigger;
|
||||
import org.elasticsearch.alerts.triggers.TriggerManager;
|
||||
import org.elasticsearch.alerts.triggers.TriggerResult;
|
||||
import org.elasticsearch.common.joda.time.DateTime;
|
||||
import org.elasticsearch.common.joda.time.DateTimeZone;
|
||||
|
@ -70,8 +69,8 @@ public class AlertActionsTest extends AbstractAlertingTests {
|
|||
builder.startObject();
|
||||
builder.field(AlertActionManager.ALERT_NAME_FIELD, "testName");
|
||||
builder.field(AlertActionManager.TRIGGERED_FIELD, true);
|
||||
builder.field(AlertActionManager.FIRE_TIME_FIELD, TriggerManager.dateTimeFormatter.printer().print(fireTime));
|
||||
builder.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, TriggerManager.dateTimeFormatter.printer().print(scheduledFireTime));
|
||||
builder.field(AlertActionManager.FIRE_TIME_FIELD, AlertUtils.dateTimeFormatter.printer().print(fireTime));
|
||||
builder.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, AlertUtils.dateTimeFormatter.printer().print(scheduledFireTime));
|
||||
builder.field(AlertActionManager.TRIGGER_FIELD, triggerMap);
|
||||
SearchRequest searchRequest = new SearchRequest("test123");
|
||||
builder.field(AlertActionManager.REQUEST);
|
||||
|
@ -96,7 +95,7 @@ public class AlertActionsTest extends AbstractAlertingTests {
|
|||
assertEquals(actionEntry.getScheduledTime(), scheduledFireTime);
|
||||
assertEquals(actionEntry.getFireTime(), fireTime);
|
||||
assertEquals(actionEntry.getState(), AlertActionState.SEARCH_NEEDED);
|
||||
assertEquals(XContentMapValues.extractValue("hits.total", actionEntry.getSearchResponse()), 10);
|
||||
assertEquals(XContentMapValues.extractValue("hits.total", actionEntry.getTriggerResponse()), 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue