Replacing "trigger" with "condition"

- renamed "trigger" notion to "condition"

- the main parts that make an alert are:
 - **schedule** - determines when/how often should the alert be checked
 - **condition** - determines whether the alert should execute
 - **actions** - define what the alert should do

- the lifecycle terminology of a fired alert changed as well, to the following
 - **fired** - the schedule fired an event indicating the alert should be **checked**
 - **checked** - the condition associated with the alert was checked - either it was met (indicating the the alert should be executed) or it wasn't (indicating the alert should not be executed)
 - **throttled** - although the condition was met, the system decided **not** to execute the alert after all based on the throttling logic
 - **executed** - the condition of the alert was met, and the system decided it should not throttle it, thefore the actions of the alert were executed.

- `FiredAlert.State` changed to reflect the new lifecycle (as described above)

Original commit: elastic/x-pack-elasticsearch@d67d13d982
This commit is contained in:
uboness 2015-02-17 12:26:41 +01:00
parent 61761286e0
commit 37d9fd062e
45 changed files with 719 additions and 744 deletions

View File

@ -13,8 +13,8 @@ import org.elasticsearch.alerts.throttle.AlertThrottler;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.transform.TransformRegistry;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.condition.ConditionRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
@ -41,7 +41,7 @@ public class Alert implements ToXContent {
private final String name;
private final Schedule schedule;
private final Trigger trigger;
private final Condition condition;
private final Actions actions;
private final Throttler throttler;
private final Status status;
@ -53,10 +53,10 @@ public class Alert implements ToXContent {
@Nullable
private final Transform transform;
public Alert(String name, Schedule schedule, Trigger trigger, Transform transform, TimeValue throttlePeriod, Actions actions, Map<String, Object> metadata, Status status) {
public Alert(String name, Schedule schedule, Condition condition, Transform transform, TimeValue throttlePeriod, Actions actions, Map<String, Object> metadata, Status status) {
this.name = name;
this.schedule = schedule;
this.trigger = trigger;
this.condition = condition;
this.actions = actions;
this.status = status != null ? status : new Status();
this.throttlePeriod = throttlePeriod;
@ -74,8 +74,8 @@ public class Alert implements ToXContent {
return schedule;
}
public Trigger trigger() {
return trigger;
public Condition condition() {
return condition;
}
public Transform transform() {
@ -133,7 +133,7 @@ public class Alert implements ToXContent {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Parser.SCHEDULE_FIELD.getPreferredName()).startObject().field(schedule.type(), schedule).endObject();
builder.field(Parser.TRIGGER_FIELD.getPreferredName()).startObject().field(trigger.type(), trigger).endObject();
builder.field(Parser.CONDITION_FIELD.getPreferredName()).startObject().field(condition.type(), condition).endObject();
if (transform != Transform.NOOP) {
builder.field(Parser.TRANSFORM_FIELD.getPreferredName()).startObject().field(transform.type(), transform).endObject();
}
@ -152,24 +152,24 @@ public class Alert implements ToXContent {
public static class Parser extends AbstractComponent {
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField CONDITION_FIELD = new ParseField("condition");
public static final ParseField ACTIONS_FIELD = new ParseField("actions");
public static final ParseField TRANSFORM_FIELD = new ParseField("transform");
public static final ParseField META_FIELD = new ParseField("meta");
public static final ParseField STATUS_FIELD = new ParseField("status");
public static final ParseField THROTTLE_PERIOD_FIELD = new ParseField("throttle_period");
private final TriggerRegistry triggerRegistry;
private final ConditionRegistry conditionRegistry;
private final ScheduleRegistry scheduleRegistry;
private final TransformRegistry transformRegistry;
private final ActionRegistry actionRegistry;
@Inject
public Parser(Settings settings, TriggerRegistry triggerRegistry, ScheduleRegistry scheduleRegistry,
public Parser(Settings settings, ConditionRegistry conditionRegistry, ScheduleRegistry scheduleRegistry,
TransformRegistry transformRegistry, ActionRegistry actionRegistry) {
super(settings);
this.triggerRegistry = triggerRegistry;
this.conditionRegistry = conditionRegistry;
this.scheduleRegistry = scheduleRegistry;
this.transformRegistry = transformRegistry;
this.actionRegistry = actionRegistry;
@ -188,7 +188,7 @@ public class Alert implements ToXContent {
public Alert parse(String name, boolean includeStatus, XContentParser parser) throws IOException {
Schedule schedule = null;
Trigger trigger = null;
Condition condition = null;
Actions actions = null;
Transform transform = null;
Map<String, Object> metatdata = null;
@ -205,8 +205,8 @@ public class Alert implements ToXContent {
} else if ((token.isValue() || token == XContentParser.Token.START_OBJECT || token == XContentParser.Token.START_ARRAY) && currentFieldName !=null ) {
if (SCHEDULE_FIELD.match(currentFieldName)) {
schedule = scheduleRegistry.parse(parser);
} else if (TRIGGER_FIELD.match(currentFieldName)) {
trigger = triggerRegistry.parse(parser);
} else if (CONDITION_FIELD.match(currentFieldName)) {
condition = conditionRegistry.parse(parser);
} else if (ACTIONS_FIELD.match(currentFieldName)) {
actions = actionRegistry.parseActions(parser);
} else if (TRANSFORM_FIELD.match(currentFieldName)) {
@ -214,7 +214,7 @@ public class Alert implements ToXContent {
} else if (META_FIELD.match(currentFieldName)) {
metatdata = parser.map();
} else if (STATUS_FIELD.match(currentFieldName) && includeStatus) {
status = Status.fromXContent(parser);
status = Status.parse(parser);
} else if (THROTTLE_PERIOD_FIELD.match(currentFieldName)) {
if (token == XContentParser.Token.VALUE_STRING) {
throttlePeriod = TimeValue.parseTimeValue(parser.text(), null);
@ -229,14 +229,14 @@ public class Alert implements ToXContent {
if (schedule == null) {
throw new AlertsSettingsException("could not parse alert [" + name + "]. missing alert schedule");
}
if (trigger == null) {
throw new AlertsSettingsException("could not parse alert [" + name + "]. missing alert trigger");
if (condition == null) {
throw new AlertsSettingsException("could not parse alert [" + name + "]. missing alert condition");
}
if (actions == null) {
throw new AlertsSettingsException("could not parse alert [" + name + "]. missing alert actions");
}
return new Alert(name, schedule, trigger, transform, throttlePeriod, actions, metatdata, status);
return new Alert(name, schedule, condition, transform, throttlePeriod, actions, metatdata, status);
}
}
@ -244,33 +244,34 @@ public class Alert implements ToXContent {
public static class Status implements ToXContent, Streamable {
public static final ParseField TIMESTAMP_FIELD = new ParseField("last_throttled");
public static final ParseField LAST_RAN_FIELD = new ParseField("last_ran");
public static final ParseField LAST_TRIGGERED_FIELD = new ParseField("last_triggered");
public static final ParseField LAST_CHECKED_FIELD = new ParseField("last_checked");
public static final ParseField LAST_MET_CONDITION_FIELD = new ParseField("last_met_condition");
public static final ParseField LAST_THROTTLED_FIELD = new ParseField("last_throttled");
public static final ParseField LAST_EXECUTED_FIELD = new ParseField("last_executed");
public static final ParseField ACK_FIELD = new ParseField("ack");
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField LAST_THROTTLE_FIELD = new ParseField("last_throttle");
public static final ParseField REASON_FIELD = new ParseField("reason");
private transient long version;
private DateTime lastRan;
private DateTime lastTriggered;
private DateTime lastChecked;
private DateTime lastMetCondition;
private Throttle lastThrottle;
private DateTime lastExecuted;
private AckStatus ackStatus;
private Throttle lastThrottle;
public Status() {
this(-1, null, null, null, null, new AckStatus());
}
public Status(Status other) {
this(other.version, other.lastRan, other.lastTriggered, other.lastExecuted, other.lastThrottle, other.ackStatus);
this(other.version, other.lastChecked, other.lastMetCondition, other.lastExecuted, other.lastThrottle, other.ackStatus);
}
private Status(long version, DateTime lastRan, DateTime lastTriggered, DateTime lastExecuted, Throttle lastThrottle, AckStatus ackStatus) {
private Status(long version, DateTime lastChecked, DateTime lastMetCondition, DateTime lastExecuted, Throttle lastThrottle, AckStatus ackStatus) {
this.version = version;
this.lastRan = lastRan;
this.lastTriggered = lastTriggered;
this.lastChecked = lastChecked;
this.lastMetCondition = lastMetCondition;
this.lastExecuted = lastExecuted;
this.lastThrottle = lastThrottle;
this.ackStatus = ackStatus;
@ -284,20 +285,20 @@ public class Alert implements ToXContent {
this.version = version;
}
public boolean ran() {
return lastRan != null;
public boolean checked() {
return lastChecked != null;
}
public DateTime lastRan() {
return lastRan;
public DateTime lastChecked() {
return lastChecked;
}
public boolean triggered() {
return lastTriggered != null;
public boolean metCondition() {
return lastMetCondition != null;
}
public DateTime lastTriggered() {
return lastTriggered;
public DateTime lastMetCondition() {
return lastMetCondition;
}
public boolean executed() {
@ -317,10 +318,19 @@ public class Alert implements ToXContent {
}
/**
* Called whenever an alert is ran
* Called whenever an alert is checked, ie. the condition of the alert is evaluated to see if
* the alert should be executed.
*
* @param metCondition indicates whether the alert's condition was met.
*/
public void onExecute(DateTime timestamp) {
lastRan = timestamp;
public void onCheck(boolean metCondition, DateTime timestamp) {
lastChecked = timestamp;
if (metCondition) {
lastMetCondition = timestamp;
} else if (ackStatus.state == AckStatus.State.ACKED) {
// didn't meet condition now after it met it in the past - we need to reset the ack state
ackStatus = new AckStatus(AckStatus.State.AWAITS_EXECUTION, timestamp);
}
}
/**
@ -331,16 +341,13 @@ public class Alert implements ToXContent {
}
/**
* Notifies this status about the triggered event of an alert run. The state will be updated accordingly -
* if the alert is can be acked and during a run, the alert was not triggered and the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKED},
* we then need to reset the state to {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#AWAITS_EXECUTION}
* Notified this status that the alert was executed. If the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#AWAITS_EXECUTION}, it will change to
* {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKABLE}.
*/
public void onTrigger(boolean triggered, DateTime timestamp) {
if (triggered) {
lastTriggered = timestamp;
} else if (ackStatus.state == AckStatus.State.ACKED) {
// didn't trigger now after it triggered in the past - we need to reset the ack state
ackStatus = new AckStatus(AckStatus.State.AWAITS_EXECUTION, timestamp);
public void onExecution(DateTime timestamp) {
lastExecuted = timestamp;
if (ackStatus.state == AckStatus.State.AWAITS_EXECUTION) {
ackStatus = new AckStatus(AckStatus.State.ACKABLE, timestamp);
}
}
@ -359,22 +366,13 @@ public class Alert implements ToXContent {
return false;
}
/**
* Notified this status that the alert was executed. If the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#AWAITS_EXECUTION}, it will change to
* {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKABLE}.
*/
public void onExecution(DateTime timestamp) {
lastExecuted = timestamp;
if (ackStatus.state == AckStatus.State.AWAITS_EXECUTION) {
ackStatus = new AckStatus(AckStatus.State.ACKABLE, timestamp);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(version);
writeOptionalDate(out, lastRan);
writeOptionalDate(out, lastTriggered);
writeOptionalDate(out, lastChecked);
writeOptionalDate(out, lastMetCondition);
writeOptionalDate(out, lastExecuted);
if (lastThrottle == null) {
out.writeBoolean(false);
@ -390,8 +388,8 @@ public class Alert implements ToXContent {
@Override
public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
lastRan = readOptionalDate(in);
lastTriggered = readOptionalDate(in);
lastChecked = readOptionalDate(in);
lastMetCondition = readOptionalDate(in);
lastExecuted = readOptionalDate(in);
lastThrottle = in.readBoolean() ? new Throttle(readDate(in), in.readString()) : null;
ackStatus = new AckStatus(AckStatus.State.valueOf(in.readString()), readDate(in));
@ -406,11 +404,11 @@ public class Alert implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (lastRan != null) {
builder.field(LAST_RAN_FIELD.getPreferredName(), lastRan);
if (lastChecked != null) {
builder.field(LAST_CHECKED_FIELD.getPreferredName(), lastChecked);
}
if (lastTriggered != null) {
builder.field(LAST_TRIGGERED_FIELD.getPreferredName(), lastTriggered);
if (lastMetCondition != null) {
builder.field(LAST_MET_CONDITION_FIELD.getPreferredName(), lastMetCondition);
}
if (lastExecuted != null) {
builder.field(LAST_EXECUTED_FIELD.getPreferredName(), lastExecuted);
@ -420,7 +418,7 @@ public class Alert implements ToXContent {
.field(TIMESTAMP_FIELD.getPreferredName(), ackStatus.timestamp)
.endObject();
if (lastThrottle != null) {
builder.startObject(LAST_THROTTLE_FIELD.getPreferredName())
builder.startObject(LAST_THROTTLED_FIELD.getPreferredName())
.field(TIMESTAMP_FIELD.getPreferredName(), lastThrottle.timestamp)
.field(REASON_FIELD.getPreferredName(), lastThrottle.reason)
.endObject();
@ -428,12 +426,12 @@ public class Alert implements ToXContent {
return builder.endObject();
}
public static Status fromXContent(XContentParser parser) throws IOException {
public static Status parse(XContentParser parser) throws IOException {
DateTime lastRan = null;
DateTime lastTriggered = null;
DateTime lastExecuted = null;
DateTime lastChecked = null;
DateTime lastMetCondition = null;
Throttle lastThrottle = null;
DateTime lastExecuted = null;
AckStatus ackStatus = null;
String currentFieldName = null;
@ -441,15 +439,15 @@ public class Alert implements ToXContent {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (LAST_RAN_FIELD.match(currentFieldName)) {
} else if (LAST_CHECKED_FIELD.match(currentFieldName)) {
if (token.isValue()) {
lastRan = parseDate(currentFieldName, token, parser);
lastChecked = parseDate(currentFieldName, token, parser);
} else {
throw new AlertsException("expecting field [" + currentFieldName + "] to hold a date value, found [" + token + "] instead");
}
} else if (LAST_TRIGGERED_FIELD.match(currentFieldName)) {
} else if (LAST_MET_CONDITION_FIELD.match(currentFieldName)) {
if (token.isValue()) {
lastTriggered = parseDate(currentFieldName, token, parser);
lastMetCondition = parseDate(currentFieldName, token, parser);
} else {
throw new AlertsException("expecting field [" + currentFieldName + "] to hold a date value, found [" + token + "] instead");
}
@ -459,7 +457,7 @@ public class Alert implements ToXContent {
} else {
throw new AlertsException("expecting field [" + currentFieldName + "] to hold a date value, found [" + token + "] instead");
}
} else if (LAST_THROTTLE_FIELD.match(currentFieldName)) {
} else if (LAST_THROTTLED_FIELD.match(currentFieldName)) {
if (token == XContentParser.Token.START_OBJECT) {
DateTime timestamp = null;
String reason = null;
@ -504,7 +502,7 @@ public class Alert implements ToXContent {
}
}
return new Status(-1, lastRan, lastTriggered, lastExecuted, lastThrottle, ackStatus);
return new Status(-1, lastChecked, lastMetCondition, lastExecuted, lastThrottle, ackStatus);
}

View File

@ -7,9 +7,9 @@ package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.condition.ConditionRegistry;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -24,24 +24,24 @@ import java.util.Map;
*/
public class AlertExecution implements ToXContent {
private final Trigger.Result triggerResult;
private final Condition.Result conditionResult;
private final Throttler.Result throttleResult;
private final Map<String, Action.Result> actionsResults;
private final Payload payload;
public AlertExecution(ExecutionContext context) {
this(context.triggerResult(), context.throttleResult(), context.actionsResults(), context.payload());
this(context.conditionResult(), context.throttleResult(), context.actionsResults(), context.payload());
}
AlertExecution(Trigger.Result triggerResult, Throttler.Result throttleResult, Map<String, Action.Result> actionsResults, Payload payload) {
this.triggerResult = triggerResult;
AlertExecution(Condition.Result conditionResult, Throttler.Result throttleResult, Map<String, Action.Result> actionsResults, Payload payload) {
this.conditionResult = conditionResult;
this.throttleResult = throttleResult;
this.actionsResults = actionsResults;
this.payload = payload;
}
public Trigger.Result triggerResult() {
return triggerResult;
public Condition.Result conditionResult() {
return conditionResult;
}
public Throttler.Result throttleResult() {
@ -59,8 +59,8 @@ public class AlertExecution implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (triggerResult != null) {
builder.startObject(Parser.TRIGGER_RESULT.getPreferredName()).field(triggerResult.type(), triggerResult).endObject();
if (conditionResult != null) {
builder.startObject(Parser.CONDITION_RESULT.getPreferredName()).field(conditionResult.type(), conditionResult).endObject();
}
if (throttleResult != null && throttleResult.throttle()) {
builder.field(Parser.THROTTLED.getPreferredName(), throttleResult.throttle());
@ -82,17 +82,17 @@ public class AlertExecution implements ToXContent {
public static class Parser {
public static final ParseField TRIGGER_RESULT = new ParseField("trigger_result");
public static final ParseField CONDITION_RESULT = new ParseField("condition_result");
public static final ParseField PAYLOAD = new ParseField("payload");
public static final ParseField ACTIONS_RESULTS = new ParseField("actions_results");
public static final ParseField THROTTLED = new ParseField("throttled");
public static final ParseField THROTTLE_REASON = new ParseField("throttle_reason");
public static AlertExecution parse(XContentParser parser, TriggerRegistry triggerRegistry, ActionRegistry actionRegistry) throws IOException {
public static AlertExecution parse(XContentParser parser, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry) throws IOException {
boolean throttled = false;
String throttleReason = null;
Map<String, Action.Result> actionResults = new HashMap<>();
Trigger.Result triggerResult = null;
Condition.Result conditionResult = null;
Payload payload = null;
String currentFieldName = null;
@ -109,10 +109,10 @@ public class AlertExecution implements ToXContent {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (TRIGGER_RESULT.match(currentFieldName)) {
triggerResult = triggerRegistry.parseResult(parser);
if (CONDITION_RESULT.match(currentFieldName)) {
conditionResult = conditionRegistry.parseResult(parser);
} else if (PAYLOAD.match(currentFieldName)) {
payload = new Payload.Simple(parser.map()); //TODO fixme
payload = new Payload.XContent(parser);
} else {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
}
@ -128,14 +128,12 @@ public class AlertExecution implements ToXContent {
}
Throttler.Result throttleResult = throttled ? Throttler.Result.throttle(throttleReason) : Throttler.Result.NO;
return new AlertExecution(triggerResult, throttleResult, actionResults, payload );
return new AlertExecution(conditionResult, throttleResult, actionResults, payload );
}
private static Map<String, Action.Result> parseActionResults(XContentParser parser, ActionRegistry actionRegistry) throws IOException {
Map<String, Action.Result> actionResults = new HashMap<>();
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
Action.Result actionResult = actionRegistry.parseResult(parser);

View File

@ -15,7 +15,7 @@ import org.elasticsearch.alerts.support.TemplateUtils;
import org.elasticsearch.alerts.support.init.InitializingModule;
import org.elasticsearch.alerts.transform.TransformModule;
import org.elasticsearch.alerts.transport.AlertsTransportModule;
import org.elasticsearch.alerts.trigger.TriggerModule;
import org.elasticsearch.alerts.condition.ConditionModule;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
@ -33,7 +33,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
new AlertsRestModule(),
new SchedulerModule(),
new AlertsTransportModule(),
new TriggerModule(),
new ConditionModule(),
new ActionModule(),
new HistoryModule());
}

View File

@ -61,7 +61,7 @@ public class AlertsPlugin extends AbstractPlugin {
}
public static Settings alertThreadPoolSettings(int availableProcessors, Integer queueSize) {
// Executing an alert involves a lot of wait time for networking (search, several index requests + optional trigger logic)
// Executing an alert involves a lot of wait time for networking (search, several index requests + optional condition logic)
//TODO Hack to get around threadpool issue
if (queueSize != null) {
return settingsBuilder()

View File

@ -63,6 +63,54 @@ public class AlertsService extends AbstractComponent {
manuallyStopped = !settings.getAsBoolean("alerts.start_immediately", true);
}
/**
* Manually starts alerting if not already started
*/
public void start() {
manuallyStopped = false;
ClusterState state = clusterService.state();
internalStart(state);
}
/**
* Manually stops alerting if not already stopped.
*/
public void stop() {
manuallyStopped = true;
internalStop();
}
private void internalStop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
logger.info("stopping alert service...");
alertLockService.stop();
historyService.stop();
scheduler.stop();
alertsStore.stop();
state.set(State.STOPPED);
logger.info("alert service has stopped");
}
}
private void internalStart(ClusterState initialState) {
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
logger.info("starting alert service...");
alertLockService.start();
ClusterState clusterState = initialState;
// Try to load alert store before the action service, b/c action depends on alert store
while (!alertsStore.start(clusterState)) {
clusterState = newClusterState(clusterState);
}
while (!historyService.start(clusterState)) {
clusterState = newClusterState(clusterState);
}
scheduler.start(alertsStore.getAlerts().values());
state.set(State.STARTED);
logger.info("alert service has started");
}
}
public AlertsStore.AlertDelete deleteAlert(String name) throws InterruptedException, ExecutionException {
ensureStarted();
AlertLockService.Lock lock = alertLockService.acquire(name);
@ -127,52 +175,8 @@ public class AlertsService extends AbstractComponent {
}
}
/**
* Manually starts alerting if not already started
*/
public void start() {
manuallyStopped = false;
ClusterState state = clusterService.state();
internalStart(state);
}
/**
* Manually stops alerting if not already stopped.
*/
public void stop() {
manuallyStopped = true;
internalStop();
}
private void internalStop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
logger.info("stopping alert service...");
alertLockService.stop();
historyService.stop();
scheduler.stop();
alertsStore.stop();
state.set(State.STOPPED);
logger.info("alert service has stopped");
}
}
private void internalStart(ClusterState initialState) {
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
logger.info("starting alert service...");
alertLockService.start();
ClusterState clusterState = initialState;
// Try to load alert store before the action service, b/c action depends on alert store
while (!alertsStore.start(clusterState)) {
clusterState = newClusterState(clusterState);
}
while (!historyService.start(clusterState)) {
clusterState = newClusterState(clusterState);
}
scheduler.start(alertsStore.getAlerts().values());
state.set(State.STARTED);
logger.info("alert service has started");
}
public long getNumberOfAlerts() {
return alertsStore.getAlerts().size();
}
private void ensureStarted() {
@ -181,10 +185,6 @@ public class AlertsService extends AbstractComponent {
}
}
public long getNumberOfAlerts() {
return alertsStore.getAlerts().size();
}
/**
* Return once a cluster state version appears that is never than the version
*/
@ -216,7 +216,7 @@ public class AlertsService extends AbstractComponent {
} else {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .alerts and
// a .alertshistory index, but they may not have been restored from the cluster state on disk
// a .alerts_history index, but they may not have been restored from the cluster state on disk
return;
}
if (state.get() == State.STOPPED && !manuallyStopped) {

View File

@ -8,7 +8,7 @@ package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.common.joda.time.DateTime;
import java.util.HashMap;
@ -24,7 +24,7 @@ public class ExecutionContext {
private final DateTime fireTime;
private final DateTime scheduledTime;
private Trigger.Result triggerResult;
private Condition.Result conditionResult;
private Throttler.Result throttleResult;
private Transform.Result transformResult;
private Map<String, Action.Result> actionsResults = new HashMap<>();
@ -58,14 +58,14 @@ public class ExecutionContext {
return payload;
}
public void onTriggerResult(Trigger.Result triggerResult) {
this.triggerResult = triggerResult;
this.payload = triggerResult.payload();
alert.status().onTrigger(triggerResult.triggered(), fireTime);
public void onConditionResult(Condition.Result conditionResult) {
this.conditionResult = conditionResult;
this.payload = conditionResult.payload();
alert.status().onCheck(conditionResult.met(), fireTime);
}
public Trigger.Result triggerResult() {
return triggerResult;
public Condition.Result conditionResult() {
return conditionResult;
}
public void onThrottleResult(Throttler.Result throttleResult) {
@ -99,7 +99,6 @@ public class ExecutionContext {
}
public AlertExecution finish() {
alert.status().onExecute(fireTime);
return new AlertExecution(this);
}

View File

@ -42,7 +42,7 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
/**
* Parses xcontent to a concrete action of the same type.
*/
protected static interface Parser<T extends Action> {
protected static interface Parser<R extends Result, T extends Action<R>> {
/**
* @return The type of the action
@ -54,7 +54,7 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
*/
T parse(XContentParser parser) throws IOException;
T.Result parseResult(XContentParser parser) throws IOException;
R parseResult(XContentParser parser) throws IOException;
}

View File

@ -27,9 +27,6 @@ public class ActionRegistry {
/**
* Reads the contents of parser to create the correct Action
* @param parser The parser containing the trigger definition
* @return a new Action instance from the parser
* @throws IOException
*/
public Action parse(XContentParser parser) throws IOException {
String type = null;
@ -39,11 +36,11 @@ public class ActionRegistry {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && type != null) {
Action.Parser triggerParser = parsers.get(type);
if (triggerParser == null) {
Action.Parser actionParser = parsers.get(type);
if (actionParser == null) {
throw new ActionException("unknown action type [" + type + "]");
}
action = triggerParser.parse(parser);
action = actionParser.parse(parser);
}
}
return action;
@ -51,8 +48,9 @@ public class ActionRegistry {
/**
* Reads the contents of parser to create the correct Action.Result
* @param parser The parser containing the trigger definition
* @return a new Action.Result instance from the parser
*
* @param parser The parser containing the action definition
* @return A new Action.Result instance from the parser
* @throws IOException
*/
public Action.Result parseResult(XContentParser parser) throws IOException {
@ -63,11 +61,11 @@ public class ActionRegistry {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && type != null) {
Action.Parser triggerParser = parsers.get(type);
if (triggerParser == null) {
Action.Parser actionParser = parsers.get(type);
if (actionParser == null) {
throw new ActionException("unknown action type [" + type + "]");
}
result = triggerParser.parseResult(parser);
result = actionParser.parseResult(parser);
}
}
return result;

View File

@ -115,7 +115,7 @@ public class EmailAction extends Action<EmailAction.Result> {
return builder.endObject();
}
public static class Parser extends AbstractComponent implements Action.Parser<EmailAction> {
public static class Parser extends AbstractComponent implements Action.Parser<Result, EmailAction> {
public static final ParseField ACCOUNT_FIELD = new ParseField("account");
public static final ParseField PROFILE_FIELD = new ParseField("profile");

View File

@ -90,7 +90,7 @@ public class IndexAction extends Action<IndexAction.Result> {
return builder;
}
public static class Parser extends AbstractComponent implements Action.Parser<IndexAction> {
public static class Parser extends AbstractComponent implements Action.Parser<Result, IndexAction> {
public static final ParseField INDEX_FIELD = new ParseField("index");
public static final ParseField TYPE_FIELD = new ParseField("type");
@ -146,7 +146,7 @@ public class IndexAction extends Action<IndexAction.Result> {
}
@Override
public Action.Result parseResult(XContentParser parser) throws IOException {
public Result parseResult(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
Boolean success = null;

View File

@ -157,7 +157,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
}
public static class Parser extends AbstractComponent implements Action.Parser<WebhookAction> {
public static class Parser extends AbstractComponent implements Action.Parser<Result, WebhookAction> {
public static final ParseField METHOD_FIELD = new ParseField("method");
public static final ParseField URL_TEMPLATE_FIELD = new ParseField("url_template");
@ -222,7 +222,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
}
@Override
public Action.Result parseResult(XContentParser parser) throws IOException {
public Result parseResult(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
Boolean success = null;

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.trigger;
package org.elasticsearch.alerts.condition;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
@ -18,58 +18,58 @@ import java.io.IOException;
/**
*
*/
public abstract class Trigger<R extends Trigger.Result> implements ToXContent {
public abstract class Condition<R extends Condition.Result> implements ToXContent {
protected final ESLogger logger;
protected Trigger(ESLogger logger) {
protected Condition(ESLogger logger) {
this.logger = logger;
}
/**
* @return the type of this trigger
* @return the type of this condition
*/
public abstract String type();
/**
* Executes this trigger
* Executes this condition
*/
public abstract R execute(ExecutionContext ctx) throws IOException;
/**
* Parses xcontent to a concrete trigger of the same type.
* Parses xcontent to a concrete condition of the same type.
*/
public static interface Parser<T extends Trigger> {
public static interface Parser<R extends Condition.Result, C extends Condition<R>> {
/**
* @return The type of the trigger
* @return The type of the condition
*/
String type();
/**
* Parses the given xcontent and creates a concrete trigger
* Parses the given xcontent and creates a concrete condition
*/
T parse(XContentParser parser) throws IOException;
C parse(XContentParser parser) throws IOException;
/**
* Parses the given xContent and creates a concrete result
*/
T.Result parseResult(XContentParser parser) throws IOException;
R parseResult(XContentParser parser) throws IOException;
}
public abstract static class Result implements ToXContent {
public static final ParseField TYPE_FIELD = new ParseField("type");
public static final ParseField TRIGGERED_FIELD = new ParseField("triggered");
public static final ParseField MET_FIELD = new ParseField("met");
public static final ParseField PAYLOAD_FIELD = new ParseField("payload");
private final String type;
private final boolean triggered;
private final boolean met;
private final Payload payload;
public Result(String type, boolean triggered, Payload payload) {
public Result(String type, boolean met, Payload payload) {
this.type = type;
this.triggered = triggered;
this.met = met;
this.payload = payload;
}
@ -77,8 +77,8 @@ public abstract class Trigger<R extends Trigger.Result> implements ToXContent {
return type;
}
public boolean triggered() {
return triggered;
public boolean met() {
return met;
}
public Payload payload() {
@ -88,9 +88,8 @@ public abstract class Trigger<R extends Trigger.Result> implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject()
.field(TYPE_FIELD.getPreferredName(), type())
.field(TRIGGERED_FIELD.getPreferredName(), triggered())
.field(PAYLOAD_FIELD.getPreferredName(), payload());
.field(MET_FIELD.getPreferredName(), met)
.field(PAYLOAD_FIELD.getPreferredName(), payload);
return toXContentBody(builder, params).endObject();
}

View File

@ -3,20 +3,20 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.trigger;
package org.elasticsearch.alerts.condition;
import org.elasticsearch.alerts.AlertsException;
/**
*
*/
public class TriggerException extends AlertsException {
public class ConditionException extends AlertsException {
public TriggerException(String msg) {
public ConditionException(String msg) {
super(msg);
}
public TriggerException(String msg, Throwable cause) {
public ConditionException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.condition;
import org.elasticsearch.alerts.condition.search.ScriptSearchCondition;
import org.elasticsearch.alerts.condition.simple.SimpleCondition;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
public class ConditionModule extends AbstractModule {
private final Map<String, Class<? extends Condition.Parser>> parsers = new HashMap<>();
public void registerCondition(String type, Class<? extends Condition.Parser> parserType) {
parsers.put(type, parserType);
}
@Override
protected void configure() {
MapBinder<String, Condition.Parser> parsersBinder = MapBinder.newMapBinder(binder(), String.class, Condition.Parser.class);
bind(ScriptSearchCondition.Parser.class).asEagerSingleton();
parsersBinder.addBinding(ScriptSearchCondition.TYPE).to(ScriptSearchCondition.Parser.class);
bind(SimpleCondition.Parser.class).asEagerSingleton();
parsersBinder.addBinding(SimpleCondition.TYPE).to(SimpleCondition.Parser.class);
for (Map.Entry<String, Class<? extends Condition.Parser>> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
parsersBinder.addBinding(entry.getKey()).to(entry.getValue());
}
bind(ConditionRegistry.class).asEagerSingleton();
}
}

View File

@ -3,9 +3,8 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.trigger;
package org.elasticsearch.alerts.condition;
import org.elasticsearch.alerts.trigger.search.SearchTrigger;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
@ -16,55 +15,56 @@ import java.util.Map;
/**
*
*/
public class TriggerRegistry {
public class ConditionRegistry {
private final ImmutableMap<String, SearchTrigger.Parser> parsers;
private final ImmutableMap<String, Condition.Parser> parsers;
@Inject
public TriggerRegistry(Map<String, SearchTrigger.Parser> parsers) {
public ConditionRegistry(Map<String, Condition.Parser> parsers) {
this.parsers = ImmutableMap.copyOf(parsers);
}
/**
* Reads the contents of parser to create the correct Trigger
* @param parser The parser containing the trigger definition
* @return a new AlertTrigger instance from the parser
* Reads the contents of parser to create the correct Condition
*
* @param parser The parser containing the condition definition
* @return A new condition instance from the parser
* @throws IOException
*/
public Trigger parse(XContentParser parser) throws IOException {
public Condition parse(XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
Trigger trigger = null;
Condition condition = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && type != null) {
SearchTrigger.Parser triggerParser = parsers.get(type);
if (triggerParser == null) {
throw new TriggerException("unknown trigger type [" + type + "]");
Condition.Parser conditionParser = parsers.get(type);
if (conditionParser == null) {
throw new ConditionException("unknown condition type [" + type + "]");
}
trigger = triggerParser.parse(parser);
condition = conditionParser.parse(parser);
}
}
return trigger;
return condition;
}
public Trigger.Result parseResult(XContentParser parser) throws IOException {
public Condition.Result parseResult(XContentParser parser) throws IOException {
String type = null;
XContentParser.Token token;
Trigger.Result triggerResult = null;
Condition.Result conditionResult = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
type = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT && type != null) {
SearchTrigger.Parser triggerParser = parsers.get(type);
if (triggerParser == null) {
throw new TriggerException("unknown trigger type [" + type + "]");
Condition.Parser conditionParser = parsers.get(type);
if (conditionParser == null) {
throw new ConditionException("unknown condition type [" + type + "]");
}
triggerResult = triggerParser.parseResult(parser);
conditionResult = conditionParser.parseResult(parser);
}
}
return triggerResult;
return conditionResult;
}
}

View File

@ -3,16 +3,16 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.trigger.search;
package org.elasticsearch.alerts.condition.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.condition.ConditionException;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerException;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -29,7 +29,7 @@ import java.io.IOException;
/**
*
*/
public class ScriptSearchTrigger extends SearchTrigger {
public class ScriptSearchCondition extends SearchCondition {
public static final String TYPE = "script";
@ -37,9 +37,9 @@ public class ScriptSearchTrigger extends SearchTrigger {
private final ScriptService.ScriptType scriptType;
private final String scriptLang;
public ScriptSearchTrigger(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client,
SearchRequest request, String script, ScriptService.ScriptType scriptType,
String scriptLang) {
public ScriptSearchCondition(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client,
SearchRequest request, String script, ScriptService.ScriptType scriptType,
String scriptLang) {
super(logger, scriptService, client, request);
this.script = script;
this.scriptType = scriptType;
@ -59,7 +59,7 @@ public class ScriptSearchTrigger extends SearchTrigger {
if (value instanceof Boolean) {
return new Result(TYPE, (Boolean) value, request, payload);
}
throw new TriggerException("trigger script [" + script + "] did not return a boolean value");
throw new ConditionException("condition script [" + script + "] did not return a boolean value");
}
@Override
@ -73,7 +73,7 @@ public class ScriptSearchTrigger extends SearchTrigger {
return builder.endObject();
}
public static class Parser extends AbstractComponent implements SearchTrigger.Parser<ScriptSearchTrigger> {
public static class Parser extends AbstractComponent implements SearchCondition.Parser<Result, ScriptSearchCondition> {
public static ParseField REQUEST_FIELD = new ParseField("request");
public static ParseField SCRIPT_TYPE_FIELD = new ParseField("script_type");
@ -94,7 +94,7 @@ public class ScriptSearchTrigger extends SearchTrigger {
}
@Override
public ScriptSearchTrigger parse(XContentParser parser) throws IOException {
public ScriptSearchCondition parse(XContentParser parser) throws IOException {
SearchRequest request = null;
String scriptLang = null;
@ -108,7 +108,7 @@ public class ScriptSearchTrigger extends SearchTrigger {
currentFieldName = parser.currentName();
} else if ((token.isValue() || token == XContentParser.Token.START_OBJECT) && currentFieldName != null) {
if (REQUEST_FIELD.match(currentFieldName)) {
request = AlertUtils.readSearchRequest(parser, AlertUtils.DEFAULT_TRIGGER_SEARCH_TYPE);
request = AlertUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
} else if (ScriptService.SCRIPT_ID.match(currentFieldName)) {
script = parser.text();
scriptType = ScriptService.ScriptType.INDEXED;
@ -119,58 +119,55 @@ public class ScriptSearchTrigger extends SearchTrigger {
} else if (ScriptService.SCRIPT_LANG.match(currentFieldName)) {
scriptLang = parser.text();
} else {
throw new TriggerException("could not parse script trigger. unexpected field [" + currentFieldName + "]");
throw new ConditionException("could not parse script condition. unexpected field [" + currentFieldName + "]");
}
}
}
if (request == null) {
throw new TriggerException("could not parse script trigger. missing required search request");
throw new ConditionException("could not parse script condition. missing required search request");
}
if (script == null) {
throw new TriggerException("could not parse script trigger. either [script] or [script_id] must be provided");
throw new ConditionException("could not parse script condition. either [script] or [script_id] must be provided");
}
return new ScriptSearchTrigger(logger, scriptService, client, request, script, scriptType, scriptLang);
return new ScriptSearchCondition(logger, scriptService, client, request, script, scriptType, scriptLang);
}
@Override
public ScriptSearchTrigger.Result parseResult(XContentParser parser) throws IOException {
public ScriptSearchCondition.Result parseResult(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
boolean triggered = false;
boolean met = false;
Payload payload = null;
SearchRequest request = null;
String type = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (Trigger.Result.TYPE_FIELD.match(currentFieldName)) {
type = parser.text();
} else if (token == XContentParser.Token.VALUE_BOOLEAN) {
if (Trigger.Result.TRIGGERED_FIELD.match(currentFieldName)) {
triggered = parser.booleanValue();
if (token == XContentParser.Token.VALUE_BOOLEAN) {
if (Condition.Result.MET_FIELD.match(currentFieldName)) {
met = parser.booleanValue();
} else {
throw new TriggerException("unable to parse trigger result. unexpected field [" + currentFieldName + "]");
throw new ConditionException("unable to parse condition result. unexpected field [" + currentFieldName + "]");
}
} else {
throw new TriggerException("unable to parse trigger result. unexpected field [" + currentFieldName + "]");
throw new ConditionException("unable to parse condition result. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Trigger.Result.PAYLOAD_FIELD.match(currentFieldName)) {
if (Condition.Result.PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.Simple(parser.map()); ///@TODO FIXME
} else if (REQUEST_FIELD.match(currentFieldName)) {
request = AlertUtils.readSearchRequest(parser);
request = AlertUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
} else {
throw new TriggerException("unable to parse trigger result. unexpected field [" + currentFieldName + "]");
throw new ConditionException("unable to parse condition result. unexpected field [" + currentFieldName + "]");
}
} else {
throw new TriggerException("unable to parse trigger result. unexpected token [" + token + "]");
throw new ConditionException("unable to parse condition result. unexpected token [" + token + "]");
}
}
return new Result(type, triggered, request, payload);
return new Result(TYPE, met, request, payload);
}
}

View File

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.condition.search;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
public abstract class SearchCondition extends Condition<SearchCondition.Result> {
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.COUNT;
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
protected final SearchRequest request;
public SearchCondition(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest request) {
super(logger);
this.scriptService = scriptService;
this.client = client;
this.request = request;
}
@Override
public Result execute(ExecutionContext ctx) throws IOException {
SearchRequest request = createSearchRequestWithTimes(this.request, ctx.scheduledTime(), ctx.fireTime(), scriptService);
if (logger.isTraceEnabled()) {
logger.trace("running query for [{}]", ctx.alert().name(), XContentHelper.convertToJson(request.source(), false, true));
}
// actionGet deals properly with InterruptedException
SearchResponse response = client.search(request).actionGet();
if (logger.isDebugEnabled()) {
logger.debug("got [{}] hits", ctx.alert().name(), response.getHits().getTotalHits());
for (SearchHit hit : response.getHits()) {
logger.debug("hit [{}]", XContentHelper.toString(hit));
}
}
return processSearchResponse(response);
}
/**
* Processes the search response and returns the appropriate condition result
*/
protected abstract Result processSearchResponse(SearchResponse response);
/**
* Creates a new search request applying the scheduledFireTime and fireTime to the original request
*/
public static SearchRequest createSearchRequestWithTimes(SearchRequest requestPrototype, DateTime scheduledFireTime, DateTime fireTime, ScriptServiceProxy scriptService) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.indices(requestPrototype.indices());
if (Strings.hasLength(requestPrototype.source())) {
Map<String, String> templateParams = new HashMap<>();
templateParams.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime));
templateParams.put(Variables.FIRE_TIME, formatDate(fireTime));
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime))
.put(Variables.FIRE_TIME, formatDate(fireTime));
request.templateParams(templateParams.map());
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
} else {
throw new ElasticsearchIllegalStateException("Search requests needs either source or template name");
}
return request;
}
public static class Result extends Condition.Result {
public static final ParseField REQUEST_FIELD = new ParseField("request");
private final SearchRequest request;
public Result(String type, boolean met, SearchRequest request, Payload payload) {
super(type, met, payload);
this.request = request;
}
public SearchRequest request() {
return request;
}
@Override
public XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(REQUEST_FIELD.getPreferredName());
AlertUtils.writeSearchRequest(request(), builder, params);
return builder;
}
}
}

View File

@ -3,12 +3,12 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.trigger.simple;
package org.elasticsearch.alerts.condition.simple;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerException;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.condition.ConditionException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
@ -19,15 +19,15 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
/**
* A trigger that always triggered and returns a static/fixed data
* A condition that is always met and returns a static/fixed payload
*/
public class SimpleTrigger extends Trigger<SimpleTrigger.Result> {
public class SimpleCondition extends Condition<SimpleCondition.Result> {
public static final String TYPE = "simple";
private final Payload payload;
public SimpleTrigger(ESLogger logger, Payload payload) {
public SimpleCondition(ESLogger logger, Payload payload) {
super(logger);
this.payload = payload;
}
@ -47,7 +47,7 @@ public class SimpleTrigger extends Trigger<SimpleTrigger.Result> {
return payload.toXContent(builder, params);
}
public static class Result extends Trigger.Result {
public static class Result extends Condition.Result {
public Result(Payload payload) {
super(TYPE, true, payload);
@ -59,7 +59,7 @@ public class SimpleTrigger extends Trigger<SimpleTrigger.Result> {
}
}
public static class Parser extends AbstractComponent implements Trigger.Parser<SimpleTrigger> {
public static class Parser extends AbstractComponent implements Condition.Parser<Result, SimpleCondition> {
@Inject
public Parser(Settings settings) {
@ -72,42 +72,44 @@ public class SimpleTrigger extends Trigger<SimpleTrigger.Result> {
}
@Override
public SimpleTrigger parse(XContentParser parser) throws IOException {
return new SimpleTrigger(logger, new Payload.XContent(parser));
public SimpleCondition parse(XContentParser parser) throws IOException {
return new SimpleCondition(logger, new Payload.XContent(parser));
}
@Override
public Result parseResult(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
String type = null;
boolean triggered = false;
Payload payload = null;
boolean met = false;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (Trigger.Result.TYPE_FIELD.match(currentFieldName)) {
type = parser.text();
} else if (token == XContentParser.Token.VALUE_BOOLEAN) {
if (Trigger.Result.TRIGGERED_FIELD.match(currentFieldName)) {
triggered = parser.booleanValue();
if (token == XContentParser.Token.VALUE_BOOLEAN) {
if (Condition.Result.MET_FIELD.match(currentFieldName)) {
met = parser.booleanValue();
} else {
throw new TriggerException("unable to parse trigger result. unexpected field [" + currentFieldName + "]");
throw new ConditionException("unable to parse simple condition result. unexpected field [" + currentFieldName + "]");
}
} else {
throw new TriggerException("unable to parse trigger result. unexpected field [" + currentFieldName + "]");
throw new ConditionException("unable to parse simple condition result. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (Trigger.Result.PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.Simple(parser.map()); ///@TODO FIXME
if (Condition.Result.PAYLOAD_FIELD.match(currentFieldName)) {
payload = new Payload.XContent(parser);
} else {
throw new TriggerException("unable to parse trigger result. unexpected field [" + currentFieldName + "]");
throw new ConditionException("unable to parse simple condition result. unexpected field [" + currentFieldName + "]");
}
} else {
throw new TriggerException("unable to parse trigger result. unexpected token [" + token + "]");
throw new ConditionException("unable to parse simple condition result. unexpected token [" + token + "]");
}
}
if (!met) {
throw new ConditionException("unable to parse simple condition result. simple condition always matches, yet [met] field is either missing or set to [false]");
}
return new Result(payload);
}
}

View File

@ -11,8 +11,8 @@ import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertExecution;
import org.elasticsearch.alerts.AlertsException;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.condition.ConditionRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
@ -35,7 +35,7 @@ public class FiredAlert implements ToXContent {
private String name;
private DateTime fireTime;
private DateTime scheduledTime;
private Trigger trigger;
private Condition condition;
private State state;
private AlertExecution execution;
@ -55,7 +55,7 @@ public class FiredAlert implements ToXContent {
this.name = alert.name();
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = alert.trigger();
this.condition = alert.condition();
this.state = State.AWAITS_EXECUTION;
this.metadata = alert.metadata();
this.version = 1;
@ -77,8 +77,8 @@ public class FiredAlert implements ToXContent {
return fireTime;
}
public Trigger trigger() {
return trigger;
public Condition condition() {
return condition;
}
public State state() {
@ -109,14 +109,14 @@ public class FiredAlert implements ToXContent {
public void update(AlertExecution execution) {
assert sealed.compareAndSet(false, true) : "sealing an fired alert should only be done once";
this.execution = execution;
if (execution.triggerResult().triggered()) {
if (!execution.conditionResult().met()) {
state = State.EXECUTION_NOT_NEEDED;
} else {
if (execution.throttleResult().throttle()) {
state = State.THROTTLED;
} else {
state = State.ACTION_PERFORMED;
state = State.EXECUTED;
}
} else {
state = State.NO_ACTION_NEEDED;
}
}
@ -126,7 +126,7 @@ public class FiredAlert implements ToXContent {
historyEntry.field(Parser.ALERT_NAME_FIELD.getPreferredName(), name);
historyEntry.field(Parser.FIRE_TIME_FIELD.getPreferredName(), fireTime.toDateTimeISO());
historyEntry.field(Parser.SCHEDULED_FIRE_TIME_FIELD.getPreferredName(), scheduledTime.toDateTimeISO());
historyEntry.startObject(Alert.Parser.TRIGGER_FIELD.getPreferredName()).field(trigger.type(), trigger, params).endObject();
historyEntry.startObject(Alert.Parser.CONDITION_FIELD.getPreferredName()).field(condition.type(), condition, params).endObject();
historyEntry.field(Parser.STATE_FIELD.getPreferredName(), state.toString());
if (message != null) {
@ -168,29 +168,29 @@ public class FiredAlert implements ToXContent {
public enum State {
AWAITS_EXECUTION,
RUNNING,
NO_ACTION_NEEDED,
ACTION_PERFORMED,
FAILED,
THROTTLED;
CHECKING,
EXECUTION_NOT_NEEDED,
THROTTLED,
EXECUTED,
FAILED;
@Override
public String toString() {
switch (this) {
case AWAITS_EXECUTION:
return "AWAITS_EXECUTION";
case RUNNING:
return "RUNNING";
case NO_ACTION_NEEDED:
return "NO_ACTION_NEEDED";
case ACTION_PERFORMED:
return "ACTION_PERFORMED";
case CHECKING:
return "CHECKING";
case EXECUTION_NOT_NEEDED:
return "EXECUTION_NOT_NEEDED";
case EXECUTED:
return "EXECUTED";
case FAILED:
return "FAILED";
case THROTTLED:
return "THROTTLED";
default:
return "NO_ACTION_NEEDED";
return "EXECUTION_NOT_NEEDED";
}
}
@ -198,12 +198,12 @@ public class FiredAlert implements ToXContent {
switch(value.toUpperCase()) {
case "AWAITS_EXECUTION":
return AWAITS_EXECUTION;
case "RUNNING":
return RUNNING;
case "NO_ACTION_NEEDED":
return NO_ACTION_NEEDED;
case "ACTION_PERFORMED":
return ACTION_PERFORMED;
case "CHECKING":
return CHECKING;
case "EXECUTION_NOT_NEEDED":
return EXECUTION_NOT_NEEDED;
case "EXECUTED":
return EXECUTED;
case "FAILED":
return FAILED;
case "THROTTLED":
@ -225,13 +225,13 @@ public class FiredAlert implements ToXContent {
public static final ParseField METADATA_FIELD = new ParseField("meta");
public static final ParseField ALERT_EXECUTION_FIELD = new ParseField("alert_execution");
private final TriggerRegistry triggerRegistry;
private final ConditionRegistry conditionRegistry;
private final ActionRegistry actionRegistry;
@Inject
public Parser(Settings settings, TriggerRegistry triggerRegistry, ActionRegistry actionRegistry) {
public Parser(Settings settings, ConditionRegistry conditionRegistry, ActionRegistry actionRegistry) {
super(settings);
this.triggerRegistry = triggerRegistry;
this.conditionRegistry = conditionRegistry;
this.actionRegistry = actionRegistry;
}
@ -255,12 +255,12 @@ public class FiredAlert implements ToXContent {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (Alert.Parser.TRIGGER_FIELD.match(currentFieldName)) {
alert.trigger = triggerRegistry.parse(parser);
if (Alert.Parser.CONDITION_FIELD.match(currentFieldName)) {
alert.condition = conditionRegistry.parse(parser);
} else if (METADATA_FIELD.match(currentFieldName)) {
alert.metadata = parser.map();
} else if (ALERT_EXECUTION_FIELD.match(currentFieldName)) {
alert.execution = AlertExecution.Parser.parse(parser, triggerRegistry, actionRegistry);
alert.execution = AlertExecution.Parser.parse(parser, conditionRegistry, actionRegistry);
} else {
throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]");
}

View File

@ -11,7 +11,7 @@ import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.component.AbstractComponent;
@ -120,22 +120,22 @@ public class HistoryService extends AbstractComponent {
return alertsThreadPool().getLargestPoolSize();
}
void execute(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException {
void fire(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, fireTime);
logger.debug("adding fired alert [{}]", alert.name());
historyStore.put(firedAlert);
execute(firedAlert);
execute(firedAlert, alert);
}
void execute(FiredAlert firedAlert) {
void execute(FiredAlert firedAlert, Alert alert) {
try {
if (alertsThreadPool().isShutdown()) {
throw new AlertsException("attempting to add to a shutdown thread pool");
}
alertsThreadPool().execute(new AlertExecutionTask(firedAlert));
alertsThreadPool().execute(new AlertExecutionTask(firedAlert, alert));
} catch (EsRejectedExecutionException e) {
logger.debug("[{}] failed to execute fired alert", firedAlert.name());
firedAlert.update(FiredAlert.State.FAILED, "failed to run fired alert due to thread pool capacity");
@ -148,7 +148,12 @@ public class HistoryService extends AbstractComponent {
if (firedAlerts != null) {
this.previousFiredAlerts = ImmutableList.of();
for (FiredAlert firedAlert : firedAlerts) {
execute(firedAlert);
Alert alert = alertsStore.getAlert(firedAlert.name());
if (alert == null) {
logger.warn("unable to find alert [{}] in alert store, perhaps it has been deleted. skipping...", firedAlert.name());
continue;
}
execute(firedAlert, alert);
}
}
}
@ -160,24 +165,25 @@ public class HistoryService extends AbstractComponent {
private final class AlertExecutionTask implements Runnable {
private final FiredAlert firedAlert;
private final Alert alert;
private AlertExecutionTask(FiredAlert firedAlert) {
private AlertExecutionTask(FiredAlert firedAlert, Alert alert) {
this.firedAlert = firedAlert;
this.alert = alert;
}
@Override
public void run() {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
try {
Alert alert = alertsStore.getAlert(firedAlert.name());
if (alert == null) {
firedAlert.update(FiredAlert.State.FAILED, "alert was not found in the alerts store");
} else {
this.firedAlert.update(FiredAlert.State.RUNNING, null);
logger.debug("executing alert [{}]", this.firedAlert.name());
AlertExecution alertExecution = execute(alert, this.firedAlert);
this.firedAlert.update(alertExecution);
}
historyStore.update(this.firedAlert);
firedAlert.update(FiredAlert.State.CHECKING, null);
logger.debug("checking alert [{}]", firedAlert.name());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, firedAlert.fireTime(), firedAlert.scheduledTime());
AlertExecution alertExecution = execute(ctx);
firedAlert.update(alertExecution);
historyStore.update(firedAlert);
} catch (Exception e) {
if (started()) {
logger.warn("failed to run alert [{}]", e, firedAlert.name());
@ -205,23 +211,19 @@ public class HistoryService extends AbstractComponent {
we lose fired jobs signficantly.
*/
AlertExecution execute(Alert alert, FiredAlert firedAlert) throws IOException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
AlertExecution execute(ExecutionContext ctx) throws IOException {
AlertLockService.Lock lock = alertLockService.acquire(alert.name());
try {
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, firedAlert.fireTime(), firedAlert.scheduledTime());
Trigger.Result triggerResult = alert.trigger().execute(ctx);
ctx.onTriggerResult(triggerResult);
Condition.Result conditionResult = alert.condition().execute(ctx);
ctx.onConditionResult(conditionResult);
if (triggerResult.triggered()) {
Throttler.Result throttleResult = alert.throttler().throttle(ctx, triggerResult);
if (conditionResult.met()) {
Throttler.Result throttleResult = alert.throttler().throttle(ctx, conditionResult);
ctx.onThrottleResult(throttleResult);
if (!throttleResult.throttle()) {
Transform.Result result = alert.transform().apply(ctx, triggerResult.payload());
Transform.Result result = alert.transform().apply(ctx, conditionResult.payload());
ctx.onTransformResult(result);
for (Action action : alert.actions()) {
@ -250,9 +252,9 @@ public class HistoryService extends AbstractComponent {
return;
}
try {
execute(alert, scheduledFireTime, fireTime);
HistoryService.this.fire(alert, scheduledFireTime, fireTime);
} catch (Exception e) {
logger.error("failed to fire alert [{}]", e, alert);
logger.error("failed to fire alert [{}]", e, name);
}
}
}

View File

@ -6,28 +6,24 @@
package org.elasticsearch.alerts.support;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.AlertsException;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
@ -35,12 +31,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public final class AlertUtils {
public final static IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.lenientExpandOpen();
public final static SearchType DEFAULT_TRIGGER_SEARCH_TYPE = SearchType.COUNT;
public final static SearchType DEFAULT_PAYLOAD_SEARCH_TYPE = SearchType.DFS_QUERY_AND_FETCH;
public static final String FIRE_TIME_VARIABLE_NAME = "fire_time";
public static final String SCHEDULED_FIRE_TIME_VARIABLE_NAME = "scheduled_fire_time";
private AlertUtils() {
}
@ -54,37 +44,6 @@ public final class AlertUtils {
}
}
/**
* Creates a new search request applying the scheduledFireTime and fireTime to the original request
*/
public static SearchRequest createSearchRequestWithTimes(SearchRequest request, DateTime scheduledFireTime, DateTime fireTime, ScriptServiceProxy scriptService) throws IOException {
SearchRequest triggerSearchRequest = new SearchRequest(request)
.indicesOptions(request.indicesOptions())
.indices(request.indices());
if (Strings.hasLength(request.source())) {
Map<String, String> templateParams = new HashMap<>();
templateParams.put(SCHEDULED_FIRE_TIME_VARIABLE_NAME, formatDate(scheduledFireTime));
templateParams.put(FIRE_TIME_VARIABLE_NAME, formatDate(fireTime));
String requestSource = XContentHelper.convertToJson(request.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
triggerSearchRequest.source((BytesReference) script.unwrap(script.run()), false);
} else if (request.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(request.templateParams())
.put(SCHEDULED_FIRE_TIME_VARIABLE_NAME, formatDate(scheduledFireTime))
.put(FIRE_TIME_VARIABLE_NAME, formatDate(fireTime));
triggerSearchRequest.templateParams(templateParams.map());
triggerSearchRequest.templateName(request.templateName());
triggerSearchRequest.templateType(request.templateType());
} else {
throw new ElasticsearchIllegalStateException("Search requests needs either source or template name");
}
return triggerSearchRequest;
}
public static SearchRequest readSearchRequest(XContentParser parser) throws IOException {
return readSearchRequest(parser, DEFAULT_TRIGGER_SEARCH_TYPE);
}
/**
* Reads a new search request instance for the specified parser.
*/
@ -235,13 +194,13 @@ public final class AlertUtils {
builder.field("allow_no_indices", options.allowNoIndices());
builder.endObject();
}
if (searchRequest.searchType() != DEFAULT_TRIGGER_SEARCH_TYPE) {
if (searchRequest.searchType() != null) {
builder.field("search_type", searchRequest.searchType().toString().toLowerCase(Locale.ENGLISH));
}
builder.endObject();
}
private static ScriptService.ScriptType readScriptType(String value) {
static ScriptService.ScriptType readScriptType(String value) {
switch (value) {
case "indexed":
return ScriptService.ScriptType.INDEXED;
@ -254,7 +213,7 @@ public final class AlertUtils {
}
}
private static String writeScriptType(ScriptService.ScriptType value) {
static String writeScriptType(ScriptService.ScriptType value) {
switch (value) {
case INDEXED:
return "indexed";

View File

@ -26,7 +26,9 @@ import java.util.Map;
/**
*/
public class StringTemplateUtils extends AbstractComponent {
private final ScriptServiceProxy scriptService;
@Inject
public StringTemplateUtils(Settings settings, ScriptServiceProxy scriptService) {
super(settings);
@ -81,7 +83,7 @@ public class StringTemplateUtils extends AbstractComponent {
language = parser.text();
break;
case "type":
type = readScriptType(parser.text());
type = AlertUtils.readScriptType(parser.text());
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + fieldName + "]");
@ -97,7 +99,7 @@ public class StringTemplateUtils extends AbstractComponent {
public static void writeTemplate(String objectName, Template template, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(objectName);
builder.field("script", template.getTemplate());
builder.field("type", writeScriptType(template.getScriptType()));
builder.field("type", AlertUtils.writeScriptType(template.getScriptType()));
builder.field("language", template.getLanguage());
if (template.getParams() != null && !template.getParams().isEmpty()) {
builder.field("params", template.getParams());
@ -105,30 +107,6 @@ public class StringTemplateUtils extends AbstractComponent {
builder.endObject();
}
private static ScriptService.ScriptType readScriptType(String value) {
switch (value) {
case "indexed":
return ScriptService.ScriptType.INDEXED;
case "inline":
return ScriptService.ScriptType.INLINE;
case "file":
return ScriptService.ScriptType.FILE;
default:
throw new ElasticsearchIllegalArgumentException("Unknown script_type value [" + value + "]");
}
}
private static String writeScriptType(ScriptService.ScriptType value) {
switch (value) {
case INDEXED:
return "indexed";
case INLINE:
return "inline";
case FILE:
return "file";
default:
throw new ElasticsearchIllegalArgumentException("Illegal script_type value [" + value + "]");
}
}
public static class Template {
private final String template;
private final Map<String, String> params;

View File

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.support;
/**
*
*/
public final class Variables {
public static final String FIRE_TIME = "fire_time";
public static final String SCHEDULED_FIRE_TIME = "scheduled_fire_time";
}

View File

@ -6,7 +6,7 @@
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.condition.Condition;
import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
@ -16,7 +16,7 @@ import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
public class AckThrottler implements Throttler {
@Override
public Result throttle(ExecutionContext ctx, Trigger.Result result) {
public Result throttle(ExecutionContext ctx, Condition.Result result) {
if (ctx.alert().acked()) {
return Result.throttle("alert [" + ctx.alert().name() + "] was acked at [" + formatDate(ctx.alert().status().ackStatus().timestamp()) + "]");
}

View File

@ -6,7 +6,7 @@
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
@ -24,7 +24,7 @@ public class AlertThrottler implements Throttler {
}
@Override
public Result throttle(ExecutionContext ctx, Trigger.Result result) {
public Result throttle(ExecutionContext ctx, Condition.Result result) {
if (periodThrottler != null) {
Result throttleResult = periodThrottler.throttle(ctx, result);
if (throttleResult.throttle()) {

View File

@ -7,7 +7,7 @@ package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.common.joda.time.PeriodType;
import org.elasticsearch.common.unit.TimeValue;
@ -33,9 +33,9 @@ public class PeriodThrottler implements Throttler {
}
@Override
public Result throttle(ExecutionContext ctx, Trigger.Result result) {
public Result throttle(ExecutionContext ctx, Condition.Result result) {
Alert.Status status = ctx.alert().status();
if (status.lastRan() != null) {
if (status.lastExecuted() != null) {
TimeValue timeElapsed = new TimeValue(System.currentTimeMillis() - status.lastExecuted().getMillis());
if (timeElapsed.getMillis() <= period.getMillis()) {
return Result.throttle("throttling interval is set to [" + period.format(periodType) +

View File

@ -6,7 +6,7 @@
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.common.ParseField;
/**
@ -16,12 +16,12 @@ public interface Throttler {
public static final Throttler NO_THROTTLE = new Throttler() {
@Override
public Result throttle(ExecutionContext ctx, Trigger.Result result) {
public Result throttle(ExecutionContext ctx, Condition.Result result) {
return Result.NO;
}
};
Result throttle(ExecutionContext ctx, Trigger.Result result);
Result throttle(ExecutionContext ctx, Condition.Result result);
static class Result {

View File

@ -7,9 +7,11 @@ package org.elasticsearch.alerts.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.common.Strings;
@ -40,6 +42,8 @@ public class SearchTransform implements Transform {
public static final String TYPE = "search";
public static final SearchType DEFAULT_SEARCH_TYPE = SearchType.DFS_QUERY_AND_FETCH;
protected final ESLogger logger;
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
@ -70,28 +74,28 @@ public class SearchTransform implements Transform {
return builder;
}
public SearchRequest createRequest(SearchRequest request, DateTime scheduledFireTime, DateTime fireTime, Map<String, Object> data) throws IOException {
SearchRequest triggerSearchRequest = new SearchRequest(request)
.indicesOptions(request.indicesOptions())
.indices(request.indices());
if (Strings.hasLength(request.source())) {
public SearchRequest createRequest(SearchRequest requestPrototype, DateTime scheduledFireTime, DateTime fireTime, Map<String, Object> data) throws IOException {
SearchRequest request = new SearchRequest(requestPrototype)
.indicesOptions(requestPrototype.indicesOptions())
.indices(requestPrototype.indices());
if (Strings.hasLength(requestPrototype.source())) {
Map<String, String> templateParams = new HashMap<>();
templateParams.put(AlertUtils.SCHEDULED_FIRE_TIME_VARIABLE_NAME, formatDate(scheduledFireTime));
templateParams.put(AlertUtils.FIRE_TIME_VARIABLE_NAME, formatDate(fireTime));
String requestSource = XContentHelper.convertToJson(request.source(), false);
templateParams.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime));
templateParams.put(Variables.FIRE_TIME, formatDate(fireTime));
String requestSource = XContentHelper.convertToJson(requestPrototype.source(), false);
ExecutableScript script = scriptService.executable("mustache", requestSource, ScriptService.ScriptType.INLINE, templateParams);
triggerSearchRequest.source((BytesReference) script.unwrap(script.run()), false);
} else if (request.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(request.templateParams())
.put(AlertUtils.SCHEDULED_FIRE_TIME_VARIABLE_NAME, formatDate(scheduledFireTime))
.put(AlertUtils.FIRE_TIME_VARIABLE_NAME, formatDate(fireTime));
triggerSearchRequest.templateParams(templateParams.map());
triggerSearchRequest.templateName(request.templateName());
triggerSearchRequest.templateType(request.templateType());
request.source((BytesReference) script.unwrap(script.run()), false);
} else if (requestPrototype.templateName() != null) {
MapBuilder<String, String> templateParams = MapBuilder.newMapBuilder(requestPrototype.templateParams())
.put(Variables.SCHEDULED_FIRE_TIME, formatDate(scheduledFireTime))
.put(Variables.FIRE_TIME, formatDate(fireTime));
request.templateParams(templateParams.map());
request.templateName(requestPrototype.templateName());
request.templateType(requestPrototype.templateType());
} else {
throw new TransformException("search requests needs either source or template name");
}
return triggerSearchRequest;
return request;
}
public static class Parser extends AbstractComponent implements Transform.Parser<SearchTransform> {
@ -113,7 +117,7 @@ public class SearchTransform implements Transform {
@Override
public SearchTransform parse(XContentParser parser) throws IOException {
SearchRequest request = AlertUtils.readSearchRequest(parser, AlertUtils.DEFAULT_PAYLOAD_SEARCH_TYPE);
SearchRequest request = AlertUtils.readSearchRequest(parser, DEFAULT_SEARCH_TYPE);
return new SearchTransform(logger, scriptService, client, request);
}
}

View File

@ -1,43 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.trigger;
import org.elasticsearch.alerts.trigger.search.ScriptSearchTrigger;
import org.elasticsearch.alerts.trigger.simple.SimpleTrigger;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.MapBinder;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
public class TriggerModule extends AbstractModule {
private final Map<String, Class<? extends Trigger.Parser>> parsers = new HashMap<>();
public void registerTrigger(String type, Class<? extends Trigger.Parser> parserType) {
parsers.put(type, parserType);
}
@Override
protected void configure() {
MapBinder<String, Trigger.Parser> parsersBinder = MapBinder.newMapBinder(binder(), String.class, Trigger.Parser.class);
bind(ScriptSearchTrigger.Parser.class).asEagerSingleton();
parsersBinder.addBinding(ScriptSearchTrigger.TYPE).to(ScriptSearchTrigger.Parser.class);
bind(SimpleTrigger.Parser.class).asEagerSingleton();
parsersBinder.addBinding(SimpleTrigger.TYPE).to(SimpleTrigger.Parser.class);
for (Map.Entry<String, Class<? extends Trigger.Parser>> entry : parsers.entrySet()) {
bind(entry.getValue()).asEagerSingleton();
parsersBinder.addBinding(entry.getKey()).to(entry.getValue());
}
bind(TriggerRegistry.class).asEagerSingleton();
}
}

View File

@ -1,84 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.trigger.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
public abstract class SearchTrigger extends Trigger<SearchTrigger.Result> {
protected final ScriptServiceProxy scriptService;
protected final ClientProxy client;
protected final SearchRequest request;
public SearchTrigger(ESLogger logger, ScriptServiceProxy scriptService, ClientProxy client, SearchRequest request) {
super(logger);
this.scriptService = scriptService;
this.client = client;
this.request = request;
}
@Override
public Result execute(ExecutionContext ctx) throws IOException {
SearchRequest request = AlertUtils.createSearchRequestWithTimes(this.request, ctx.scheduledTime(), ctx.fireTime(), scriptService);
if (logger.isTraceEnabled()) {
logger.trace("running query for [{}]", ctx.alert().name(), XContentHelper.convertToJson(request.source(), false, true));
}
// actionGet deals properly with InterruptedException
SearchResponse response = client.search(request).actionGet();
if (logger.isDebugEnabled()) {
logger.debug("got [{}] hits", ctx.alert().name(), response.getHits().getTotalHits());
for (SearchHit hit : response.getHits()) {
logger.debug("hit [{}]", XContentHelper.toString(hit));
}
}
return processSearchResponse(response);
}
/**
* Processes the search response and returns the appropriate trigger result
*/
protected abstract Result processSearchResponse(SearchResponse response);
public static class Result extends Trigger.Result {
public static final ParseField REQUEST_FIELD = new ParseField("request");
private final SearchRequest request;
public Result(String type, boolean triggered, SearchRequest request, Payload payload) {
super(type, triggered, payload);
this.request = request;
}
public SearchRequest request() {
return request;
}
@Override
public XContentBuilder toXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(REQUEST_FIELD.getPreferredName());
AlertUtils.writeSearchRequest(request(), builder, params);
return builder;
}
}
}

View File

@ -15,9 +15,6 @@
"type": "string",
"index": "not_analyzed"
},
"triggered": {
"type": "boolean"
},
"fire_time": {
"type": "date"
},
@ -31,12 +28,7 @@
"message": {
"type": "string"
},
"trigger" : {
"type" : "object",
"enabled" : false,
"dynamic" : true
},
"transform" : {
"condition" : {
"type" : "object",
"enabled" : false,
"dynamic" : true
@ -46,21 +38,6 @@
"enabled" : false,
"dynamic" : true
},
"payload" : {
"type" : "object",
"enabled" : false,
"dynamic" : true
},
"trigger_response": {
"type": "object",
"enabled" : false,
"dynamic" : true
},
"actions": {
"type" : "object",
"enabled" : false,
"dynamic" : true
},
"meta" : {
"type" : "object",
"dynamic": true

View File

@ -24,7 +24,7 @@
"throttle_period": {
"type": "string"
},
"trigger": {
"condition": {
"type": "object",
"enabled" : false,
"dynamic" : true

View File

@ -18,6 +18,8 @@ import org.elasticsearch.alerts.actions.email.service.Profile;
import org.elasticsearch.alerts.actions.webhook.HttpClient;
import org.elasticsearch.alerts.actions.webhook.WebhookAction;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.condition.search.ScriptSearchCondition;
import org.elasticsearch.alerts.condition.search.SearchCondition;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryStore;
import org.elasticsearch.alerts.scheduler.schedule.CronSchedule;
@ -27,7 +29,6 @@ import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.transform.SearchTransform;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
import org.elasticsearch.alerts.trigger.search.ScriptSearchTrigger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
@ -49,7 +50,6 @@ import org.junit.After;
import org.junit.Before;
import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
@ -109,11 +109,11 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
stopAlerting();
}
protected BytesReference createAlertSource(String cron, SearchRequest request, String scriptTrigger) throws IOException {
return createAlertSource(cron, request, scriptTrigger, null);
protected BytesReference createAlertSource(String cron, SearchRequest conditionRequest, String conditionScript) throws IOException {
return createAlertSource(cron, conditionRequest, conditionScript, null);
}
protected BytesReference createAlertSource(String cron, SearchRequest request, String scriptTrigger, Map<String,Object> metadata) throws IOException {
protected BytesReference createAlertSource(String cron, SearchRequest conditionRequest, String conditionScript, Map<String,Object> metadata) throws IOException {
XContentBuilder builder;
if (randomBoolean()) {
builder = jsonBuilder();
@ -134,11 +134,11 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
builder.field("meta", metadata);
}
builder.startObject("trigger");
builder.startObject("condition");
builder.startObject("script");
builder.field("request");
AlertUtils.writeSearchRequest(request, builder, ToXContent.EMPTY_PARAMS);
builder.field("script", scriptTrigger);
AlertUtils.writeSearchRequest(conditionRequest, builder, ToXContent.EMPTY_PARAMS);
builder.field("script", conditionScript);
builder.endObject();
builder.endObject();
@ -156,28 +156,25 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
return builder.bytes();
}
public static SearchRequest createTriggerSearchRequest(String... indices) {
public static SearchRequest createConditionSearchRequest(String... indices) {
SearchRequest request = new SearchRequest(indices);
request.indicesOptions(AlertUtils.DEFAULT_INDICES_OPTIONS);
request.searchType(AlertUtils.DEFAULT_TRIGGER_SEARCH_TYPE);
request.searchType(SearchCondition.DEFAULT_SEARCH_TYPE);
return request;
}
protected Alert createTestAlert(String alertName) throws AddressException {
SearchRequest triggerRequest = createTriggerSearchRequest("my-trigger-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = createTriggerSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
transformRequest.searchType(AlertUtils.DEFAULT_PAYLOAD_SEARCH_TYPE);
triggerRequest.searchType(AlertUtils.DEFAULT_TRIGGER_SEARCH_TYPE);
SearchRequest conditionRequest = createConditionSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = createConditionSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
transformRequest.searchType(SearchTransform.DEFAULT_SEARCH_TYPE);
conditionRequest.searchType(SearchCondition.DEFAULT_SEARCH_TYPE);
List<Action> actions = new ArrayList<>();
StringTemplateUtils.Template template =
new StringTemplateUtils.Template("{{alert_name}} triggered with {{response.hits.total}} hits");
new StringTemplateUtils.Template("{{alert_name}} executed with {{response.hits.total}} hits");
actions.add(new WebhookAction(logger, stringTemplateUtils(), httpClient(), template, new StringTemplateUtils.Template("http://localhost/foobarbaz/{{alert_name}}"), HttpMethod.GET));
List<InternetAddress> addresses = new ArrayList<>();
addresses.addAll(Arrays.asList(InternetAddress.parse("you@foo.com")));
actions.add(new WebhookAction(logger, stringTemplateUtils(), httpClient(), template, new StringTemplateUtils.Template("http://localhost/foobarbaz/{{alert_name}}"), HttpMethod.GET));
Email.Address from = new Email.Address("from@test.com");
List<Email.Address> emailAddressList = new ArrayList<>();
@ -201,8 +198,8 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
return new Alert(
alertName,
new CronSchedule("0/5 * * * * ? *"),
new ScriptSearchTrigger(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
triggerRequest,"return true", ScriptService.ScriptType.INLINE, "groovy"),
new ScriptSearchCondition(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
conditionRequest,"return true", ScriptService.ScriptType.INLINE, "groovy"),
new SearchTransform(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()), transformRequest),
new TimeValue(0),
new Actions(actions),
@ -236,8 +233,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
return internalTestCluster().getInstance(FiredAlert.Parser.class);
}
protected void assertAlertTriggeredExact(final String alertName, final long expectedAlertActionsWithActionPerformed) throws Exception {
protected void assertAlertWithExactPerformedActionsCount(final String alertName, final long expectedAlertActionsWithActionPerformed) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
@ -255,11 +251,11 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
});
}
protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception {
assertAlertTriggered(alertName, minimumExpectedAlertActionsWithActionPerformed, true);
protected void assertAlertWithMinimumPerformedActionsCount(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception {
assertAlertWithMinimumPerformedActionsCount(alertName, minimumExpectedAlertActionsWithActionPerformed, true);
}
protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed, final boolean assertTriggerSearchMatched) throws Exception {
protected void assertAlertWithMinimumPerformedActionsCount(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed, final boolean assertConditionMet) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
@ -274,11 +270,11 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
SearchResponse searchResponse = client().prepareSearch(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.ACTION_PERFORMED.toString())))
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.EXECUTED.toString())))
.get();
assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed));
if (assertTriggerSearchMatched) {
assertThat((Integer) XContentMapValues.extractValue("alert_execution.trigger_result.script.payload.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1));
if (assertConditionMet) {
assertThat((Integer) XContentMapValues.extractValue("alert_execution.condition_result.script.payload.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1));
}
}
});
@ -287,12 +283,12 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
protected long findNumberOfPerformedActions(String alertName) {
SearchResponse searchResponse = client().prepareSearch(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.ACTION_PERFORMED.toString())))
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.EXECUTED.toString())))
.get();
return searchResponse.getHits().getTotalHits();
}
protected void assertNoAlertTrigger(final String alertName, final long expectedAlertActionsWithNoActionNeeded) throws Exception {
protected void assertAlertWithNoActionNeeded(final String alertName, final long expectedAlertActionsWithNoActionNeeded) throws Exception {
assertBusy(new Runnable() {
@Override
public void run() {
@ -308,7 +304,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
SearchResponse searchResponse = client().prepareSearch(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.NO_ACTION_NEEDED.toString())))
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.EXECUTION_NOT_NEEDED.toString())))
.get();
assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(expectedAlertActionsWithNoActionNeeded));
}
@ -482,7 +478,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
}
private final class NoopEmailService implements EmailService {
private static class NoopEmailService implements EmailService {
@Override
public void start(ClusterState state) {
@ -502,7 +498,6 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) {
return new EmailSent(accountName, email);
}
};
}
}

View File

@ -35,12 +35,12 @@ public class AlertMetadataTest extends AbstractAlertingTests {
metaList.add("test");
metadata.put("baz", metaList);
SearchRequest triggerRequest = createTriggerSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
SearchRequest conditionRequest = createConditionSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
alertClient().preparePutAlert("1")
.setAlertSource(createAlertSource("0/5 * * * * ? *", triggerRequest, "hits.total == 1", metadata))
.setAlertSource(createAlertSource("0/5 * * * * ? *", conditionRequest, "hits.total == 1", metadata))
.get();
// Wait for a no action entry to be added. (the trigger search request will not match, because there are no docs in my-index)
assertNoAlertTrigger("1", 1);
// Wait for a no action entry to be added. (the condition search request will not match, because there are no docs in my-index)
assertAlertWithNoActionNeeded("1", 1);
refresh();
SearchResponse searchResponse = client().prepareSearch(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")

View File

@ -56,7 +56,7 @@ public class AlertSerializationTest extends AbstractAlertingTests {
assertEquals(parsedAlert.status().lastExecuted().getMillis(), alert.status().lastExecuted().getMillis());
}
assertEqualByGeneratedXContent(parsedAlert.schedule(), alert.schedule());
assertEqualByGeneratedXContent(parsedAlert.trigger(), alert.trigger());
assertEqualByGeneratedXContent(parsedAlert.condition(), alert.condition());
assertEquals(parsedAlert.throttlePeriod().getMillis(), alert.throttlePeriod().getMillis());
assertEquals(parsedAlert.status().ackStatus().state(), alert.status().ackStatus().state());
assertEquals(parsedAlert.metadata().get("foo"), "bar");

View File

@ -24,7 +24,7 @@ import org.elasticsearch.alerts.transform.SearchTransform;
import org.elasticsearch.alerts.transport.actions.ack.AckAlertResponse;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
import org.elasticsearch.alerts.trigger.search.ScriptSearchTrigger;
import org.elasticsearch.alerts.condition.search.ScriptSearchCondition;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -60,7 +60,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
refresh();
SearchRequest request = createTriggerSearchRequest("test-index").source(searchSource().query(matchAllQuery()));
SearchRequest request = createConditionSearchRequest("test-index").source(searchSource().query(matchAllQuery()));
List<Action> actions = new ArrayList<>();
@ -69,7 +69,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
Alert alert = new Alert(
"test-serialization",
new CronSchedule("0/5 * * * * ? *"),
new ScriptSearchTrigger(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
new ScriptSearchCondition(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
request, "hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"),
new SearchTransform(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()), request),
new TimeValue(0),
@ -110,7 +110,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
long countAfterSleep = searchResponse.getHits().getTotalHits();
assertThat("There shouldn't be more entries in the index after we ack the alert", countAfterAck, equalTo(countAfterSleep));
//Now delete the event and the ack state should change to NOT_TRIGGERED
//Now delete the event and the ack state should change to AWAITS_EXECUTION
DeleteResponse response = client().prepareDelete("test-index", "test-type", dummyEventIndexResponse.getId()).get();
assertTrue(response.isFound());
@ -143,7 +143,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
assertTrue(dummyEventIndexResponse.isCreated());
refresh();
SearchRequest request = createTriggerSearchRequest("test-index").source(searchSource().query(matchAllQuery()));
SearchRequest request = createConditionSearchRequest("test-index").source(searchSource().query(matchAllQuery()));
List<Action> actions = new ArrayList<>();
@ -152,7 +152,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
Alert alert = new Alert(
"test-time-throttle",
new CronSchedule("0/5 * * * * ? *"),
new ScriptSearchTrigger(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
new ScriptSearchCondition(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
request, "hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"),
new SearchTransform(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()), request),
new TimeValue(10, TimeUnit.SECONDS),

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Variables;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
@ -44,12 +45,12 @@ public class BasicAlertingTest extends AbstractAlertingTests {
createIndex("my-index");
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
alertsClient.preparePutAlert("my-first-alert")
.setAlertSource(alertSource)
.get();
assertAlertTriggered("my-first-alert", 1);
assertAlertWithMinimumPerformedActionsCount("my-first-alert", 1);
GetAlertResponse getAlertResponse = alertClient().prepareGetAlert().setAlertName("my-first-alert").get();
assertThat(getAlertResponse.getResponse().isExists(), is(true));
@ -59,18 +60,18 @@ public class BasicAlertingTest extends AbstractAlertingTests {
@Test
public void testIndexAlert_registerAlertBeforeTargetIndex() throws Exception {
AlertsClient alertsClient = alertClient();
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
alertsClient.preparePutAlert("my-first-alert")
.setAlertSource(alertSource)
.get();
// The alert can't trigger because there is no data that matches with the query
assertNoAlertTrigger("my-first-alert", 1);
// The alert's condition won't meet because there is no data that matches with the query
assertAlertWithNoActionNeeded("my-first-alert", 1);
// Index sample doc after we register the alert and the alert should get triggered
// Index sample doc after we register the alert and the alert's condition should meet
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
assertAlertTriggered("my-first-alert", 1);
assertAlertWithMinimumPerformedActionsCount("my-first-alert", 1);
}
@Test
@ -79,7 +80,7 @@ public class BasicAlertingTest extends AbstractAlertingTests {
createIndex("my-index");
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
PutAlertResponse indexResponse = alertsClient.preparePutAlert("my-first-alert")
.setAlertSource(alertSource)
@ -113,8 +114,8 @@ public class BasicAlertingTest extends AbstractAlertingTests {
alertSource.field("malformed_field", "x");
alertSource.startObject("schedule").field("cron", "0/5 * * * * ? *").endObject();
alertSource.startObject("trigger").startObject("script").field("script", "return true").field("request");
AlertUtils.writeSearchRequest(createTriggerSearchRequest(), alertSource, ToXContent.EMPTY_PARAMS);
alertSource.startObject("condition").startObject("script").field("script", "return true").field("request");
AlertUtils.writeSearchRequest(createConditionSearchRequest(), alertSource, ToXContent.EMPTY_PARAMS);
alertSource.endObject();
alertSource.endObject();
@ -142,7 +143,7 @@ public class BasicAlertingTest extends AbstractAlertingTests {
createIndex("my-index");
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
// By accessing the actual hit we know that the fetch phase has been performed
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits?.hits[0]._score == 1.0");
@ -150,30 +151,30 @@ public class BasicAlertingTest extends AbstractAlertingTests {
.setAlertSource(alertSource)
.get();
assertThat(indexResponse.indexResponse().isCreated(), is(true));
assertAlertTriggered("my-first-alert", 1);
assertAlertWithMinimumPerformedActionsCount("my-first-alert", 1);
}
@Test
public void testModifyAlerts() throws Exception {
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
alertClient().preparePutAlert("1")
.setAlertSource(createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1"))
.get();
assertAlertTriggered("1", 0, false);
assertAlertWithMinimumPerformedActionsCount("1", 0, false);
alertClient().preparePutAlert("1")
.setAlertSource(createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 0"))
.get();
assertAlertTriggered("1", 1, false);
assertAlertWithMinimumPerformedActionsCount("1", 1, false);
alertClient().preparePutAlert("1")
.setAlertSource(createAlertSource("0/5 * * * * ? 2020", searchRequest, "hits.total == 0"))
.get();
Thread.sleep(5000);
long triggered = findNumberOfPerformedActions("1");
long count = findNumberOfPerformedActions("1");
Thread.sleep(5000);
assertThat(triggered, equalTo(findNumberOfPerformedActions("1")));
assertThat(count, equalTo(findNumberOfPerformedActions("1")));
}
@Test
@ -204,7 +205,7 @@ public class BasicAlertingTest extends AbstractAlertingTests {
}
assertAcked(prepareCreate("my-index").addMapping("my-type", "_timestamp", "enabled=true"));
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(
searchSource()
.query(QueryBuilders.constantScoreQuery(FilterBuilders.rangeFilter("_timestamp").from("{{scheduled_fire_time}}||-1m").to("{{scheduled_fire_time}}")))
.aggregation(AggregationBuilders.dateHistogram("rate").field("_timestamp").interval(DateHistogram.Interval.SECOND).order(Histogram.Order.COUNT_DESC))
@ -216,40 +217,40 @@ public class BasicAlertingTest extends AbstractAlertingTests {
indexThread.start();
indexThread.join();
assertAlertTriggeredExact("rate-alert", 0);
assertNoAlertTrigger("rate-alert", 1);
assertAlertWithExactPerformedActionsCount("rate-alert", 0);
assertAlertWithNoActionNeeded("rate-alert", 1);
indexThread = new Thread(new R(100, 60000));
indexThread.start();
indexThread.join();
assertAlertTriggered("rate-alert", 1);
assertAlertWithMinimumPerformedActionsCount("rate-alert", 1);
}
private final SearchSourceBuilder searchSourceBuilder = searchSource().query(
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{" + AlertUtils.SCHEDULED_FIRE_TIME_VARIABLE_NAME + "}}||-30s").to("{{" + AlertUtils.SCHEDULED_FIRE_TIME_VARIABLE_NAME + "}}"))
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{" + Variables.SCHEDULED_FIRE_TIME + "}}||-30s").to("{{" + Variables.SCHEDULED_FIRE_TIME + "}}"))
);
@Test
public void testTriggerSearchWithSource() throws Exception {
testTriggerSearch(
createTriggerSearchRequest("my-index").source(searchSourceBuilder)
public void testConditionSearchWithSource() throws Exception {
testConditionSearch(
createConditionSearchRequest("my-index").source(searchSourceBuilder)
);
}
@Test
public void testTriggerSearchWithIndexedTemplate() throws Exception {
public void testConditionSearchWithIndexedTemplate() throws Exception {
client().preparePutIndexedScript()
.setScriptLang("mustache")
.setId("my-template")
.setSource(jsonBuilder().startObject().field("template").value(searchSourceBuilder).endObject())
.get();
SearchRequest searchRequest = createTriggerSearchRequest("my-index");
SearchRequest searchRequest = createConditionSearchRequest("my-index");
searchRequest.templateName("my-template");
searchRequest.templateType(ScriptService.ScriptType.INDEXED);
testTriggerSearch(searchRequest);
testConditionSearch(searchRequest);
}
private void testTriggerSearch(SearchRequest request) throws Exception {
private void testConditionSearch(SearchRequest request) throws Exception {
long scheduleTimeInMs = 5000;
String alertName = "red-alert";
assertAcked(prepareCreate("my-index").addMapping("my-type", "_timestamp", "enabled=true", "event_type", "type=string"));
@ -270,7 +271,7 @@ public class BasicAlertingTest extends AbstractAlertingTests {
.get();
long timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertNoAlertTrigger(alertName, 1);
assertAlertWithNoActionNeeded(alertName, 1);
time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
@ -279,7 +280,7 @@ public class BasicAlertingTest extends AbstractAlertingTests {
.get();
timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertNoAlertTrigger(alertName, 2);
assertAlertWithNoActionNeeded(alertName, 2);
time1 = System.currentTimeMillis();
client().prepareIndex("my-index", "my-type")
@ -288,6 +289,6 @@ public class BasicAlertingTest extends AbstractAlertingTests {
.get();
timeLeft = scheduleTimeInMs - (System.currentTimeMillis() - time1);
Thread.sleep(timeLeft);
assertAlertTriggered(alertName, 1);
assertAlertWithMinimumPerformedActionsCount(alertName, 1);
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.Actions;
import org.elasticsearch.alerts.condition.search.ScriptSearchCondition;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryStore;
import org.elasticsearch.alerts.scheduler.schedule.CronSchedule;
@ -19,7 +20,6 @@ import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.transform.SearchTransform;
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
import org.elasticsearch.alerts.trigger.search.ScriptSearchTrigger;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
@ -35,8 +35,10 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
/**
@ -47,7 +49,7 @@ public class BootStrapTest extends AbstractAlertingTests {
public void testBootStrapAlerts() throws Exception {
ensureAlertingStarted();
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("0 0/5 * * * ? *", searchRequest, "hits.total == 1");
client().prepareIndex(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, "my-first-alert")
.setSource(alertSource)
@ -66,7 +68,7 @@ public class BootStrapTest extends AbstractAlertingTests {
@Test
@TestLogging("alerts.actions:DEBUG")
public void testBootStrapHistory() throws Exception {
public void testBootstrapHistory() throws Exception {
ensureAlertingStarted();
AlertsStatsResponse response = alertClient().prepareAlertsStats().get();
@ -74,11 +76,11 @@ public class BootStrapTest extends AbstractAlertingTests {
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
Alert alert = new Alert(
"test-serialization",
new CronSchedule("0/5 * * * * ? 2035"),
new ScriptSearchTrigger(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
new ScriptSearchCondition(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
searchRequest, "return true", ScriptService.ScriptType.INLINE, "groovy"),
new SearchTransform(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()), searchRequest),
new TimeValue(0),
@ -87,17 +89,24 @@ public class BootStrapTest extends AbstractAlertingTests {
new Alert.Status()
);
XContentBuilder builder = jsonBuilder().value(alert);
IndexResponse indexResponse = client().prepareIndex(AlertsStore.ALERT_INDEX, AlertsStore.ALERT_TYPE, alert.name())
.setSource(builder).get();
ensureGreen(AlertsStore.ALERT_INDEX);
refresh();
assertThat(indexResponse.isCreated(), is(true));
DateTime scheduledFireTime = new DateTime(DateTimeZone.UTC);
FiredAlert entry = new FiredAlert(alert, scheduledFireTime, scheduledFireTime);
FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, scheduledFireTime);
String actionHistoryIndex = HistoryStore.getAlertHistoryIndexNameForTime(scheduledFireTime);
createIndex(actionHistoryIndex);
ensureGreen(actionHistoryIndex);
logger.info("Created index {}", actionHistoryIndex);
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryStore.ALERT_HISTORY_TYPE, entry.id())
indexResponse = client().prepareIndex(actionHistoryIndex, HistoryStore.ALERT_HISTORY_TYPE, firedAlert.id())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(XContentFactory.jsonBuilder().value(entry))
.setSource(jsonBuilder().value(firedAlert))
.get();
assertTrue(indexResponse.isCreated());
@ -107,7 +116,7 @@ public class BootStrapTest extends AbstractAlertingTests {
response = alertClient().prepareAlertsStats().get();
assertTrue(response.isAlertActionManagerStarted());
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(0L));
assertThat(response.getNumberOfRegisteredAlerts(), equalTo(1L));
assertThat(response.getAlertActionManagerLargestQueueSize(), equalTo(1L));
}
@ -117,7 +126,7 @@ public class BootStrapTest extends AbstractAlertingTests {
DateTime now = new DateTime(DateTimeZone.UTC);
long numberOfAlertHistoryIndices = randomIntBetween(2,8);
long numberOfAlertHistoryEntriesPerIndex = randomIntBetween(5,10);
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
for (int i = 0; i < numberOfAlertHistoryIndices; i++) {
DateTime historyIndexDate = now.minus((new TimeValue(i, TimeUnit.DAYS)).getMillis());
@ -131,7 +140,7 @@ public class BootStrapTest extends AbstractAlertingTests {
Alert alert = new Alert(
"action-test-"+ i + " " + j,
new CronSchedule("0/5 * * * * ? 2035"), //Set a cron schedule far into the future so this alert is never scheduled
new ScriptSearchTrigger(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
new ScriptSearchCondition(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
searchRequest, "return true", ScriptService.ScriptType.INLINE, "groovy"),
new SearchTransform(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()), searchRequest),
new TimeValue(0),
@ -139,16 +148,16 @@ public class BootStrapTest extends AbstractAlertingTests {
null,
new Alert.Status()
);
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
XContentBuilder jsonBuilder = jsonBuilder();
alert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
PutAlertResponse putAlertResponse = alertClient().preparePutAlert(alert.name()).setAlertSource(jsonBuilder.bytes()).get();
assertTrue(putAlertResponse.indexResponse().isCreated());
FiredAlert entry = new FiredAlert(alert, historyIndexDate, historyIndexDate);
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryStore.ALERT_HISTORY_TYPE, entry.id())
FiredAlert firedAlert = new FiredAlert(alert, historyIndexDate, historyIndexDate);
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryStore.ALERT_HISTORY_TYPE, firedAlert.id())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(XContentFactory.jsonBuilder().value(entry))
.setSource(XContentFactory.jsonBuilder().value(firedAlert))
.get();
assertTrue(indexResponse.isCreated());
}
@ -168,7 +177,7 @@ public class BootStrapTest extends AbstractAlertingTests {
public void run() {
CountResponse countResponse = client().prepareCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")
.setTypes(HistoryStore.ALERT_HISTORY_TYPE)
.setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.ACTION_PERFORMED.toString())).get();
.setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.EXECUTED.toString())).get();
assertEquals(totalHistoryEntries, countResponse.getCount());
}

View File

@ -57,12 +57,12 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
// Have a sample document in the index, the alert is going to evaluate
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
alertClient().preparePutAlert("my-first-alert")
.setAlertSource(alertSource)
.get();
assertAlertTriggered("my-first-alert", 1);
assertAlertWithMinimumPerformedActionsCount("my-first-alert", 1);
// Stop the elected master, no new master will be elected b/c of m_m_n is set to 2
stopElectedMasterNodeAndWait();
@ -76,18 +76,18 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
// Bring back the 2nd node and wait for elected master node to come back and alerting to work as expected.
startElectedMasterNodeAndWait();
// Our first alert should at least have been triggered twice
assertAlertTriggered("my-first-alert", 2);
// Our first alert's condition should at least have been met twice
assertAlertWithMinimumPerformedActionsCount("my-first-alert", 2);
// Delete the existing alert
DeleteAlertResponse response = alertClient().prepareDeleteAlert("my-first-alert").get();
assertThat(response.deleteResponse().isFound(), is(true));
// Add a new alert and wait for it get triggered
// Add a new alert and wait for its condition to be met
alertClient().preparePutAlert("my-second-alert")
.setAlertSource(alertSource)
.get();
assertAlertTriggered("my-second-alert", 1);
assertAlertWithMinimumPerformedActionsCount("my-second-alert", 1);
}
@Test
@ -104,7 +104,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
ensureAlertingStarted();
for (int i = 1; i <= numberOfAlerts; i++) {
String alertName = "alert" + i;
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1");
alertClient().preparePutAlert(alertName)
.setAlertSource(alertSource)
@ -116,7 +116,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
for (int j = 1; j < numberOfAlerts; j++) {
String alertName = "alert" + i;
assertAlertTriggered(alertName, i);
assertAlertWithMinimumPerformedActionsCount(alertName, i);
}
stopElectedMasterNodeAndWait();
startElectedMasterNodeAndWait();

View File

@ -10,13 +10,12 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.Actions;
import org.elasticsearch.alerts.actions.index.IndexAction;
import org.elasticsearch.alerts.condition.search.ScriptSearchCondition;
import org.elasticsearch.alerts.scheduler.schedule.CronSchedule;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.support.init.proxy.ScriptServiceProxy;
import org.elasticsearch.alerts.transform.SearchTransform;
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
import org.elasticsearch.alerts.trigger.search.ScriptSearchTrigger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -40,15 +39,15 @@ public class TransformSearchTest extends AbstractAlertingTests {
@Test
public void testTransformSearchRequest() throws Exception {
createIndex("my-trigger-index", "my-payload-index", "my-payload-output");
ensureGreen("my-trigger-index", "my-payload-index", "my-payload-output");
createIndex("my-condition-index", "my-payload-index", "my-payload-output");
ensureGreen("my-condition-index", "my-payload-index", "my-payload-output");
index("my-payload-index","payload", "mytestresult");
refresh();
SearchRequest triggerRequest = createTriggerSearchRequest("my-trigger-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = createTriggerSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
transformRequest.searchType(AlertUtils.DEFAULT_PAYLOAD_SEARCH_TYPE);
SearchRequest conditionRequest = createConditionSearchRequest("my-condition-index").source(searchSource().query(matchAllQuery()));
SearchRequest transformRequest = createConditionSearchRequest("my-payload-index").source(searchSource().query(matchAllQuery()));
transformRequest.searchType(SearchTransform.DEFAULT_SEARCH_TYPE);
List<Action> actions = new ArrayList<>();
actions.add(new IndexAction(logger, ClientProxy.of(client()), "my-payload-output","result"));
@ -60,8 +59,8 @@ public class TransformSearchTest extends AbstractAlertingTests {
Alert alert = new Alert(
"test-serialization",
new CronSchedule("0/5 * * * * ? *"),
new ScriptSearchTrigger(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
triggerRequest,"return true", ScriptService.ScriptType.INLINE, "groovy"),
new ScriptSearchCondition(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),
conditionRequest,"return true", ScriptService.ScriptType.INLINE, "groovy"),
new SearchTransform(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()), transformRequest),
new TimeValue(0),
new Actions(actions),
@ -75,7 +74,7 @@ public class TransformSearchTest extends AbstractAlertingTests {
PutAlertResponse putAlertResponse = alertClient().preparePutAlert("test-payload").setAlertSource(jsonBuilder.bytes()).get();
assertTrue(putAlertResponse.indexResponse().isCreated());
assertAlertTriggered("test-payload", 1, false);
assertAlertWithMinimumPerformedActionsCount("test-payload", 1, false);
refresh();
SearchRequest searchRequest = client().prepareSearch("my-payload-output").request();

View File

@ -19,8 +19,8 @@ import org.elasticsearch.alerts.transport.actions.get.GetAlertRequest;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.transport.actions.put.PutAlertRequest;
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.search.ScriptSearchTrigger;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.condition.search.ScriptSearchCondition;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -57,15 +57,15 @@ public class ActionsTest extends AbstractAlertingTests {
final List<Action> actionList = new ArrayList<>();
actionList.add(alertAction);
Trigger alertTrigger = new ScriptSearchTrigger(logger, ScriptServiceProxy.of(scriptService()),
ClientProxy.of(client()), createTriggerSearchRequest(), "return true", ScriptService.ScriptType.INLINE, "groovy");
Condition alertCondition = new ScriptSearchCondition(logger, ScriptServiceProxy.of(scriptService()),
ClientProxy.of(client()), createConditionSearchRequest(), "return true", ScriptService.ScriptType.INLINE, "groovy");
Alert alert = new Alert(
"my-first-alert",
new CronSchedule("0/5 * * * * ? *"),
alertTrigger,
new SearchTransform(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()),createTriggerSearchRequest()),
alertCondition,
new SearchTransform(logger, ScriptServiceProxy.of(scriptService()), ClientProxy.of(client()), createConditionSearchRequest()),
new TimeValue(0),
new Actions(actionList),
null,

View File

@ -51,7 +51,7 @@ public class AlertStatsTests extends AbstractAlertingTests {
assertTrue(response.isAlertActionManagerStarted());
assertThat(response.getAlertManagerStarted(), equalTo(AlertsService.State.STARTED));
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
SearchRequest searchRequest = createConditionSearchRequest("my-index").source(searchSource().query(termQuery("field", "value")));
BytesReference alertSource = createAlertSource("* * * * * ? *", searchRequest, "hits.total == 1");
alertClient().preparePutAlert("testAlert")
.setAlertSource(alertSource)

View File

@ -40,9 +40,8 @@ import java.util.Set;
public class EmailActionTest extends ElasticsearchTestCase {
public void testEmailTemplateRender() throws IOException, MessagingException {
//createIndex("my-trigger-index");
StringTemplateUtils.Template template =
new StringTemplateUtils.Template("{{alert_name}} triggered with {{response.hits.total}} hits");
new StringTemplateUtils.Template("{{alert_name}} executed with {{response.hits.total}} hits");
Settings settings = ImmutableSettings.settingsBuilder().build();
MustacheScriptEngineService mustacheScriptEngineService = new MustacheScriptEngineService(settings);

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.trigger.search;
package org.elasticsearch.alerts.condition.search;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequest;
@ -34,9 +34,9 @@ import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class SearchTriggerUnitTest extends ElasticsearchTestCase {
public class SearchConditionTests extends ElasticsearchTestCase {
private XContentBuilder createTriggerContent(String script, String scriptLang, ScriptService.ScriptType scriptType, SearchRequest request) throws IOException {
private XContentBuilder createConditionContent(String script, String scriptLang, ScriptService.ScriptType scriptType, SearchRequest request) throws IOException {
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field("script");
@ -48,14 +48,14 @@ public class SearchTriggerUnitTest extends ElasticsearchTestCase {
if (scriptType != null) {
jsonBuilder.field("script_type", scriptType.toString());
}
jsonBuilder.field(ScriptSearchTrigger.Parser.REQUEST_FIELD.getPreferredName());
jsonBuilder.field(ScriptSearchCondition.Parser.REQUEST_FIELD.getPreferredName());
AlertUtils.writeSearchRequest(request, jsonBuilder, ToXContent.EMPTY_PARAMS);
jsonBuilder.endObject();
jsonBuilder.endObject();
return jsonBuilder;
}
public void testInlineScriptTriggers() throws Exception {
public void testInlineScriptConditions() throws Exception {
Settings settings = ImmutableSettings.settingsBuilder().build();
GroovyScriptEngineService groovyScriptEngineService = new GroovyScriptEngineService(settings);
@ -64,12 +64,12 @@ public class SearchTriggerUnitTest extends ElasticsearchTestCase {
engineServiceSet.add(groovyScriptEngineService);
ScriptService scriptService = new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp));
ScriptSearchTrigger.Parser triggerParser = new ScriptSearchTrigger.Parser(settings, null, ScriptServiceProxy.of(scriptService));
ScriptSearchCondition.Parser conditionParser = new ScriptSearchCondition.Parser(settings, null, ScriptServiceProxy.of(scriptService));
try {
XContentBuilder builder = createTriggerContent("hits.total > 1", null, null, AbstractAlertingTests.createTriggerSearchRequest());
XContentBuilder builder = createConditionContent("hits.total > 1", null, null, AbstractAlertingTests.createConditionSearchRequest());
XContentParser parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
ScriptSearchTrigger trigger = triggerParser.parse(parser);
ScriptSearchCondition condition = conditionParser.parse(parser);
SearchRequest request = new SearchRequest();
request.indices("my-index");
@ -78,18 +78,18 @@ public class SearchTriggerUnitTest extends ElasticsearchTestCase {
SearchResponse response = new SearchResponse(InternalSearchResponse.empty(), "", 3, 3, 500l, new ShardSearchFailure[0]);
XContentBuilder responseBuilder = jsonBuilder().startObject().value(response).endObject();
assertFalse(trigger.processSearchResponse(response).triggered());
assertFalse(condition.processSearchResponse(response).met());
builder = createTriggerContent("return true", null, null, AbstractAlertingTests.createTriggerSearchRequest());
builder = createConditionContent("return true", null, null, AbstractAlertingTests.createConditionSearchRequest());
parser = XContentFactory.xContent(builder.bytes()).createParser(builder.bytes());
trigger = triggerParser.parse(parser);
condition = conditionParser.parse(parser);
assertTrue(trigger.processSearchResponse(response).triggered());
assertTrue(condition.processSearchResponse(response).met());
tp.shutdownNow();
} catch (IOException ioe) {
throw new ElasticsearchException("Failed to construct the trigger", ioe);
throw new ElasticsearchException("Failed to construct the condition", ioe);
}
}

View File

@ -9,10 +9,10 @@ import org.elasticsearch.alerts.*;
import org.elasticsearch.alerts.actions.email.EmailAction;
import org.elasticsearch.alerts.actions.webhook.WebhookAction;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.search.ScriptSearchTrigger;
import org.elasticsearch.alerts.trigger.search.SearchTrigger;
import org.elasticsearch.alerts.trigger.simple.SimpleTrigger;
import org.elasticsearch.alerts.condition.Condition;
import org.elasticsearch.alerts.condition.search.ScriptSearchCondition;
import org.elasticsearch.alerts.condition.search.SearchCondition;
import org.elasticsearch.alerts.condition.simple.SimpleCondition;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -24,7 +24,7 @@ import org.junit.Test;
public class FiredAlertTest extends AbstractAlertingTests {
@Test
public void testFiredAlertParser() throws Exception {
public void testParser() throws Exception {
Alert alert = createTestAlert("fired_test");
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
@ -41,19 +41,19 @@ public class FiredAlertTest extends AbstractAlertingTests {
}
@Test
public void testFinalizedFiredAlertParser() throws Exception {
public void testParser_WithSealedFiredAlert() throws Exception {
Alert alert = createTestAlert("fired_test");
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime());
ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah"));
ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}"));
Trigger.Result triggerResult = new SimpleTrigger.Result(new Payload.Simple());
ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult));
ctx.onTriggerResult(triggerResult);
Condition.Result conditionResult = new SimpleCondition.Result(new Payload.Simple());
ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx, conditionResult));
ctx.onConditionResult(conditionResult);
firedAlert.update(new AlertExecution(ctx));
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
logger.error("FOO : " + jsonBuilder.bytes().toUtf8());
FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0);
XContentBuilder jsonBuilder2 = XContentFactory.jsonBuilder();
parsedFiredAlert.toXContent(jsonBuilder2, ToXContent.EMPTY_PARAMS);
@ -61,16 +61,17 @@ public class FiredAlertTest extends AbstractAlertingTests {
}
@Test
public void testFinalizedFiredAlertParserScriptSearchTrigger() throws Exception {
public void testParser_WithSealedFiredAlert_WithScriptSearchCondition() throws Exception {
Alert alert = createTestAlert("fired_test");
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime());
ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah"));
ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}"));
Trigger.Result triggerResult = new SearchTrigger.Result(ScriptSearchTrigger.TYPE, true, createTriggerSearchRequest(), new Payload.Simple());
ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult));
ctx.onTriggerResult(triggerResult);
Condition.Result conditionResult = new SearchCondition.Result(ScriptSearchCondition.TYPE, true, createConditionSearchRequest(), new Payload.Simple());
ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx, conditionResult));
ctx.onConditionResult(conditionResult);
firedAlert.update(new AlertExecution(ctx));
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0);