diff --git a/src/main/java/org/elasticsearch/alerts/Alert.java b/src/main/java/org/elasticsearch/alerts/Alert.java index f3e724c9ce9..c19f9f44b9a 100644 --- a/src/main/java/org/elasticsearch/alerts/Alert.java +++ b/src/main/java/org/elasticsearch/alerts/Alert.java @@ -102,6 +102,18 @@ public class Alert implements ToXContent { return status; } + /** + * Acks this alert. + * + * @return {@code true} if the status of this alert changed, {@code false} otherwise. + */ + public boolean ack() { + return status.onAck(new DateTime()); + } + + public boolean acked() { + return status.ackStatus.state == Status.AckStatus.State.ACKED; + } @Override public boolean equals(Object o) { @@ -244,24 +256,24 @@ public class Alert implements ToXContent { private DateTime lastRan; private DateTime lastTriggered; private DateTime lastExecuted; - private Ack ack; + private AckStatus ackStatus; private Throttle lastThrottle; public Status() { - this(-1, null, null, null, null, new Ack()); + this(-1, null, null, null, null, new AckStatus()); } public Status(Status other) { - this(other.version, other.lastRan, other.lastTriggered, other.lastExecuted, other.lastThrottle, other.ack); + this(other.version, other.lastRan, other.lastTriggered, other.lastExecuted, other.lastThrottle, other.ackStatus); } - private Status(long version, DateTime lastRan, DateTime lastTriggered, DateTime lastExecuted, Throttle lastThrottle, Ack ack) { + private Status(long version, DateTime lastRan, DateTime lastTriggered, DateTime lastExecuted, Throttle lastThrottle, AckStatus ackStatus) { this.version = version; this.lastRan = lastRan; this.lastTriggered = lastTriggered; this.lastExecuted = lastExecuted; this.lastThrottle = lastThrottle; - this.ack = ack; + this.ackStatus = ackStatus; } public long version() { @@ -300,18 +312,14 @@ public class Alert implements ToXContent { return lastThrottle; } - public Ack ack() { - return ack; - } - - public boolean acked() { - return ack.state == Ack.State.ACKED; + public AckStatus ackStatus() { + return ackStatus; } /** * Called whenever an alert is ran */ - public void onRun(DateTime timestamp) { + public void onExecute(DateTime timestamp) { lastRan = timestamp; } @@ -324,41 +332,41 @@ public class Alert implements ToXContent { /** * Notifies this status about the triggered event of an alert run. The state will be updated accordingly - - * if the alert is can be acked and during a run, the alert was not triggered and the current state is {@link Status.Ack.State#ACKED}, - * we then need to reset the state to {@link Status.Ack.State#AWAITS_EXECUTION} + * if the alert is can be acked and during a run, the alert was not triggered and the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKED}, + * we then need to reset the state to {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#AWAITS_EXECUTION} */ public void onTrigger(boolean triggered, DateTime timestamp) { if (triggered) { lastTriggered = timestamp; - } else if (ack.state == Ack.State.ACKED) { + } else if (ackStatus.state == AckStatus.State.ACKED) { // didn't trigger now after it triggered in the past - we need to reset the ack state - ack = new Ack(Ack.State.AWAITS_EXECUTION, timestamp); + ackStatus = new AckStatus(AckStatus.State.AWAITS_EXECUTION, timestamp); } } /** - * Notifies this status that the alert was acked. If the current state is {@link Status.Ack.State#ACKABLE}, then we'll change it - * to {@link Status.Ack.State#ACKED} (when set to {@link Status.Ack.State#ACKED}, the {@link org.elasticsearch.alerts.throttle.AckThrottler} will lastThrottle the + * Notifies this status that the alert was acked. If the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKABLE}, then we'll change it + * to {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKED} (when set to {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKED}, the {@link org.elasticsearch.alerts.throttle.AckThrottler} will lastThrottle the * execution. * * @return {@code true} if the state of changed due to the ack, {@code false} otherwise. */ - public boolean onAck(DateTime timestamp) { - if (ack.state == Ack.State.ACKABLE) { - ack = new Ack(Ack.State.ACKED, timestamp); + boolean onAck(DateTime timestamp) { + if (ackStatus.state == AckStatus.State.ACKABLE) { + ackStatus = new AckStatus(AckStatus.State.ACKED, timestamp); return true; } return false; } /** - * Notified this status that the alert was executed. If the current state is {@link Status.Ack.State#AWAITS_EXECUTION}, it will change to - * {@link Status.Ack.State#ACKABLE}. + * Notified this status that the alert was executed. If the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#AWAITS_EXECUTION}, it will change to + * {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKABLE}. */ public void onExecution(DateTime timestamp) { lastExecuted = timestamp; - if (ack.state == Ack.State.AWAITS_EXECUTION) { - ack = new Ack(Ack.State.ACKABLE, timestamp); + if (ackStatus.state == AckStatus.State.AWAITS_EXECUTION) { + ackStatus = new AckStatus(AckStatus.State.ACKABLE, timestamp); } } @@ -375,8 +383,8 @@ public class Alert implements ToXContent { writeDate(out, lastThrottle.timestamp); out.writeString(lastThrottle.reason); } - out.writeString(ack.state.name()); - writeDate(out, ack.timestamp); + out.writeString(ackStatus.state.name()); + writeDate(out, ackStatus.timestamp); } @Override @@ -386,7 +394,7 @@ public class Alert implements ToXContent { lastTriggered = readOptionalDate(in); lastExecuted = readOptionalDate(in); lastThrottle = in.readBoolean() ? new Throttle(readDate(in), in.readString()) : null; - ack = new Ack(Ack.State.valueOf(in.readString()), readDate(in)); + ackStatus = new AckStatus(AckStatus.State.valueOf(in.readString()), readDate(in)); } public static Status read(StreamInput in) throws IOException { @@ -408,8 +416,8 @@ public class Alert implements ToXContent { builder.field(LAST_EXECUTED_FIELD.getPreferredName(), lastExecuted); } builder.startObject(ACK_FIELD.getPreferredName()) - .field(STATE_FIELD.getPreferredName(), ack.state.name().toLowerCase(Locale.ROOT)) - .field(TIMESTAMP_FIELD.getPreferredName(), ack.timestamp) + .field(STATE_FIELD.getPreferredName(), ackStatus.state.name().toLowerCase(Locale.ROOT)) + .field(TIMESTAMP_FIELD.getPreferredName(), ackStatus.timestamp) .endObject(); if (lastThrottle != null) { builder.startObject(LAST_THROTTLE_FIELD.getPreferredName()) @@ -426,7 +434,7 @@ public class Alert implements ToXContent { DateTime lastTriggered = null; DateTime lastExecuted = null; Throttle lastThrottle = null; - Ack ack = null; + AckStatus ackStatus = null; String currentFieldName = null; XContentParser.Token token = null; @@ -474,7 +482,7 @@ public class Alert implements ToXContent { } } else if (ACK_FIELD.match(currentFieldName)) { if (token == XContentParser.Token.START_OBJECT) { - Ack.State state = null; + AckStatus.State state = null; DateTime timestamp = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -483,24 +491,24 @@ public class Alert implements ToXContent { if (TIMESTAMP_FIELD.match(currentFieldName)) { timestamp = parseDate(currentFieldName, token, parser); } else if (STATE_FIELD.match(currentFieldName)) { - state = Ack.State.valueOf(parser.text().toUpperCase(Locale.ROOT)); + state = AckStatus.State.valueOf(parser.text().toUpperCase(Locale.ROOT)); } else { throw new AlertsException("unknown filed [" + currentFieldName + "] in alert status throttle entry"); } } } - ack = new Ack(state, timestamp); + ackStatus = new AckStatus(state, timestamp); } else { throw new AlertsException("expecting field [" + currentFieldName + "] to be an object, found [" + token + "] instead"); } } } - return new Status(-1, lastRan, lastTriggered, lastExecuted, lastThrottle, ack); + return new Status(-1, lastRan, lastTriggered, lastExecuted, lastThrottle, ackStatus); } - public static class Ack { + public static class AckStatus { public static enum State { AWAITS_EXECUTION, @@ -511,11 +519,11 @@ public class Alert implements ToXContent { private final State state; private final DateTime timestamp; - public Ack() { + public AckStatus() { this(State.AWAITS_EXECUTION, new DateTime()); } - public Ack(State state, DateTime timestamp) { + public AckStatus(State state, DateTime timestamp) { this.state = state; this.timestamp = timestamp; } diff --git a/src/main/java/org/elasticsearch/alerts/AlertExecution.java b/src/main/java/org/elasticsearch/alerts/AlertExecution.java new file mode 100644 index 00000000000..b5a13885475 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/AlertExecution.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.alerts.actions.Action; +import org.elasticsearch.alerts.actions.ActionRegistry; +import org.elasticsearch.alerts.throttle.Throttler; +import org.elasticsearch.alerts.trigger.Trigger; +import org.elasticsearch.alerts.trigger.TriggerRegistry; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** +* +*/ +public class AlertExecution implements ToXContent { + + private final Trigger.Result triggerResult; + private final Throttler.Result throttleResult; + private final Map actionsResults; + private final Payload payload; + + public AlertExecution(ExecutionContext context) { + this(context.triggerResult(), context.throttleResult(), context.actionsResults(), context.payload()); + } + + AlertExecution(Trigger.Result triggerResult, Throttler.Result throttleResult, Map actionsResults, Payload payload) { + this.triggerResult = triggerResult; + this.throttleResult = throttleResult; + this.actionsResults = actionsResults; + this.payload = payload; + } + + public Trigger.Result triggerResult() { + return triggerResult; + } + + public Throttler.Result throttleResult() { + return throttleResult; + } + + public Map actionsResults() { + return actionsResults; + } + + public Payload payload() { + return payload; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (triggerResult != null) { + builder.startObject(Parser.TRIGGER_RESULT.getPreferredName()).field(triggerResult.type(), triggerResult).endObject(); + } + if (throttleResult != null && throttleResult.throttle()) { + builder.field(Parser.THROTTLED.getPreferredName(), throttleResult.throttle()); + if (throttleResult.reason() != null) { + builder.field(Parser.THROTTLE_REASON.getPreferredName(), throttleResult.reason()); + } + } + builder.field(Parser.PAYLOAD.getPreferredName(), payload()); + builder.startArray(Parser.ACTIONS_RESULTS.getPreferredName()); + for (Map.Entry actionResult : actionsResults.entrySet()) { + builder.startObject(); + builder.field(actionResult.getKey(), actionResult.getValue()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + public static class Parser { + + public static final ParseField TRIGGER_RESULT = new ParseField("trigger_result"); + public static final ParseField PAYLOAD = new ParseField("payload"); + public static final ParseField ACTIONS_RESULTS = new ParseField("actions_results"); + public static final ParseField THROTTLED = new ParseField("throttled"); + public static final ParseField THROTTLE_REASON = new ParseField("throttle_reason"); + + public static AlertExecution parse(XContentParser parser, TriggerRegistry triggerRegistry, ActionRegistry actionRegistry) throws IOException { + boolean throttled = false; + String throttleReason = null; + Map actionResults = new HashMap<>(); + Trigger.Result triggerResult = null; + Payload payload = null; + + String currentFieldName = null; + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (THROTTLE_REASON.match(currentFieldName)) { + throttleReason = parser.text(); + } else if (THROTTLED.match(currentFieldName)) { + throttled = parser.booleanValue(); + } else { + throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]"); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (TRIGGER_RESULT.match(currentFieldName)) { + triggerResult = triggerRegistry.parseResult(parser); + } else if (PAYLOAD.match(currentFieldName)) { + payload = new Payload.Simple(parser.map()); //TODO fixme + } else { + throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]"); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (ACTIONS_RESULTS.match(currentFieldName)) { + actionResults = parseActionResults(parser, actionRegistry); + } else { + throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]"); + } + } else { + throw new AlertsException("unable to parse alert run. unexpected token [" + token + "]"); + } + } + + Throttler.Result throttleResult = throttled ? Throttler.Result.throttle(throttleReason) : Throttler.Result.NO; + return new AlertExecution(triggerResult, throttleResult, actionResults, payload ); + + } + + private static Map parseActionResults(XContentParser parser, ActionRegistry actionRegistry) throws IOException { + Map actionResults = new HashMap<>(); + + String currentFieldName = null; + XContentParser.Token token; + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + Action.Result actionResult = actionRegistry.parseResult(parser); + actionResults.put(actionResult.type(), actionResult); + } + return actionResults; + } + } +} diff --git a/src/main/java/org/elasticsearch/alerts/AlertLockService.java b/src/main/java/org/elasticsearch/alerts/AlertLockService.java new file mode 100644 index 00000000000..74518980639 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/AlertLockService.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts; + +import org.elasticsearch.common.util.concurrent.KeyedLock; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + */ +public class AlertLockService { + + private final KeyedLock alertLock = new KeyedLock<>(); + + private AtomicBoolean running = new AtomicBoolean(); + + public Lock acquire(String name) { + alertLock.acquire(name); + return new Lock(name, alertLock); + } + + public void start() { + if (running.compareAndSet(false, true)) { + // init + } + } + + public void stop() { + if (running.compareAndSet(true, false)) { + // It can happen we have still ongoing operations and we wait those operations to finish to avoid + // that AlertManager or any of its components end up in a illegal state after the state as been set to stopped. + // + // For example: An alert action entry may be added while we stopping alerting if we don't wait for + // ongoing operations to complete. Resulting in once the alert service starts again that more than + // expected alert action entries are processed. + // + // Note: new operations will fail now because the state has been set to: stopping + while (alertLock.hasLockedKeys()) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + } + } + } + } + + public static class Lock { + + private final String name; + private final KeyedLock alertLock; + + private Lock(String name, KeyedLock alertLock) { + this.name = name; + this.alertLock = alertLock; + + } + + public void release() { + alertLock.release(name); + } + } +} diff --git a/src/main/java/org/elasticsearch/alerts/AlertsModule.java b/src/main/java/org/elasticsearch/alerts/AlertsModule.java index 9535e4f4cf5..d3fc0037d44 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsModule.java @@ -42,6 +42,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules { protected void configure() { bind(Alert.Parser.class).asEagerSingleton(); + bind(AlertLockService.class).asEagerSingleton(); bind(AlertsService.class).asEagerSingleton(); bind(AlertsStore.class).asEagerSingleton(); bind(TemplateUtils.class).asEagerSingleton(); diff --git a/src/main/java/org/elasticsearch/alerts/AlertsService.java b/src/main/java/org/elasticsearch/alerts/AlertsService.java index f2285b5af29..a12c76757ed 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsService.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsService.java @@ -6,40 +6,24 @@ package org.elasticsearch.alerts; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.alerts.actions.Action; -import org.elasticsearch.alerts.actions.ActionRegistry; -import org.elasticsearch.alerts.history.FiredAlert; import org.elasticsearch.alerts.history.HistoryService; import org.elasticsearch.alerts.scheduler.Scheduler; -import org.elasticsearch.alerts.throttle.Throttler; -import org.elasticsearch.alerts.transform.Transform; -import org.elasticsearch.alerts.trigger.Trigger; -import org.elasticsearch.alerts.trigger.TriggerRegistry; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.KeyedLock; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; @@ -47,27 +31,25 @@ public class AlertsService extends AbstractComponent { private final Scheduler scheduler; private final AlertsStore alertsStore; - private final HistoryService historyService; private final ThreadPool threadPool; private final ClusterService clusterService; - private final KeyedLock alertLock = new KeyedLock<>(); + private final AlertLockService alertLockService; + private final HistoryService historyService; private final AtomicReference state = new AtomicReference<>(State.STOPPED); private volatile boolean manuallyStopped; @Inject public AlertsService(Settings settings, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore, - IndicesService indicesService, HistoryService historyService, - ThreadPool threadPool) { + IndicesService indicesService, HistoryService historyService, ThreadPool threadPool, + AlertLockService alertLockService) { super(settings); this.scheduler = scheduler; this.threadPool = threadPool; this.alertsStore = alertsStore; - this.historyService = historyService; - this.historyService.setAlertsService(this); this.clusterService = clusterService; - - scheduler.addListener(new SchedulerListener()); + this.alertLockService = alertLockService; + this.historyService = historyService; clusterService.add(new AlertsClusterStateListener()); // Close if the indices service is being stopped, so we don't run into search failures (locally) that will @@ -83,7 +65,7 @@ public class AlertsService extends AbstractComponent { public AlertsStore.AlertDelete deleteAlert(String name) throws InterruptedException, ExecutionException { ensureStarted(); - alertLock.acquire(name); + AlertLockService.Lock lock = alertLockService.acquire(name); try { AlertsStore.AlertDelete delete = alertsStore.deleteAlert(name); if (delete.deleteResponse().isFound()) { @@ -91,137 +73,47 @@ public class AlertsService extends AbstractComponent { } return delete; } finally { - alertLock.release(name); + lock.release(); } } - public IndexResponse putAlert(String alertName, BytesReference alertSource) { + public IndexResponse putAlert(String name, BytesReference alertSource) { ensureStarted(); - alertLock.acquire(alertName); + AlertLockService.Lock lock = alertLockService.acquire(name); try { - AlertsStore.AlertPut result = alertsStore.putAlert(alertName, alertSource); + AlertsStore.AlertPut result = alertsStore.putAlert(name, alertSource); if (result.previous() == null || !result.previous().schedule().equals(result.current().schedule())) { scheduler.schedule(result.current()); } return result.indexResponse(); } finally { - alertLock.release(alertName); + lock.release(); } } /** * TODO: add version, fields, etc support that the core get api has as well. */ - public Alert getAlert(String alertName) { - return alertsStore.getAlert(alertName); + public Alert getAlert(String name) { + return alertsStore.getAlert(name); } - public State getState() { + public State state() { return state.get(); } - /* - The execution of an alert is split into operations, a schedule part which just makes sure that store the fact an alert - has fired and an execute part which actually executed the alert. - - The reason this is split into two operations is that we don't want to lose the fact an alert has fired. If we - would not split the execution of an alert and many alerts fire in a small window of time then it can happen that - thread pool that receives fired jobs from the quartz scheduler is going to reject jobs and then we would never - know about jobs that have fired. By splitting the execution of fired jobs into two operations we lower the chance - we lose fired jobs signficantly. - */ - - /** - * This does the necessary actions, so we don't lose the fact that an alert got execute from the {@link org.elasticsearch.alerts.scheduler.Scheduler} - * It writes the an entry in the alert history index with the proper status for this alert. - * - * The rest of the actions happen in {@link #runAlert(org.elasticsearch.alerts.history.FiredAlert)}. - * - * The reason the executing of the alert is split into two, is that we don't want to lose the fact that an alert has - * fired. If we were - */ - void triggerAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ - ensureStarted(); - alertLock.acquire(alertName); - try { - Alert alert = alertsStore.getAlert(alertName); - if (alert == null) { - logger.warn("unable to find [{}] in the alert store, perhaps it has been deleted", alertName); - return; - } - - try { - historyService.alertFired(alert, scheduledFireTime, fireTime); - } catch (Exception e) { - logger.error("failed to schedule alert action for [{}]", e, alert); - } - } finally { - alertLock.release(alertName); - } - } - - /** - * This actually runs the alert: - * 1) Runs the configured search request - * 2) Checks if the search request triggered (matches with the defined conditions) - * 3) If the alert has been triggered, checks if the alert should be throttled - * 4) If the alert hasn't been throttled runs the configured actions - */ - public AlertRun runAlert(FiredAlert entry) throws IOException { - ensureStarted(); - alertLock.acquire(entry.name()); - try { - Alert alert = alertsStore.getAlert(entry.name()); - if (alert == null) { - throw new ElasticsearchException("Alert is not available"); - } - - AlertContext ctx = new AlertContext(alert, entry.fireTime(), entry.scheduledTime()); - - Trigger.Result triggerResult = alert.trigger().execute(ctx); - ctx.triggerResult(triggerResult); - - if (triggerResult.triggered()) { - alert.status().onTrigger(true, entry.fireTime()); - - Throttler.Result throttleResult = alert.throttler().throttle(ctx, triggerResult); - ctx.throttleResult(throttleResult); - - if (!throttleResult.throttle()) { - Transform.Result result = alert.transform().apply(ctx, triggerResult.payload()); - ctx.transformResult(result); - - for (Action action : alert.actions()){ - Action.Result actionResult = action.execute(ctx, result.payload()); - ctx.addActionResult(actionResult); - } - alert.status().onExecution(entry.scheduledTime()); - } else { - alert.status().onThrottle(entry.fireTime(), throttleResult.reason()); - } - } else { - alert.status().onTrigger(false, entry.fireTime()); - } - alert.status().onRun(entry.fireTime()); - alertsStore.updateAlert(alert); - return new AlertRun(ctx); - } finally { - alertLock.release(entry.name()); - } - } - /** * Acks the alert if needed */ - public Alert.Status ackAlert(String alertName) { + public Alert.Status ackAlert(String name) { ensureStarted(); - alertLock.acquire(alertName); + AlertLockService.Lock lock = alertLockService.acquire(name); try { - Alert alert = alertsStore.getAlert(alertName); + Alert alert = alertsStore.getAlert(name); if (alert == null) { - throw new AlertsException("alert [" + alertName + "] does not exist"); + throw new AlertsException("alert [" + name + "] does not exist"); } - if (alert.status().onAck(new DateTime())) { + if (alert.ack()) { try { alertsStore.updateAlertStatus(alert); } catch (IOException ioe) { @@ -231,7 +123,7 @@ public class AlertsService extends AbstractComponent { // we need to create a safe copy of the status return new Alert.Status(alert.status()); } finally { - alertLock.release(alertName); + lock.release(); } } @@ -240,7 +132,6 @@ public class AlertsService extends AbstractComponent { */ public void start() { manuallyStopped = false; - logger.info("starting alert service..."); ClusterState state = clusterService.state(); internalStart(state); } @@ -256,20 +147,7 @@ public class AlertsService extends AbstractComponent { private void internalStop() { if (state.compareAndSet(State.STARTED, State.STOPPING)) { logger.info("stopping alert service..."); - while (true) { - // It can happen we have still ongoing operations and we wait those operations to finish to avoid - // that AlertManager or any of its components end up in a illegal state after the state as been set to stopped. - // - // For example: An alert action entry may be added while we stopping alerting if we don't wait for - // ongoing operations to complete. Resulting in once the alert service starts again that more than - // expected alert action entries are processed. - // - // Note: new operations will fail now because the state has been set to: stopping - if (!alertLock.hasLockedKeys()) { - break; - } - } - + alertLockService.stop(); historyService.stop(); scheduler.stop(); alertsStore.stop(); @@ -280,25 +158,19 @@ public class AlertsService extends AbstractComponent { private void internalStart(ClusterState initialState) { if (state.compareAndSet(State.STOPPED, State.STARTING)) { + logger.info("starting alert service..."); + alertLockService.start(); ClusterState clusterState = initialState; // Try to load alert store before the action service, b/c action depends on alert store - while (true) { - if (alertsStore.start(clusterState)) { - break; - } + while (!alertsStore.start(clusterState)) { clusterState = newClusterState(clusterState); } - while (true) { - if (historyService.start(clusterState)) { - break; - } + while (!historyService.start(clusterState)) { clusterState = newClusterState(clusterState); } - scheduler.start(alertsStore.getAlerts().values()); state.set(State.STARTED); - historyService.executePreviouslyFiredAlerts(); logger.info("alert service has started"); } } @@ -326,13 +198,6 @@ public class AlertsService extends AbstractComponent { } } - private class SchedulerListener implements Scheduler.Listener { - @Override - public void fire(String alertName, DateTime scheduledFireTime, DateTime fireTime) { - triggerAlert(alertName, scheduledFireTime, fireTime); - } - } - private final class AlertsClusterStateListener implements ClusterStateListener { @Override @@ -367,130 +232,6 @@ public class AlertsService extends AbstractComponent { } - public static class AlertRun implements ToXContent { - - private final Trigger.Result triggerResult; - private final Throttler.Result throttleResult; - private final Map actionsResults; - private final Payload payload; - - public AlertRun(AlertContext context) { - this(context.triggerResult(), context.throttleResult(), context.actionsResults(), context.payload()); - } - - private AlertRun(Trigger.Result triggerResult, Throttler.Result throttleResult, Map actionsResults, Payload payload) { - this.triggerResult = triggerResult; - this.throttleResult = throttleResult; - this.actionsResults = actionsResults; - this.payload = payload; - } - - - public Trigger.Result triggerResult() { - return triggerResult; - } - - public Throttler.Result throttleResult() { - return throttleResult; - } - - public Map actionsResults() { - return actionsResults; - } - - public Payload payload() { - return payload; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - if (triggerResult != null) { - builder.startObject(Parser.TRIGGER_RESULT.getPreferredName()).field(triggerResult.type(), triggerResult).endObject(); - } - if (throttleResult != null) { - builder.field(Parser.THROTTLED.getPreferredName(), throttleResult.throttle()); - if (throttleResult.reason() != null) { - builder.field(Parser.THROTTLE_REASON.getPreferredName(), throttleResult.reason()); - } - } - builder.field(Parser.PAYLOAD.getPreferredName(), payload()); - builder.startArray(Parser.ACTIONS_RESULTS.getPreferredName()); - for (Map.Entry actionResult : actionsResults.entrySet()) { - builder.startObject(); - builder.field(actionResult.getKey(), actionResult.getValue()); - builder.endObject(); - } - builder.endArray(); - builder.endObject(); - return builder; - } - - public static class Parser { - public static final ParseField TRIGGER_RESULT = new ParseField("trigger_result"); - public static final ParseField PAYLOAD = new ParseField("payload"); - public static final ParseField ACTIONS_RESULTS = new ParseField("actions_results"); - public static final ParseField THROTTLED = new ParseField("throttled"); - public static final ParseField THROTTLE_REASON = new ParseField("throttle_reason"); - - public static AlertRun parse(XContentParser parser, TriggerRegistry triggerRegistry, ActionRegistry actionRegistry) throws IOException { - boolean throttled = false; - String throttleReason = null; - Map actionResults = new HashMap<>(); - Trigger.Result triggerResult = null; - Payload payload = null; - - String currentFieldName = null; - XContentParser.Token token; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token.isValue()) { - if (THROTTLE_REASON.match(currentFieldName)) { - throttleReason = parser.text(); - } else if (THROTTLED.match(currentFieldName)) { - throttled = parser.booleanValue(); - } else { - throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]"); - } - } else if (token == XContentParser.Token.START_OBJECT) { - if (TRIGGER_RESULT.match(currentFieldName)) { - triggerResult = triggerRegistry.parseResult(parser); - } else if (PAYLOAD.match(currentFieldName)) { - payload = new Payload.Simple(parser.map()); //TODO fixme - } else { - throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]"); - } - } else if (token == XContentParser.Token.START_ARRAY) { - if (ACTIONS_RESULTS.match(currentFieldName)) { - actionResults = parseActionResults(parser, actionRegistry); - } else { - throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]"); - } - } else { - throw new AlertsException("unable to parse alert run. unexpected token [" + token + "]"); - } - } - - Throttler.Result throttleResult = throttled ? Throttler.Result.throttle(throttleReason) : Throttler.NO_THROTTLE.throttle(null,null); - return new AlertRun(triggerResult, throttleResult, actionResults, payload ); - - } - - private static Map parseActionResults(XContentParser parser, ActionRegistry actionRegistry) throws IOException { - Map actionResults = new HashMap<>(); - - String currentFieldName = null; - XContentParser.Token token; - while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { - Action.Result actionResult = actionRegistry.parseResult(parser); - actionResults.put(actionResult.type(), actionResult); - } - return actionResults; - } - } - } - /** * Encapsulates the state of the alerts plugin. */ diff --git a/src/main/java/org/elasticsearch/alerts/AlertContext.java b/src/main/java/org/elasticsearch/alerts/ExecutionContext.java similarity index 69% rename from src/main/java/org/elasticsearch/alerts/AlertContext.java rename to src/main/java/org/elasticsearch/alerts/ExecutionContext.java index 4d2b1f22db2..99f836d415c 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertContext.java +++ b/src/main/java/org/elasticsearch/alerts/ExecutionContext.java @@ -6,7 +6,6 @@ package org.elasticsearch.alerts; import org.elasticsearch.alerts.actions.Action; -import org.elasticsearch.alerts.history.FiredAlert; import org.elasticsearch.alerts.throttle.Throttler; import org.elasticsearch.alerts.transform.Transform; import org.elasticsearch.alerts.trigger.Trigger; @@ -18,28 +17,29 @@ import java.util.Map; /** * */ -public class AlertContext { +public class ExecutionContext { - private final String runId; + private final String id; private final Alert alert; private final DateTime fireTime; private final DateTime scheduledTime; - private Payload payload; private Trigger.Result triggerResult; private Throttler.Result throttleResult; private Transform.Result transformResult; private Map actionsResults = new HashMap<>(); - public AlertContext(Alert alert, DateTime fireTime, DateTime scheduledTime) { - this.runId = FiredAlert.firedAlertId(alert, scheduledTime); + private Payload payload; + + public ExecutionContext(String id, Alert alert, DateTime fireTime, DateTime scheduledTime) { + this.id = id; this.alert = alert; this.fireTime = fireTime; this.scheduledTime = scheduledTime; } - public String runId() { - return runId; + public String id() { + return id; } public Alert alert() { @@ -58,24 +58,30 @@ public class AlertContext { return payload; } - public void triggerResult(Trigger.Result triggerResult) { + public void onTriggerResult(Trigger.Result triggerResult) { this.triggerResult = triggerResult; this.payload = triggerResult.payload(); + alert.status().onTrigger(triggerResult.triggered(), fireTime); } public Trigger.Result triggerResult() { return triggerResult; } - public void throttleResult(Throttler.Result throttleResult) { + public void onThrottleResult(Throttler.Result throttleResult) { this.throttleResult = throttleResult; + if (throttleResult.throttle()) { + alert.status().onThrottle(fireTime, throttleResult.reason()); + } else { + alert.status().onExecution(fireTime); + } } public Throttler.Result throttleResult() { return throttleResult; } - public void transformResult(Transform.Result transformResult) { + public void onTransformResult(Transform.Result transformResult) { this.transformResult = transformResult; this.payload = transformResult.payload(); } @@ -84,7 +90,7 @@ public class AlertContext { return transformResult; } - public void addActionResult(Action.Result result) { + public void onActionResult(Action.Result result) { actionsResults.put(result.type(), result); } @@ -92,4 +98,9 @@ public class AlertContext { return actionsResults; } + public AlertExecution finish() { + alert.status().onExecute(fireTime); + return new AlertExecution(this); + } + } diff --git a/src/main/java/org/elasticsearch/alerts/actions/Action.java b/src/main/java/org/elasticsearch/alerts/actions/Action.java index 901cc614bcf..0ab49b673ab 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/Action.java +++ b/src/main/java/org/elasticsearch/alerts/actions/Action.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.actions; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.logging.ESLogger; @@ -36,7 +36,7 @@ public abstract class Action implements ToXContent { /** * Executes this action */ - public abstract R execute(AlertContext context, Payload payload) throws IOException; + public abstract R execute(ExecutionContext context, Payload payload) throws IOException; /** @@ -60,7 +60,9 @@ public abstract class Action implements ToXContent { public static abstract class Result implements ToXContent { + public static final ParseField SUCCESS_FIELD = new ParseField("success"); + protected final String type; protected final boolean success; @@ -80,7 +82,7 @@ public abstract class Action implements ToXContent { @Override public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("success", success); + builder.field(SUCCESS_FIELD.getPreferredName(), success); xContentBody(builder, params); return builder.endObject(); } diff --git a/src/main/java/org/elasticsearch/alerts/actions/email/EmailAction.java b/src/main/java/org/elasticsearch/alerts/actions/email/EmailAction.java index 442bba803b1..029e6496437 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/email/EmailAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/email/EmailAction.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.actions.email; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.actions.Action; import org.elasticsearch.alerts.actions.email.service.*; @@ -64,8 +64,8 @@ public class EmailAction extends Action { } @Override - public Result execute(AlertContext ctx, Payload payload) throws IOException { - email.id(ctx.runId()); + public Result execute(ExecutionContext ctx, Payload payload) throws IOException { + email.id(ctx.id()); Map alertParams = new HashMap<>(); alertParams.put(Action.ALERT_NAME_VARIABLE_NAME, ctx.alert().name()); @@ -213,7 +213,7 @@ public class EmailAction extends Action { String currentFieldName = null; XContentParser.Token token; - boolean success = false; + Boolean success = null; Email email = null; String account = null; String reason = null; @@ -247,11 +247,11 @@ public class EmailAction extends Action { } } - if (success) { - return new Result.Success(new EmailService.EmailSent(account, email)); - } else { - return new Result.Failure(reason); + if (success == null) { + throw new EmailException("could not parse email result. expected field [success]"); } + + return success ? new Result.Success(new EmailService.EmailSent(account, email)) : new Result.Failure(reason); } } diff --git a/src/main/java/org/elasticsearch/alerts/actions/index/IndexAction.java b/src/main/java/org/elasticsearch/alerts/actions/index/IndexAction.java index 345669fc695..bc758b72585 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/index/IndexAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/index/IndexAction.java @@ -8,7 +8,7 @@ package org.elasticsearch.alerts.actions.index; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.actions.Action; import org.elasticsearch.alerts.actions.ActionException; @@ -50,7 +50,7 @@ public class IndexAction extends Action { } @Override - public Result execute(AlertContext ctx, Payload payload) throws IOException { + public Result execute(ExecutionContext ctx, Payload payload) throws IOException { IndexRequest indexRequest = new IndexRequest(); indexRequest.index(index); indexRequest.type(type); @@ -58,7 +58,7 @@ public class IndexAction extends Action { XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint(); resultBuilder.startObject(); resultBuilder.field("data", payload.data()); - resultBuilder.field("timestamp", ctx.alert().status().lastExecuted()); + resultBuilder.field("timestamp", ctx.fireTime()); resultBuilder.endObject(); indexRequest.source(resultBuilder); } catch (IOException ioe) { @@ -149,7 +149,7 @@ public class IndexAction extends Action { public Action.Result parseResult(XContentParser parser) throws IOException { String currentFieldName = null; XContentParser.Token token; - boolean success = false; + Boolean success = null; Payload payload = null; String reason = null; @@ -178,6 +178,11 @@ public class IndexAction extends Action { throw new ActionException("could not parse index result. unexpected token [" + token + "]"); } } + + if (success == null) { + throw new ActionException("could not parse index result. expected boolean field [success]"); + } + return new Result(payload, reason, success); } } diff --git a/src/main/java/org/elasticsearch/alerts/actions/webhook/WebhookAction.java b/src/main/java/org/elasticsearch/alerts/actions/webhook/WebhookAction.java index bacdd1cd0d4..f37ab43246c 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/webhook/WebhookAction.java +++ b/src/main/java/org/elasticsearch/alerts/actions/webhook/WebhookAction.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.actions.webhook; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.actions.Action; import org.elasticsearch.alerts.actions.ActionException; @@ -59,7 +59,7 @@ public class WebhookAction extends Action { } @Override - public Result execute(AlertContext ctx, Payload payload) throws IOException { + public Result execute(ExecutionContext ctx, Payload payload) throws IOException { Map data = payload.data(); String renderedUrl = applyTemplate(templateUtils, urlTemplate, ctx.alert().name(), data); String body = applyTemplate(templateUtils, bodyTemplate != null ? bodyTemplate : DEFAULT_BODY_TEMPLATE, ctx.alert().name(), data); @@ -225,7 +225,7 @@ public class WebhookAction extends Action { public Action.Result parseResult(XContentParser parser) throws IOException { String currentFieldName = null; XContentParser.Token token; - boolean success = false; + Boolean success = null; String url = null; String body = null; String reason = null; @@ -246,7 +246,7 @@ public class WebhookAction extends Action { if (Action.Result.SUCCESS_FIELD.match(currentFieldName)) { success = parser.booleanValue(); } else { - throw new ActionException("could not parse index result. unexpected boolean field [" + currentFieldName + "]"); + throw new ActionException("could not parse webhook result. unexpected boolean field [" + currentFieldName + "]"); } } else { throw new ActionException("unable to parse webhook action result. unexpected field [" + currentFieldName + "]" ); @@ -256,12 +256,11 @@ public class WebhookAction extends Action { } } - if (success) { - return new Result.Executed(httpStatus, url, body); - } else { - return new Result.Failure(reason); + if (success == null) { + throw new ActionException("could not parse webhook result. expected boolean field [success]"); } + return success ? new Result.Executed(httpStatus, url, body) : new Result.Failure(reason); } } diff --git a/src/main/java/org/elasticsearch/alerts/history/FiredAlert.java b/src/main/java/org/elasticsearch/alerts/history/FiredAlert.java index 514733bdeac..0731e414ed6 100644 --- a/src/main/java/org/elasticsearch/alerts/history/FiredAlert.java +++ b/src/main/java/org/elasticsearch/alerts/history/FiredAlert.java @@ -8,14 +8,12 @@ package org.elasticsearch.alerts.history; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.AlertExecution; import org.elasticsearch.alerts.AlertsException; -import org.elasticsearch.alerts.AlertsService; import org.elasticsearch.alerts.actions.ActionRegistry; -import org.elasticsearch.alerts.actions.Actions; -import org.elasticsearch.alerts.transform.Transform; -import org.elasticsearch.alerts.transform.TransformRegistry; import org.elasticsearch.alerts.trigger.Trigger; import org.elasticsearch.alerts.trigger.TriggerRegistry; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; @@ -38,33 +36,27 @@ public class FiredAlert implements ToXContent { private DateTime fireTime; private DateTime scheduledTime; private Trigger trigger; - private Actions actions; private State state; + private AlertExecution execution; - /*Optional*/ - private Transform transform; - private String errorMessage; - private Map metadata; + private @Nullable String message; + private @Nullable Map metadata; - // During an fired alert execution we use this and then we store it with the history, after that we don't use it. - // We store it because it may end up being useful for debug / history purposes - private transient AlertsService.AlertRun alertRun; // Used for assertion purposes, so we can ensure/test what we have loaded in memory is the same as what is persisted. private transient long version; - private final AtomicBoolean finalized = new AtomicBoolean(false); + private final AtomicBoolean sealed = new AtomicBoolean(false); FiredAlert() { } - public FiredAlert(Alert alert, DateTime scheduledTime, DateTime fireTime, State state) { - this.id = firedAlertId(alert, scheduledTime); + public FiredAlert(Alert alert, DateTime scheduledTime, DateTime fireTime) { + this.id = alert.name() + "#" + scheduledTime.toDateTimeISO(); this.name = alert.name(); this.fireTime = fireTime; this.scheduledTime = scheduledTime; this.trigger = alert.trigger(); - this.actions = alert.actions(); - this.state = state; + this.state = State.AWAITS_EXECUTION; this.metadata = alert.metadata(); this.version = 1; } @@ -73,107 +65,59 @@ public class FiredAlert implements ToXContent { return id; } - public void id(String id) { - this.id = id; - } - - public void finalize(Alert alert, AlertsService.AlertRun alertRun) { - assert finalized.compareAndSet(false, true) : "finalizing an fired alert should only be done once"; - this.alertRun = alertRun; - if (alertRun.triggerResult().triggered()) { - if (alertRun.throttleResult().throttle()) { - state = State.THROTTLED; - } else { - state = State.ACTION_PERFORMED; - } - transform = alert.transform(); - } else { - state = State.NO_ACTION_NEEDED; - } - } - - public static String firedAlertId(Alert alert, DateTime dateTime) { - return alert.name() + "#" + dateTime.toDateTimeISO(); - } - public DateTime scheduledTime() { return scheduledTime; } - public void scheduledTime(DateTime scheduledTime) { - this.scheduledTime = scheduledTime; - } - public String name() { return name; } - public void name(String name) { - this.name = name; - } - public DateTime fireTime() { return fireTime; } - public void fireTime(DateTime fireTime) { - this.fireTime = fireTime; - } - public Trigger trigger() { return trigger; } - public void trigger(Trigger trigger) { - this.trigger = trigger; - } - - public Actions actions() { - return actions; - } - - public void actions(Actions actions) { - this.actions = actions; - } - public State state() { return state; } - public void state(State state) { - this.state = state; - } - - public long version() { - return version; - } - - public void version(long version) { - this.version = version; - } - - public String errorMessage(){ - return this.errorMessage; - } - - public void errorMessage(String errorMessage) { - this.errorMessage = errorMessage; + public String message(){ + return this.message; } public Map metadata() { return metadata; } - public void metadata(Map metadata) { - this.metadata = metadata; + public long version() { + return version; } - public Transform transform() { - return transform; + void version(long version) { + this.version = version; } - public void transform(Transform transform) { - this.transform = transform; + public void update(State state, @Nullable String message) { + this.state = state; + this.message = message; + } + + public void update(AlertExecution execution) { + assert sealed.compareAndSet(false, true) : "sealing an fired alert should only be done once"; + this.execution = execution; + if (execution.triggerResult().triggered()) { + if (execution.throttleResult().throttle()) { + state = State.THROTTLED; + } else { + state = State.ACTION_PERFORMED; + } + } else { + state = State.NO_ACTION_NEEDED; + } } @Override @@ -182,22 +126,18 @@ public class FiredAlert implements ToXContent { historyEntry.field(Parser.ALERT_NAME_FIELD.getPreferredName(), name); historyEntry.field(Parser.FIRE_TIME_FIELD.getPreferredName(), fireTime.toDateTimeISO()); historyEntry.field(Parser.SCHEDULED_FIRE_TIME_FIELD.getPreferredName(), scheduledTime.toDateTimeISO()); - historyEntry.startObject(Parser.TRIGGER_FIELD.getPreferredName()).field(trigger.type(), trigger, params).endObject(); - historyEntry.field(Parser.ACTIONS_FIELD.getPreferredName(), actions, params); + historyEntry.startObject(Alert.Parser.TRIGGER_FIELD.getPreferredName()).field(trigger.type(), trigger, params).endObject(); historyEntry.field(Parser.STATE_FIELD.getPreferredName(), state.toString()); - if (transform != null) { - historyEntry.startObject(Parser.TRANSFORM_FIELD.getPreferredName()).field(transform.type(), transform, params).endObject(); - } - if (errorMessage != null) { - historyEntry.field(Parser.ERROR_MESSAGE_FIELD.getPreferredName(), errorMessage); + if (message != null) { + historyEntry.field(Parser.MESSAGE_FIELD.getPreferredName(), message); } if (metadata != null) { historyEntry.field(Parser.METADATA_FIELD.getPreferredName(), metadata); } - if (alertRun != null) { - historyEntry.field(Parser.ALERT_RUN_FIELD.getPreferredName(), alertRun); + if (execution != null) { + historyEntry.field(Parser.ALERT_EXECUTION_FIELD.getPreferredName(), execution); } historyEntry.endObject(); @@ -227,7 +167,7 @@ public class FiredAlert implements ToXContent { public enum State { - AWAITS_RUN, + AWAITS_EXECUTION, RUNNING, NO_ACTION_NEEDED, ACTION_PERFORMED, @@ -235,10 +175,10 @@ public class FiredAlert implements ToXContent { THROTTLED; @Override - public String toString(){ + public String toString() { switch (this) { - case AWAITS_RUN: - return "AWAITS_RUN"; + case AWAITS_EXECUTION: + return "AWAITS_EXECUTION"; case RUNNING: return "RUNNING"; case NO_ACTION_NEEDED: @@ -254,10 +194,10 @@ public class FiredAlert implements ToXContent { } } - public static State fromString(String s) { - switch(s.toUpperCase()) { - case "AWAITS_RUN": - return AWAITS_RUN; + public static State fromString(String value) { + switch(value.toUpperCase()) { + case "AWAITS_EXECUTION": + return AWAITS_EXECUTION; case "RUNNING": return RUNNING; case "NO_ACTION_NEEDED": @@ -269,7 +209,7 @@ public class FiredAlert implements ToXContent { case "THROTTLED": return THROTTLED; default: - throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" ); + throw new ElasticsearchIllegalArgumentException("unknown fired alert state [" + value + "]"); } } @@ -280,23 +220,18 @@ public class FiredAlert implements ToXContent { public static final ParseField ALERT_NAME_FIELD = new ParseField("alert_name"); public static final ParseField FIRE_TIME_FIELD = new ParseField("fire_time"); public static final ParseField SCHEDULED_FIRE_TIME_FIELD = new ParseField("scheduled_fire_time"); - public static final ParseField ERROR_MESSAGE_FIELD = new ParseField("error_msg"); - public static final ParseField TRIGGER_FIELD = new ParseField("trigger"); - public static final ParseField TRANSFORM_FIELD = new ParseField("transform"); - public static final ParseField ACTIONS_FIELD = new ParseField("actions"); + public static final ParseField MESSAGE_FIELD = new ParseField("message"); public static final ParseField STATE_FIELD = new ParseField("state"); public static final ParseField METADATA_FIELD = new ParseField("meta"); - public static final ParseField ALERT_RUN_FIELD = new ParseField("alert_run"); + public static final ParseField ALERT_EXECUTION_FIELD = new ParseField("alert_execution"); private final TriggerRegistry triggerRegistry; - private final TransformRegistry transformRegistry; private final ActionRegistry actionRegistry; @Inject - public Parser(Settings settings, TriggerRegistry triggerRegistry, TransformRegistry transformRegistry, ActionRegistry actionRegistry) { + public Parser(Settings settings, TriggerRegistry triggerRegistry, ActionRegistry actionRegistry) { super(settings); this.triggerRegistry = triggerRegistry; - this.transformRegistry = transformRegistry; this.actionRegistry = actionRegistry; } @@ -309,44 +244,37 @@ public class FiredAlert implements ToXContent { } public FiredAlert parse(XContentParser parser, String id, long version) throws IOException { - FiredAlert entry = new FiredAlert(); - entry.id(id); - entry.version(version); + FiredAlert alert = new FiredAlert(); + alert.id = id; + alert.version = version; + String currentFieldName = null; XContentParser.Token token = parser.nextToken(); assert token == XContentParser.Token.START_OBJECT; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_ARRAY) { - if (ACTIONS_FIELD.match(currentFieldName)) { - entry.actions(actionRegistry.parseActions(parser)); - } else { - throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]"); - } } else if (token == XContentParser.Token.START_OBJECT) { - if (TRIGGER_FIELD.match(currentFieldName)) { - entry.trigger(triggerRegistry.parse(parser)); - } else if (TRANSFORM_FIELD.match(currentFieldName)) { - entry.transform(transformRegistry.parse(parser)); + if (Alert.Parser.TRIGGER_FIELD.match(currentFieldName)) { + alert.trigger = triggerRegistry.parse(parser); } else if (METADATA_FIELD.match(currentFieldName)) { - entry.metadata(parser.map()); - } else if (ALERT_RUN_FIELD.match(currentFieldName)) { - entry.alertRun = AlertsService.AlertRun.Parser.parse(parser, triggerRegistry, actionRegistry); + alert.metadata = parser.map(); + } else if (ALERT_EXECUTION_FIELD.match(currentFieldName)) { + alert.execution = AlertExecution.Parser.parse(parser, triggerRegistry, actionRegistry); } else { throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]"); } } else if (token.isValue()) { if (ALERT_NAME_FIELD.match(currentFieldName)) { - entry.name(parser.text()); + alert.name = parser.text(); } else if (FIRE_TIME_FIELD.match(currentFieldName)) { - entry.fireTime(DateTime.parse(parser.text())); + alert.fireTime = DateTime.parse(parser.text()); } else if (SCHEDULED_FIRE_TIME_FIELD.match(currentFieldName)) { - entry.scheduledTime(DateTime.parse(parser.text())); - } else if (ERROR_MESSAGE_FIELD.match(currentFieldName)) { - entry.errorMessage(parser.textOrNull()); + alert.scheduledTime = DateTime.parse(parser.text()); + } else if (MESSAGE_FIELD.match(currentFieldName)) { + alert.message = parser.textOrNull(); } else if (STATE_FIELD.match(currentFieldName)) { - entry.state(State.fromString(parser.text())); + alert.state = State.fromString(parser.text()); } else { throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]"); } @@ -355,7 +283,7 @@ public class FiredAlert implements ToXContent { } } - return entry; + return alert; } } } diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java index 7c73d3fc163..5ca272b505a 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java @@ -7,6 +7,11 @@ package org.elasticsearch.alerts.history; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.alerts.*; +import org.elasticsearch.alerts.actions.Action; +import org.elasticsearch.alerts.scheduler.Scheduler; +import org.elasticsearch.alerts.throttle.Throttler; +import org.elasticsearch.alerts.transform.Transform; +import org.elasticsearch.alerts.trigger.Trigger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.collect.ImmutableList; import org.elasticsearch.common.component.AbstractComponent; @@ -18,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,22 +34,22 @@ public class HistoryService extends AbstractComponent { private final HistoryStore historyStore; private final ThreadPool threadPool; private final AlertsStore alertsStore; + private final AlertLockService alertLockService; + private final AtomicBoolean started = new AtomicBoolean(false); - private AlertsService alertsService; // Holds fired alerts that were fired before on a different elected master node, but never had the chance to run. - private volatile ImmutableList previousFiredAlerts; + private volatile ImmutableList previousFiredAlerts = ImmutableList.of(); @Inject - public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, AlertsStore alertsStore) { + public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, + AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler) { super(settings); this.historyStore = historyStore; this.threadPool = threadPool; this.alertsStore = alertsStore; - } - - public void setAlertsService(AlertsService alertsService){ - this.alertsService = alertsService; + this.alertLockService = alertLockService; + scheduler.addListener(new SchedulerListener()); } public boolean start(ClusterState state) { @@ -52,41 +58,41 @@ public class HistoryService extends AbstractComponent { } assert alertsThreadPool().getQueue().isEmpty() : "queue should be empty, but contains " + alertsThreadPool().getQueue().size() + " elements."; - HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state); - if (loadResult.succeeded()) { - if (!loadResult.notRanFiredAlerts().isEmpty()) { - this.previousFiredAlerts = ImmutableList.copyOf(loadResult.notRanFiredAlerts()); - logger.debug("loaded [{}] actions from the alert history index into actions queue", previousFiredAlerts.size()); - } - logger.debug("starting history service"); - if (started.compareAndSet(false, true)) { - if (alertsThreadPool().isShutdown()) { - logger.info("Restarting thread pool that had been shutdown"); - // this update threadpool settings work around is for restarting the alerts thread pool, - // that creates a new alerts thread pool and cleans up the existing one that has previously been shutdown. - int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - /*** - *TODO Horrible horrible hack to make sure that settings are always different from the previous settings - * - * THIS NEEDS TO CHANGE ASAP - */ - int queueSize = alertsThreadPool().getQueue().remainingCapacity(); - if (queueSize % 2 == 0){ - queueSize = queueSize + 1; - } else { - queueSize = queueSize - 1; - } - //TODO END HORRIBLE HACK - - threadPool.updateSettings(AlertsPlugin.alertThreadPoolSettings(availableProcessors, queueSize)); - assert !alertsThreadPool().isShutdown(); - } - logger.debug("started history service"); - } - return true; - } else { + HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state, FiredAlert.State.AWAITS_EXECUTION); + if (!loadResult.succeeded()) { return false; } + this.previousFiredAlerts = ImmutableList.copyOf(loadResult); + if (!previousFiredAlerts.isEmpty()) { + logger.debug("loaded [{}] actions from the alert history index into actions queue", previousFiredAlerts.size()); + } + logger.debug("starting history service"); + if (started.compareAndSet(false, true)) { + if (alertsThreadPool().isShutdown()) { + logger.info("Restarting thread pool that had been shutdown"); + // this update thread pool settings work around is for restarting the alerts thread pool, + // that creates a new alerts thread pool and cleans up the existing one that has previously been shutdown. + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + /*** + *TODO Horrible horrible hack to make sure that settings are always different from the previous settings + * + * THIS NEEDS TO CHANGE ASAP + */ + int queueSize = alertsThreadPool().getQueue().remainingCapacity(); + if (queueSize % 2 == 0){ + queueSize = queueSize + 1; + } else { + queueSize = queueSize - 1; + } + //TODO END HORRIBLE HACK + + threadPool.updateSettings(AlertsPlugin.alertThreadPoolSettings(availableProcessors, queueSize)); + assert !alertsThreadPool().isShutdown(); + } + logger.debug("started history service"); + } + executePreviouslyFiredAlerts(); + return true; } public void stop() { @@ -104,31 +110,6 @@ public class HistoryService extends AbstractComponent { return started.get(); } - // We can only process previosly fired alerts if the alert service has gone into a started state, - // so we let the alert service execute this method when it gets into that state. - - // TODO: We maybe have a AlertServiceStateListener interface for component that are interrested in when the state - // of alerts changes then these components can register themselves. - public void executePreviouslyFiredAlerts() { - ImmutableList firedAlerts = this.previousFiredAlerts; - if (firedAlerts != null) { - this.previousFiredAlerts = null; - for (FiredAlert firedAlert : firedAlerts) { - innerExecute(firedAlert); - } - } - } - - public void alertFired(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException { - if (!started.get()) { - throw new ElasticsearchIllegalStateException("not started"); - } - FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, fireTime, FiredAlert.State.AWAITS_RUN); - logger.debug("adding fired alert [{}]", alert.name()); - historyStore.put(firedAlert); - innerExecute(firedAlert); - } - // TODO: should be removed from the stats api? This is already visible in the thread pool cat api. public long getQueueSize() { return alertsThreadPool().getQueue().size(); @@ -139,63 +120,140 @@ public class HistoryService extends AbstractComponent { return alertsThreadPool().getLargestPoolSize(); } - private void innerExecute(FiredAlert firedAlert) { + void execute(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException { + if (!started.get()) { + throw new ElasticsearchIllegalStateException("not started"); + } + FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, fireTime); + logger.debug("adding fired alert [{}]", alert.name()); + historyStore.put(firedAlert); + execute(firedAlert); + } + + void execute(FiredAlert firedAlert) { try { if (alertsThreadPool().isShutdown()) { throw new AlertsException("attempting to add to a shutdown thread pool"); } - alertsThreadPool().execute(new AlertHistoryRunnable(firedAlert)); + alertsThreadPool().execute(new AlertExecutionTask(firedAlert)); } catch (EsRejectedExecutionException e) { logger.debug("[{}] failed to execute fired alert", firedAlert.name()); - firedAlert.state(FiredAlert.State.FAILED); - firedAlert.errorMessage("failed to run fired alert due to thread pool capacity"); + firedAlert.update(FiredAlert.State.FAILED, "failed to run fired alert due to thread pool capacity"); historyStore.update(firedAlert); } } + void executePreviouslyFiredAlerts() { + ImmutableList firedAlerts = this.previousFiredAlerts; + if (firedAlerts != null) { + this.previousFiredAlerts = ImmutableList.of(); + for (FiredAlert firedAlert : firedAlerts) { + execute(firedAlert); + } + } + } + private EsThreadPoolExecutor alertsThreadPool() { return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME); } - private final class AlertHistoryRunnable implements Runnable { + private final class AlertExecutionTask implements Runnable { - private final FiredAlert alert; + private final FiredAlert firedAlert; - private AlertHistoryRunnable(FiredAlert alert) { - this.alert = alert; + private AlertExecutionTask(FiredAlert firedAlert) { + this.firedAlert = firedAlert; } @Override public void run() { try { - Alert alert = alertsStore.getAlert(this.alert.name()); + Alert alert = alertsStore.getAlert(firedAlert.name()); if (alert == null) { - this.alert.errorMessage("alert was not found in the alerts store"); - this.alert.state(FiredAlert.State.FAILED); - historyStore.update(this.alert); - return; + firedAlert.update(FiredAlert.State.FAILED, "alert was not found in the alerts store"); + } else { + this.firedAlert.update(FiredAlert.State.RUNNING, null); + logger.debug("executing alert [{}]", this.firedAlert.name()); + AlertExecution alertExecution = execute(alert, this.firedAlert); + this.firedAlert.update(alertExecution); } - this.alert.state(FiredAlert.State.RUNNING); - historyStore.update(this.alert); - logger.debug("running an alert [{}]", this.alert.name()); - AlertsService.AlertRun alertRun = alertsService.runAlert(this.alert); - this.alert.finalize(alert, alertRun); - historyStore.update(this.alert); + historyStore.update(this.firedAlert); } catch (Exception e) { if (started()) { - logger.warn("failed to run alert [{}]", e, alert.name()); + logger.warn("failed to run alert [{}]", e, firedAlert.name()); try { - alert.errorMessage(e.getMessage()); - alert.state(FiredAlert.State.FAILED); - historyStore.update(alert); + firedAlert.update(FiredAlert.State.FAILED, e.getMessage()); + historyStore.update(firedAlert); + } catch (Exception e2) { - logger.error("failed to update fired alert [{}] with the error message", e2, alert); + logger.error("failed to update fired alert [{}] with the error message", e2, firedAlert); } } else { - logger.debug("failed to execute fired alert [{}] after shutdown", e, alert); + logger.debug("failed to execute fired alert [{}] after shutdown", e, firedAlert); } } } + + /* + The execution of an alert is split into operations, a schedule part which just makes sure that store the fact an alert + has fired and an execute part which actually executed the alert. + + The reason this is split into two operations is that we don't want to lose the fact an alert has fired. If we + would not split the execution of an alert and many alerts fire in a small window of time then it can happen that + thread pool that receives fired jobs from the quartz scheduler is going to reject jobs and then we would never + know about jobs that have fired. By splitting the execution of fired jobs into two operations we lower the chance + we lose fired jobs signficantly. + */ + + AlertExecution execute(Alert alert, FiredAlert firedAlert) throws IOException { + if (!started.get()) { + throw new ElasticsearchIllegalStateException("not started"); + } + AlertLockService.Lock lock = alertLockService.acquire(alert.name()); + try { + ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, firedAlert.fireTime(), firedAlert.scheduledTime()); + + Trigger.Result triggerResult = alert.trigger().execute(ctx); + ctx.onTriggerResult(triggerResult); + + if (triggerResult.triggered()) { + Throttler.Result throttleResult = alert.throttler().throttle(ctx, triggerResult); + ctx.onThrottleResult(throttleResult); + + if (!throttleResult.throttle()) { + Transform.Result result = alert.transform().apply(ctx, triggerResult.payload()); + ctx.onTransformResult(result); + + for (Action action : alert.actions()) { + Action.Result actionResult = action.execute(ctx, result.payload()); + ctx.onActionResult(actionResult); + } + } + } + return ctx.finish(); + + } finally { + lock.release(); + } + } } + private class SchedulerListener implements Scheduler.Listener { + @Override + public void fire(String name, DateTime scheduledFireTime, DateTime fireTime) { + if (!started.get()) { + throw new ElasticsearchIllegalStateException("not started"); + } + Alert alert = alertsStore.getAlert(name); + if (alert == null) { + logger.warn("unable to find [{}] in the alert store, perhaps it has been deleted", name); + return; + } + try { + execute(alert, scheduledFireTime, fireTime); + } catch (Exception e) { + logger.error("failed to fire alert [{}]", e, alert); + } + } + } } diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java index 474f6b44e4b..1474bbb3b70 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java @@ -30,6 +30,7 @@ import org.elasticsearch.search.SearchHit; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; /** @@ -37,7 +38,7 @@ import java.util.List; public class HistoryStore extends AbstractComponent { public static final String ALERT_HISTORY_INDEX_PREFIX = ".alert_history_"; - public static final String ALERT_HISTORY_TYPE = "alerthistory"; + public static final String ALERT_HISTORY_TYPE = "fired_alert"; static final DateTimeFormatter alertHistoryIndexTimeFormat = DateTimeFormat.forPattern("YYYY-MM-dd"); @@ -84,10 +85,10 @@ public class HistoryStore extends AbstractComponent { } } - public LoadResult loadFiredAlerts(ClusterState state) { + public LoadResult loadFiredAlerts(ClusterState state, FiredAlert.State firedAlertState) { String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); if (indices.length == 0) { - logger.info("No previous .alerthistory index, skip loading of alert actions"); + logger.info("No .alert_history indices found, skip loading of alert actions"); templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); return new LoadResult(true); } @@ -110,7 +111,7 @@ public class HistoryStore extends AbstractComponent { } SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*") - .setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.AWAITS_RUN.toString())) + .setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), firedAlertState.toString())) .setSearchType(SearchType.SCAN) .setScroll(scrollTimeout) .setSize(scrollSize) @@ -130,7 +131,7 @@ public class HistoryStore extends AbstractComponent { for (SearchHit sh : response.getHits()) { String historyId = sh.getId(); FiredAlert historyEntry = alertRecordParser.parse(sh.getSourceRef(), historyId, sh.version()); - assert historyEntry.state() == FiredAlert.State.AWAITS_RUN; + assert historyEntry.state() == FiredAlert.State.AWAITS_EXECUTION; logger.debug("loaded fired alert from index [{}/{}/{}]", sh.index(), sh.type(), sh.id()); alerts.add(historyEntry); } @@ -151,27 +152,28 @@ public class HistoryStore extends AbstractComponent { return ALERT_HISTORY_INDEX_PREFIX + alertHistoryIndexTimeFormat.print(time); } - public class LoadResult { + public class LoadResult implements Iterable { private final boolean succeeded; - private final List notRanFiredAlerts; + private final List alerts; - public LoadResult(boolean succeeded, List notRanFiredAlerts) { + public LoadResult(boolean succeeded, List alerts) { this.succeeded = succeeded; - this.notRanFiredAlerts = notRanFiredAlerts; + this.alerts = alerts; } public LoadResult(boolean succeeded) { this.succeeded = succeeded; - this.notRanFiredAlerts = Collections.emptyList(); + this.alerts = Collections.emptyList(); + } + + @Override + public Iterator iterator() { + return alerts.iterator(); } public boolean succeeded() { return succeeded; } - - public List notRanFiredAlerts() { - return notRanFiredAlerts; - } } } diff --git a/src/main/java/org/elasticsearch/alerts/throttle/AckThrottler.java b/src/main/java/org/elasticsearch/alerts/throttle/AckThrottler.java index 323853c637c..113b2d43c04 100644 --- a/src/main/java/org/elasticsearch/alerts/throttle/AckThrottler.java +++ b/src/main/java/org/elasticsearch/alerts/throttle/AckThrottler.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.throttle; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.trigger.Trigger; import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate; @@ -16,9 +16,9 @@ import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate; public class AckThrottler implements Throttler { @Override - public Result throttle(AlertContext ctx, Trigger.Result result) { - if (ctx.alert().status().acked()) { - return Result.throttle("alert [" + ctx.alert().name() + "] was acked at [" + formatDate(ctx.alert().status().ack().timestamp()) + "]"); + public Result throttle(ExecutionContext ctx, Trigger.Result result) { + if (ctx.alert().acked()) { + return Result.throttle("alert [" + ctx.alert().name() + "] was acked at [" + formatDate(ctx.alert().status().ackStatus().timestamp()) + "]"); } return Result.NO; } diff --git a/src/main/java/org/elasticsearch/alerts/throttle/AlertThrottler.java b/src/main/java/org/elasticsearch/alerts/throttle/AlertThrottler.java index a8f212b334d..0daba1e8064 100644 --- a/src/main/java/org/elasticsearch/alerts/throttle/AlertThrottler.java +++ b/src/main/java/org/elasticsearch/alerts/throttle/AlertThrottler.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.throttle; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.trigger.Trigger; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; @@ -24,7 +24,7 @@ public class AlertThrottler implements Throttler { } @Override - public Result throttle(AlertContext ctx, Trigger.Result result) { + public Result throttle(ExecutionContext ctx, Trigger.Result result) { if (periodThrottler != null) { Result throttleResult = periodThrottler.throttle(ctx, result); if (throttleResult.throttle()) { diff --git a/src/main/java/org/elasticsearch/alerts/throttle/PeriodThrottler.java b/src/main/java/org/elasticsearch/alerts/throttle/PeriodThrottler.java index 7c70ce7e469..174f7a16ca3 100644 --- a/src/main/java/org/elasticsearch/alerts/throttle/PeriodThrottler.java +++ b/src/main/java/org/elasticsearch/alerts/throttle/PeriodThrottler.java @@ -6,7 +6,7 @@ package org.elasticsearch.alerts.throttle; import org.elasticsearch.alerts.Alert; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.trigger.Trigger; import org.elasticsearch.common.joda.time.PeriodType; import org.elasticsearch.common.unit.TimeValue; @@ -33,7 +33,7 @@ public class PeriodThrottler implements Throttler { } @Override - public Result throttle(AlertContext ctx, Trigger.Result result) { + public Result throttle(ExecutionContext ctx, Trigger.Result result) { Alert.Status status = ctx.alert().status(); if (status.lastRan() != null) { TimeValue timeElapsed = new TimeValue(System.currentTimeMillis() - status.lastExecuted().getMillis()); diff --git a/src/main/java/org/elasticsearch/alerts/throttle/Throttler.java b/src/main/java/org/elasticsearch/alerts/throttle/Throttler.java index e0c8727a7d9..a68fe573439 100644 --- a/src/main/java/org/elasticsearch/alerts/throttle/Throttler.java +++ b/src/main/java/org/elasticsearch/alerts/throttle/Throttler.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.throttle; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.trigger.Trigger; import org.elasticsearch.common.ParseField; @@ -16,18 +16,19 @@ public interface Throttler { public static final Throttler NO_THROTTLE = new Throttler() { @Override - public Result throttle(AlertContext ctx, Trigger.Result result) { + public Result throttle(ExecutionContext ctx, Trigger.Result result) { return Result.NO; } }; - Result throttle(AlertContext ctx, Trigger.Result result); + Result throttle(ExecutionContext ctx, Trigger.Result result); static class Result { + public static ParseField THROTTLE_FIELD = new ParseField("throttle"); public static ParseField REASON_FIELD = new ParseField("reason"); - static final Result NO = new Result(false, null); + public static final Result NO = new Result(false, null); private final boolean throttle; private final String reason; diff --git a/src/main/java/org/elasticsearch/alerts/transform/SearchTransform.java b/src/main/java/org/elasticsearch/alerts/transform/SearchTransform.java index d284d9dd39c..98535d341e9 100644 --- a/src/main/java/org/elasticsearch/alerts/transform/SearchTransform.java +++ b/src/main/java/org/elasticsearch/alerts/transform/SearchTransform.java @@ -7,7 +7,7 @@ package org.elasticsearch.alerts.transform; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.support.AlertUtils; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; @@ -58,7 +58,7 @@ public class SearchTransform implements Transform { } @Override - public Transform.Result apply(AlertContext ctx, Payload payload) throws IOException { + public Transform.Result apply(ExecutionContext ctx, Payload payload) throws IOException { SearchRequest req = createRequest(request, ctx.scheduledTime(), ctx.fireTime(), payload.data()); SearchResponse resp = client.search(req).actionGet(); return new Transform.Result(TYPE, new Payload.ActionResponse(resp)); diff --git a/src/main/java/org/elasticsearch/alerts/transform/Transform.java b/src/main/java/org/elasticsearch/alerts/transform/Transform.java index 51c2a54c8ca..924c9f98e59 100644 --- a/src/main/java/org/elasticsearch/alerts/transform/Transform.java +++ b/src/main/java/org/elasticsearch/alerts/transform/Transform.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.transform; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -25,7 +25,7 @@ public interface Transform extends ToXContent { } @Override - public Result apply(AlertContext context, Payload payload) throws IOException { + public Result apply(ExecutionContext context, Payload payload) throws IOException { return new Result("noop", payload); } @@ -37,7 +37,7 @@ public interface Transform extends ToXContent { String type(); - Result apply(AlertContext context, Payload payload) throws IOException; + Result apply(ExecutionContext context, Payload payload) throws IOException; static class Result { diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/stats/TransportAlertsStatsAction.java b/src/main/java/org/elasticsearch/alerts/transport/actions/stats/TransportAlertsStatsAction.java index 9df89b51112..972444f705d 100644 --- a/src/main/java/org/elasticsearch/alerts/transport/actions/stats/TransportAlertsStatsAction.java +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/stats/TransportAlertsStatsAction.java @@ -57,7 +57,7 @@ public class TransportAlertsStatsAction extends TransportMasterNodeOperationActi @Override protected void masterOperation(AlertsStatsRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { AlertsStatsResponse statsResponse = new AlertsStatsResponse(); - statsResponse.setAlertManagerState(alertsService.getState()); + statsResponse.setAlertManagerState(alertsService.state()); statsResponse.setAlertActionManagerStarted(historyService.started()); statsResponse.setAlertActionManagerQueueSize(historyService.getQueueSize()); statsResponse.setNumberOfRegisteredAlerts(alertsService.getNumberOfAlerts()); diff --git a/src/main/java/org/elasticsearch/alerts/trigger/Trigger.java b/src/main/java/org/elasticsearch/alerts/trigger/Trigger.java index c20a05fc8e8..122dcd793c9 100644 --- a/src/main/java/org/elasticsearch/alerts/trigger/Trigger.java +++ b/src/main/java/org/elasticsearch/alerts/trigger/Trigger.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.trigger; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.logging.ESLogger; @@ -34,7 +34,7 @@ public abstract class Trigger implements ToXContent { /** * Executes this trigger */ - public abstract R execute(AlertContext ctx) throws IOException; + public abstract R execute(ExecutionContext ctx) throws IOException; /** diff --git a/src/main/java/org/elasticsearch/alerts/trigger/search/ScriptSearchTrigger.java b/src/main/java/org/elasticsearch/alerts/trigger/search/ScriptSearchTrigger.java index a7a4afafd66..83e1129d239 100644 --- a/src/main/java/org/elasticsearch/alerts/trigger/search/ScriptSearchTrigger.java +++ b/src/main/java/org/elasticsearch/alerts/trigger/search/ScriptSearchTrigger.java @@ -70,8 +70,7 @@ public class ScriptSearchTrigger extends SearchTrigger { builder.field(ScriptService.SCRIPT_INLINE.getPreferredName(), script); builder.field(Parser.SCRIPT_TYPE_FIELD.getPreferredName(), scriptType); builder.field(ScriptService.SCRIPT_LANG.getPreferredName(), scriptLang); - builder.endObject(); - return builder; + return builder.endObject(); } public static class Parser extends AbstractComponent implements SearchTrigger.Parser { diff --git a/src/main/java/org/elasticsearch/alerts/trigger/search/SearchTrigger.java b/src/main/java/org/elasticsearch/alerts/trigger/search/SearchTrigger.java index 94f3b6c6a9a..249966c88b4 100644 --- a/src/main/java/org/elasticsearch/alerts/trigger/search/SearchTrigger.java +++ b/src/main/java/org/elasticsearch/alerts/trigger/search/SearchTrigger.java @@ -7,7 +7,7 @@ package org.elasticsearch.alerts.trigger.search; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.support.AlertUtils; import org.elasticsearch.alerts.support.init.proxy.ClientProxy; @@ -35,7 +35,7 @@ public abstract class SearchTrigger extends Trigger { } @Override - public Result execute(AlertContext ctx) throws IOException { + public Result execute(ExecutionContext ctx) throws IOException { SearchRequest request = AlertUtils.createSearchRequestWithTimes(this.request, ctx.scheduledTime(), ctx.fireTime(), scriptService); if (logger.isTraceEnabled()) { logger.trace("running query for [{}]", ctx.alert().name(), XContentHelper.convertToJson(request.source(), false, true)); diff --git a/src/main/java/org/elasticsearch/alerts/trigger/simple/SimpleTrigger.java b/src/main/java/org/elasticsearch/alerts/trigger/simple/SimpleTrigger.java index 2ee8fced9e2..b3f64036142 100644 --- a/src/main/java/org/elasticsearch/alerts/trigger/simple/SimpleTrigger.java +++ b/src/main/java/org/elasticsearch/alerts/trigger/simple/SimpleTrigger.java @@ -5,7 +5,7 @@ */ package org.elasticsearch.alerts.trigger.simple; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.trigger.Trigger; import org.elasticsearch.alerts.trigger.TriggerException; @@ -38,7 +38,7 @@ public class SimpleTrigger extends Trigger { } @Override - public Result execute(AlertContext ctx) throws IOException { + public Result execute(ExecutionContext ctx) throws IOException { return new Result(payload); } diff --git a/src/main/resources/alerthistory.json b/src/main/resources/alerthistory.json index 316261f36dc..cf1d5386bfc 100644 --- a/src/main/resources/alerthistory.json +++ b/src/main/resources/alerthistory.json @@ -8,7 +8,7 @@ "index.mapper.dynamic" : false }, "mappings": { - "alerthistory": { + "fired_alert": { "dynamic" : "strict", "properties": { "alert_name": { @@ -28,7 +28,7 @@ "type": "string", "index": "not_analyzed" }, - "error_msg": { + "message": { "type": "string" }, "trigger" : { @@ -41,7 +41,7 @@ "enabled" : false, "dynamic" : true }, - "alert_run" : { + "alert_execution" : { "type" : "object", "enabled" : false, "dynamic" : true diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index 9a70ae0f9c4..d4f5f05a95f 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -126,7 +126,9 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest } builder.startObject(); - builder.startObject("schedule").field("cron", cron).endObject(); + builder.startObject("schedule") + .field("cron", cron) + .endObject(); if (metadata != null) { builder.field("meta", metadata); @@ -276,7 +278,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest .get(); assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed)); if (assertTriggerSearchMatched) { - assertThat((Integer) XContentMapValues.extractValue("alert_run.trigger_result.script.payload.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1)); + assertThat((Integer) XContentMapValues.extractValue("alert_execution.trigger_result.script.payload.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1)); } } }); @@ -425,7 +427,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest // on these nodes for (String node : nodes) { AlertsService alertsService = _testCluster.getInstance(AlertsService.class, node); - assertThat(alertsService.getState(), equalTo(AlertsService.State.STOPPED)); + assertThat(alertsService.state(), equalTo(AlertsService.State.STOPPED)); alertsService.stop(); // Prevents these nodes from starting alerting when new elected master node is picked. } @@ -435,7 +437,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest assertBusy(new Runnable() { @Override public void run() { - assertThat(alertsService.getState(), not(equalTo(AlertsService.State.STARTING))); + assertThat(alertsService.state(), not(equalTo(AlertsService.State.STARTING))); } }); } catch (Exception e) { @@ -446,7 +448,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest assertBusy(new Runnable() { @Override public void run() { - assertThat(alertsService.getState(), equalTo(AlertsService.State.STOPPED)); + assertThat(alertsService.state(), equalTo(AlertsService.State.STOPPED)); } }); } catch (Exception e) { diff --git a/src/test/java/org/elasticsearch/alerts/AlertSerializationTest.java b/src/test/java/org/elasticsearch/alerts/AlertSerializationTest.java index 19ce8f61236..c6fa550faec 100644 --- a/src/test/java/org/elasticsearch/alerts/AlertSerializationTest.java +++ b/src/test/java/org/elasticsearch/alerts/AlertSerializationTest.java @@ -58,7 +58,7 @@ public class AlertSerializationTest extends AbstractAlertingTests { assertEqualByGeneratedXContent(parsedAlert.schedule(), alert.schedule()); assertEqualByGeneratedXContent(parsedAlert.trigger(), alert.trigger()); assertEquals(parsedAlert.throttlePeriod().getMillis(), alert.throttlePeriod().getMillis()); - assertEquals(parsedAlert.status().ack().state(), alert.status().ack().state()); + assertEquals(parsedAlert.status().ackStatus().state(), alert.status().ackStatus().state()); assertEquals(parsedAlert.metadata().get("foo"), "bar"); } diff --git a/src/test/java/org/elasticsearch/alerts/AlertThrottleTests.java b/src/test/java/org/elasticsearch/alerts/AlertThrottleTests.java index 7b31403a6e5..2e19dfa39fc 100644 --- a/src/test/java/org/elasticsearch/alerts/AlertThrottleTests.java +++ b/src/test/java/org/elasticsearch/alerts/AlertThrottleTests.java @@ -87,7 +87,7 @@ public class AlertThrottleTests extends AbstractAlertingTests { Thread.sleep(20000); AckAlertResponse ackResponse = alertsClient.prepareAckAlert("throttled-alert").get(); - assertEquals(Alert.Status.Ack.State.ACKED, ackResponse.getStatus().ack().state()); + assertEquals(Alert.Status.AckStatus.State.ACKED, ackResponse.getStatus().ackStatus().state()); refresh(); SearchResponse searchResponse = client() @@ -123,7 +123,7 @@ public class AlertThrottleTests extends AbstractAlertingTests { Alert parsedAlert = alertParser.parse(getAlertResponse.getResponse().getId(), true, getAlertResponse.getResponse().getSourceAsBytesRef()); - assertThat(parsedAlert.status().ack().state(), equalTo(Alert.Status.Ack.State.AWAITS_EXECUTION)); + assertThat(parsedAlert.status().ackStatus().state(), equalTo(Alert.Status.AckStatus.State.AWAITS_EXECUTION)); CountResponse countOfThrottledActions = client() .prepareCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*") diff --git a/src/test/java/org/elasticsearch/alerts/BootStrapTest.java b/src/test/java/org/elasticsearch/alerts/BootStrapTest.java index a218d9711b4..56b0965d849 100644 --- a/src/test/java/org/elasticsearch/alerts/BootStrapTest.java +++ b/src/test/java/org/elasticsearch/alerts/BootStrapTest.java @@ -88,7 +88,7 @@ public class BootStrapTest extends AbstractAlertingTests { ); DateTime scheduledFireTime = new DateTime(DateTimeZone.UTC); - FiredAlert entry = new FiredAlert(alert, scheduledFireTime, scheduledFireTime, FiredAlert.State.AWAITS_RUN); + FiredAlert entry = new FiredAlert(alert, scheduledFireTime, scheduledFireTime); String actionHistoryIndex = HistoryStore.getAlertHistoryIndexNameForTime(scheduledFireTime); createIndex(actionHistoryIndex); @@ -145,7 +145,7 @@ public class BootStrapTest extends AbstractAlertingTests { PutAlertResponse putAlertResponse = alertClient().preparePutAlert(alert.name()).setAlertSource(jsonBuilder.bytes()).get(); assertTrue(putAlertResponse.indexResponse().isCreated()); - FiredAlert entry = new FiredAlert(alert, historyIndexDate, historyIndexDate, FiredAlert.State.AWAITS_RUN); + FiredAlert entry = new FiredAlert(alert, historyIndexDate, historyIndexDate); IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryStore.ALERT_HISTORY_TYPE, entry.id()) .setConsistencyLevel(WriteConsistencyLevel.ALL) .setSource(XContentFactory.jsonBuilder().value(entry)) diff --git a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java index 2218a5e9111..26e37e38266 100644 --- a/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java +++ b/src/test/java/org/elasticsearch/alerts/NoMasterNodeTests.java @@ -142,7 +142,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests { }), equalTo(true)); // Ensure that the alert manager doesn't run elsewhere for (AlertsService alertsService : internalTestCluster().getInstances(AlertsService.class)) { - assertThat(alertsService.getState(), is(AlertsService.State.STOPPED)); + assertThat(alertsService.state(), is(AlertsService.State.STOPPED)); } } diff --git a/src/test/java/org/elasticsearch/alerts/actions/email/EmailActionTest.java b/src/test/java/org/elasticsearch/alerts/actions/email/EmailActionTest.java index 1d2359c9744..7faa348cf19 100644 --- a/src/test/java/org/elasticsearch/alerts/actions/email/EmailActionTest.java +++ b/src/test/java/org/elasticsearch/alerts/actions/email/EmailActionTest.java @@ -6,7 +6,7 @@ package org.elasticsearch.alerts.actions.email; import org.elasticsearch.alerts.Alert; -import org.elasticsearch.alerts.AlertContext; +import org.elasticsearch.alerts.ExecutionContext; import org.elasticsearch.alerts.Payload; import org.elasticsearch.alerts.actions.email.service.Authentication; import org.elasticsearch.alerts.actions.email.service.Email; @@ -29,9 +29,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import javax.mail.MessagingException; -import javax.mail.internet.InternetAddress; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** */ @@ -41,40 +43,17 @@ public class EmailActionTest extends ElasticsearchTestCase { //createIndex("my-trigger-index"); StringTemplateUtils.Template template = new StringTemplateUtils.Template("{{alert_name}} triggered with {{response.hits.total}} hits"); - List addresses = new ArrayList<>(); - addresses.addAll(Arrays.asList(InternetAddress.parse("you@foo.com"))); Settings settings = ImmutableSettings.settingsBuilder().build(); MustacheScriptEngineService mustacheScriptEngineService = new MustacheScriptEngineService(settings); - ThreadPool tp; - tp = new ThreadPool(ThreadPool.Names.SAME); + ThreadPool threadPool = new ThreadPool(ThreadPool.Names.SAME); Set engineServiceSet = new HashSet<>(); engineServiceSet.add(mustacheScriptEngineService); - ScriptService scriptService = new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp)); + ScriptService scriptService = new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, threadPool)); StringTemplateUtils stringTemplateUtils = new StringTemplateUtils(settings, ScriptServiceProxy.of(scriptService)); - EmailService alwaysSuccessEmailService = new EmailService() { - @Override - public void start(ClusterState state) { - - } - - @Override - public void stop() { - - } - - @Override - public EmailSent send(Email email, Authentication auth, Profile profile) { - return new EmailSent(auth.username(), email); - } - - @Override - public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) { - return new EmailSent(accountName, email); - } - }; + EmailService emailService = new EmailServiceMock(); Email.Address from = new Email.Address("from@test.com"); List emailAddressList = new ArrayList<>(); @@ -86,7 +65,7 @@ public class EmailActionTest extends ElasticsearchTestCase { emailBuilder.from(from); emailBuilder.to(to); - EmailAction emailAction = new EmailAction(logger, alwaysSuccessEmailService, stringTemplateUtils, emailBuilder, + EmailAction emailAction = new EmailAction(logger, emailService, stringTemplateUtils, emailBuilder, new Authentication("testname", "testpassword"), Profile.STANDARD, "testaccount", template, template, null, true); //This is ok since the execution of the action only relies on the alert name @@ -100,12 +79,10 @@ public class EmailActionTest extends ElasticsearchTestCase { null, new Alert.Status() ); - - - AlertContext ctx = new AlertContext(alert, new DateTime(), new DateTime()); + ExecutionContext ctx = new ExecutionContext("test-serialization#1", alert, new DateTime(), new DateTime()); EmailAction.Result result = emailAction.execute(ctx, new Payload.Simple()); - tp.shutdownNow(); + threadPool.shutdownNow(); assertTrue(result.success()); @@ -114,7 +91,28 @@ public class EmailActionTest extends ElasticsearchTestCase { assertArrayEquals(success.email().to().toArray(), to.toArray() ); assertEquals(success.email().from(), from); //@TODO add more here + } + static class EmailServiceMock implements EmailService { + @Override + public void start(ClusterState state) { + + } + + @Override + public void stop() { + + } + + @Override + public EmailSent send(Email email, Authentication auth, Profile profile) { + return new EmailSent(auth.username(), email); + } + + @Override + public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) { + return new EmailSent(accountName, email); + } } } diff --git a/src/test/java/org/elasticsearch/alerts/history/FiredAlertTest.java b/src/test/java/org/elasticsearch/alerts/history/FiredAlertTest.java index 204242a8cbe..92fdbdf79db 100644 --- a/src/test/java/org/elasticsearch/alerts/history/FiredAlertTest.java +++ b/src/test/java/org/elasticsearch/alerts/history/FiredAlertTest.java @@ -27,7 +27,7 @@ public class FiredAlertTest extends AbstractAlertingTests { public void testFiredAlertParser() throws Exception { Alert alert = createTestAlert("fired_test"); - FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime(), FiredAlert.State.AWAITS_RUN); + FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime()); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0); @@ -43,14 +43,14 @@ public class FiredAlertTest extends AbstractAlertingTests { @Test public void testFinalizedFiredAlertParser() throws Exception { Alert alert = createTestAlert("fired_test"); - FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime(), FiredAlert.State.AWAITS_RUN); - AlertContext ctx = new AlertContext(alert, new DateTime(), new DateTime()); - ctx.addActionResult(new EmailAction.Result.Failure("failed to send because blah")); - ctx.addActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}")); + FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime()); + ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime()); + ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah")); + ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}")); Trigger.Result triggerResult = new SimpleTrigger.Result(new Payload.Simple()); - ctx.throttleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult)); - ctx.triggerResult(triggerResult); - firedAlert.finalize(alert, new AlertsService.AlertRun(ctx)); + ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult)); + ctx.onTriggerResult(triggerResult); + firedAlert.update(new AlertExecution(ctx)); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); logger.error("FOO : " + jsonBuilder.bytes().toUtf8()); @@ -63,14 +63,14 @@ public class FiredAlertTest extends AbstractAlertingTests { @Test public void testFinalizedFiredAlertParserScriptSearchTrigger() throws Exception { Alert alert = createTestAlert("fired_test"); - FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime(), FiredAlert.State.AWAITS_RUN); - AlertContext ctx = new AlertContext(alert, new DateTime(), new DateTime()); - ctx.addActionResult(new EmailAction.Result.Failure("failed to send because blah")); - ctx.addActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}")); + FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime()); + ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime()); + ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah")); + ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}")); Trigger.Result triggerResult = new SearchTrigger.Result(ScriptSearchTrigger.TYPE, true, createTriggerSearchRequest(), new Payload.Simple()); - ctx.throttleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult)); - ctx.triggerResult(triggerResult); - firedAlert.finalize(alert, new AlertsService.AlertRun(ctx)); + ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult)); + ctx.onTriggerResult(triggerResult); + firedAlert.update(new AlertExecution(ctx)); XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS); FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0);