Componentized the history service and alert record parser.

Renamed AlertRecord to FiredAlert
Refactored the persistence part of FiredAlert out of HistoryService to HistoryStore.
Moved AlertActionState to FiredAlert.State

Original commit: elastic/x-pack-elasticsearch@595c733cfc
This commit is contained in:
Martijn van Groningen 2015-02-05 16:10:55 +01:00
parent 31a3907bed
commit f261d8aeaf
14 changed files with 674 additions and 640 deletions

View File

@ -8,7 +8,7 @@ package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.ActionModule;
import org.elasticsearch.alerts.client.AlertsClientModule;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.history.HistoryModule;
import org.elasticsearch.alerts.payload.PayloadModule;
import org.elasticsearch.alerts.rest.AlertsRestModule;
import org.elasticsearch.alerts.scheduler.SchedulerModule;
@ -34,7 +34,8 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
new SchedulerModule(),
new AlertsTransportModule(),
new TriggerModule(),
new ActionModule());
new ActionModule(),
new HistoryModule());
}
@Override
@ -44,7 +45,6 @@ public class AlertsModule extends AbstractModule implements SpawnModules {
bind(AlertsService.class).asEagerSingleton();
bind(AlertsStore.class).asEagerSingleton();
bind(TemplateUtils.class).asEagerSingleton();
bind(HistoryService.class).asEagerSingleton();
bind(ConfigurationService.class).asEagerSingleton();
}

View File

@ -10,7 +10,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.actions.Action;
import org.elasticsearch.alerts.history.AlertRecord;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.throttle.Throttler;
@ -127,7 +127,7 @@ public class AlertsService extends AbstractComponent {
* This does the necessary actions, so we don't lose the fact that an alert got execute from the {@link org.elasticsearch.alerts.scheduler.Scheduler}
* It writes the an entry in the alert history index with the proper status for this alert.
*
* The rest of the actions happen in {@link #runAlert(org.elasticsearch.alerts.history.AlertRecord)}.
* The rest of the actions happen in {@link #runAlert(org.elasticsearch.alerts.history.FiredAlert)}.
*
* The reason the executing of the alert is split into two, is that we don't want to lose the fact that an alert has
* fired. If we were
@ -143,7 +143,7 @@ public class AlertsService extends AbstractComponent {
}
try {
historyService.addAlertAction(alert, scheduledFireTime, fireTime);
historyService.alertFired(alert, scheduledFireTime, fireTime);
} catch (Exception e) {
logger.error("Failed to schedule alert action for [{}]", e, alert);
}
@ -159,42 +159,42 @@ public class AlertsService extends AbstractComponent {
* 3) If the alert has been triggered, checks if the alert should be throttled
* 4) If the alert hasn't been throttled runs the configured actions
*/
public AlertRun runAlert(AlertRecord entry) throws IOException {
public AlertRun runAlert(FiredAlert entry) throws IOException {
ensureStarted();
alertLock.acquire(entry.getName());
alertLock.acquire(entry.name());
try {
Alert alert = alertsStore.getAlert(entry.getName());
Alert alert = alertsStore.getAlert(entry.name());
if (alert == null) {
throw new ElasticsearchException("Alert is not available");
}
Trigger trigger = alert.trigger();
Trigger.Result triggerResult = trigger.execute(alert, entry.getScheduledTime(), entry.getFireTime());
Trigger.Result triggerResult = trigger.execute(alert, entry.scheduledTime(), entry.fireTime());
AlertRun alertRun = null;
if (triggerResult.triggered()) {
alert.status().triggered(true, entry.getFireTime());
alert.status().triggered(true, entry.fireTime());
Throttler.Result throttleResult = alert.throttler().throttle(alert, triggerResult);
if (!throttleResult.throttle()) {
Map<String, Object> data = alert.payload().execute(alert, triggerResult, entry.getScheduledTime(), entry.getFireTime());
alertRun = new AlertRun(triggerResult, data);
Map<String, Object> data = alert.payload().execute(alert, triggerResult, entry.scheduledTime(), entry.fireTime());
alertRun = new AlertRun(triggerResult, throttleResult, data);
for (Action action : alert.actions()){
Action.Result actionResult = action.execute(alert, data);
//TODO : process action result, what to do if just one action fails or throws exception ?
}
alert.status().executed(entry.getScheduledTime());
alert.status().executed(entry.scheduledTime());
} else {
alert.status().throttled(entry.getFireTime(), throttleResult.reason());
alert.status().throttled(entry.fireTime(), throttleResult.reason());
}
} else {
alert.status().triggered(false, entry.getFireTime());
alert.status().triggered(false, entry.fireTime());
}
if (alertRun == null) {
alertRun = new AlertRun(triggerResult, null);
alertRun = new AlertRun(triggerResult, null, null);
}
alert.status().ran(entry.getFireTime());
alert.status().ran(entry.fireTime());
alertsStore.updateAlert(alert);
return alertRun;
} finally {
alertLock.release(entry.getName());
alertLock.release(entry.name());
}
}
@ -355,16 +355,22 @@ public class AlertsService extends AbstractComponent {
public static class AlertRun {
private final Trigger.Result result;
private final Trigger.Result triggerResult;
private final Throttler.Result throttleResult;
private final Map<String, Object> data;
public AlertRun(Trigger.Result result, Map<String, Object> data) {
this.result = result;
public AlertRun(Trigger.Result triggerResult, Throttler.Result throttleResult, Map<String, Object> data) {
this.triggerResult = triggerResult;
this.throttleResult = throttleResult;
this.data = data;
}
public Trigger.Result triggerResult() {
return result;
return triggerResult;
}
public Throttler.Result throttleResult() {
return throttleResult;
}
public Map<String, Object> data() {

View File

@ -55,7 +55,7 @@ public abstract class Action<R extends Action.Result> implements ToXContent {
protected static abstract class Result implements ToXContent {
public static abstract class Result implements ToXContent {
private final boolean success;

View File

@ -1,59 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
/**
*/
public enum AlertActionState {
SEARCH_NEEDED,
SEARCH_UNDERWAY,
NO_ACTION_NEEDED,
ACTION_PERFORMED,
ERROR,
THROTTLED;
@Override
public String toString(){
switch (this) {
case SEARCH_NEEDED:
return "SEARCH_NEEDED";
case SEARCH_UNDERWAY:
return "SEARCH_UNDERWAY";
case NO_ACTION_NEEDED:
return "NO_ACTION_NEEDED";
case ACTION_PERFORMED:
return "ACTION_PERFORMED";
case ERROR:
return "ERROR";
case THROTTLED:
return "THROTTLED";
default:
return "NO_ACTION_NEEDED";
}
}
public static AlertActionState fromString(String s) {
switch(s.toUpperCase()) {
case "SEARCH_NEEDED":
return SEARCH_NEEDED;
case "SEARCH_UNDERWAY":
return SEARCH_UNDERWAY;
case "NO_ACTION_NEEDED":
return NO_ACTION_NEEDED;
case "ACTION_UNDERWAY":
return ACTION_PERFORMED;
case "ERROR":
return ERROR;
case "THROTTLED":
return THROTTLED;
default:
throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" );
}
}
}

View File

@ -1,270 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.history;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.actions.AlertActions;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.Payload;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* An alert history is an event of an alert that fired on particular moment in time.
*/
public class AlertRecord implements ToXContent {
private String id;
private String name;
private DateTime fireTime;
private DateTime scheduledTime;
private Trigger trigger;
private AlertActions actions;
private AlertActionState state;
/*Optional*/
private Trigger.Result triggerResult;
private Payload payload;
private String errorMsg;
private Map<String,Object> metadata;
private transient long version;
private transient XContentType contentType;
AlertRecord() {
}
public AlertRecord(Alert alert, DateTime scheduledTime, DateTime fireTime, AlertActionState state) throws IOException {
this.id = alert.getName() + "#" + scheduledTime.toDateTimeISO();
this.name = alert.getName();
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = alert.getTrigger();
this.actions = alert.getActions();
this.state = state;
this.metadata = alert.getMetadata();
this.version = 1;
this.contentType = alert.getContentType();
}
/**
* @return The unique id of the alert action entry
*/
public String getId() {
return id;
}
void setId(String id) {
this.id = id;
}
public void execution(Alert alert, AlertsService.AlertRun alertRun) {
triggerResult = alertRun.evaluation();
if (triggerResult.triggered()) {
if (triggerResult.throttled()) {
if (alert.getStatus() != Alert.Status.NOT_TRIGGERED) {
state = AlertActionState.THROTTLED;
}
} else if (state != AlertActionState.THROTTLED) {
state = AlertActionState.ACTION_PERFORMED;
}
payload = alertRun.payload();
} else {
state = AlertActionState.NO_ACTION_NEEDED;
}
}
/**
* @return The time the alert was scheduled to be triggered
*/
public DateTime getScheduledTime() {
return scheduledTime;
}
void setScheduledTime(DateTime scheduledTime) {
this.scheduledTime = scheduledTime;
}
/**
* @return The name of the alert that triggered
*/
public String getName() {
return name;
}
void setName(String name) {
this.name = name;
}
/**
* @return The time the alert actually ran.
*/
public DateTime getFireTime() {
return fireTime;
}
void setFireTime(DateTime fireTime) {
this.fireTime = fireTime;
}
/**
* @return The trigger that evaluated the search response
*/
public Trigger getTrigger() {
return trigger;
}
void setTrigger(Trigger trigger) {
this.trigger = trigger;
}
public Trigger.Result triggerEvaluation() {
return triggerResult;
}
public void triggerEvaluation(Trigger.Result result) {
this.triggerResult = result;
}
/**
* @return The list of actions that ran if the search response matched with the trigger
*/
public List<AlertAction> getActions() {
return actions;
}
void setActions(AlertActions actions) {
this.actions = actions;
}
/**
* @return The current state of the alert event.
*/
public AlertActionState getState() {
return state;
}
void setState(AlertActionState state) {
this.state = state;
}
public long getVersion() {
return version;
}
void setVersion(long version) {
this.version = version;
}
/**
* @return xcontext type of the _source of this action entry.
*/
public XContentType getContentType() {
return contentType;
}
void setContentType(XContentType contentType) {
this.contentType = contentType;
}
/**
* @return The error if an error occurred otherwise null
*/
public String getErrorMsg(){
return this.errorMsg;
}
void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
/**
* @return The metadata that was associated with the alert when this entry was created
*/
public Map<String, Object> getMetadata() {
return metadata;
}
void setMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
}
public Payload payload() {
return payload;
}
public void payload(Payload payload) {
this.payload = payload;
}
@Override
public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException {
historyEntry.startObject();
historyEntry.field("alert_name", name);
historyEntry.field("triggered", triggerResult.triggered());
historyEntry.field("fire_time", fireTime.toDateTimeISO());
historyEntry.field(HistoryService.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO());
historyEntry.field("trigger");
historyEntry.startObject();
historyEntry.field(trigger.type(), trigger, params);
historyEntry.endObject();
historyEntry.field("trigger_response", triggerResult.data());
if (payload != null) {
historyEntry.field("payload_request");
AlertUtils.writeSearchRequest(payload.request(), historyEntry, params);
historyEntry.field("payload_response", payload.data());
}
historyEntry.startObject("actions");
for (AlertAction action : actions) {
historyEntry.field(action.getActionName());
action.toXContent(historyEntry, params);
}
historyEntry.endObject();
historyEntry.field(HistoryService.STATE, state.toString());
if (errorMsg != null) {
historyEntry.field("error_msg", errorMsg);
}
if (metadata != null) {
historyEntry.field("meta", metadata);
}
historyEntry.endObject();
return historyEntry;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertRecord entry = (AlertRecord) o;
if (!id.equals(entry.id)) return false;
return true;
}
@Override
public int hashCode() {
return id.hashCode();
}
}

View File

@ -0,0 +1,352 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.history;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.actions.AlertActions;
import org.elasticsearch.alerts.payload.Payload;
import org.elasticsearch.alerts.payload.PayloadRegistry;
import org.elasticsearch.alerts.trigger.Trigger;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.Map;
public class FiredAlert implements ToXContent {
private String id;
private String name;
private DateTime fireTime;
private DateTime scheduledTime;
private Trigger trigger;
private AlertActions actions;
private State state;
/*Optional*/
private Payload payload;
private String errorMessage;
private Map<String,Object> metadata;
// During an fired alert execution we use this and then we store it with the history, after that we don't use it.
// We store it because it may end up being useful for debug / history purposes
private transient AlertsService.AlertRun alertRun;
// Used for assertion purposes, so we can ensure/test what we have loaded in memory is the same as what is persisted.
private transient long version;
FiredAlert() {
}
public FiredAlert(Alert alert, DateTime scheduledTime, DateTime fireTime, State state) {
this.id = alert.name() + "#" + scheduledTime.toDateTimeISO();
this.name = alert.name();
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = alert.trigger();
this.actions = alert.actions();
this.state = state;
this.metadata = alert.metadata();
this.version = 1;
}
public String id() {
return id;
}
public void id(String id) {
this.id = id;
}
// TODO: Maybe pull into history service?
public void update(Alert alert, AlertsService.AlertRun alertRun) {
this.alertRun = alertRun;
if (alertRun.triggerResult().triggered()) {
if (alertRun.throttleResult().throttle()) {
if (alert.status().state() != Alert.Status.State.NOT_EXECUTED) {
state = State.THROTTLED;
}
} else if (state != State.THROTTLED) {
state = State.ACTION_PERFORMED;
}
payload = alert.payload();
} else {
state = State.NO_ACTION_NEEDED;
}
}
public DateTime scheduledTime() {
return scheduledTime;
}
public void scheduledTime(DateTime scheduledTime) {
this.scheduledTime = scheduledTime;
}
public String name() {
return name;
}
public void name(String name) {
this.name = name;
}
public DateTime fireTime() {
return fireTime;
}
public void fireTime(DateTime fireTime) {
this.fireTime = fireTime;
}
public Trigger trigger() {
return trigger;
}
public void trigger(Trigger trigger) {
this.trigger = trigger;
}
public AlertActions actions() {
return actions;
}
public void actions(AlertActions actions) {
this.actions = actions;
}
public State state() {
return state;
}
public void state(State state) {
this.state = state;
}
public long version() {
return version;
}
public void version(long version) {
this.version = version;
}
public String errorMessage(){
return this.errorMessage;
}
public void errorMsg(String errorMessage) {
this.errorMessage = errorMessage;
}
public Map<String, Object> metadata() {
return metadata;
}
public void metadata(Map<String, Object> metadata) {
this.metadata = metadata;
}
public Payload payload() {
return payload;
}
public void payload(Payload payload) {
this.payload = payload;
}
@Override
public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException {
historyEntry.startObject();
historyEntry.field(Parser.ALERT_NAME_FIELD.getPreferredName(), name);
historyEntry.field(Parser.FIRE_TIME_FIELD.getPreferredName(), fireTime.toDateTimeISO());
historyEntry.field(Parser.SCHEDULED_FIRE_TIME_FIELD.getPreferredName(), scheduledTime.toDateTimeISO());
historyEntry.field(Parser.TRIGGER_FIELD.getPreferredName(), trigger, params);
historyEntry.field(Parser.ACTIONS_FIELD.getPreferredName(), actions, params);
historyEntry.field(Parser.STATE_FIELD.getPreferredName(), state.toString());
if (payload != null) {
historyEntry.field(Parser.PAYLOAD_FIELD.getPreferredName(), payload, params);
}
if (errorMessage != null) {
historyEntry.field(Parser.ERROR_MESSAGE_FIELD.getPreferredName(), errorMessage);
}
if (metadata != null) {
historyEntry.field(Parser.METADATA_FIELD.getPreferredName(), metadata);
}
// TODO: maybe let AlertRun implement ToXContent?
if (alertRun != null) {
historyEntry.field(Parser.TRIGGER_RESPONSE.getPreferredName(), alertRun.triggerResult().data());
historyEntry.field(Parser.PAYLOAD_RESPONSE.getPreferredName(), alertRun.data());
}
historyEntry.endObject();
return historyEntry;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FiredAlert entry = (FiredAlert) o;
if (!id.equals(entry.id)) return false;
return true;
}
@Override
public int hashCode() {
return id.hashCode();
}
@Override
public String toString() {
return id;
}
public enum State {
AWAITS_RUN,
RUNNING,
NO_ACTION_NEEDED,
ACTION_PERFORMED,
FAILED,
THROTTLED;
@Override
public String toString(){
switch (this) {
case AWAITS_RUN:
return "AWAITS_RUN";
case RUNNING:
return "RUNNING";
case NO_ACTION_NEEDED:
return "NO_ACTION_NEEDED";
case ACTION_PERFORMED:
return "ACTION_PERFORMED";
case FAILED:
return "FAILED";
case THROTTLED:
return "THROTTLED";
default:
return "NO_ACTION_NEEDED";
}
}
public static State fromString(String s) {
switch(s.toUpperCase()) {
case "AWAITS_RUN":
return AWAITS_RUN;
case "RUNNING":
return RUNNING;
case "NO_ACTION_NEEDED":
return NO_ACTION_NEEDED;
case "ACTION_UNDERWAY":
return ACTION_PERFORMED;
case "FAILED":
return FAILED;
case "THROTTLED":
return THROTTLED;
default:
throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" );
}
}
}
public static class Parser extends AbstractComponent {
public static final ParseField ALERT_NAME_FIELD = new ParseField("alert_name");
public static final ParseField FIRE_TIME_FIELD = new ParseField("fire_time");
public static final ParseField SCHEDULED_FIRE_TIME_FIELD = new ParseField("scheduled_fire_time");
public static final ParseField ERROR_MESSAGE_FIELD = new ParseField("error_msg");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
public static final ParseField TRIGGER_RESPONSE = new ParseField("trigger_response");
public static final ParseField PAYLOAD_FIELD = new ParseField("payload_request");
public static final ParseField PAYLOAD_RESPONSE = new ParseField("payload_response");
public static final ParseField ACTIONS_FIELD = new ParseField("actions");
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField METADATA_FIELD = new ParseField("meta");
private final TriggerRegistry triggerRegistry;
private final PayloadRegistry payloadRegistry;
private final ActionRegistry actionRegistry;
@Inject
public Parser(Settings settings, TriggerRegistry triggerRegistry, PayloadRegistry payloadRegistry, ActionRegistry actionRegistry) {
super(settings);
this.triggerRegistry = triggerRegistry;
this.payloadRegistry = payloadRegistry;
this.actionRegistry = actionRegistry;
}
public FiredAlert parse(BytesReference source, String historyId, long version) {
try (XContentParser parser = XContentHelper.createParser(source)) {
return parse(parser, historyId, version);
} catch (IOException e) {
throw new ElasticsearchException("Error during parsing alert record", e);
}
}
public FiredAlert parse(XContentParser parser, String id, long version) throws IOException {
FiredAlert entry = new FiredAlert();
entry.id(id);
entry.version(version);
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
if (ACTIONS_FIELD.match(currentFieldName)) {
entry.actions(actionRegistry.parseActions(parser));
} else if (TRIGGER_FIELD.match(currentFieldName)) {
entry.trigger(triggerRegistry.parse(parser));
} else if (PAYLOAD_FIELD.match(currentFieldName)) {
entry.payload(payloadRegistry.parse(parser));
} else if (METADATA_FIELD.match(currentFieldName)) {
entry.metadata(parser.map());
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token.isValue()) {
if (ALERT_NAME_FIELD.match(currentFieldName)) {
entry.name(parser.text());
} else if (FIRE_TIME_FIELD.match(currentFieldName)) {
entry.fireTime(DateTime.parse(parser.text()));
} else if (SCHEDULED_FIRE_TIME_FIELD.match(currentFieldName)) {
entry.scheduledTime(DateTime.parse(parser.text()));
} else if (ERROR_MESSAGE_FIELD.match(currentFieldName)) {
entry.errorMsg(parser.textOrNull());
} else if (STATE_FIELD.match(currentFieldName)) {
entry.state(State.fromString(parser.text()));
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "] for [" + currentFieldName + "]");
}
}
return entry;
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.history;
import org.elasticsearch.alerts.AlertsException;
/**
*/
public class HistoryException extends AlertsException {
public HistoryException(String msg) {
super(msg);
}
public HistoryException(String msg, Throwable cause) {
super(msg, cause);
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.history;
import org.elasticsearch.common.inject.AbstractModule;
/**
*/
public class HistoryModule extends AbstractModule {
@Override
protected void configure() {
bind(FiredAlert.Parser.class).asEagerSingleton();
bind(HistoryStore.class).asEagerSingleton();
bind(HistoryService.class).asEagerSingleton();
}
}

View File

@ -5,45 +5,19 @@
*/
package org.elasticsearch.alerts.history;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertsPlugin;
import org.elasticsearch.alerts.AlertsService;
import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.actions.ActionRegistry;
import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.support.TemplateUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.alerts.trigger.TriggerRegistry;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@ -53,52 +27,22 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class HistoryService extends AbstractComponent {
public static final String ALERT_NAME_FIELD = "alert_name";
public static final String TRIGGERED_FIELD = "triggered";
public static final String FIRE_TIME_FIELD = "fire_time";
public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduled_fire_time";
public static final String ERROR_MESSAGE = "error_msg";
public static final String TRIGGER_FIELD = "trigger";
public static final String TRIGGER_REQUEST = "trigger_request";
public static final String TRIGGER_RESPONSE = "trigger_response";
public static final String PAYLOAD_REQUEST = "payload_request";
public static final String PAYLOAD_RESPONSE = "payload_response";
public static final String ACTIONS_FIELD = "actions";
public static final String STATE = "state";
public static final String METADATA = "meta";
public static final String ALERT_HISTORY_INDEX_PREFIX = ".alert_history_";
public static final DateTimeFormatter alertHistoryIndexTimeFormat = DateTimeFormat.forPattern("YYYY-MM-dd");
public static final String ALERT_HISTORY_TYPE = "alerthistory";
private final ClientProxy client;
private final HistoryStore historyStore;
private AlertsService alertsService;
private final ThreadPool threadPool;
private final AlertsStore alertsStore;
private final TriggerRegistry triggerRegistry;
private final TemplateUtils templateUtils;
private final int scrollSize;
private final TimeValue scrollTimeout;
private final AtomicLong largestQueueSize = new AtomicLong(0);
private final AtomicBoolean started = new AtomicBoolean(false);
private final BlockingQueue<AlertRecord> actionsToBeProcessed = new LinkedBlockingQueue<>();
private final BlockingQueue<FiredAlert> actionsToBeProcessed = new LinkedBlockingQueue<>();
private volatile Thread queueReaderThread;
@Inject
public HistoryService(Settings settings, ClientProxy client, ThreadPool threadPool, AlertsStore alertsStore,
TriggerRegistry triggerRegistry, TemplateUtils templateUtils) {
public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, AlertsStore alertsStore) {
super(settings);
this.client = client;
this.historyStore = historyStore;
this.threadPool = threadPool;
this.alertsStore = alertsStore;
this.triggerRegistry = triggerRegistry;
this.templateUtils = templateUtils;
// Not using component settings, to let AlertsStore and AlertActionManager share the same settings
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
}
public void setAlertsService(AlertsService alertsService){
@ -110,36 +54,19 @@ public class HistoryService extends AbstractComponent {
return true;
}
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
if (indices.length == 0) {
logger.info("No previous .alerthistory index, skip loading of alert actions");
templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
assert actionsToBeProcessed.isEmpty() : "Queue should be empty, but contains " + actionsToBeProcessed.size() + " elements.";
HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state);
if (loadResult.succeeded()) {
if (!loadResult.notRanFiredAlerts().isEmpty()) {
actionsToBeProcessed.addAll(loadResult.notRanFiredAlerts());
logger.debug("Loaded [{}] actions from the alert history index into actions queue", actionsToBeProcessed.size());
largestQueueSize.set(actionsToBeProcessed.size());
}
doStart();
return true;
}
int numPrimaryShards = 0;
for (String index : indices) {
IndexMetaData indexMetaData = state.getMetaData().index(index);
if (indexMetaData != null) {
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index);
return false;
} else {
numPrimaryShards += indexMetaData.numberOfShards();
}
}
}
try {
loadQueue(numPrimaryShards);
} catch (Exception e) {
logger.warn("Failed to load unfinished alert actions. Schedule to retry alert action loading...", e);
actionsToBeProcessed.clear();
return false;
}
templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
doStart();
return true;
}
public void stop() {
@ -163,136 +90,14 @@ public class HistoryService extends AbstractComponent {
return started.get();
}
/**
* Calculates the correct alert history index name for a given time using alertHistoryIndexTimeFormat
*/
public static String getAlertHistoryIndexNameForTime(DateTime time) {
return ALERT_HISTORY_INDEX_PREFIX + alertHistoryIndexTimeFormat.print(time);
}
private void loadQueue(int numPrimaryShards) {
assert actionsToBeProcessed.isEmpty() : "Queue should be empty, but contains " + actionsToBeProcessed.size() + " elements.";
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet();
if (refreshResponse.getSuccessfulShards() < numPrimaryShards) {
throw new ElasticsearchException("Not all shards have been refreshed");
}
SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.termQuery(STATE, AlertActionState.SEARCH_NEEDED.toString()))
.setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout)
.setSize(scrollSize)
.setTypes(ALERT_HISTORY_TYPE)
.setPreference("_primary")
.get();
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
throw new ElasticsearchException("Partial response while loading alert actions");
}
if (response.getHits().getTotalHits() > 0) {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
while (response.getHits().hits().length != 0) {
for (SearchHit sh : response.getHits()) {
String historyId = sh.getId();
AlertRecord historyEntry = parseHistory(historyId, sh.getSourceRef(), sh.version(), actionRegistry);
assert historyEntry.getState() == AlertActionState.SEARCH_NEEDED;
logger.debug("Adding entry: [{}/{}/{}]", sh.index(), sh.type(), sh.id());
actionsToBeProcessed.add(historyEntry);
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
}
}
} finally {
client.prepareClearScroll().addScrollId(response.getScrollId()).get();
}
logger.info("Loaded [{}] actions from the alert history index into actions queue", actionsToBeProcessed.size());
largestQueueSize.set(actionsToBeProcessed.size());
}
AlertRecord parseHistory(String historyId, BytesReference source, long version, ActionRegistry actionRegistry) {
AlertRecord entry = new AlertRecord();
entry.setId(historyId);
entry.setVersion(version);
try (XContentParser parser = XContentHelper.createParser(source)) {
entry.setContentType(parser.contentType());
String currentFieldName = null;
XContentParser.Token token = parser.nextToken();
assert token == XContentParser.Token.START_OBJECT;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_OBJECT) {
switch (currentFieldName) {
case ACTIONS_FIELD:
entry.setActions(actionRegistry.parseActions(parser));
break;
case TRIGGER_FIELD:
entry.setTrigger(triggerRegistry.parse(parser));
break;
case TRIGGER_RESPONSE:
entry.setTriggerResponse(parser.map());
break;
case PAYLOAD_REQUEST:
entry.setPayloadRequest(AlertUtils.readSearchRequest(parser));
break;
case PAYLOAD_RESPONSE:
entry.setPayloadResponse(parser.map());
break;
case METADATA:
entry.setMetadata(parser.map());
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else if (token.isValue()) {
switch (currentFieldName) {
case ALERT_NAME_FIELD:
entry.setName(parser.text());
break;
case TRIGGERED_FIELD:
entry.setTriggered(parser.booleanValue());
break;
case FIRE_TIME_FIELD:
entry.setFireTime(DateTime.parse(parser.text()));
break;
case SCHEDULED_FIRE_TIME_FIELD:
entry.setScheduledTime(DateTime.parse(parser.text()));
break;
case ERROR_MESSAGE:
entry.setErrorMsg(parser.textOrNull());
break;
case STATE:
entry.setState(AlertActionState.fromString(parser.text()));
break;
default:
throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]");
}
} else {
throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "] for [" + currentFieldName + "]");
}
}
} catch (IOException e) {
throw new ElasticsearchException("Error during parsing alert action", e);
}
return entry;
}
public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
public void alertFired(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException {
ensureStarted();
logger.debug("Adding alert action for alert [{}]", alert.getName());
String alertHistoryIndex = getAlertHistoryIndexNameForTime(scheduledFireTime);
AlertRecord entry = new AlertRecord(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED);
IndexResponse response = client.prepareIndex(alertHistoryIndex, ALERT_HISTORY_TYPE, entry.getId())
.setSource(XContentFactory.contentBuilder(alert.getContentType()).value(entry))
.setOpType(IndexRequest.OpType.CREATE)
.get();
entry.setVersion(response.getVersion());
logger.debug("Added alert action for alert [{}]", alert.getName());
FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, fireTime, FiredAlert.State.AWAITS_RUN);
logger.debug("adding fired alert [{}]", alert.name());
historyStore.put(firedAlert);
long currentSize = actionsToBeProcessed.size() + 1;
actionsToBeProcessed.add(entry);
actionsToBeProcessed.add(firedAlert);
long currentLargestQueueSize = largestQueueSize.get();
boolean done = false;
while (!done) {
@ -305,23 +110,6 @@ public class HistoryService extends AbstractComponent {
}
}
private void updateHistoryEntry(AlertRecord entry) throws IOException {
ensureStarted();
logger.debug("Updating alert action [{}]", entry.getId());
IndexResponse response = client.prepareIndex(getAlertHistoryIndexNameForTime(entry.getScheduledTime()), ALERT_HISTORY_TYPE, entry.getId())
.setSource(XContentFactory.contentBuilder(entry.getContentType()).value(entry))
.get();
logger.debug("Updated alert action [{}]", entry.getId());
entry.setVersion(response.getVersion());
}
private void updateHistoryEntry(AlertRecord entry, AlertActionState actionPerformed) throws IOException {
entry.setState(actionPerformed);
updateHistoryEntry(entry);
}
public long getQueueSize() {
return actionsToBeProcessed.size();
}
@ -350,37 +138,40 @@ public class HistoryService extends AbstractComponent {
private class AlertHistoryRunnable implements Runnable {
private final AlertRecord entry;
private final FiredAlert alert;
private AlertHistoryRunnable(AlertRecord entry) {
this.entry = entry;
private AlertHistoryRunnable(FiredAlert alert) {
this.alert = alert;
}
@Override
public void run() {
try {
Alert alert = alertsStore.getAlert(entry.getName());
Alert alert = alertsStore.getAlert(this.alert.name());
if (alert == null) {
entry.setErrorMsg("Alert was not found in the alerts store");
updateHistoryEntry(entry, AlertActionState.ERROR);
this.alert.errorMsg("alert was not found in the alerts store");
this.alert.state(FiredAlert.State.FAILED);
historyStore.update(this.alert);
return;
}
updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY);
logger.debug("Running an alert action entry for [{}]", entry.getName());
AlertsService.AlertRun alertRun = alertsService.runAlert(entry);
entry.execution(alert, alertRun);
updateHistoryEntry(entry);
this.alert.state(FiredAlert.State.RUNNING);
historyStore.update(this.alert);
logger.debug("running an alert [{}]", this.alert.name());
AlertsService.AlertRun alertRun = alertsService.runAlert(this.alert);
this.alert.update(alert, alertRun);
historyStore.update(this.alert);
} catch (Exception e) {
if (started()) {
logger.warn("Failed to execute alert action", e);
logger.warn("failed to run alert [{}]", e, alert.name());
try {
entry.setErrorMsg(e.getMessage());
updateHistoryEntry(entry, AlertActionState.ERROR);
alert.errorMsg(e.getMessage());
alert.state(FiredAlert.State.FAILED);
historyStore.update(alert);
} catch (Exception e2) {
logger.error("Failed to update action history entry with the error message", e2);
logger.error("failed to update fired alert [{}] with the error message", e2, alert);
}
} else {
logger.debug("Failed to execute alert action after shutdown", e);
logger.debug("failed to execute fired alert [{}] after shutdown", e, alert);
}
}
}
@ -391,19 +182,19 @@ public class HistoryService extends AbstractComponent {
@Override
public void run() {
try {
logger.debug("Starting thread to read from the job queue");
logger.debug("starting thread to read from the job queue");
while (started()) {
AlertRecord entry = actionsToBeProcessed.take();
if (entry != null) {
threadPool.executor(AlertsPlugin.NAME).execute(new AlertHistoryRunnable(entry));
FiredAlert alert = actionsToBeProcessed.take();
if (alert != null) {
threadPool.executor(AlertsPlugin.NAME).execute(new AlertHistoryRunnable(alert));
}
}
} catch (Exception e) {
if (started()) {
logger.error("Error during reader thread, restarting queue reader thread...", e);
logger.error("error during reader thread, restarting queue reader thread...", e);
startQueueReaderThread();
} else {
logger.error("Error during reader thread", e);
logger.error("error during reader thread", e);
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.history;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.support.TemplateUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
*/
public class HistoryStore extends AbstractComponent {
static final DateTimeFormatter alertHistoryIndexTimeFormat = DateTimeFormat.forPattern("YYYY-MM-dd");
static final String ALERT_HISTORY_INDEX_PREFIX = ".alert_history_";
static final String ALERT_HISTORY_TYPE = "alerthistory";
private final ClientProxy client;
private final TemplateUtils templateUtils;
private final int scrollSize;
private final TimeValue scrollTimeout;
private final FiredAlert.Parser alertRecordParser;
@Inject
public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, FiredAlert.Parser alertRecordParser) {
super(settings);
this.client = client;
this.templateUtils = templateUtils;
this.alertRecordParser = alertRecordParser;
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
this.scrollSize = settings.getAsInt("alerts.scroll.size", 100);
}
public void put(FiredAlert firedAlert) throws HistoryException {
String alertHistoryIndex = getAlertHistoryIndexNameForTime(firedAlert.scheduledTime());
try {
IndexResponse response = client.prepareIndex(alertHistoryIndex, ALERT_HISTORY_TYPE, firedAlert.id())
.setSource(XContentFactory.jsonBuilder().value(firedAlert))
.setOpType(IndexRequest.OpType.CREATE)
.get();
firedAlert.version(response.getVersion());
} catch (IOException e) {
throw new HistoryException("persisting new fired alert [" + firedAlert + "] failed", e);
}
}
public void update(FiredAlert firedAlert) throws HistoryException {
logger.debug("updating fired alert [{}]", firedAlert);
try {
IndexResponse response = client.prepareIndex(getAlertHistoryIndexNameForTime(firedAlert.scheduledTime()), ALERT_HISTORY_TYPE, firedAlert.id())
.setSource(XContentFactory.jsonBuilder().value(firedAlert))
.get();
firedAlert.version(response.getVersion());
logger.debug("updated fired alert [{}]", firedAlert);
} catch (IOException e) {
throw new HistoryException("persisting fired alert [" + firedAlert + "] failed", e);
}
}
public LoadResult loadFiredAlerts(ClusterState state) {
String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*");
if (indices.length == 0) {
logger.info("No previous .alerthistory index, skip loading of alert actions");
templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
return new LoadResult(true);
}
int numPrimaryShards = 0;
for (String index : indices) {
IndexMetaData indexMetaData = state.getMetaData().index(index);
if (indexMetaData != null) {
if (!state.routingTable().index(index).allPrimaryShardsActive()) {
logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index);
return new LoadResult(false);
} else {
numPrimaryShards += indexMetaData.numberOfShards();
}
}
}
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet();
if (refreshResponse.getSuccessfulShards() < numPrimaryShards) {
return new LoadResult(false);
}
SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.AWAITS_RUN.toString()))
.setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout)
.setSize(scrollSize)
.setTypes(ALERT_HISTORY_TYPE)
.setPreference("_primary")
.get();
List<FiredAlert> alerts = new ArrayList<>();
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
return new LoadResult(false);
}
if (response.getHits().getTotalHits() > 0) {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
while (response.getHits().hits().length != 0) {
for (SearchHit sh : response.getHits()) {
String historyId = sh.getId();
FiredAlert historyEntry = alertRecordParser.parse(sh.getSourceRef(), historyId, sh.version());
assert historyEntry.state() == FiredAlert.State.AWAITS_RUN;
logger.debug("loaded fired alert from index [{}/{}/{}]", sh.index(), sh.type(), sh.id());
alerts.add(historyEntry);
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
}
}
} finally {
client.prepareClearScroll().addScrollId(response.getScrollId()).get();
}
templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
return new LoadResult(true, alerts);
}
/**
* Calculates the correct alert history index name for a given time using alertHistoryIndexTimeFormat
*/
static String getAlertHistoryIndexNameForTime(DateTime time) {
return ALERT_HISTORY_INDEX_PREFIX + alertHistoryIndexTimeFormat.print(time);
}
public class LoadResult {
private final boolean succeeded;
private final List<FiredAlert> notRanFiredAlerts;
public LoadResult(boolean succeeded, List<FiredAlert> notRanFiredAlerts) {
this.succeeded = succeeded;
this.notRanFiredAlerts = notRanFiredAlerts;
}
public LoadResult(boolean succeeded) {
this.succeeded = succeeded;
this.notRanFiredAlerts = Collections.emptyList();
}
public boolean succeeded() {
return succeeded;
}
public List<FiredAlert> notRanFiredAlerts() {
return notRanFiredAlerts;
}
}
}

View File

@ -8,8 +8,8 @@ package org.elasticsearch.alerts;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
@ -178,7 +178,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
SearchResponse searchResponse = client().prepareSearch(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.ACTION_PERFORMED.toString())))
.get();
assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed));
if (assertTriggerSearchMatched) {
@ -191,7 +191,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
protected long findNumberOfPerformedActions(String alertName) {
SearchResponse searchResponse = client().prepareSearch(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.ACTION_PERFORMED.toString())))
.get();
return searchResponse.getHits().getTotalHits();
}
@ -212,7 +212,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
SearchResponse searchResponse = client().prepareSearch(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*")
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.NO_ACTION_NEEDED.toString())))
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.NO_ACTION_NEEDED.toString())))
.get();
assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(expectedAlertActionsWithNoActionNeeded));
}

View File

@ -10,14 +10,13 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.actions.IndexAlertAction;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.transport.actions.ack.AckAlertResponse;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse;
import org.elasticsearch.alerts.triggers.ScriptTrigger;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -51,7 +50,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
refresh();
Alert alert = new Alert();
alert.setStatus(Alert.Status.NOT_TRIGGERED);
alert.setStatus(Alert.Status.State.NOT_TRIGGERED);
alert.setTriggerSearchRequest(createTriggerSearchRequest("test-index").source(searchSource().query(matchAllQuery())));
alert.setTrigger(new ScriptTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy"));
@ -106,7 +105,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
CountResponse countOfThrottledActions = client()
.prepareCount(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.matchQuery(HistoryService.STATE, AlertActionState.THROTTLED.toString()))
.setQuery(QueryBuilders.matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.toString()))
.get();
assertThat(countOfThrottledActions.getCount(), greaterThan(0L));
}
@ -178,7 +177,7 @@ public class AlertThrottleTests extends AbstractAlertingTests {
CountResponse countOfThrottledActions = client()
.prepareCount(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*")
.setQuery(QueryBuilders.matchQuery(HistoryService.STATE, AlertActionState.THROTTLED.toString()))
.setQuery(QueryBuilders.matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.toString()))
.get();
assertThat(countOfThrottledActions.getCount(), greaterThan(0L));
}

View File

@ -9,9 +9,9 @@ import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.history.AlertRecord;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.actions.AlertActionState;
import org.elasticsearch.alerts.history.HistoryStore;
import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse;
import org.elasticsearch.alerts.triggers.ScriptTrigger;
import org.elasticsearch.common.bytes.BytesReference;
@ -76,14 +76,14 @@ public class BootStrapTest extends AbstractAlertingTests {
Alert.Status.NOT_ACKABLE);
DateTime scheduledFireTime = new DateTime();
AlertRecord entry = new AlertRecord(alert, scheduledFireTime, scheduledFireTime, AlertActionState.SEARCH_NEEDED);
String actionHistoryIndex = HistoryService.getAlertHistoryIndexNameForTime(scheduledFireTime);
FiredAlert entry = new FiredAlert(alert, scheduledFireTime, scheduledFireTime, FiredAlert.State.AWAITS_RUN);
String actionHistoryIndex = HistoryStore.getAlertHistoryIndexNameForTime(scheduledFireTime);
createIndex(actionHistoryIndex);
ensureGreen(actionHistoryIndex);
logger.info("Created index {}", actionHistoryIndex);
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryService.ALERT_HISTORY_TYPE, entry.getId())
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryService.ALERT_HISTORY_TYPE, entry.id())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(XContentFactory.jsonBuilder().value(entry))
.get();
@ -109,7 +109,7 @@ public class BootStrapTest extends AbstractAlertingTests {
for (int i = 0; i < numberOfAlertHistoryIndices; i++) {
DateTime historyIndexDate = now.minus((new TimeValue(i, TimeUnit.DAYS)).getMillis());
String actionHistoryIndex = HistoryService.getAlertHistoryIndexNameForTime(historyIndexDate);
String actionHistoryIndex = HistoryStore.getAlertHistoryIndexNameForTime(historyIndexDate);
createIndex(actionHistoryIndex);
ensureGreen(actionHistoryIndex);
logger.info("Created index {}", actionHistoryIndex);
@ -123,10 +123,10 @@ public class BootStrapTest extends AbstractAlertingTests {
new DateTime(),
0,
new TimeValue(0),
Alert.Status.NOT_ACKABLE);
Alert.Status.State.NOT_ACKABLE);
AlertRecord entry = new AlertRecord(alert, historyIndexDate, historyIndexDate, AlertActionState.SEARCH_NEEDED);
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryService.ALERT_HISTORY_TYPE, entry.getId())
FiredAlert entry = new FiredAlert(alert, historyIndexDate, historyIndexDate, FiredAlert.State.AWAITS_RUN);
IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryService.ALERT_HISTORY_TYPE, entry.id())
.setConsistencyLevel(WriteConsistencyLevel.ALL)
.setSource(XContentFactory.jsonBuilder().value(entry))
.get();

View File

@ -12,7 +12,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.alerts.*;
import org.elasticsearch.alerts.client.AlertsClient;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.history.AlertRecord;
import org.elasticsearch.alerts.history.FiredAlert;
import org.elasticsearch.alerts.support.AlertUtils;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
@ -69,34 +69,34 @@ public class AlertActionsTest extends AbstractAlertingTests {
XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.field(HistoryService.ALERT_NAME_FIELD, "testName");
builder.field(HistoryService.TRIGGERED_FIELD, true);
builder.field(HistoryService.FIRE_TIME_FIELD, AlertUtils.dateTimeFormatter.printer().print(fireTime));
builder.field(HistoryService.SCHEDULED_FIRE_TIME_FIELD, AlertUtils.dateTimeFormatter.printer().print(scheduledFireTime));
builder.field(HistoryService.TRIGGER_FIELD, triggerMap);
builder.field(FiredAlert.Parser.ALERT_NAME_FIELD.getPreferredName(), "testName");
builder.field(FiredAlert.Parser.TRIGGERED_FIELD, true);
builder.field(FiredAlert.Parser.FIRE_TIME_FIELD.getPreferredName(), AlertUtils.dateTimeFormatter.printer().print(fireTime));
builder.field(FiredAlert.Parser.SCHEDULED_FIRE_TIME_FIELD, AlertUtils.dateTimeFormatter.printer().print(scheduledFireTime));
builder.field(FiredAlert.Parser.TRIGGER_FIELD.getPreferredName(), triggerMap);
SearchRequest searchRequest = new SearchRequest("test123");
builder.field(HistoryService.TRIGGER_REQUEST);
builder.field(FiredAlert.Parser.TRIGGER_REQUEST);
AlertUtils.writeSearchRequest(searchRequest, builder, ToXContent.EMPTY_PARAMS);
SearchResponse searchResponse = new SearchResponse(
new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[0], 10, 0), null, null, null, false, false),
null, 1, 1, 0, new ShardSearchFailure[0]
);
builder.startObject(HistoryService.TRIGGER_RESPONSE);
builder.startObject(FiredAlert.Parser.TRIGGER_RESPONSE.getPreferredName());
builder.value(searchResponse);
builder.endObject();
builder.field(HistoryService.ACTIONS_FIELD, actionMap);
builder.field(HistoryService.STATE, AlertActionState.SEARCH_NEEDED.toString());
builder.field(FiredAlert.Parser.ACTIONS_FIELD.getPreferredName(), actionMap);
builder.field(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.AWAITS_RUN.toString());
builder.endObject();
final ActionRegistry alertActionRegistry = internalTestCluster().getInstance(ActionRegistry.class, internalTestCluster().getMasterName());
final HistoryService alertManager = internalTestCluster().getInstance(HistoryService.class, internalTestCluster().getMasterName());
AlertRecord actionEntry = alertManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry);
assertEquals(actionEntry.getVersion(), 0);
assertEquals(actionEntry.getName(), "testName");
FiredAlert actionEntry = alertManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry);
assertEquals(actionEntry.version(), 0);
assertEquals(actionEntry.name(), "testName");
assertEquals(actionEntry.isTriggered(), true);
assertEquals(actionEntry.getScheduledTime(), scheduledFireTime);
assertEquals(actionEntry.getFireTime(), fireTime);
assertEquals(actionEntry.getState(), AlertActionState.SEARCH_NEEDED);
assertEquals(actionEntry.scheduledTime(), scheduledFireTime);
assertEquals(actionEntry.fireTime(), fireTime);
assertEquals(actionEntry.state(), FiredAlert.State.AWAITS_RUN);
assertEquals(XContentMapValues.extractValue("hits.total", actionEntry.getTriggerResponse()), 10);
}
@ -134,7 +134,7 @@ public class AlertActionsTest extends AbstractAlertingTests {
}
};
AlertActionRegistry alertActionRegistry = internalTestCluster().getInstance(AlertActionRegistry.class, internalTestCluster().getMasterName());
ActionRegistry alertActionRegistry = internalTestCluster().getInstance(ActionRegistry.class, internalTestCluster().getMasterName());
alertActionRegistry.registerAction("test", new AlertActionFactory() {
@Override
public AlertAction createAction(XContentParser parser) throws IOException {