diff --git a/src/main/java/org/elasticsearch/alerts/AlertsModule.java b/src/main/java/org/elasticsearch/alerts/AlertsModule.java index f5d21632c39..67207dfa1b9 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsModule.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsModule.java @@ -8,7 +8,7 @@ package org.elasticsearch.alerts; import org.elasticsearch.alerts.actions.ActionModule; import org.elasticsearch.alerts.client.AlertsClientModule; -import org.elasticsearch.alerts.history.HistoryService; +import org.elasticsearch.alerts.history.HistoryModule; import org.elasticsearch.alerts.payload.PayloadModule; import org.elasticsearch.alerts.rest.AlertsRestModule; import org.elasticsearch.alerts.scheduler.SchedulerModule; @@ -34,7 +34,8 @@ public class AlertsModule extends AbstractModule implements SpawnModules { new SchedulerModule(), new AlertsTransportModule(), new TriggerModule(), - new ActionModule()); + new ActionModule(), + new HistoryModule()); } @Override @@ -44,7 +45,6 @@ public class AlertsModule extends AbstractModule implements SpawnModules { bind(AlertsService.class).asEagerSingleton(); bind(AlertsStore.class).asEagerSingleton(); bind(TemplateUtils.class).asEagerSingleton(); - bind(HistoryService.class).asEagerSingleton(); bind(ConfigurationService.class).asEagerSingleton(); } diff --git a/src/main/java/org/elasticsearch/alerts/AlertsService.java b/src/main/java/org/elasticsearch/alerts/AlertsService.java index 7600f4169dc..0f7899fa069 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsService.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsService.java @@ -10,7 +10,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.alerts.actions.Action; -import org.elasticsearch.alerts.history.AlertRecord; +import org.elasticsearch.alerts.history.FiredAlert; import org.elasticsearch.alerts.history.HistoryService; import org.elasticsearch.alerts.scheduler.Scheduler; import org.elasticsearch.alerts.throttle.Throttler; @@ -127,7 +127,7 @@ public class AlertsService extends AbstractComponent { * This does the necessary actions, so we don't lose the fact that an alert got execute from the {@link org.elasticsearch.alerts.scheduler.Scheduler} * It writes the an entry in the alert history index with the proper status for this alert. * - * The rest of the actions happen in {@link #runAlert(org.elasticsearch.alerts.history.AlertRecord)}. + * The rest of the actions happen in {@link #runAlert(org.elasticsearch.alerts.history.FiredAlert)}. * * The reason the executing of the alert is split into two, is that we don't want to lose the fact that an alert has * fired. If we were @@ -143,7 +143,7 @@ public class AlertsService extends AbstractComponent { } try { - historyService.addAlertAction(alert, scheduledFireTime, fireTime); + historyService.alertFired(alert, scheduledFireTime, fireTime); } catch (Exception e) { logger.error("Failed to schedule alert action for [{}]", e, alert); } @@ -159,42 +159,42 @@ public class AlertsService extends AbstractComponent { * 3) If the alert has been triggered, checks if the alert should be throttled * 4) If the alert hasn't been throttled runs the configured actions */ - public AlertRun runAlert(AlertRecord entry) throws IOException { + public AlertRun runAlert(FiredAlert entry) throws IOException { ensureStarted(); - alertLock.acquire(entry.getName()); + alertLock.acquire(entry.name()); try { - Alert alert = alertsStore.getAlert(entry.getName()); + Alert alert = alertsStore.getAlert(entry.name()); if (alert == null) { throw new ElasticsearchException("Alert is not available"); } Trigger trigger = alert.trigger(); - Trigger.Result triggerResult = trigger.execute(alert, entry.getScheduledTime(), entry.getFireTime()); + Trigger.Result triggerResult = trigger.execute(alert, entry.scheduledTime(), entry.fireTime()); AlertRun alertRun = null; if (triggerResult.triggered()) { - alert.status().triggered(true, entry.getFireTime()); + alert.status().triggered(true, entry.fireTime()); Throttler.Result throttleResult = alert.throttler().throttle(alert, triggerResult); if (!throttleResult.throttle()) { - Map data = alert.payload().execute(alert, triggerResult, entry.getScheduledTime(), entry.getFireTime()); - alertRun = new AlertRun(triggerResult, data); + Map data = alert.payload().execute(alert, triggerResult, entry.scheduledTime(), entry.fireTime()); + alertRun = new AlertRun(triggerResult, throttleResult, data); for (Action action : alert.actions()){ Action.Result actionResult = action.execute(alert, data); //TODO : process action result, what to do if just one action fails or throws exception ? } - alert.status().executed(entry.getScheduledTime()); + alert.status().executed(entry.scheduledTime()); } else { - alert.status().throttled(entry.getFireTime(), throttleResult.reason()); + alert.status().throttled(entry.fireTime(), throttleResult.reason()); } } else { - alert.status().triggered(false, entry.getFireTime()); + alert.status().triggered(false, entry.fireTime()); } if (alertRun == null) { - alertRun = new AlertRun(triggerResult, null); + alertRun = new AlertRun(triggerResult, null, null); } - alert.status().ran(entry.getFireTime()); + alert.status().ran(entry.fireTime()); alertsStore.updateAlert(alert); return alertRun; } finally { - alertLock.release(entry.getName()); + alertLock.release(entry.name()); } } @@ -355,16 +355,22 @@ public class AlertsService extends AbstractComponent { public static class AlertRun { - private final Trigger.Result result; + private final Trigger.Result triggerResult; + private final Throttler.Result throttleResult; private final Map data; - public AlertRun(Trigger.Result result, Map data) { - this.result = result; + public AlertRun(Trigger.Result triggerResult, Throttler.Result throttleResult, Map data) { + this.triggerResult = triggerResult; + this.throttleResult = throttleResult; this.data = data; } public Trigger.Result triggerResult() { - return result; + return triggerResult; + } + + public Throttler.Result throttleResult() { + return throttleResult; } public Map data() { diff --git a/src/main/java/org/elasticsearch/alerts/actions/Action.java b/src/main/java/org/elasticsearch/alerts/actions/Action.java index 17cccfd8cce..5f17c6899f9 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/Action.java +++ b/src/main/java/org/elasticsearch/alerts/actions/Action.java @@ -55,7 +55,7 @@ public abstract class Action implements ToXContent { - protected static abstract class Result implements ToXContent { + public static abstract class Result implements ToXContent { private final boolean success; diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionState.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionState.java deleted file mode 100644 index 44bdf7f11ae..00000000000 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionState.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.alerts.actions; - -import org.elasticsearch.ElasticsearchIllegalArgumentException; - -/** - */ -public enum AlertActionState { - - SEARCH_NEEDED, - SEARCH_UNDERWAY, - NO_ACTION_NEEDED, - ACTION_PERFORMED, - ERROR, - THROTTLED; - - @Override - public String toString(){ - switch (this) { - case SEARCH_NEEDED: - return "SEARCH_NEEDED"; - case SEARCH_UNDERWAY: - return "SEARCH_UNDERWAY"; - case NO_ACTION_NEEDED: - return "NO_ACTION_NEEDED"; - case ACTION_PERFORMED: - return "ACTION_PERFORMED"; - case ERROR: - return "ERROR"; - case THROTTLED: - return "THROTTLED"; - default: - return "NO_ACTION_NEEDED"; - } - } - - public static AlertActionState fromString(String s) { - switch(s.toUpperCase()) { - case "SEARCH_NEEDED": - return SEARCH_NEEDED; - case "SEARCH_UNDERWAY": - return SEARCH_UNDERWAY; - case "NO_ACTION_NEEDED": - return NO_ACTION_NEEDED; - case "ACTION_UNDERWAY": - return ACTION_PERFORMED; - case "ERROR": - return ERROR; - case "THROTTLED": - return THROTTLED; - default: - throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" ); - } - } -} diff --git a/src/main/java/org/elasticsearch/alerts/history/AlertRecord.java b/src/main/java/org/elasticsearch/alerts/history/AlertRecord.java deleted file mode 100644 index 63f40315150..00000000000 --- a/src/main/java/org/elasticsearch/alerts/history/AlertRecord.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.alerts.history; - -import org.elasticsearch.alerts.Alert; -import org.elasticsearch.alerts.AlertsService; -import org.elasticsearch.alerts.actions.AlertAction; -import org.elasticsearch.alerts.actions.AlertActionState; -import org.elasticsearch.alerts.actions.AlertActions; -import org.elasticsearch.alerts.support.AlertUtils; -import org.elasticsearch.alerts.support.Payload; -import org.elasticsearch.alerts.trigger.Trigger; -import org.elasticsearch.common.joda.time.DateTime; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentType; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * An alert history is an event of an alert that fired on particular moment in time. - */ -public class AlertRecord implements ToXContent { - - private String id; - private String name; - private DateTime fireTime; - private DateTime scheduledTime; - private Trigger trigger; - private AlertActions actions; - private AlertActionState state; - - - /*Optional*/ - private Trigger.Result triggerResult; - private Payload payload; - private String errorMsg; - private Map metadata; - private transient long version; - private transient XContentType contentType; - - AlertRecord() { - } - - public AlertRecord(Alert alert, DateTime scheduledTime, DateTime fireTime, AlertActionState state) throws IOException { - this.id = alert.getName() + "#" + scheduledTime.toDateTimeISO(); - this.name = alert.getName(); - this.fireTime = fireTime; - this.scheduledTime = scheduledTime; - this.trigger = alert.getTrigger(); - this.actions = alert.getActions(); - this.state = state; - this.metadata = alert.getMetadata(); - this.version = 1; - this.contentType = alert.getContentType(); - } - - /** - * @return The unique id of the alert action entry - */ - public String getId() { - return id; - } - - void setId(String id) { - this.id = id; - } - - public void execution(Alert alert, AlertsService.AlertRun alertRun) { - triggerResult = alertRun.evaluation(); - if (triggerResult.triggered()) { - if (triggerResult.throttled()) { - if (alert.getStatus() != Alert.Status.NOT_TRIGGERED) { - state = AlertActionState.THROTTLED; - } - } else if (state != AlertActionState.THROTTLED) { - state = AlertActionState.ACTION_PERFORMED; - } - payload = alertRun.payload(); - } else { - state = AlertActionState.NO_ACTION_NEEDED; - } - } - - /** - * @return The time the alert was scheduled to be triggered - */ - public DateTime getScheduledTime() { - return scheduledTime; - } - - void setScheduledTime(DateTime scheduledTime) { - this.scheduledTime = scheduledTime; - } - - /** - * @return The name of the alert that triggered - */ - public String getName() { - return name; - } - - void setName(String name) { - this.name = name; - } - - /** - * @return The time the alert actually ran. - */ - public DateTime getFireTime() { - return fireTime; - } - - void setFireTime(DateTime fireTime) { - this.fireTime = fireTime; - } - - /** - * @return The trigger that evaluated the search response - */ - public Trigger getTrigger() { - return trigger; - } - - void setTrigger(Trigger trigger) { - this.trigger = trigger; - } - - public Trigger.Result triggerEvaluation() { - return triggerResult; - } - - public void triggerEvaluation(Trigger.Result result) { - this.triggerResult = result; - } - - /** - * @return The list of actions that ran if the search response matched with the trigger - */ - public List getActions() { - return actions; - } - - void setActions(AlertActions actions) { - this.actions = actions; - } - - /** - * @return The current state of the alert event. - */ - public AlertActionState getState() { - return state; - } - - void setState(AlertActionState state) { - this.state = state; - } - - public long getVersion() { - return version; - } - - void setVersion(long version) { - this.version = version; - } - - /** - * @return xcontext type of the _source of this action entry. - */ - public XContentType getContentType() { - return contentType; - } - - void setContentType(XContentType contentType) { - this.contentType = contentType; - } - - /** - * @return The error if an error occurred otherwise null - */ - public String getErrorMsg(){ - return this.errorMsg; - } - - void setErrorMsg(String errorMsg) { - this.errorMsg = errorMsg; - } - - /** - * @return The metadata that was associated with the alert when this entry was created - */ - public Map getMetadata() { - return metadata; - } - - void setMetadata(Map metadata) { - this.metadata = metadata; - } - - public Payload payload() { - return payload; - } - - public void payload(Payload payload) { - this.payload = payload; - } - - - @Override - public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException { - historyEntry.startObject(); - historyEntry.field("alert_name", name); - historyEntry.field("triggered", triggerResult.triggered()); - historyEntry.field("fire_time", fireTime.toDateTimeISO()); - historyEntry.field(HistoryService.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO()); - historyEntry.field("trigger"); - historyEntry.startObject(); - historyEntry.field(trigger.type(), trigger, params); - historyEntry.endObject(); - historyEntry.field("trigger_response", triggerResult.data()); - - - if (payload != null) { - historyEntry.field("payload_request"); - AlertUtils.writeSearchRequest(payload.request(), historyEntry, params); - historyEntry.field("payload_response", payload.data()); - } - - historyEntry.startObject("actions"); - for (AlertAction action : actions) { - historyEntry.field(action.getActionName()); - action.toXContent(historyEntry, params); - } - historyEntry.endObject(); - historyEntry.field(HistoryService.STATE, state.toString()); - - if (errorMsg != null) { - historyEntry.field("error_msg", errorMsg); - } - - if (metadata != null) { - historyEntry.field("meta", metadata); - } - - historyEntry.endObject(); - - return historyEntry; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - AlertRecord entry = (AlertRecord) o; - if (!id.equals(entry.id)) return false; - - return true; - } - - @Override - public int hashCode() { - return id.hashCode(); - } -} diff --git a/src/main/java/org/elasticsearch/alerts/history/FiredAlert.java b/src/main/java/org/elasticsearch/alerts/history/FiredAlert.java new file mode 100644 index 00000000000..e7cea346abd --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/history/FiredAlert.java @@ -0,0 +1,352 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.history; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.alerts.Alert; +import org.elasticsearch.alerts.AlertsService; +import org.elasticsearch.alerts.actions.ActionRegistry; +import org.elasticsearch.alerts.actions.AlertActions; +import org.elasticsearch.alerts.payload.Payload; +import org.elasticsearch.alerts.payload.PayloadRegistry; +import org.elasticsearch.alerts.trigger.Trigger; +import org.elasticsearch.alerts.trigger.TriggerRegistry; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Map; + +public class FiredAlert implements ToXContent { + + private String id; + private String name; + private DateTime fireTime; + private DateTime scheduledTime; + private Trigger trigger; + private AlertActions actions; + private State state; + + /*Optional*/ + private Payload payload; + private String errorMessage; + private Map metadata; + + // During an fired alert execution we use this and then we store it with the history, after that we don't use it. + // We store it because it may end up being useful for debug / history purposes + private transient AlertsService.AlertRun alertRun; + // Used for assertion purposes, so we can ensure/test what we have loaded in memory is the same as what is persisted. + private transient long version; + + FiredAlert() { + } + + public FiredAlert(Alert alert, DateTime scheduledTime, DateTime fireTime, State state) { + this.id = alert.name() + "#" + scheduledTime.toDateTimeISO(); + this.name = alert.name(); + this.fireTime = fireTime; + this.scheduledTime = scheduledTime; + this.trigger = alert.trigger(); + this.actions = alert.actions(); + this.state = state; + this.metadata = alert.metadata(); + this.version = 1; + } + + public String id() { + return id; + } + + public void id(String id) { + this.id = id; + } + + // TODO: Maybe pull into history service? + public void update(Alert alert, AlertsService.AlertRun alertRun) { + this.alertRun = alertRun; + if (alertRun.triggerResult().triggered()) { + if (alertRun.throttleResult().throttle()) { + if (alert.status().state() != Alert.Status.State.NOT_EXECUTED) { + state = State.THROTTLED; + } + } else if (state != State.THROTTLED) { + state = State.ACTION_PERFORMED; + } + payload = alert.payload(); + } else { + state = State.NO_ACTION_NEEDED; + } + } + + public DateTime scheduledTime() { + return scheduledTime; + } + + public void scheduledTime(DateTime scheduledTime) { + this.scheduledTime = scheduledTime; + } + + public String name() { + return name; + } + + public void name(String name) { + this.name = name; + } + + public DateTime fireTime() { + return fireTime; + } + + public void fireTime(DateTime fireTime) { + this.fireTime = fireTime; + } + + public Trigger trigger() { + return trigger; + } + + public void trigger(Trigger trigger) { + this.trigger = trigger; + } + + public AlertActions actions() { + return actions; + } + + public void actions(AlertActions actions) { + this.actions = actions; + } + + public State state() { + return state; + } + + public void state(State state) { + this.state = state; + } + + public long version() { + return version; + } + + public void version(long version) { + this.version = version; + } + + public String errorMessage(){ + return this.errorMessage; + } + + public void errorMsg(String errorMessage) { + this.errorMessage = errorMessage; + } + + public Map metadata() { + return metadata; + } + + public void metadata(Map metadata) { + this.metadata = metadata; + } + + public Payload payload() { + return payload; + } + + public void payload(Payload payload) { + this.payload = payload; + } + + @Override + public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException { + historyEntry.startObject(); + historyEntry.field(Parser.ALERT_NAME_FIELD.getPreferredName(), name); + historyEntry.field(Parser.FIRE_TIME_FIELD.getPreferredName(), fireTime.toDateTimeISO()); + historyEntry.field(Parser.SCHEDULED_FIRE_TIME_FIELD.getPreferredName(), scheduledTime.toDateTimeISO()); + historyEntry.field(Parser.TRIGGER_FIELD.getPreferredName(), trigger, params); + historyEntry.field(Parser.ACTIONS_FIELD.getPreferredName(), actions, params); + historyEntry.field(Parser.STATE_FIELD.getPreferredName(), state.toString()); + + if (payload != null) { + historyEntry.field(Parser.PAYLOAD_FIELD.getPreferredName(), payload, params); + } + if (errorMessage != null) { + historyEntry.field(Parser.ERROR_MESSAGE_FIELD.getPreferredName(), errorMessage); + } + if (metadata != null) { + historyEntry.field(Parser.METADATA_FIELD.getPreferredName(), metadata); + } + // TODO: maybe let AlertRun implement ToXContent? + if (alertRun != null) { + historyEntry.field(Parser.TRIGGER_RESPONSE.getPreferredName(), alertRun.triggerResult().data()); + historyEntry.field(Parser.PAYLOAD_RESPONSE.getPreferredName(), alertRun.data()); + } + + historyEntry.endObject(); + return historyEntry; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FiredAlert entry = (FiredAlert) o; + if (!id.equals(entry.id)) return false; + + return true; + } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return id; + } + + public enum State { + + AWAITS_RUN, + RUNNING, + NO_ACTION_NEEDED, + ACTION_PERFORMED, + FAILED, + THROTTLED; + + @Override + public String toString(){ + switch (this) { + case AWAITS_RUN: + return "AWAITS_RUN"; + case RUNNING: + return "RUNNING"; + case NO_ACTION_NEEDED: + return "NO_ACTION_NEEDED"; + case ACTION_PERFORMED: + return "ACTION_PERFORMED"; + case FAILED: + return "FAILED"; + case THROTTLED: + return "THROTTLED"; + default: + return "NO_ACTION_NEEDED"; + } + } + + public static State fromString(String s) { + switch(s.toUpperCase()) { + case "AWAITS_RUN": + return AWAITS_RUN; + case "RUNNING": + return RUNNING; + case "NO_ACTION_NEEDED": + return NO_ACTION_NEEDED; + case "ACTION_UNDERWAY": + return ACTION_PERFORMED; + case "FAILED": + return FAILED; + case "THROTTLED": + return THROTTLED; + default: + throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" ); + } + } + + } + + public static class Parser extends AbstractComponent { + + public static final ParseField ALERT_NAME_FIELD = new ParseField("alert_name"); + public static final ParseField FIRE_TIME_FIELD = new ParseField("fire_time"); + public static final ParseField SCHEDULED_FIRE_TIME_FIELD = new ParseField("scheduled_fire_time"); + public static final ParseField ERROR_MESSAGE_FIELD = new ParseField("error_msg"); + public static final ParseField TRIGGER_FIELD = new ParseField("trigger"); + public static final ParseField TRIGGER_RESPONSE = new ParseField("trigger_response"); + public static final ParseField PAYLOAD_FIELD = new ParseField("payload_request"); + public static final ParseField PAYLOAD_RESPONSE = new ParseField("payload_response"); + public static final ParseField ACTIONS_FIELD = new ParseField("actions"); + public static final ParseField STATE_FIELD = new ParseField("state"); + public static final ParseField METADATA_FIELD = new ParseField("meta"); + + private final TriggerRegistry triggerRegistry; + private final PayloadRegistry payloadRegistry; + private final ActionRegistry actionRegistry; + + @Inject + public Parser(Settings settings, TriggerRegistry triggerRegistry, PayloadRegistry payloadRegistry, ActionRegistry actionRegistry) { + super(settings); + this.triggerRegistry = triggerRegistry; + this.payloadRegistry = payloadRegistry; + this.actionRegistry = actionRegistry; + } + + public FiredAlert parse(BytesReference source, String historyId, long version) { + try (XContentParser parser = XContentHelper.createParser(source)) { + return parse(parser, historyId, version); + } catch (IOException e) { + throw new ElasticsearchException("Error during parsing alert record", e); + } + } + + public FiredAlert parse(XContentParser parser, String id, long version) throws IOException { + FiredAlert entry = new FiredAlert(); + entry.id(id); + entry.version(version); + + String currentFieldName = null; + XContentParser.Token token = parser.nextToken(); + assert token == XContentParser.Token.START_OBJECT; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.START_OBJECT) { + if (ACTIONS_FIELD.match(currentFieldName)) { + entry.actions(actionRegistry.parseActions(parser)); + } else if (TRIGGER_FIELD.match(currentFieldName)) { + entry.trigger(triggerRegistry.parse(parser)); + } else if (PAYLOAD_FIELD.match(currentFieldName)) { + entry.payload(payloadRegistry.parse(parser)); + } else if (METADATA_FIELD.match(currentFieldName)) { + entry.metadata(parser.map()); + } else { + throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); + } + } else if (token.isValue()) { + if (ALERT_NAME_FIELD.match(currentFieldName)) { + entry.name(parser.text()); + } else if (FIRE_TIME_FIELD.match(currentFieldName)) { + entry.fireTime(DateTime.parse(parser.text())); + } else if (SCHEDULED_FIRE_TIME_FIELD.match(currentFieldName)) { + entry.scheduledTime(DateTime.parse(parser.text())); + } else if (ERROR_MESSAGE_FIELD.match(currentFieldName)) { + entry.errorMsg(parser.textOrNull()); + } else if (STATE_FIELD.match(currentFieldName)) { + entry.state(State.fromString(parser.text())); + } else { + throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); + } + } else { + throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "] for [" + currentFieldName + "]"); + } + } + + return entry; + } + } +} diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryException.java b/src/main/java/org/elasticsearch/alerts/history/HistoryException.java new file mode 100644 index 00000000000..17dc1810c2a --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryException.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.history; + +import org.elasticsearch.alerts.AlertsException; + +/** + */ +public class HistoryException extends AlertsException { + + public HistoryException(String msg) { + super(msg); + } + + public HistoryException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryModule.java b/src/main/java/org/elasticsearch/alerts/history/HistoryModule.java new file mode 100644 index 00000000000..6b015239c7d --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryModule.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.history; + +import org.elasticsearch.common.inject.AbstractModule; + +/** + */ +public class HistoryModule extends AbstractModule { + + @Override + protected void configure() { + bind(FiredAlert.Parser.class).asEagerSingleton(); + bind(HistoryStore.class).asEagerSingleton(); + bind(HistoryService.class).asEagerSingleton(); + } +} diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java index 006353b333c..291bb4db6d6 100644 --- a/src/main/java/org/elasticsearch/alerts/history/HistoryService.java +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryService.java @@ -5,45 +5,19 @@ */ package org.elasticsearch.alerts.history; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalStateException; -import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; -import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.AlertsPlugin; import org.elasticsearch.alerts.AlertsService; import org.elasticsearch.alerts.AlertsStore; -import org.elasticsearch.alerts.actions.ActionRegistry; -import org.elasticsearch.alerts.actions.AlertActionState; -import org.elasticsearch.alerts.support.AlertUtils; -import org.elasticsearch.alerts.support.TemplateUtils; -import org.elasticsearch.alerts.support.init.proxy.ClientProxy; -import org.elasticsearch.alerts.trigger.TriggerRegistry; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; -import org.elasticsearch.common.joda.time.format.DateTimeFormat; -import org.elasticsearch.common.joda.time.format.DateTimeFormatter; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,52 +27,22 @@ import java.util.concurrent.atomic.AtomicLong; */ public class HistoryService extends AbstractComponent { - public static final String ALERT_NAME_FIELD = "alert_name"; - public static final String TRIGGERED_FIELD = "triggered"; - public static final String FIRE_TIME_FIELD = "fire_time"; - public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduled_fire_time"; - public static final String ERROR_MESSAGE = "error_msg"; - public static final String TRIGGER_FIELD = "trigger"; - public static final String TRIGGER_REQUEST = "trigger_request"; - public static final String TRIGGER_RESPONSE = "trigger_response"; - public static final String PAYLOAD_REQUEST = "payload_request"; - public static final String PAYLOAD_RESPONSE = "payload_response"; - public static final String ACTIONS_FIELD = "actions"; - public static final String STATE = "state"; - public static final String METADATA = "meta"; - - public static final String ALERT_HISTORY_INDEX_PREFIX = ".alert_history_"; - public static final DateTimeFormatter alertHistoryIndexTimeFormat = DateTimeFormat.forPattern("YYYY-MM-dd"); - public static final String ALERT_HISTORY_TYPE = "alerthistory"; - - private final ClientProxy client; + private final HistoryStore historyStore; private AlertsService alertsService; private final ThreadPool threadPool; private final AlertsStore alertsStore; - private final TriggerRegistry triggerRegistry; - private final TemplateUtils templateUtils; - - private final int scrollSize; - private final TimeValue scrollTimeout; private final AtomicLong largestQueueSize = new AtomicLong(0); private final AtomicBoolean started = new AtomicBoolean(false); - private final BlockingQueue actionsToBeProcessed = new LinkedBlockingQueue<>(); + private final BlockingQueue actionsToBeProcessed = new LinkedBlockingQueue<>(); private volatile Thread queueReaderThread; @Inject - public HistoryService(Settings settings, ClientProxy client, ThreadPool threadPool, AlertsStore alertsStore, - TriggerRegistry triggerRegistry, TemplateUtils templateUtils) { + public HistoryService(Settings settings, HistoryStore historyStore, ThreadPool threadPool, AlertsStore alertsStore) { super(settings); - this.client = client; + this.historyStore = historyStore; this.threadPool = threadPool; this.alertsStore = alertsStore; - this.triggerRegistry = triggerRegistry; - this.templateUtils = templateUtils; - // Not using component settings, to let AlertsStore and AlertActionManager share the same settings - this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); - this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); - } public void setAlertsService(AlertsService alertsService){ @@ -110,36 +54,19 @@ public class HistoryService extends AbstractComponent { return true; } - String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); - if (indices.length == 0) { - logger.info("No previous .alerthistory index, skip loading of alert actions"); - templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); + assert actionsToBeProcessed.isEmpty() : "Queue should be empty, but contains " + actionsToBeProcessed.size() + " elements."; + HistoryStore.LoadResult loadResult = historyStore.loadFiredAlerts(state); + if (loadResult.succeeded()) { + if (!loadResult.notRanFiredAlerts().isEmpty()) { + actionsToBeProcessed.addAll(loadResult.notRanFiredAlerts()); + logger.debug("Loaded [{}] actions from the alert history index into actions queue", actionsToBeProcessed.size()); + largestQueueSize.set(actionsToBeProcessed.size()); + } doStart(); return true; - } - int numPrimaryShards = 0; - for (String index : indices) { - IndexMetaData indexMetaData = state.getMetaData().index(index); - if (indexMetaData != null) { - if (!state.routingTable().index(index).allPrimaryShardsActive()) { - logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index); - return false; - } else { - numPrimaryShards += indexMetaData.numberOfShards(); - } - } - } - - try { - loadQueue(numPrimaryShards); - } catch (Exception e) { - logger.warn("Failed to load unfinished alert actions. Schedule to retry alert action loading...", e); - actionsToBeProcessed.clear(); + } else { return false; } - templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); - doStart(); - return true; } public void stop() { @@ -163,136 +90,14 @@ public class HistoryService extends AbstractComponent { return started.get(); } - /** - * Calculates the correct alert history index name for a given time using alertHistoryIndexTimeFormat - */ - public static String getAlertHistoryIndexNameForTime(DateTime time) { - return ALERT_HISTORY_INDEX_PREFIX + alertHistoryIndexTimeFormat.print(time); - } - - private void loadQueue(int numPrimaryShards) { - assert actionsToBeProcessed.isEmpty() : "Queue should be empty, but contains " + actionsToBeProcessed.size() + " elements."; - RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet(); - if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { - throw new ElasticsearchException("Not all shards have been refreshed"); - } - - SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*") - .setQuery(QueryBuilders.termQuery(STATE, AlertActionState.SEARCH_NEEDED.toString())) - .setSearchType(SearchType.SCAN) - .setScroll(scrollTimeout) - .setSize(scrollSize) - .setTypes(ALERT_HISTORY_TYPE) - .setPreference("_primary") - .get(); - try { - if (response.getTotalShards() != response.getSuccessfulShards()) { - throw new ElasticsearchException("Partial response while loading alert actions"); - } - - if (response.getHits().getTotalHits() > 0) { - response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); - while (response.getHits().hits().length != 0) { - for (SearchHit sh : response.getHits()) { - String historyId = sh.getId(); - AlertRecord historyEntry = parseHistory(historyId, sh.getSourceRef(), sh.version(), actionRegistry); - assert historyEntry.getState() == AlertActionState.SEARCH_NEEDED; - logger.debug("Adding entry: [{}/{}/{}]", sh.index(), sh.type(), sh.id()); - actionsToBeProcessed.add(historyEntry); - } - response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); - } - } - } finally { - client.prepareClearScroll().addScrollId(response.getScrollId()).get(); - } - logger.info("Loaded [{}] actions from the alert history index into actions queue", actionsToBeProcessed.size()); - largestQueueSize.set(actionsToBeProcessed.size()); - } - - AlertRecord parseHistory(String historyId, BytesReference source, long version, ActionRegistry actionRegistry) { - AlertRecord entry = new AlertRecord(); - entry.setId(historyId); - entry.setVersion(version); - - try (XContentParser parser = XContentHelper.createParser(source)) { - entry.setContentType(parser.contentType()); - - String currentFieldName = null; - XContentParser.Token token = parser.nextToken(); - assert token == XContentParser.Token.START_OBJECT; - while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token == XContentParser.Token.FIELD_NAME) { - currentFieldName = parser.currentName(); - } else if (token == XContentParser.Token.START_OBJECT) { - switch (currentFieldName) { - case ACTIONS_FIELD: - entry.setActions(actionRegistry.parseActions(parser)); - break; - case TRIGGER_FIELD: - entry.setTrigger(triggerRegistry.parse(parser)); - break; - case TRIGGER_RESPONSE: - entry.setTriggerResponse(parser.map()); - break; - case PAYLOAD_REQUEST: - entry.setPayloadRequest(AlertUtils.readSearchRequest(parser)); - break; - case PAYLOAD_RESPONSE: - entry.setPayloadResponse(parser.map()); - break; - case METADATA: - entry.setMetadata(parser.map()); - default: - throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); - } - } else if (token.isValue()) { - switch (currentFieldName) { - case ALERT_NAME_FIELD: - entry.setName(parser.text()); - break; - case TRIGGERED_FIELD: - entry.setTriggered(parser.booleanValue()); - break; - case FIRE_TIME_FIELD: - entry.setFireTime(DateTime.parse(parser.text())); - break; - case SCHEDULED_FIRE_TIME_FIELD: - entry.setScheduledTime(DateTime.parse(parser.text())); - break; - case ERROR_MESSAGE: - entry.setErrorMsg(parser.textOrNull()); - break; - case STATE: - entry.setState(AlertActionState.fromString(parser.text())); - break; - default: - throw new ElasticsearchIllegalArgumentException("Unexpected field [" + currentFieldName + "]"); - } - } else { - throw new ElasticsearchIllegalArgumentException("Unexpected token [" + token + "] for [" + currentFieldName + "]"); - } - } - } catch (IOException e) { - throw new ElasticsearchException("Error during parsing alert action", e); - } - return entry; - } - - public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException { + public void alertFired(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws HistoryException { ensureStarted(); - logger.debug("Adding alert action for alert [{}]", alert.getName()); - String alertHistoryIndex = getAlertHistoryIndexNameForTime(scheduledFireTime); - AlertRecord entry = new AlertRecord(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED); - IndexResponse response = client.prepareIndex(alertHistoryIndex, ALERT_HISTORY_TYPE, entry.getId()) - .setSource(XContentFactory.contentBuilder(alert.getContentType()).value(entry)) - .setOpType(IndexRequest.OpType.CREATE) - .get(); - entry.setVersion(response.getVersion()); - logger.debug("Added alert action for alert [{}]", alert.getName()); + FiredAlert firedAlert = new FiredAlert(alert, scheduledFireTime, fireTime, FiredAlert.State.AWAITS_RUN); + logger.debug("adding fired alert [{}]", alert.name()); + historyStore.put(firedAlert); long currentSize = actionsToBeProcessed.size() + 1; - actionsToBeProcessed.add(entry); + actionsToBeProcessed.add(firedAlert); long currentLargestQueueSize = largestQueueSize.get(); boolean done = false; while (!done) { @@ -305,23 +110,6 @@ public class HistoryService extends AbstractComponent { } } - - private void updateHistoryEntry(AlertRecord entry) throws IOException { - ensureStarted(); - logger.debug("Updating alert action [{}]", entry.getId()); - IndexResponse response = client.prepareIndex(getAlertHistoryIndexNameForTime(entry.getScheduledTime()), ALERT_HISTORY_TYPE, entry.getId()) - .setSource(XContentFactory.contentBuilder(entry.getContentType()).value(entry)) - .get(); - logger.debug("Updated alert action [{}]", entry.getId()); - entry.setVersion(response.getVersion()); - } - - - private void updateHistoryEntry(AlertRecord entry, AlertActionState actionPerformed) throws IOException { - entry.setState(actionPerformed); - updateHistoryEntry(entry); - } - public long getQueueSize() { return actionsToBeProcessed.size(); } @@ -350,37 +138,40 @@ public class HistoryService extends AbstractComponent { private class AlertHistoryRunnable implements Runnable { - private final AlertRecord entry; + private final FiredAlert alert; - private AlertHistoryRunnable(AlertRecord entry) { - this.entry = entry; + private AlertHistoryRunnable(FiredAlert alert) { + this.alert = alert; } @Override public void run() { try { - Alert alert = alertsStore.getAlert(entry.getName()); + Alert alert = alertsStore.getAlert(this.alert.name()); if (alert == null) { - entry.setErrorMsg("Alert was not found in the alerts store"); - updateHistoryEntry(entry, AlertActionState.ERROR); + this.alert.errorMsg("alert was not found in the alerts store"); + this.alert.state(FiredAlert.State.FAILED); + historyStore.update(this.alert); return; } - updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY); - logger.debug("Running an alert action entry for [{}]", entry.getName()); - AlertsService.AlertRun alertRun = alertsService.runAlert(entry); - entry.execution(alert, alertRun); - updateHistoryEntry(entry); + this.alert.state(FiredAlert.State.RUNNING); + historyStore.update(this.alert); + logger.debug("running an alert [{}]", this.alert.name()); + AlertsService.AlertRun alertRun = alertsService.runAlert(this.alert); + this.alert.update(alert, alertRun); + historyStore.update(this.alert); } catch (Exception e) { if (started()) { - logger.warn("Failed to execute alert action", e); + logger.warn("failed to run alert [{}]", e, alert.name()); try { - entry.setErrorMsg(e.getMessage()); - updateHistoryEntry(entry, AlertActionState.ERROR); + alert.errorMsg(e.getMessage()); + alert.state(FiredAlert.State.FAILED); + historyStore.update(alert); } catch (Exception e2) { - logger.error("Failed to update action history entry with the error message", e2); + logger.error("failed to update fired alert [{}] with the error message", e2, alert); } } else { - logger.debug("Failed to execute alert action after shutdown", e); + logger.debug("failed to execute fired alert [{}] after shutdown", e, alert); } } } @@ -391,19 +182,19 @@ public class HistoryService extends AbstractComponent { @Override public void run() { try { - logger.debug("Starting thread to read from the job queue"); + logger.debug("starting thread to read from the job queue"); while (started()) { - AlertRecord entry = actionsToBeProcessed.take(); - if (entry != null) { - threadPool.executor(AlertsPlugin.NAME).execute(new AlertHistoryRunnable(entry)); + FiredAlert alert = actionsToBeProcessed.take(); + if (alert != null) { + threadPool.executor(AlertsPlugin.NAME).execute(new AlertHistoryRunnable(alert)); } } } catch (Exception e) { if (started()) { - logger.error("Error during reader thread, restarting queue reader thread...", e); + logger.error("error during reader thread, restarting queue reader thread...", e); startQueueReaderThread(); } else { - logger.error("Error during reader thread", e); + logger.error("error during reader thread", e); } } } diff --git a/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java new file mode 100644 index 00000000000..6512f4edf26 --- /dev/null +++ b/src/main/java/org/elasticsearch/alerts/history/HistoryStore.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.alerts.history; + +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.alerts.support.TemplateUtils; +import org.elasticsearch.alerts.support.init.proxy.ClientProxy; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.joda.time.DateTime; +import org.elasticsearch.common.joda.time.format.DateTimeFormat; +import org.elasticsearch.common.joda.time.format.DateTimeFormatter; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + */ +public class HistoryStore extends AbstractComponent { + + static final DateTimeFormatter alertHistoryIndexTimeFormat = DateTimeFormat.forPattern("YYYY-MM-dd"); + static final String ALERT_HISTORY_INDEX_PREFIX = ".alert_history_"; + static final String ALERT_HISTORY_TYPE = "alerthistory"; + + private final ClientProxy client; + private final TemplateUtils templateUtils; + private final int scrollSize; + private final TimeValue scrollTimeout; + private final FiredAlert.Parser alertRecordParser; + + @Inject + public HistoryStore(Settings settings, ClientProxy client, TemplateUtils templateUtils, FiredAlert.Parser alertRecordParser) { + super(settings); + this.client = client; + this.templateUtils = templateUtils; + this.alertRecordParser = alertRecordParser; + this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); + this.scrollSize = settings.getAsInt("alerts.scroll.size", 100); + } + + public void put(FiredAlert firedAlert) throws HistoryException { + String alertHistoryIndex = getAlertHistoryIndexNameForTime(firedAlert.scheduledTime()); + try { + IndexResponse response = client.prepareIndex(alertHistoryIndex, ALERT_HISTORY_TYPE, firedAlert.id()) + .setSource(XContentFactory.jsonBuilder().value(firedAlert)) + .setOpType(IndexRequest.OpType.CREATE) + .get(); + firedAlert.version(response.getVersion()); + } catch (IOException e) { + throw new HistoryException("persisting new fired alert [" + firedAlert + "] failed", e); + } + } + + public void update(FiredAlert firedAlert) throws HistoryException { + logger.debug("updating fired alert [{}]", firedAlert); + try { + IndexResponse response = client.prepareIndex(getAlertHistoryIndexNameForTime(firedAlert.scheduledTime()), ALERT_HISTORY_TYPE, firedAlert.id()) + .setSource(XContentFactory.jsonBuilder().value(firedAlert)) + .get(); + firedAlert.version(response.getVersion()); + logger.debug("updated fired alert [{}]", firedAlert); + } catch (IOException e) { + throw new HistoryException("persisting fired alert [" + firedAlert + "] failed", e); + } + } + + public LoadResult loadFiredAlerts(ClusterState state) { + String[] indices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), ALERT_HISTORY_INDEX_PREFIX + "*"); + if (indices.length == 0) { + logger.info("No previous .alerthistory index, skip loading of alert actions"); + templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); + return new LoadResult(true); + } + int numPrimaryShards = 0; + for (String index : indices) { + IndexMetaData indexMetaData = state.getMetaData().index(index); + if (indexMetaData != null) { + if (!state.routingTable().index(index).allPrimaryShardsActive()) { + logger.warn("Not all primary shards of the [{}] index are started. Schedule to retry alert action loading..", index); + return new LoadResult(false); + } else { + numPrimaryShards += indexMetaData.numberOfShards(); + } + } + } + + RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet(); + if (refreshResponse.getSuccessfulShards() < numPrimaryShards) { + return new LoadResult(false); + } + + SearchResponse response = client.prepareSearch(ALERT_HISTORY_INDEX_PREFIX + "*") + .setQuery(QueryBuilders.termQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.AWAITS_RUN.toString())) + .setSearchType(SearchType.SCAN) + .setScroll(scrollTimeout) + .setSize(scrollSize) + .setTypes(ALERT_HISTORY_TYPE) + .setPreference("_primary") + .get(); + List alerts = new ArrayList<>(); + try { + if (response.getTotalShards() != response.getSuccessfulShards()) { + return new LoadResult(false); + } + + if (response.getHits().getTotalHits() > 0) { + response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); + while (response.getHits().hits().length != 0) { + for (SearchHit sh : response.getHits()) { + String historyId = sh.getId(); + FiredAlert historyEntry = alertRecordParser.parse(sh.getSourceRef(), historyId, sh.version()); + assert historyEntry.state() == FiredAlert.State.AWAITS_RUN; + logger.debug("loaded fired alert from index [{}/{}/{}]", sh.index(), sh.type(), sh.id()); + alerts.add(historyEntry); + } + response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); + } + } + } finally { + client.prepareClearScroll().addScrollId(response.getScrollId()).get(); + } + templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory"); + return new LoadResult(true, alerts); + } + + /** + * Calculates the correct alert history index name for a given time using alertHistoryIndexTimeFormat + */ + static String getAlertHistoryIndexNameForTime(DateTime time) { + return ALERT_HISTORY_INDEX_PREFIX + alertHistoryIndexTimeFormat.print(time); + } + + public class LoadResult { + + private final boolean succeeded; + private final List notRanFiredAlerts; + + public LoadResult(boolean succeeded, List notRanFiredAlerts) { + this.succeeded = succeeded; + this.notRanFiredAlerts = notRanFiredAlerts; + } + + public LoadResult(boolean succeeded) { + this.succeeded = succeeded; + this.notRanFiredAlerts = Collections.emptyList(); + } + + public boolean succeeded() { + return succeeded; + } + + public List notRanFiredAlerts() { + return notRanFiredAlerts; + } + } +} diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index 83a7d07a0ce..f031cf4841a 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -8,8 +8,8 @@ package org.elasticsearch.alerts; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.alerts.history.FiredAlert; import org.elasticsearch.alerts.history.HistoryService; -import org.elasticsearch.alerts.actions.AlertActionState; import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.support.AlertUtils; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; @@ -178,7 +178,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest SearchResponse searchResponse = client().prepareSearch(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*") .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) + .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.ACTION_PERFORMED.toString()))) .get(); assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed)); if (assertTriggerSearchMatched) { @@ -191,7 +191,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest protected long findNumberOfPerformedActions(String alertName) { SearchResponse searchResponse = client().prepareSearch(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*") .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) + .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.ACTION_PERFORMED.toString()))) .get(); return searchResponse.getHits().getTotalHits(); } @@ -212,7 +212,7 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest SearchResponse searchResponse = client().prepareSearch(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*") .setIndicesOptions(IndicesOptions.lenientExpandOpen()) - .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.NO_ACTION_NEEDED.toString()))) + .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", FiredAlert.State.NO_ACTION_NEEDED.toString()))) .get(); assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(expectedAlertActionsWithNoActionNeeded)); } diff --git a/src/test/java/org/elasticsearch/alerts/AlertThrottleTests.java b/src/test/java/org/elasticsearch/alerts/AlertThrottleTests.java index c42502be448..84516d60428 100644 --- a/src/test/java/org/elasticsearch/alerts/AlertThrottleTests.java +++ b/src/test/java/org/elasticsearch/alerts/AlertThrottleTests.java @@ -10,14 +10,13 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.alerts.history.FiredAlert; import org.elasticsearch.alerts.history.HistoryService; -import org.elasticsearch.alerts.actions.AlertActionState; import org.elasticsearch.alerts.actions.IndexAlertAction; import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.transport.actions.ack.AckAlertResponse; import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse; import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse; -import org.elasticsearch.alerts.triggers.ScriptTrigger; import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; @@ -51,7 +50,7 @@ public class AlertThrottleTests extends AbstractAlertingTests { refresh(); Alert alert = new Alert(); - alert.setStatus(Alert.Status.NOT_TRIGGERED); + alert.setStatus(Alert.Status.State.NOT_TRIGGERED); alert.setTriggerSearchRequest(createTriggerSearchRequest("test-index").source(searchSource().query(matchAllQuery()))); alert.setTrigger(new ScriptTrigger("hits.total > 0", ScriptService.ScriptType.INLINE, "groovy")); @@ -106,7 +105,7 @@ public class AlertThrottleTests extends AbstractAlertingTests { CountResponse countOfThrottledActions = client() .prepareCount(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*") - .setQuery(QueryBuilders.matchQuery(HistoryService.STATE, AlertActionState.THROTTLED.toString())) + .setQuery(QueryBuilders.matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.toString())) .get(); assertThat(countOfThrottledActions.getCount(), greaterThan(0L)); } @@ -178,7 +177,7 @@ public class AlertThrottleTests extends AbstractAlertingTests { CountResponse countOfThrottledActions = client() .prepareCount(HistoryService.ALERT_HISTORY_INDEX_PREFIX + "*") - .setQuery(QueryBuilders.matchQuery(HistoryService.STATE, AlertActionState.THROTTLED.toString())) + .setQuery(QueryBuilders.matchQuery(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.THROTTLED.toString())) .get(); assertThat(countOfThrottledActions.getCount(), greaterThan(0L)); } diff --git a/src/test/java/org/elasticsearch/alerts/BootStrapTest.java b/src/test/java/org/elasticsearch/alerts/BootStrapTest.java index 55823f27900..016d3cbcc16 100644 --- a/src/test/java/org/elasticsearch/alerts/BootStrapTest.java +++ b/src/test/java/org/elasticsearch/alerts/BootStrapTest.java @@ -9,9 +9,9 @@ import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.alerts.actions.AlertAction; -import org.elasticsearch.alerts.history.AlertRecord; +import org.elasticsearch.alerts.history.FiredAlert; import org.elasticsearch.alerts.history.HistoryService; -import org.elasticsearch.alerts.actions.AlertActionState; +import org.elasticsearch.alerts.history.HistoryStore; import org.elasticsearch.alerts.transport.actions.stats.AlertsStatsResponse; import org.elasticsearch.alerts.triggers.ScriptTrigger; import org.elasticsearch.common.bytes.BytesReference; @@ -76,14 +76,14 @@ public class BootStrapTest extends AbstractAlertingTests { Alert.Status.NOT_ACKABLE); DateTime scheduledFireTime = new DateTime(); - AlertRecord entry = new AlertRecord(alert, scheduledFireTime, scheduledFireTime, AlertActionState.SEARCH_NEEDED); - String actionHistoryIndex = HistoryService.getAlertHistoryIndexNameForTime(scheduledFireTime); + FiredAlert entry = new FiredAlert(alert, scheduledFireTime, scheduledFireTime, FiredAlert.State.AWAITS_RUN); + String actionHistoryIndex = HistoryStore.getAlertHistoryIndexNameForTime(scheduledFireTime); createIndex(actionHistoryIndex); ensureGreen(actionHistoryIndex); logger.info("Created index {}", actionHistoryIndex); - IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryService.ALERT_HISTORY_TYPE, entry.getId()) + IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryService.ALERT_HISTORY_TYPE, entry.id()) .setConsistencyLevel(WriteConsistencyLevel.ALL) .setSource(XContentFactory.jsonBuilder().value(entry)) .get(); @@ -109,7 +109,7 @@ public class BootStrapTest extends AbstractAlertingTests { for (int i = 0; i < numberOfAlertHistoryIndices; i++) { DateTime historyIndexDate = now.minus((new TimeValue(i, TimeUnit.DAYS)).getMillis()); - String actionHistoryIndex = HistoryService.getAlertHistoryIndexNameForTime(historyIndexDate); + String actionHistoryIndex = HistoryStore.getAlertHistoryIndexNameForTime(historyIndexDate); createIndex(actionHistoryIndex); ensureGreen(actionHistoryIndex); logger.info("Created index {}", actionHistoryIndex); @@ -123,10 +123,10 @@ public class BootStrapTest extends AbstractAlertingTests { new DateTime(), 0, new TimeValue(0), - Alert.Status.NOT_ACKABLE); + Alert.Status.State.NOT_ACKABLE); - AlertRecord entry = new AlertRecord(alert, historyIndexDate, historyIndexDate, AlertActionState.SEARCH_NEEDED); - IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryService.ALERT_HISTORY_TYPE, entry.getId()) + FiredAlert entry = new FiredAlert(alert, historyIndexDate, historyIndexDate, FiredAlert.State.AWAITS_RUN); + IndexResponse indexResponse = client().prepareIndex(actionHistoryIndex, HistoryService.ALERT_HISTORY_TYPE, entry.id()) .setConsistencyLevel(WriteConsistencyLevel.ALL) .setSource(XContentFactory.jsonBuilder().value(entry)) .get(); diff --git a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java index 1e8cd1dc0d9..ffea1664c1f 100644 --- a/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java +++ b/src/test/java/org/elasticsearch/alerts/actions/AlertActionsTest.java @@ -12,7 +12,7 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.alerts.*; import org.elasticsearch.alerts.client.AlertsClient; import org.elasticsearch.alerts.history.HistoryService; -import org.elasticsearch.alerts.history.AlertRecord; +import org.elasticsearch.alerts.history.FiredAlert; import org.elasticsearch.alerts.support.AlertUtils; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse; @@ -69,34 +69,34 @@ public class AlertActionsTest extends AbstractAlertingTests { XContentBuilder builder = jsonBuilder(); builder.startObject(); - builder.field(HistoryService.ALERT_NAME_FIELD, "testName"); - builder.field(HistoryService.TRIGGERED_FIELD, true); - builder.field(HistoryService.FIRE_TIME_FIELD, AlertUtils.dateTimeFormatter.printer().print(fireTime)); - builder.field(HistoryService.SCHEDULED_FIRE_TIME_FIELD, AlertUtils.dateTimeFormatter.printer().print(scheduledFireTime)); - builder.field(HistoryService.TRIGGER_FIELD, triggerMap); + builder.field(FiredAlert.Parser.ALERT_NAME_FIELD.getPreferredName(), "testName"); + builder.field(FiredAlert.Parser.TRIGGERED_FIELD, true); + builder.field(FiredAlert.Parser.FIRE_TIME_FIELD.getPreferredName(), AlertUtils.dateTimeFormatter.printer().print(fireTime)); + builder.field(FiredAlert.Parser.SCHEDULED_FIRE_TIME_FIELD, AlertUtils.dateTimeFormatter.printer().print(scheduledFireTime)); + builder.field(FiredAlert.Parser.TRIGGER_FIELD.getPreferredName(), triggerMap); SearchRequest searchRequest = new SearchRequest("test123"); - builder.field(HistoryService.TRIGGER_REQUEST); + builder.field(FiredAlert.Parser.TRIGGER_REQUEST); AlertUtils.writeSearchRequest(searchRequest, builder, ToXContent.EMPTY_PARAMS); SearchResponse searchResponse = new SearchResponse( new InternalSearchResponse(new InternalSearchHits(new InternalSearchHit[0], 10, 0), null, null, null, false, false), null, 1, 1, 0, new ShardSearchFailure[0] ); - builder.startObject(HistoryService.TRIGGER_RESPONSE); + builder.startObject(FiredAlert.Parser.TRIGGER_RESPONSE.getPreferredName()); builder.value(searchResponse); builder.endObject(); - builder.field(HistoryService.ACTIONS_FIELD, actionMap); - builder.field(HistoryService.STATE, AlertActionState.SEARCH_NEEDED.toString()); + builder.field(FiredAlert.Parser.ACTIONS_FIELD.getPreferredName(), actionMap); + builder.field(FiredAlert.Parser.STATE_FIELD.getPreferredName(), FiredAlert.State.AWAITS_RUN.toString()); builder.endObject(); final ActionRegistry alertActionRegistry = internalTestCluster().getInstance(ActionRegistry.class, internalTestCluster().getMasterName()); final HistoryService alertManager = internalTestCluster().getInstance(HistoryService.class, internalTestCluster().getMasterName()); - AlertRecord actionEntry = alertManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry); - assertEquals(actionEntry.getVersion(), 0); - assertEquals(actionEntry.getName(), "testName"); + FiredAlert actionEntry = alertManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry); + assertEquals(actionEntry.version(), 0); + assertEquals(actionEntry.name(), "testName"); assertEquals(actionEntry.isTriggered(), true); - assertEquals(actionEntry.getScheduledTime(), scheduledFireTime); - assertEquals(actionEntry.getFireTime(), fireTime); - assertEquals(actionEntry.getState(), AlertActionState.SEARCH_NEEDED); + assertEquals(actionEntry.scheduledTime(), scheduledFireTime); + assertEquals(actionEntry.fireTime(), fireTime); + assertEquals(actionEntry.state(), FiredAlert.State.AWAITS_RUN); assertEquals(XContentMapValues.extractValue("hits.total", actionEntry.getTriggerResponse()), 10); } @@ -134,7 +134,7 @@ public class AlertActionsTest extends AbstractAlertingTests { } }; - AlertActionRegistry alertActionRegistry = internalTestCluster().getInstance(AlertActionRegistry.class, internalTestCluster().getMasterName()); + ActionRegistry alertActionRegistry = internalTestCluster().getInstance(ActionRegistry.class, internalTestCluster().getMasterName()); alertActionRegistry.registerAction("test", new AlertActionFactory() { @Override public AlertAction createAction(XContentParser parser) throws IOException {