Refactoring

- moved alert execution logic to the history service (the history service now listener to schedule events)
- introduced `AlertLockService` - used by both alerts service and history service to lock alerts across services
- the history service is now responsible for executing the previous "not yet executed" fired alerts.
- renamed `AlertContext` to `ExecutionContext`
- renamed `AlertRun` to `AlertExecution`
- improved actions result parsing logic (`success` field is mandatory)
- renamed the alert history type to `fired_alert` (used to be `alerthistory`)
- renamed fired alert `error_msg` to just `message`.

Original commit: elastic/x-pack-elasticsearch@09f26ce3cf
This commit is contained in:
uboness 2015-02-16 06:06:02 +01:00
parent 8b83d74994
commit 61761286e0
32 changed files with 656 additions and 688 deletions

View File

@ -102,6 +102,18 @@ public class Alert implements ToXContent {
return status;
}
/**
* Acks this alert.
*
* @return {@code true} if the status of this alert changed, {@code false} otherwise.
*/
public boolean ack() {
return status.onAck(new DateTime());
}
public boolean acked() {
return status.ackStatus.state == Status.AckStatus.State.ACKED;
}
@Override
public boolean equals(Object o) {
@ -244,24 +256,24 @@ public class Alert implements ToXContent {
private DateTime lastRan;
private DateTime lastTriggered;
private DateTime lastExecuted;
private Ack ack;
private AckStatus ackStatus;
private Throttle lastThrottle;
public Status() {
this(-1, null, null, null, null, new Ack());
this(-1, null, null, null, null, new AckStatus());
}
public Status(Status other) {
this(other.version, other.lastRan, other.lastTriggered, other.lastExecuted, other.lastThrottle, other.ack);
this(other.version, other.lastRan, other.lastTriggered, other.lastExecuted, other.lastThrottle, other.ackStatus);
}
private Status(long version, DateTime lastRan, DateTime lastTriggered, DateTime lastExecuted, Throttle lastThrottle, Ack ack) {
private Status(long version, DateTime lastRan, DateTime lastTriggered, DateTime lastExecuted, Throttle lastThrottle, AckStatus ackStatus) {
this.version = version;
this.lastRan = lastRan;
this.lastTriggered = lastTriggered;
this.lastExecuted = lastExecuted;
this.lastThrottle = lastThrottle;
this.ack = ack;
this.ackStatus = ackStatus;
}
public long version() {
@ -300,18 +312,14 @@ public class Alert implements ToXContent {
return lastThrottle;
}
public Ack ack() {
return ack;
}
public boolean acked() {
return ack.state == Ack.State.ACKED;
public AckStatus ackStatus() {
return ackStatus;
}
/**
* Called whenever an alert is ran
*/
public void onRun(DateTime timestamp) {
public void onExecute(DateTime timestamp) {
lastRan = timestamp;
}
@ -324,41 +332,41 @@ public class Alert implements ToXContent {
/**
* Notifies this status about the triggered event of an alert run. The state will be updated accordingly -
* if the alert is can be acked and during a run, the alert was not triggered and the current state is {@link Status.Ack.State#ACKED},
* we then need to reset the state to {@link Status.Ack.State#AWAITS_EXECUTION}
* if the alert is can be acked and during a run, the alert was not triggered and the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKED},
* we then need to reset the state to {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#AWAITS_EXECUTION}
*/
public void onTrigger(boolean triggered, DateTime timestamp) {
if (triggered) {
lastTriggered = timestamp;
} else if (ack.state == Ack.State.ACKED) {
} else if (ackStatus.state == AckStatus.State.ACKED) {
// didn't trigger now after it triggered in the past - we need to reset the ack state
ack = new Ack(Ack.State.AWAITS_EXECUTION, timestamp);
ackStatus = new AckStatus(AckStatus.State.AWAITS_EXECUTION, timestamp);
}
}
/**
* Notifies this status that the alert was acked. If the current state is {@link Status.Ack.State#ACKABLE}, then we'll change it
* to {@link Status.Ack.State#ACKED} (when set to {@link Status.Ack.State#ACKED}, the {@link org.elasticsearch.alerts.throttle.AckThrottler} will lastThrottle the
* Notifies this status that the alert was acked. If the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKABLE}, then we'll change it
* to {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKED} (when set to {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKED}, the {@link org.elasticsearch.alerts.throttle.AckThrottler} will lastThrottle the
* execution.
*
* @return {@code true} if the state of changed due to the ack, {@code false} otherwise.
*/
public boolean onAck(DateTime timestamp) {
if (ack.state == Ack.State.ACKABLE) {
ack = new Ack(Ack.State.ACKED, timestamp);
boolean onAck(DateTime timestamp) {
if (ackStatus.state == AckStatus.State.ACKABLE) {
ackStatus = new AckStatus(AckStatus.State.ACKED, timestamp);
return true;
}
return false;
}
/**
* Notified this status that the alert was executed. If the current state is {@link Status.Ack.State#AWAITS_EXECUTION}, it will change to
* {@link Status.Ack.State#ACKABLE}.
* Notified this status that the alert was executed. If the current state is {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#AWAITS_EXECUTION}, it will change to
* {@link org.elasticsearch.alerts.Alert.Status.AckStatus.State#ACKABLE}.
*/
public void onExecution(DateTime timestamp) {
lastExecuted = timestamp;
if (ack.state == Ack.State.AWAITS_EXECUTION) {
ack = new Ack(Ack.State.ACKABLE, timestamp);
if (ackStatus.state == AckStatus.State.AWAITS_EXECUTION) {
ackStatus = new AckStatus(AckStatus.State.ACKABLE, timestamp);
}
}
@ -375,8 +383,8 @@ public class Alert implements ToXContent {
writeDate(out, lastThrottle.timestamp);
out.writeString(lastThrottle.reason);
}
out.writeString(ack.state.name());
writeDate(out, ack.timestamp);
out.writeString(ackStatus.state.name());
writeDate(out, ackStatus.timestamp);
}
@Override
@ -386,7 +394,7 @@ public class Alert implements ToXContent {
lastTriggered = readOptionalDate(in);
lastExecuted = readOptionalDate(in);
lastThrottle = in.readBoolean() ? new Throttle(readDate(in), in.readString()) : null;
ack = new Ack(Ack.State.valueOf(in.readString()), readDate(in));
ackStatus = new AckStatus(AckStatus.State.valueOf(in.readString()), readDate(in));
}
public static Status read(StreamInput in) throws IOException {
@ -408,8 +416,8 @@ public class Alert implements ToXContent {
builder.field(LAST_EXECUTED_FIELD.getPreferredName(), lastExecuted);
}
builder.startObject(ACK_FIELD.getPreferredName())
.field(STATE_FIELD.getPreferredName(), ack.state.name().toLowerCase(Locale.ROOT))
.field(TIMESTAMP_FIELD.getPreferredName(), ack.timestamp)
.field(STATE_FIELD.getPreferredName(), ackStatus.state.name().toLowerCase(Locale.ROOT))
.field(TIMESTAMP_FIELD.getPreferredName(), ackStatus.timestamp)
.endObject();
if (lastThrottle != null) {
builder.startObject(LAST_THROTTLE_FIELD.getPreferredName())
@ -426,7 +434,7 @@ public class Alert implements ToXContent {
DateTime lastTriggered = null;
DateTime lastExecuted = null;
Throttle lastThrottle = null;
Ack ack = null;
AckStatus ackStatus = null;
String currentFieldName = null;
XContentParser.Token token = null;
@ -474,7 +482,7 @@ public class Alert implements ToXContent {
}
} else if (ACK_FIELD.match(currentFieldName)) {
if (token == XContentParser.Token.START_OBJECT) {
Ack.State state = null;
AckStatus.State state = null;
DateTime timestamp = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
@ -483,24 +491,24 @@ public class Alert implements ToXContent {
if (TIMESTAMP_FIELD.match(currentFieldName)) {
timestamp = parseDate(currentFieldName, token, parser);
} else if (STATE_FIELD.match(currentFieldName)) {
state = Ack.State.valueOf(parser.text().toUpperCase(Locale.ROOT));
state = AckStatus.State.valueOf(parser.text().toUpperCase(Locale.ROOT));
} else {
throw new AlertsException("unknown filed [" + currentFieldName + "] in alert status throttle entry");
}
}
}
ack = new Ack(state, timestamp);
ackStatus = new AckStatus(state, timestamp);
} else {
throw new AlertsException("expecting field [" + currentFieldName + "] to be an object, found [" + token + "] instead");
}
}
}
return new Status(-1, lastRan, lastTriggered, lastExecuted, lastThrottle, ack);
return new Status(-1, lastRan, lastTriggered, lastExecuted, lastThrottle, ackStatus);
}
public static class Ack {
public static class AckStatus {
public static enum State {
AWAITS_EXECUTION,
@ -511,11 +519,11 @@ public class Alert implements ToXContent {
private final State state;
private final DateTime timestamp;
public Ack() {
public AckStatus() {
this(State.AWAITS_EXECUTION, new DateTime());
}
public Ack(State state, DateTime timestamp) {
public AckStatus(State state, DateTime timestamp) {
this.state = state;
this.timestamp = timestamp;
}

View File

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
public class AlertExecution implements ToXContent {
private final Trigger.Result triggerResult;
private final Throttler.Result throttleResult;
private final Map<String, Action.Result> actionsResults;
private final Payload payload;
public AlertExecution(ExecutionContext context) {
this(context.triggerResult(), context.throttleResult(), context.actionsResults(), context.payload());
}
AlertExecution(Trigger.Result triggerResult, Throttler.Result throttleResult, Map<String, Action.Result> actionsResults, Payload payload) {
this.triggerResult = triggerResult;
this.throttleResult = throttleResult;
this.actionsResults = actionsResults;
this.payload = payload;
}
public Trigger.Result triggerResult() {
return triggerResult;
}
public Throttler.Result throttleResult() {
return throttleResult;
}
public Map<String, Action.Result> actionsResults() {
return actionsResults;
}
public Payload payload() {
return payload;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (triggerResult != null) {
builder.startObject(Parser.TRIGGER_RESULT.getPreferredName()).field(triggerResult.type(), triggerResult).endObject();
}
if (throttleResult != null && throttleResult.throttle()) {
builder.field(Parser.THROTTLED.getPreferredName(), throttleResult.throttle());
if (throttleResult.reason() != null) {
builder.field(Parser.THROTTLE_REASON.getPreferredName(), throttleResult.reason());
}
}
builder.field(Parser.PAYLOAD.getPreferredName(), payload());
builder.startArray(Parser.ACTIONS_RESULTS.getPreferredName());
for (Map.Entry<String, Action.Result> actionResult : actionsResults.entrySet()) {
builder.startObject();
builder.field(actionResult.getKey(), actionResult.getValue());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
}
public static class Parser {
public static final ParseField TRIGGER_RESULT = new ParseField("trigger_result");
public static final ParseField PAYLOAD = new ParseField("payload");
public static final ParseField ACTIONS_RESULTS = new ParseField("actions_results");
public static final ParseField THROTTLED = new ParseField("throttled");
public static final ParseField THROTTLE_REASON = new ParseField("throttle_reason");
public static AlertExecution parse(XContentParser parser, TriggerRegistry triggerRegistry, ActionRegistry actionRegistry) throws IOException {
boolean throttled = false;
String throttleReason = null;
Map<String, Action.Result> actionResults = new HashMap<>();
Trigger.Result triggerResult = null;
Payload payload = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (THROTTLE_REASON.match(currentFieldName)) {
throttleReason = parser.text();
} else if (THROTTLED.match(currentFieldName)) {
throttled = parser.booleanValue();
} else {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (TRIGGER_RESULT.match(currentFieldName)) {
triggerResult = triggerRegistry.parseResult(parser);
} else if (PAYLOAD.match(currentFieldName)) {
payload = new Payload.Simple(parser.map()); //TODO fixme
} else {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (ACTIONS_RESULTS.match(currentFieldName)) {
actionResults = parseActionResults(parser, actionRegistry);
} else {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
}
} else {
throw new AlertsException("unable to parse alert run. unexpected token [" + token + "]");
}
}
Throttler.Result throttleResult = throttled ? Throttler.Result.throttle(throttleReason) : Throttler.Result.NO;
return new AlertExecution(triggerResult, throttleResult, actionResults, payload );
}
private static Map<String, Action.Result> parseActionResults(XContentParser parser, ActionRegistry actionRegistry) throws IOException {
Map<String, Action.Result> actionResults = new HashMap<>();
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
Action.Result actionResult = actionRegistry.parseResult(parser);
actionResults.put(actionResult.type(), actionResult);
}
return actionResults;
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import java.util.concurrent.atomic.AtomicBoolean;
/**
*
*/
public class AlertLockService {
private final KeyedLock<String> alertLock = new KeyedLock<>();
private AtomicBoolean running = new AtomicBoolean();
public Lock acquire(String name) {
alertLock.acquire(name);
return new Lock(name, alertLock);
}
public void start() {
if (running.compareAndSet(false, true)) {
// init
}
}
public void stop() {
if (running.compareAndSet(true, false)) {
// It can happen we have still ongoing operations and we wait those operations to finish to avoid
// that AlertManager or any of its components end up in a illegal state after the state as been set to stopped.
//
// For example: An alert action entry may be added while we stopping alerting if we don't wait for
// ongoing operations to complete. Resulting in once the alert service starts again that more than
// expected alert action entries are processed.
//
// Note: new operations will fail now because the state has been set to: stopping
while (alertLock.hasLockedKeys()) {
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
}
}
}
}
public static class Lock {
private final String name;
private final KeyedLock<String> alertLock;
private Lock(String name, KeyedLock<String> alertLock) {
this.name = name;
this.alertLock = alertLock;
}
public void release() {
alertLock.release(name);
}
}
}

View File

@ -42,6 +42,7 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
protected void configure() {
bind(Alert.Parser.class).asEagerSingleton();
bind(AlertLockService.class).asEagerSingleton();
bind(AlertsService.class).asEagerSingleton();
bind(AlertsStore.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();

View File

@ -6,40 +6,24 @@
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
@ -47,27 +31,25 @@ public class AlertsService extends AbstractComponent {
private final Scheduler scheduler;
private final AlertsStore alertsStore;
private final HistoryService historyService;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final KeyedLock<String> alertLock = new KeyedLock<>();
private final AlertLockService alertLockService;
private final HistoryService historyService;
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private volatile boolean manuallyStopped;
@Inject
public AlertsService(Settings settings, ClusterService clusterService, Scheduler scheduler, AlertsStore alertsStore,
IndicesService indicesService, HistoryService historyService,
ThreadPool threadPool) {
IndicesService indicesService, HistoryService historyService, ThreadPool threadPool,
AlertLockService alertLockService) {
super(settings);
this.scheduler = scheduler;
this.threadPool = threadPool;
this.alertsStore = alertsStore;
this.historyService = historyService;
this.historyService.setAlertsService(this);
this.clusterService = clusterService;
scheduler.addListener(new SchedulerListener());
this.alertLockService = alertLockService;
this.historyService = historyService;
clusterService.add(new AlertsClusterStateListener());
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will
@ -83,7 +65,7 @@ public class AlertsService extends AbstractComponent {
public AlertsStore.AlertDelete deleteAlert(String name) throws InterruptedException, ExecutionException {
ensureStarted();
alertLock.acquire(name);
AlertLockService.Lock lock = alertLockService.acquire(name);
try {
AlertsStore.AlertDelete delete = alertsStore.deleteAlert(name);
if (delete.deleteResponse().isFound()) {
@ -91,137 +73,47 @@ public class AlertsService extends AbstractComponent {
}
return delete;
} finally {
alertLock.release(name);
lock.release();
}
}
public IndexResponse putAlert(String alertName, BytesReference alertSource) {
public IndexResponse putAlert(String name, BytesReference alertSource) {
ensureStarted();
alertLock.acquire(alertName);
AlertLockService.Lock lock = alertLockService.acquire(name);
try {
AlertsStore.AlertPut result = alertsStore.putAlert(alertName, alertSource);
AlertsStore.AlertPut result = alertsStore.putAlert(name, alertSource);
if (result.previous() == null || !result.previous().schedule().equals(result.current().schedule())) {
scheduler.schedule(result.current());
}
return result.indexResponse();
} finally {
alertLock.release(alertName);
lock.release();
}
}
/**
* TODO: add version, fields, etc support that the core get api has as well.
*/
public Alert getAlert(String alertName) {
return alertsStore.getAlert(alertName);
public Alert getAlert(String name) {
return alertsStore.getAlert(name);
}
public State getState() {
public State state() {
return state.get();
}
/*
The execution of an alert is split into operations, a schedule part which just makes sure that store the fact an alert
has fired and an execute part which actually executed the alert.
The reason this is split into two operations is that we don't want to lose the fact an alert has fired. If we
would not split the execution of an alert and many alerts fire in a small window of time then it can happen that
thread pool that receives fired jobs from the quartz scheduler is going to reject jobs and then we would never
know about jobs that have fired. By splitting the execution of fired jobs into two operations we lower the chance
we lose fired jobs signficantly.
*/
/**
* This does the necessary actions, so we don't lose the fact that an alert got execute from the {@link org.elasticsearch.alerts.scheduler.Scheduler}
* It writes the an entry in the alert history index with the proper status for this alert.
*
* The rest of the actions happen in {@link #runAlert(org.elasticsearch.alerts.history.FiredAlert)}.
*
* The reason the executing of the alert is split into two, is that we don't want to lose the fact that an alert has
* fired. If we were
*/
void triggerAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){
ensureStarted();
alertLock.acquire(alertName);
try {
Alert alert = alertsStore.getAlert(alertName);
if (alert == null) {
logger.warn("unable to find [{}] in the alert store, perhaps it has been deleted", alertName);
return;
}
try {
historyService.alertFired(alert, scheduledFireTime, fireTime);
} catch (Exception e) {
logger.error("failed to schedule alert action for [{}]", e, alert);
}
} finally {
alertLock.release(alertName);
}
}
/**
* This actually runs the alert:
* 1) Runs the configured search request
* 2) Checks if the search request triggered (matches with the defined conditions)
* 3) If the alert has been triggered, checks if the alert should be throttled
* 4) If the alert hasn't been throttled runs the configured actions
*/
public AlertRun runAlert(FiredAlert entry) throws IOException {
ensureStarted();
alertLock.acquire(entry.name());
try {
Alert alert = alertsStore.getAlert(entry.name());
if (alert == null) {
throw new ElasticsearchException("Alert is not available");
}
AlertContext ctx = new AlertContext(alert, entry.fireTime(), entry.scheduledTime());
Trigger.Result triggerResult = alert.trigger().execute(ctx);
ctx.triggerResult(triggerResult);
if (triggerResult.triggered()) {
alert.status().onTrigger(true, entry.fireTime());
Throttler.Result throttleResult = alert.throttler().throttle(ctx, triggerResult);
ctx.throttleResult(throttleResult);
if (!throttleResult.throttle()) {
Transform.Result result = alert.transform().apply(ctx, triggerResult.payload());
ctx.transformResult(result);
for (Action action : alert.actions()){
Action.Result actionResult = action.execute(ctx, result.payload());
ctx.addActionResult(actionResult);
}
alert.status().onExecution(entry.scheduledTime());
} else {
alert.status().onThrottle(entry.fireTime(), throttleResult.reason());
}
} else {
alert.status().onTrigger(false, entry.fireTime());
}
alert.status().onRun(entry.fireTime());
alertsStore.updateAlert(alert);
return new AlertRun(ctx);
} finally {
alertLock.release(entry.name());
}
}
/**
* Acks the alert if needed
*/
public Alert.Status ackAlert(String alertName) {
public Alert.Status ackAlert(String name) {
ensureStarted();
alertLock.acquire(alertName);
AlertLockService.Lock lock = alertLockService.acquire(name);
try {
Alert alert = alertsStore.getAlert(alertName);
Alert alert = alertsStore.getAlert(name);
if (alert == null) {
throw new AlertsException("alert [" + alertName + "] does not exist");
throw new AlertsException("alert [" + name + "] does not exist");
}
if (alert.status().onAck(new DateTime())) {
if (alert.ack()) {
try {
alertsStore.updateAlertStatus(alert);
} catch (IOException ioe) {
@ -231,7 +123,7 @@ public class AlertsService extends AbstractComponent {
// we need to create a safe copy of the status
return new Alert.Status(alert.status());
} finally {
alertLock.release(alertName);
lock.release();
}
}
@ -240,7 +132,6 @@ public class AlertsService extends AbstractComponent {
*/
public void start() {
manuallyStopped = false;
logger.info("starting alert service...");
ClusterState state = clusterService.state();
internalStart(state);
}
@ -256,20 +147,7 @@ public class AlertsService extends AbstractComponent {
private void internalStop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
logger.info("stopping alert service...");
while (true) {
// It can happen we have still ongoing operations and we wait those operations to finish to avoid
// that AlertManager or any of its components end up in a illegal state after the state as been set to stopped.
//
// For example: An alert action entry may be added while we stopping alerting if we don't wait for
// ongoing operations to complete. Resulting in once the alert service starts again that more than
// expected alert action entries are processed.
//
// Note: new operations will fail now because the state has been set to: stopping
if (!alertLock.hasLockedKeys()) {
break;
}
}
alertLockService.stop();
historyService.stop();
scheduler.stop();
alertsStore.stop();
@ -280,25 +158,19 @@ public class AlertsService extends AbstractComponent {
private void internalStart(ClusterState initialState) {
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
logger.info("starting alert service...");
alertLockService.start();
ClusterState clusterState = initialState;
// Try to load alert store before the action service, b/c action depends on alert store
while (true) {
if (alertsStore.start(clusterState)) {
break;
}
while (!alertsStore.start(clusterState)) {
clusterState = newClusterState(clusterState);
}
while (true) {
if (historyService.start(clusterState)) {
break;
}
while (!historyService.start(clusterState)) {
clusterState = newClusterState(clusterState);
}
scheduler.start(alertsStore.getAlerts().values());
state.set(State.STARTED);
historyService.executePreviouslyFiredAlerts();
logger.info("alert service has started");
}
}
@ -326,13 +198,6 @@ public class AlertsService extends AbstractComponent {
}
}
private class SchedulerListener implements Scheduler.Listener {
@Override
public void fire(String alertName, DateTime scheduledFireTime, DateTime fireTime) {
triggerAlert(alertName, scheduledFireTime, fireTime);
}
}
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override
@ -367,130 +232,6 @@ public class AlertsService extends AbstractComponent {
}
public static class AlertRun implements ToXContent {
private final Trigger.Result triggerResult;
private final Throttler.Result throttleResult;
private final Map<String, Action.Result> actionsResults;
private final Payload payload;
public AlertRun(AlertContext context) {
this(context.triggerResult(), context.throttleResult(), context.actionsResults(), context.payload());
}
private AlertRun(Trigger.Result triggerResult, Throttler.Result throttleResult, Map<String, Action.Result> actionsResults, Payload payload) {
this.triggerResult = triggerResult;
this.throttleResult = throttleResult;
this.actionsResults = actionsResults;
this.payload = payload;
}
public Trigger.Result triggerResult() {
return triggerResult;
}
public Throttler.Result throttleResult() {
return throttleResult;
}
public Map<String, Action.Result> actionsResults() {
return actionsResults;
}
public Payload payload() {
return payload;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (triggerResult != null) {
builder.startObject(Parser.TRIGGER_RESULT.getPreferredName()).field(triggerResult.type(), triggerResult).endObject();
}
if (throttleResult != null) {
builder.field(Parser.THROTTLED.getPreferredName(), throttleResult.throttle());
if (throttleResult.reason() != null) {
builder.field(Parser.THROTTLE_REASON.getPreferredName(), throttleResult.reason());
}
}
builder.field(Parser.PAYLOAD.getPreferredName(), payload());
builder.startArray(Parser.ACTIONS_RESULTS.getPreferredName());
for (Map.Entry<String, Action.Result> actionResult : actionsResults.entrySet()) {
builder.startObject();
builder.field(actionResult.getKey(), actionResult.getValue());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
}
public static class Parser {
public static final ParseField TRIGGER_RESULT = new ParseField("trigger_result");
public static final ParseField PAYLOAD = new ParseField("payload");
public static final ParseField ACTIONS_RESULTS = new ParseField("actions_results");
public static final ParseField THROTTLED = new ParseField("throttled");
public static final ParseField THROTTLE_REASON = new ParseField("throttle_reason");
public static AlertRun parse(XContentParser parser, TriggerRegistry triggerRegistry, ActionRegistry actionRegistry) throws IOException {
boolean throttled = false;
String throttleReason = null;
Map<String, Action.Result> actionResults = new HashMap<>();
Trigger.Result triggerResult = null;
Payload payload = null;
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (THROTTLE_REASON.match(currentFieldName)) {
throttleReason = parser.text();
} else if (THROTTLED.match(currentFieldName)) {
throttled = parser.booleanValue();
} else {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (TRIGGER_RESULT.match(currentFieldName)) {
triggerResult = triggerRegistry.parseResult(parser);
} else if (PAYLOAD.match(currentFieldName)) {
payload = new Payload.Simple(parser.map()); //TODO fixme
} else {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (ACTIONS_RESULTS.match(currentFieldName)) {
actionResults = parseActionResults(parser, actionRegistry);
} else {
throw new AlertsException("unable to parse alert run. unexpected field [" + currentFieldName + "]");
}
} else {
throw new AlertsException("unable to parse alert run. unexpected token [" + token + "]");
}
}
Throttler.Result throttleResult = throttled ? Throttler.Result.throttle(throttleReason) : Throttler.NO_THROTTLE.throttle(null,null);
return new AlertRun(triggerResult, throttleResult, actionResults, payload );
}
private static Map<String, Action.Result> parseActionResults(XContentParser parser, ActionRegistry actionRegistry) throws IOException {
Map<String, Action.Result> actionResults = new HashMap<>();
String currentFieldName = null;
XContentParser.Token token;
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
Action.Result actionResult = actionRegistry.parseResult(parser);
actionResults.put(actionResult.type(), actionResult);
}
return actionResults;
}
}
}
/**
* Encapsulates the state of the alerts plugin.
*/

View File

@ -6,7 +6,6 @@
package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.trigger.Trigger;
@ -18,28 +17,29 @@ import java.util.Map;
/**
*
*/
public class AlertContext {
public class ExecutionContext {
private final String runId;
private final String id;
private final Alert alert;
private final DateTime fireTime;
private final DateTime scheduledTime;
private Payload payload;
private Trigger.Result triggerResult;
private Throttler.Result throttleResult;
private Transform.Result transformResult;
private Map<String, Action.Result> actionsResults = new HashMap<>();
public AlertContext(Alert alert, DateTime fireTime, DateTime scheduledTime) {
this.runId = FiredAlert.firedAlertId(alert, scheduledTime);
private Payload payload;
public ExecutionContext(String id, Alert alert, DateTime fireTime, DateTime scheduledTime) {
this.id = id;
this.alert = alert;
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
}
public String runId() {
return runId;
public String id() {
return id;
}
public Alert alert() {
@ -58,24 +58,30 @@ public class AlertContext {
return payload;
}
public void triggerResult(Trigger.Result triggerResult) {
public void onTriggerResult(Trigger.Result triggerResult) {
this.triggerResult = triggerResult;
this.payload = triggerResult.payload();
alert.status().onTrigger(triggerResult.triggered(), fireTime);
}
public Trigger.Result triggerResult() {
return triggerResult;
}
public void throttleResult(Throttler.Result throttleResult) {
public void onThrottleResult(Throttler.Result throttleResult) {
this.throttleResult = throttleResult;
if (throttleResult.throttle()) {
alert.status().onThrottle(fireTime, throttleResult.reason());
} else {
alert.status().onExecution(fireTime);
}
}
public Throttler.Result throttleResult() {
return throttleResult;
}
public void transformResult(Transform.Result transformResult) {
public void onTransformResult(Transform.Result transformResult) {
this.transformResult = transformResult;
this.payload = transformResult.payload();
}
@ -84,7 +90,7 @@ public class AlertContext {
return transformResult;
}
public void addActionResult(Action.Result result) {
public void onActionResult(Action.Result result) {
actionsResults.put(result.type(), result);
}
@ -92,4 +98,9 @@ public class AlertContext {
return actionsResults;
}
public AlertExecution finish() {
alert.status().onExecute(fireTime);
return new AlertExecution(this);
}
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.logging.ESLogger;
@ -36,7 +36,7 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
/**
* Executes this action
*/
public abstract R execute(AlertContext context, Payload payload) throws IOException;
public abstract R execute(ExecutionContext context, Payload payload) throws IOException;
/**
@ -60,7 +60,9 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
public static abstract class Result implements ToXContent {
public static final ParseField SUCCESS_FIELD = new ParseField("success");
protected final String type;
protected final boolean success;
@ -80,7 +82,7 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("success", success);
builder.field(SUCCESS_FIELD.getPreferredName(), success);
xContentBody(builder, params);
return builder.endObject();
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.actions.email;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.email.service.*;
@ -64,8 +64,8 @@ public class EmailAction extends Action<EmailAction.Result> {
}
@Override
public Result execute(AlertContext ctx, Payload payload) throws IOException {
email.id(ctx.runId());
public Result execute(ExecutionContext ctx, Payload payload) throws IOException {
email.id(ctx.id());
Map<String, Object> alertParams = new HashMap<>();
alertParams.put(Action.ALERT_NAME_VARIABLE_NAME, ctx.alert().name());
@ -213,7 +213,7 @@ public class EmailAction extends Action<EmailAction.Result> {
String currentFieldName = null;
XContentParser.Token token;
boolean success = false;
Boolean success = null;
Email email = null;
String account = null;
String reason = null;
@ -247,11 +247,11 @@ public class EmailAction extends Action<EmailAction.Result> {
}
}
if (success) {
return new Result.Success(new EmailService.EmailSent(account, email));
} else {
return new Result.Failure(reason);
if (success == null) {
throw new EmailException("could not parse email result. expected field [success]");
}
return success ? new Result.Success(new EmailService.EmailSent(account, email)) : new Result.Failure(reason);
}
}

View File

@ -8,7 +8,7 @@ package org.elasticsearch.alerts.actions.index;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
@ -50,7 +50,7 @@ public class IndexAction extends Action<IndexAction.Result> {
}
@Override
public Result execute(AlertContext ctx, Payload payload) throws IOException {
public Result execute(ExecutionContext ctx, Payload payload) throws IOException {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(index);
indexRequest.type(type);
@ -58,7 +58,7 @@ public class IndexAction extends Action<IndexAction.Result> {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject();
resultBuilder.field("data", payload.data());
resultBuilder.field("timestamp", ctx.alert().status().lastExecuted());
resultBuilder.field("timestamp", ctx.fireTime());
resultBuilder.endObject();
indexRequest.source(resultBuilder);
} catch (IOException ioe) {
@ -149,7 +149,7 @@ public class IndexAction extends Action<IndexAction.Result> {
public Action.Result parseResult(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
boolean success = false;
Boolean success = null;
Payload payload = null;
String reason = null;
@ -178,6 +178,11 @@ public class IndexAction extends Action<IndexAction.Result> {
throw new ActionException("could not parse index result. unexpected token [" + token + "]");
}
}
if (success == null) {
throw new ActionException("could not parse index result. expected boolean field [success]");
}
return new Result(payload, reason, success);
}
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.actions.webhook;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.actions.ActionException;
@ -59,7 +59,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
}
@Override
public Result execute(AlertContext ctx, Payload payload) throws IOException {
public Result execute(ExecutionContext ctx, Payload payload) throws IOException {
Map<String, Object> data = payload.data();
String renderedUrl = applyTemplate(templateUtils, urlTemplate, ctx.alert().name(), data);
String body = applyTemplate(templateUtils, bodyTemplate != null ? bodyTemplate : DEFAULT_BODY_TEMPLATE, ctx.alert().name(), data);
@ -225,7 +225,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
public Action.Result parseResult(XContentParser parser) throws IOException {
String currentFieldName = null;
XContentParser.Token token;
boolean success = false;
Boolean success = null;
String url = null;
String body = null;
String reason = null;
@ -246,7 +246,7 @@ public class WebhookAction extends Action<WebhookAction.Result> {
if (Action.Result.SUCCESS_FIELD.match(currentFieldName)) {
success = parser.booleanValue();
} else {
throw new ActionException("could not parse index result. unexpected boolean field [" + currentFieldName + "]");
throw new ActionException("could not parse webhook result. unexpected boolean field [" + currentFieldName + "]");
}
} else {
throw new ActionException("unable to parse webhook action result. unexpected field [" + currentFieldName + "]" );
@ -256,12 +256,11 @@ public class WebhookAction extends Action<WebhookAction.Result> {
}
}
if (success) {
return new Result.Executed(httpStatus, url, body);
} else {
return new Result.Failure(reason);
if (success == null) {
throw new ActionException("could not parse webhook result. expected boolean field [success]");
}
return success ? new Result.Executed(httpStatus, url, body) : new Result.Failure(reason);
}
}

View File

@ -8,14 +8,12 @@ package org.elasticsearch.alerts.history;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertExecution;
import org.elasticsearch.alerts.AlertsException;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.actions.Actions;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.transform.TransformRegistry;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
@ -38,33 +36,27 @@ public class FiredAlert implements ToXContent {
private DateTime fireTime;
private DateTime scheduledTime;
private Trigger trigger;
private Actions actions;
private State state;
private AlertExecution execution;
/*Optional*/
private Transform transform;
private String errorMessage;
private Map<String,Object> metadata;
private @Nullable String message;
private @Nullable Map<String,Object> metadata;
// During an fired alert execution we use this and then we store it with the history, after that we don't use it.
// We store it because it may end up being useful for debug / history purposes
private transient AlertsService.AlertRun alertRun;
// Used for assertion purposes, so we can ensure/test what we have loaded in memory is the same as what is persisted.
private transient long version;
private final AtomicBoolean finalized = new AtomicBoolean(false);
private final AtomicBoolean sealed = new AtomicBoolean(false);
FiredAlert() {
}
public FiredAlert(Alert alert, DateTime scheduledTime, DateTime fireTime, State state) {
this.id = firedAlertId(alert, scheduledTime);
public FiredAlert(Alert alert, DateTime scheduledTime, DateTime fireTime) {
this.id = alert.name() + "#" + scheduledTime.toDateTimeISO();
this.name = alert.name();
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = alert.trigger();
this.actions = alert.actions();
this.state = state;
this.state = State.AWAITS_EXECUTION;
this.metadata = alert.metadata();
this.version = 1;
}
@ -73,107 +65,59 @@ public class FiredAlert implements ToXContent {
return id;
}
public void id(String id) {
this.id = id;
}
public void finalize(Alert alert, AlertsService.AlertRun alertRun) {
assert finalized.compareAndSet(false, true) : "finalizing an fired alert should only be done once";
this.alertRun = alertRun;
if (alertRun.triggerResult().triggered()) {
if (alertRun.throttleResult().throttle()) {
state = State.THROTTLED;
} else {
state = State.ACTION_PERFORMED;
}
transform = alert.transform();
} else {
state = State.NO_ACTION_NEEDED;
}
}
public static String firedAlertId(Alert alert, DateTime dateTime) {
return alert.name() + "#" + dateTime.toDateTimeISO();
}
public DateTime scheduledTime() {
return scheduledTime;
}
public void scheduledTime(DateTime scheduledTime) {
this.scheduledTime = scheduledTime;
}
public String name() {
return name;
}
public void name(String name) {
this.name = name;
}
public DateTime fireTime() {
return fireTime;
}
public void fireTime(DateTime fireTime) {
this.fireTime = fireTime;
}
public Trigger trigger() {
return trigger;
}
public void trigger(Trigger trigger) {
this.trigger = trigger;
}
public Actions actions() {
return actions;
}
public void actions(Actions actions) {
this.actions = actions;
}
public State state() {
return state;
}
public void state(State state) {
this.state = state;
}
public long version() {
return version;
}
public void version(long version) {
this.version = version;
}
public String errorMessage(){
return this.errorMessage;
}
public void errorMessage(String errorMessage) {
this.errorMessage = errorMessage;
public String message(){
return this.message;
}
public Map<String, Object> metadata() {
return metadata;
}
public void metadata(Map<String, Object> metadata) {
this.metadata = metadata;
public long version() {
return version;
}
public Transform transform() {
return transform;
void version(long version) {
this.version = version;
}
public void transform(Transform transform) {
this.transform = transform;
public void update(State state, @Nullable String message) {
this.state = state;
this.message = message;
}
public void update(AlertExecution execution) {
assert sealed.compareAndSet(false, true) : "sealing an fired alert should only be done once";
this.execution = execution;
if (execution.triggerResult().triggered()) {
if (execution.throttleResult().throttle()) {
state = State.THROTTLED;
} else {
state = State.ACTION_PERFORMED;
}
} else {
state = State.NO_ACTION_NEEDED;
}
}
@Override
@ -182,22 +126,18 @@ public class FiredAlert implements ToXContent {
historyEntry.field(Parser.ALERT_NAME_FIELD.getPreferredName(), name);
historyEntry.field(Parser.FIRE_TIME_FIELD.getPreferredName(), fireTime.toDateTimeISO());
historyEntry.field(Parser.SCHEDULED_FIRE_TIME_FIELD.getPreferredName(), scheduledTime.toDateTimeISO());
historyEntry.startObject(Parser.TRIGGER_FIELD.getPreferredName()).field(trigger.type(), trigger, params).endObject();
historyEntry.field(Parser.ACTIONS_FIELD.getPreferredName(), actions, params);
historyEntry.startObject(Alert.Parser.TRIGGER_FIELD.getPreferredName()).field(trigger.type(), trigger, params).endObject();
historyEntry.field(Parser.STATE_FIELD.getPreferredName(), state.toString());
if (transform != null) {
historyEntry.startObject(Parser.TRANSFORM_FIELD.getPreferredName()).field(transform.type(), transform, params).endObject();
}
if (errorMessage != null) {
historyEntry.field(Parser.ERROR_MESSAGE_FIELD.getPreferredName(), errorMessage);
if (message != null) {
historyEntry.field(Parser.MESSAGE_FIELD.getPreferredName(), message);
}
if (metadata != null) {
historyEntry.field(Parser.METADATA_FIELD.getPreferredName(), metadata);
}
if (alertRun != null) {
historyEntry.field(Parser.ALERT_RUN_FIELD.getPreferredName(), alertRun);
if (execution != null) {
historyEntry.field(Parser.ALERT_EXECUTION_FIELD.getPreferredName(), execution);
}
historyEntry.endObject();
@ -227,7 +167,7 @@ public class FiredAlert implements ToXContent {
public enum State {
AWAITS_RUN,
AWAITS_EXECUTION,
RUNNING,
NO_ACTION_NEEDED,
ACTION_PERFORMED,
@ -235,10 +175,10 @@ public class FiredAlert implements ToXContent {
THROTTLED;
@Override
public String toString(){
public String toString() {
switch (this) {
case AWAITS_RUN:
return "AWAITS_RUN";
case AWAITS_EXECUTION:
return "AWAITS_EXECUTION";
case RUNNING:
return "RUNNING";
case NO_ACTION_NEEDED:
@ -254,10 +194,10 @@ public class FiredAlert implements ToXContent {
}
}
public static State fromString(String s) {
switch(s.toUpperCase()) {
case "AWAITS_RUN":
return AWAITS_RUN;
public static State fromString(String value) {
switch(value.toUpperCase()) {
case "AWAITS_EXECUTION":
return AWAITS_EXECUTION;
case "RUNNING":
return RUNNING;
case "NO_ACTION_NEEDED":
@ -269,7 +209,7 @@ public class FiredAlert implements ToXContent {
case "THROTTLED":
return THROTTLED;
default:
throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" );
throw new ElasticsearchIllegalArgumentException("unknown fired alert state [" + value + "]");
}
}
@ -280,23 +220,18 @@ public class FiredAlert implements ToXContent {
public static final ParseField ALERT_NAME_FIELD = new ParseField("alert_name");
public static final ParseField FIRE_TIME_FIELD = new ParseField("fire_time");
public static final ParseField SCHEDULED_FIRE_TIME_FIELD = new ParseField("scheduled_fire_time");
public static final ParseField ERROR_MESSAGE_FIELD = new ParseField("error_msg");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField TRANSFORM_FIELD = new ParseField("transform");
public static final ParseField ACTIONS_FIELD = new ParseField("actions");
public static final ParseField MESSAGE_FIELD = new ParseField("message");
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField METADATA_FIELD = new ParseField("meta");
public static final ParseField ALERT_RUN_FIELD = new ParseField("alert_run");
public static final ParseField ALERT_EXECUTION_FIELD = new ParseField("alert_execution");
private final TriggerRegistry triggerRegistry;
private final TransformRegistry transformRegistry;
private final ActionRegistry actionRegistry;
@Inject
public Parser(Settings settings, TriggerRegistry triggerRegistry, TransformRegistry transformRegistry, ActionRegistry actionRegistry) {
public Parser(Settings settings, TriggerRegistry triggerRegistry, ActionRegistry actionRegistry) {
super(settings);
this.triggerRegistry = triggerRegistry;
this.transformRegistry = transformRegistry;
this.actionRegistry = actionRegistry;
}
@ -309,44 +244,37 @@ public class FiredAlert implements ToXContent {
}
public FiredAlert parse(XContentParser parser, String id, long version) throws IOException {
FiredAlert entry = new FiredAlert();
entry.id(id);
entry.version(version);
FiredAlert alert = new FiredAlert();
alert.id = id;
alert.version = version;
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
if (ACTIONS_FIELD.match(currentFieldName)) {
entry.actions(actionRegistry.parseActions(parser));
} else {
throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (TRIGGER_FIELD.match(currentFieldName)) {
entry.trigger(triggerRegistry.parse(parser));
} else if (TRANSFORM_FIELD.match(currentFieldName)) {
entry.transform(transformRegistry.parse(parser));
if (Alert.Parser.TRIGGER_FIELD.match(currentFieldName)) {
alert.trigger = triggerRegistry.parse(parser);
} else if (METADATA_FIELD.match(currentFieldName)) {
entry.metadata(parser.map());
} else if (ALERT_RUN_FIELD.match(currentFieldName)) {
entry.alertRun = AlertsService.AlertRun.Parser.parse(parser, triggerRegistry, actionRegistry);
alert.metadata = parser.map();
} else if (ALERT_EXECUTION_FIELD.match(currentFieldName)) {
alert.execution = AlertExecution.Parser.parse(parser, triggerRegistry, actionRegistry);
} else {
throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]");
}
} else if (token.isValue()) {
if (ALERT_NAME_FIELD.match(currentFieldName)) {
entry.name(parser.text());
alert.name = parser.text();
} else if (FIRE_TIME_FIELD.match(currentFieldName)) {
entry.fireTime(DateTime.parse(parser.text()));
alert.fireTime = DateTime.parse(parser.text());
} else if (SCHEDULED_FIRE_TIME_FIELD.match(currentFieldName)) {
entry.scheduledTime(DateTime.parse(parser.text()));
} else if (ERROR_MESSAGE_FIELD.match(currentFieldName)) {
entry.errorMessage(parser.textOrNull());
alert.scheduledTime = DateTime.parse(parser.text());
} else if (MESSAGE_FIELD.match(currentFieldName)) {
alert.message = parser.textOrNull();
} else if (STATE_FIELD.match(currentFieldName)) {
entry.state(State.fromString(parser.text()));
alert.state = State.fromString(parser.text());
} else {
throw new AlertsException("unable to parse fired alert. unexpected field [" + currentFieldName + "]");
}
@ -355,7 +283,7 @@ public class FiredAlert implements ToXContent {
}
}
return entry;
return alert;
}
}
}

View File

@ -7,6 +7,11 @@ package org.elasticsearch.alerts.history;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.alerts.*;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.throttle.Throttler;
import org.elasticsearch.alerts.transform.Transform;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.component.AbstractComponent;
@ -18,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -28,22 +34,22 @@ public class HistoryService extends AbstractComponent {
private final HistoryStore historyStore;
private final ThreadPool threadPool;
private final AlertsStore alertsStore;
private final AlertLockService alertLockService;
private final AtomicBoolean started = new AtomicBoolean(false);
private AlertsService alertsService;
// Holds fired alerts that were fired before on a different elected master node, but never had the chance to run.
private volatile ImmutableList<FiredAlert> previousFiredAlerts;
private volatile ImmutableList<FiredAlert> previousFiredAlerts = ImmutableList.of();
@Inject
public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, AlertsStore alertsStore) {
public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool,
AlertsStore alertsStore, AlertLockService alertLockService, Scheduler scheduler) {
super(settings);
this.historyStore = historyStore;
this.threadPool = threadPool;
this.alertsStore = alertsStore;
}
public void setAlertsService(AlertsService alertsService){
this.alertsService = alertsService;
this.alertLockService = alertLockService;
scheduler.addListener(new SchedulerListener());
}
public boolean start(ClusterState state) {
@ -52,41 +58,41 @@ public class HistoryService extends AbstractComponent {
}
assert alertsThreadPool().getQueue().isEmpty() : "queue should be empty, but contains " + alertsThreadPool().getQueue().size() + " elements.";
HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state);
if (loadResult.succeeded()) {
if (!loadResult.notRanFiredAlerts().isEmpty()) {
this.previousFiredAlerts = ImmutableList.copyOf(loadResult.notRanFiredAlerts());
logger.debug("loaded [{}] actions from the alert history index into actions queue", previousFiredAlerts.size());
}
logger.debug("starting history service");
if (started.compareAndSet(false, true)) {
if (alertsThreadPool().isShutdown()) {
logger.info("Restarting thread pool that had been shutdown");
// this update threadpool settings work around is for restarting the alerts thread pool,
// that creates a new alerts thread pool and cleans up the existing one that has previously been shutdown.
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
/***
*TODO Horrible horrible hack to make sure that settings are always different from the previous settings
*
* THIS NEEDS TO CHANGE ASAP
*/
int queueSize = alertsThreadPool().getQueue().remainingCapacity();
if (queueSize % 2 == 0){
queueSize = queueSize + 1;
} else {
queueSize = queueSize - 1;
}
//TODO END HORRIBLE HACK
threadPool.updateSettings(AlertsPlugin.alertThreadPoolSettings(availableProcessors, queueSize));
assert !alertsThreadPool().isShutdown();
}
logger.debug("started history service");
}
return true;
} else {
HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state, FiredAlert.State.AWAITS_EXECUTION);
if (!loadResult.succeeded()) {
return false;
}
this.previousFiredAlerts = ImmutableList.copyOf(loadResult);
if (!previousFiredAlerts.isEmpty()) {
logger.debug("loaded [{}] actions from the alert history index into actions queue", previousFiredAlerts.size());
}
logger.debug("starting history service");
if (started.compareAndSet(false, true)) {
if (alertsThreadPool().isShutdown()) {
logger.info("Restarting thread pool that had been shutdown");
// this update thread pool settings work around is for restarting the alerts thread pool,
// that creates a new alerts thread pool and cleans up the existing one that has previously been shutdown.
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
/***
*TODO Horrible horrible hack to make sure that settings are always different from the previous settings
*
* THIS NEEDS TO CHANGE ASAP
*/
int queueSize = alertsThreadPool().getQueue().remainingCapacity();
if (queueSize % 2 == 0){
queueSize = queueSize + 1;
} else {
queueSize = queueSize - 1;
}
//TODO END HORRIBLE HACK
threadPool.updateSettings(AlertsPlugin.alertThreadPoolSettings(availableProcessors, queueSize));
assert !alertsThreadPool().isShutdown();
}
logger.debug("started history service");
}
executePreviouslyFiredAlerts();
return true;
}
public void stop() {
@ -104,31 +110,6 @@ public class HistoryService extends AbstractComponent {
return started.get();
}
// We can only process previosly fired alerts if the alert service has gone into a started state,
// so we let the alert service execute this method when it gets into that state.
// TODO: We maybe have a AlertServiceStateListener interface for component that are interrested in when the state
// of alerts changes then these components can register themselves.
public void executePreviouslyFiredAlerts() {
ImmutableList<FiredAlert> firedAlerts = this.previousFiredAlerts;
if (firedAlerts != null) {
this.previousFiredAlerts = null;
for (FiredAlert firedAlert : firedAlerts) {
innerExecute(firedAlert);
}
}
}
public void alertFired(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, fireTime, FiredAlert.State.AWAITS_RUN);
logger.debug("adding fired alert [{}]", alert.name());
historyStore.put(firedAlert);
innerExecute(firedAlert);
}
// TODO: should be removed from the stats api? This is already visible in the thread pool cat api.
public long getQueueSize() {
return alertsThreadPool().getQueue().size();
@ -139,63 +120,140 @@ public class HistoryService extends AbstractComponent {
return alertsThreadPool().getLargestPoolSize();
}
private void innerExecute(FiredAlert firedAlert) {
void execute(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, fireTime);
logger.debug("adding fired alert [{}]", alert.name());
historyStore.put(firedAlert);
execute(firedAlert);
}
void execute(FiredAlert firedAlert) {
try {
if (alertsThreadPool().isShutdown()) {
throw new AlertsException("attempting to add to a shutdown thread pool");
}
alertsThreadPool().execute(new AlertHistoryRunnable(firedAlert));
alertsThreadPool().execute(new AlertExecutionTask(firedAlert));
} catch (EsRejectedExecutionException e) {
logger.debug("[{}] failed to execute fired alert", firedAlert.name());
firedAlert.state(FiredAlert.State.FAILED);
firedAlert.errorMessage("failed to run fired alert due to thread pool capacity");
firedAlert.update(FiredAlert.State.FAILED, "failed to run fired alert due to thread pool capacity");
historyStore.update(firedAlert);
}
}
void executePreviouslyFiredAlerts() {
ImmutableList<FiredAlert> firedAlerts = this.previousFiredAlerts;
if (firedAlerts != null) {
this.previousFiredAlerts = ImmutableList.of();
for (FiredAlert firedAlert : firedAlerts) {
execute(firedAlert);
}
}
}
private EsThreadPoolExecutor alertsThreadPool() {
return (EsThreadPoolExecutor) threadPool.executor(AlertsPlugin.NAME);
}
private final class AlertHistoryRunnable implements Runnable {
private final class AlertExecutionTask implements Runnable {
private final FiredAlert alert;
private final FiredAlert firedAlert;
private AlertHistoryRunnable(FiredAlert alert) {
this.alert = alert;
private AlertExecutionTask(FiredAlert firedAlert) {
this.firedAlert = firedAlert;
}
@Override
public void run() {
try {
Alert alert = alertsStore.getAlert(this.alert.name());
Alert alert = alertsStore.getAlert(firedAlert.name());
if (alert == null) {
this.alert.errorMessage("alert was not found in the alerts store");
this.alert.state(FiredAlert.State.FAILED);
historyStore.update(this.alert);
return;
firedAlert.update(FiredAlert.State.FAILED, "alert was not found in the alerts store");
} else {
this.firedAlert.update(FiredAlert.State.RUNNING, null);
logger.debug("executing alert [{}]", this.firedAlert.name());
AlertExecution alertExecution = execute(alert, this.firedAlert);
this.firedAlert.update(alertExecution);
}
this.alert.state(FiredAlert.State.RUNNING);
historyStore.update(this.alert);
logger.debug("running an alert [{}]", this.alert.name());
AlertsService.AlertRun alertRun = alertsService.runAlert(this.alert);
this.alert.finalize(alert, alertRun);
historyStore.update(this.alert);
historyStore.update(this.firedAlert);
} catch (Exception e) {
if (started()) {
logger.warn("failed to run alert [{}]", e, alert.name());
logger.warn("failed to run alert [{}]", e, firedAlert.name());
try {
alert.errorMessage(e.getMessage());
alert.state(FiredAlert.State.FAILED);
historyStore.update(alert);
firedAlert.update(FiredAlert.State.FAILED, e.getMessage());
historyStore.update(firedAlert);
} catch (Exception e2) {
logger.error("failed to update fired alert [{}] with the error message", e2, alert);
logger.error("failed to update fired alert [{}] with the error message", e2, firedAlert);
}
} else {
logger.debug("failed to execute fired alert [{}] after shutdown", e, alert);
logger.debug("failed to execute fired alert [{}] after shutdown", e, firedAlert);
}
}
}
/*
The execution of an alert is split into operations, a schedule part which just makes sure that store the fact an alert
has fired and an execute part which actually executed the alert.
The reason this is split into two operations is that we don't want to lose the fact an alert has fired. If we
would not split the execution of an alert and many alerts fire in a small window of time then it can happen that
thread pool that receives fired jobs from the quartz scheduler is going to reject jobs and then we would never
know about jobs that have fired. By splitting the execution of fired jobs into two operations we lower the chance
we lose fired jobs signficantly.
*/
AlertExecution execute(Alert alert, FiredAlert firedAlert) throws IOException {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
AlertLockService.Lock lock = alertLockService.acquire(alert.name());
try {
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, firedAlert.fireTime(), firedAlert.scheduledTime());
Trigger.Result triggerResult = alert.trigger().execute(ctx);
ctx.onTriggerResult(triggerResult);
if (triggerResult.triggered()) {
Throttler.Result throttleResult = alert.throttler().throttle(ctx, triggerResult);
ctx.onThrottleResult(throttleResult);
if (!throttleResult.throttle()) {
Transform.Result result = alert.transform().apply(ctx, triggerResult.payload());
ctx.onTransformResult(result);
for (Action action : alert.actions()) {
Action.Result actionResult = action.execute(ctx, result.payload());
ctx.onActionResult(actionResult);
}
}
}
return ctx.finish();
} finally {
lock.release();
}
}
}
private class SchedulerListener implements Scheduler.Listener {
@Override
public void fire(String name, DateTime scheduledFireTime, DateTime fireTime) {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
Alert alert = alertsStore.getAlert(name);
if (alert == null) {
logger.warn("unable to find [{}] in the alert store, perhaps it has been deleted", name);
return;
}
try {
execute(alert, scheduledFireTime, fireTime);
} catch (Exception e) {
logger.error("failed to fire alert [{}]", e, alert);
}
}
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
@ -37,7 +38,7 @@ import java.util.List;
public class HistoryStore extends AbstractComponent {
public static final String ALERT_HISTORY_INDEX_PREFIX = ".alert_history_";
public static final String ALERT_HISTORY_TYPE = "alerthistory";
public static final String ALERT_HISTORY_TYPE = "fired_alert";
static final DateTimeFormatter alertHistoryIndexTimeFormat = DateTimeFormat.forPattern("YYYY-MM-dd");
@ -84,10 +85,10 @@ public class HistoryStore extends AbstractComponent {
}
}
public LoadResult loadFiredAlerts(ClusterState state) {
public LoadResult loadFiredAlerts(ClusterState state, FiredAlert.State firedAlertState) {
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
if (indices.length == 0) {
logger.info("No previous .alerthistory index, skip loading of alert actions");
logger.info("No .alert_history indices found, skip loading of alert actions");
templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
return new LoadResult(true);
}
@ -110,7 +111,7 @@ public class HistoryStore extends AbstractComponent {
}
SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.AWAITS_RUN.toString()))
.setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), firedAlertState.toString()))
.setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout)
.setSize(scrollSize)
@ -130,7 +131,7 @@ public class HistoryStore extends AbstractComponent {
for (SearchHit sh : response.getHits()) {
String historyId = sh.getId();
FiredAlert historyEntry = alertRecordParser.parse(sh.getSourceRef(), historyId, sh.version());
assert historyEntry.state() == FiredAlert.State.AWAITS_RUN;
assert historyEntry.state() == FiredAlert.State.AWAITS_EXECUTION;
logger.debug("loaded fired alert from index [{}/{}/{}]", sh.index(), sh.type(), sh.id());
alerts.add(historyEntry);
}
@ -151,27 +152,28 @@ public class HistoryStore extends AbstractComponent {
return ALERT_HISTORY_INDEX_PREFIX + alertHistoryIndexTimeFormat.print(time);
}
public class LoadResult {
public class LoadResult implements Iterable<FiredAlert> {
private final boolean succeeded;
private final List<FiredAlert> notRanFiredAlerts;
private final List<FiredAlert> alerts;
public LoadResult(boolean succeeded, List<FiredAlert> notRanFiredAlerts) {
public LoadResult(boolean succeeded, List<FiredAlert> alerts) {
this.succeeded = succeeded;
this.notRanFiredAlerts = notRanFiredAlerts;
this.alerts = alerts;
}
public LoadResult(boolean succeeded) {
this.succeeded = succeeded;
this.notRanFiredAlerts = Collections.emptyList();
this.alerts = Collections.emptyList();
}
@Override
public Iterator<FiredAlert> iterator() {
return alerts.iterator();
}
public boolean succeeded() {
return succeeded;
}
public List<FiredAlert> notRanFiredAlerts() {
return notRanFiredAlerts;
}
}
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.trigger.Trigger;
import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
@ -16,9 +16,9 @@ import static org.elasticsearch.alerts.support.AlertsDateUtils.formatDate;
public class AckThrottler implements Throttler {
@Override
public Result throttle(AlertContext ctx, Trigger.Result result) {
if (ctx.alert().status().acked()) {
return Result.throttle("alert [" + ctx.alert().name() + "] was acked at [" + formatDate(ctx.alert().status().ack().timestamp()) + "]");
public Result throttle(ExecutionContext ctx, Trigger.Result result) {
if (ctx.alert().acked()) {
return Result.throttle("alert [" + ctx.alert().name() + "] was acked at [" + formatDate(ctx.alert().status().ackStatus().timestamp()) + "]");
}
return Result.NO;
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
@ -24,7 +24,7 @@ public class AlertThrottler implements Throttler {
}
@Override
public Result throttle(AlertContext ctx, Trigger.Result result) {
public Result throttle(ExecutionContext ctx, Trigger.Result result) {
if (periodThrottler != null) {
Result throttleResult = periodThrottler.throttle(ctx, result);
if (throttleResult.throttle()) {

View File

@ -6,7 +6,7 @@
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.joda.time.PeriodType;
import org.elasticsearch.common.unit.TimeValue;
@ -33,7 +33,7 @@ public class PeriodThrottler implements Throttler {
}
@Override
public Result throttle(AlertContext ctx, Trigger.Result result) {
public Result throttle(ExecutionContext ctx, Trigger.Result result) {
Alert.Status status = ctx.alert().status();
if (status.lastRan() != null) {
TimeValue timeElapsed = new TimeValue(System.currentTimeMillis() - status.lastExecuted().getMillis());

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.throttle;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.ParseField;
@ -16,18 +16,19 @@ public interface Throttler {
public static final Throttler NO_THROTTLE = new Throttler() {
@Override
public Result throttle(AlertContext ctx, Trigger.Result result) {
public Result throttle(ExecutionContext ctx, Trigger.Result result) {
return Result.NO;
}
};
Result throttle(AlertContext ctx, Trigger.Result result);
Result throttle(ExecutionContext ctx, Trigger.Result result);
static class Result {
public static ParseField THROTTLE_FIELD = new ParseField("throttle");
public static ParseField REASON_FIELD = new ParseField("reason");
static final Result NO = new Result(false, null);
public static final Result NO = new Result(false, null);
private final boolean throttle;
private final String reason;

View File

@ -7,7 +7,7 @@ package org.elasticsearch.alerts.transform;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
@ -58,7 +58,7 @@ public class SearchTransform implements Transform {
}
@Override
public Transform.Result apply(AlertContext ctx, Payload payload) throws IOException {
public Transform.Result apply(ExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = createRequest(request, ctx.scheduledTime(), ctx.fireTime(), payload.data());
SearchResponse resp = client.search(req).actionGet();
return new Transform.Result(TYPE, new Payload.ActionResponse(resp));

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.transform;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -25,7 +25,7 @@ public interface Transform extends ToXContent {
}
@Override
public Result apply(AlertContext context, Payload payload) throws IOException {
public Result apply(ExecutionContext context, Payload payload) throws IOException {
return new Result("noop", payload);
}
@ -37,7 +37,7 @@ public interface Transform extends ToXContent {
String type();
Result apply(AlertContext context, Payload payload) throws IOException;
Result apply(ExecutionContext context, Payload payload) throws IOException;
static class Result {

View File

@ -57,7 +57,7 @@ public class TransportAlertsStatsAction extends TransportMasterNodeOperationActi
@Override
protected void masterOperation(AlertsStatsRequest request, ClusterState state, ActionListener<AlertsStatsResponse> listener) throws ElasticsearchException {
AlertsStatsResponse statsResponse = new AlertsStatsResponse();
statsResponse.setAlertManagerState(alertsService.getState());
statsResponse.setAlertManagerState(alertsService.state());
statsResponse.setAlertActionManagerStarted(historyService.started());
statsResponse.setAlertActionManagerQueueSize(historyService.getQueueSize());
statsResponse.setNumberOfRegisteredAlerts(alertsService.getNumberOfAlerts());

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.trigger;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.logging.ESLogger;
@ -34,7 +34,7 @@ public abstract class Trigger<R extends Trigger.Result> implements ToXContent {
/**
* Executes this trigger
*/
public abstract R execute(AlertContext ctx) throws IOException;
public abstract R execute(ExecutionContext ctx) throws IOException;
/**

View File

@ -70,8 +70,7 @@ public class ScriptSearchTrigger extends SearchTrigger {
builder.field(ScriptService.SCRIPT_INLINE.getPreferredName(), script);
builder.field(Parser.SCRIPT_TYPE_FIELD.getPreferredName(), scriptType);
builder.field(ScriptService.SCRIPT_LANG.getPreferredName(), scriptLang);
builder.endObject();
return builder;
return builder.endObject();
}
public static class Parser extends AbstractComponent implements SearchTrigger.Parser<ScriptSearchTrigger> {

View File

@ -7,7 +7,7 @@ package org.elasticsearch.alerts.trigger.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
@ -35,7 +35,7 @@ public abstract class SearchTrigger extends Trigger<SearchTrigger.Result> {
}
@Override
public Result execute(AlertContext ctx) throws IOException {
public Result execute(ExecutionContext ctx) throws IOException {
SearchRequest request = AlertUtils.createSearchRequestWithTimes(this.request, ctx.scheduledTime(), ctx.fireTime(), scriptService);
if (logger.isTraceEnabled()) {
logger.trace("running query for [{}]", ctx.alert().name(), XContentHelper.convertToJson(request.source(), false, true));

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.trigger.simple;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerException;
@ -38,7 +38,7 @@ public class SimpleTrigger extends Trigger<SimpleTrigger.Result> {
}
@Override
public Result execute(AlertContext ctx) throws IOException {
public Result execute(ExecutionContext ctx) throws IOException {
return new Result(payload);
}

View File

@ -8,7 +8,7 @@
"index.mapper.dynamic" : false
},
"mappings": {
"alerthistory": {
"fired_alert": {
"dynamic" : "strict",
"properties": {
"alert_name": {
@ -28,7 +28,7 @@
"type": "string",
"index": "not_analyzed"
},
"error_msg": {
"message": {
"type": "string"
},
"trigger" : {
@ -41,7 +41,7 @@
"enabled" : false,
"dynamic" : true
},
"alert_run" : {
"alert_execution" : {
"type" : "object",
"enabled" : false,
"dynamic" : true

View File

@ -126,7 +126,9 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
}
builder.startObject();
builder.startObject("schedule").field("cron", cron).endObject();
builder.startObject("schedule")
.field("cron", cron)
.endObject();
if (metadata != null) {
builder.field("meta", metadata);
@ -276,7 +278,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
.get();
assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed));
if (assertTriggerSearchMatched) {
assertThat((Integer) XContentMapValues.extractValue("alert_run.trigger_result.script.payload.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1));
assertThat((Integer) XContentMapValues.extractValue("alert_execution.trigger_result.script.payload.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1));
}
}
});
@ -425,7 +427,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
// on these nodes
for (String node : nodes) {
AlertsService alertsService = _testCluster.getInstance(AlertsService.class, node);
assertThat(alertsService.getState(), equalTo(AlertsService.State.STOPPED));
assertThat(alertsService.state(), equalTo(AlertsService.State.STOPPED));
alertsService.stop(); // Prevents these nodes from starting alerting when new elected master node is picked.
}
@ -435,7 +437,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertsService.getState(), not(equalTo(AlertsService.State.STARTING)));
assertThat(alertsService.state(), not(equalTo(AlertsService.State.STARTING)));
}
});
} catch (Exception e) {
@ -446,7 +448,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(alertsService.getState(), equalTo(AlertsService.State.STOPPED));
assertThat(alertsService.state(), equalTo(AlertsService.State.STOPPED));
}
});
} catch (Exception e) {

View File

@ -58,7 +58,7 @@ public class AlertSerializationTest extends AbstractAlertingTests {
assertEqualByGeneratedXContent(parsedAlert.schedule(), alert.schedule());
assertEqualByGeneratedXContent(parsedAlert.trigger(), alert.trigger());
assertEquals(parsedAlert.throttlePeriod().getMillis(), alert.throttlePeriod().getMillis());
assertEquals(parsedAlert.status().ack().state(), alert.status().ack().state());
assertEquals(parsedAlert.status().ackStatus().state(), alert.status().ackStatus().state());
assertEquals(parsedAlert.metadata().get("foo"), "bar");
}

View File

@ -87,7 +87,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
Thread.sleep(20000);
AckAlertResponse ackResponse = alertsClient.prepareAckAlert("throttled-alert").get();
assertEquals(Alert.Status.Ack.State.ACKED, ackResponse.getStatus().ack().state());
assertEquals(Alert.Status.AckStatus.State.ACKED, ackResponse.getStatus().ackStatus().state());
refresh();
SearchResponse searchResponse = client()
@ -123,7 +123,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
Alert parsedAlert = alertParser.parse(getAlertResponse.getResponse().getId(), true,
getAlertResponse.getResponse().getSourceAsBytesRef());
assertThat(parsedAlert.status().ack().state(), equalTo(Alert.Status.Ack.State.AWAITS_EXECUTION));
assertThat(parsedAlert.status().ackStatus().state(), equalTo(Alert.Status.AckStatus.State.AWAITS_EXECUTION));
CountResponse countOfThrottledActions = client()
.prepareCount(HistoryStore.ALERT_HISTORY_INDEX_PREFIX + "*")

View File

@ -88,7 +88,7 @@ public class BootStrapTest extends AbstractAlertingTests {
);
DateTime scheduledFireTime = new DateTime(DateTimeZone.UTC);
FiredAlert entry = new FiredAlert(alert, scheduledFireTime, scheduledFireTime, FiredAlert.State.AWAITS_RUN);
FiredAlert entry = new FiredAlert(alert, scheduledFireTime, scheduledFireTime);
String actionHistoryIndex = HistoryStore.getAlertHistoryIndexNameForTime(scheduledFireTime);
createIndex(actionHistoryIndex);
@ -145,7 +145,7 @@ public class BootStrapTest extends AbstractAlertingTests {
PutAlertResponse putAlertResponse = alertClient().preparePutAlert(alert.name()).setAlertSource(jsonBuilder.bytes()).get();
assertTrue(putAlertResponse.indexResponse().isCreated());
FiredAlert entry = new FiredAlert(alert, historyIndexDate, historyIndexDate, FiredAlert.State.AWAITS_RUN);
FiredAlert entry = new FiredAlert(alert, historyIndexDate, historyIndexDate);
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryStore.ALERT_HISTORY_TYPE, entry.id())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(XContentFactory.jsonBuilder().value(entry))

View File

@ -142,7 +142,7 @@ public class NoMasterNodeTests extends AbstractAlertingTests {
}), equalTo(true));
// Ensure that the alert manager doesn't run elsewhere
for (AlertsService alertsService : internalTestCluster().getInstances(AlertsService.class)) {
assertThat(alertsService.getState(), is(AlertsService.State.STOPPED));
assertThat(alertsService.state(), is(AlertsService.State.STOPPED));
}
}

View File

@ -6,7 +6,7 @@
package org.elasticsearch.alerts.actions.email;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertContext;
import org.elasticsearch.alerts.ExecutionContext;
import org.elasticsearch.alerts.Payload;
import org.elasticsearch.alerts.actions.email.service.Authentication;
import org.elasticsearch.alerts.actions.email.service.Email;
@ -29,9 +29,11 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
*/
@ -41,40 +43,17 @@ public class EmailActionTest extends ElasticsearchTestCase {
//createIndex("my-trigger-index");
StringTemplateUtils.Template template =
new StringTemplateUtils.Template("{{alert_name}} triggered with {{response.hits.total}} hits");
List<InternetAddress> addresses = new ArrayList<>();
addresses.addAll(Arrays.asList(InternetAddress.parse("you@foo.com")));
Settings settings = ImmutableSettings.settingsBuilder().build();
MustacheScriptEngineService mustacheScriptEngineService = new MustacheScriptEngineService(settings);
ThreadPool tp;
tp = new ThreadPool(ThreadPool.Names.SAME);
ThreadPool threadPool = new ThreadPool(ThreadPool.Names.SAME);
Set<ScriptEngineService> engineServiceSet = new HashSet<>();
engineServiceSet.add(mustacheScriptEngineService);
ScriptService scriptService = new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, tp));
ScriptService scriptService = new ScriptService(settings, new Environment(), engineServiceSet, new ResourceWatcherService(settings, threadPool));
StringTemplateUtils stringTemplateUtils = new StringTemplateUtils(settings, ScriptServiceProxy.of(scriptService));
EmailService alwaysSuccessEmailService = new EmailService() {
@Override
public void start(ClusterState state) {
}
@Override
public void stop() {
}
@Override
public EmailSent send(Email email, Authentication auth, Profile profile) {
return new EmailSent(auth.username(), email);
}
@Override
public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) {
return new EmailSent(accountName, email);
}
};
EmailService emailService = new EmailServiceMock();
Email.Address from = new Email.Address("from@test.com");
List<Email.Address> emailAddressList = new ArrayList<>();
@ -86,7 +65,7 @@ public class EmailActionTest extends ElasticsearchTestCase {
emailBuilder.from(from);
emailBuilder.to(to);
EmailAction emailAction = new EmailAction(logger, alwaysSuccessEmailService, stringTemplateUtils, emailBuilder,
EmailAction emailAction = new EmailAction(logger, emailService, stringTemplateUtils, emailBuilder,
new Authentication("testname", "testpassword"), Profile.STANDARD, "testaccount", template, template, null, true);
//This is ok since the execution of the action only relies on the alert name
@ -100,12 +79,10 @@ public class EmailActionTest extends ElasticsearchTestCase {
null,
new Alert.Status()
);
AlertContext ctx = new AlertContext(alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext("test-serialization#1", alert, new DateTime(), new DateTime());
EmailAction.Result result = emailAction.execute(ctx, new Payload.Simple());
tp.shutdownNow();
threadPool.shutdownNow();
assertTrue(result.success());
@ -114,7 +91,28 @@ public class EmailActionTest extends ElasticsearchTestCase {
assertArrayEquals(success.email().to().toArray(), to.toArray() );
assertEquals(success.email().from(), from);
//@TODO add more here
}
static class EmailServiceMock implements EmailService {
@Override
public void start(ClusterState state) {
}
@Override
public void stop() {
}
@Override
public EmailSent send(Email email, Authentication auth, Profile profile) {
return new EmailSent(auth.username(), email);
}
@Override
public EmailSent send(Email email, Authentication auth, Profile profile, String accountName) {
return new EmailSent(accountName, email);
}
}
}

View File

@ -27,7 +27,7 @@ public class FiredAlertTest extends AbstractAlertingTests {
public void testFiredAlertParser() throws Exception {
Alert alert = createTestAlert("fired_test");
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime(), FiredAlert.State.AWAITS_RUN);
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0);
@ -43,14 +43,14 @@ public class FiredAlertTest extends AbstractAlertingTests {
@Test
public void testFinalizedFiredAlertParser() throws Exception {
Alert alert = createTestAlert("fired_test");
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime(), FiredAlert.State.AWAITS_RUN);
AlertContext ctx = new AlertContext(alert, new DateTime(), new DateTime());
ctx.addActionResult(new EmailAction.Result.Failure("failed to send because blah"));
ctx.addActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}"));
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime());
ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah"));
ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}"));
Trigger.Result triggerResult = new SimpleTrigger.Result(new Payload.Simple());
ctx.throttleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult));
ctx.triggerResult(triggerResult);
firedAlert.finalize(alert, new AlertsService.AlertRun(ctx));
ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult));
ctx.onTriggerResult(triggerResult);
firedAlert.update(new AlertExecution(ctx));
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
logger.error("FOO : " + jsonBuilder.bytes().toUtf8());
@ -63,14 +63,14 @@ public class FiredAlertTest extends AbstractAlertingTests {
@Test
public void testFinalizedFiredAlertParserScriptSearchTrigger() throws Exception {
Alert alert = createTestAlert("fired_test");
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime(), FiredAlert.State.AWAITS_RUN);
AlertContext ctx = new AlertContext(alert, new DateTime(), new DateTime());
ctx.addActionResult(new EmailAction.Result.Failure("failed to send because blah"));
ctx.addActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}"));
FiredAlert firedAlert = new FiredAlert(alert, new DateTime(), new DateTime());
ExecutionContext ctx = new ExecutionContext(firedAlert.id(), alert, new DateTime(), new DateTime());
ctx.onActionResult(new EmailAction.Result.Failure("failed to send because blah"));
ctx.onActionResult(new WebhookAction.Result.Executed(300, "http://localhost:8000/alertfoo", "{'awesome' : 'us'}"));
Trigger.Result triggerResult = new SearchTrigger.Result(ScriptSearchTrigger.TYPE, true, createTriggerSearchRequest(), new Payload.Simple());
ctx.throttleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult));
ctx.triggerResult(triggerResult);
firedAlert.finalize(alert, new AlertsService.AlertRun(ctx));
ctx.onThrottleResult(Throttler.NO_THROTTLE.throttle(ctx, triggerResult));
ctx.onTriggerResult(triggerResult);
firedAlert.update(new AlertExecution(ctx));
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
firedAlert.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
FiredAlert parsedFiredAlert = firedAlertParser().parse(jsonBuilder.bytes(), firedAlert.id(), 0);