mirror of
synced 2025-03-24 17:09:48 +00:00
Merge pull request elastic/elasticsearch#21 from elasticsearch/alerthistory/jobqueue
Alerthistory/jobqueue Original commit: elastic/x-pack-elasticsearch@87154dca40
This commit is contained in:
@ -22,6 +22,8 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager;
@ -31,7 +33,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableMap;
@ -46,6 +47,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
@ -59,11 +61,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class AlertManager extends AbstractLifecycleComponent {
public static final String ALERT_INDEX = ".alerts";
public static final String ALERT_TYPE = "alert";
public static final String ALERT_HISTORY_INDEX = "alerthistory";
public static final String ALERT_HISTORY_TYPE = "alertHistory";
public static final ParseField QUERY_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
@ -79,11 +79,16 @@ public class AlertManager extends AbstractLifecycleComponent {
private final Client client;
private AlertScheduler scheduler;
private final ThreadPool threadPool;
private final ConcurrentMap<String,Alert> alertMap;
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean startActions = new AtomicBoolean(false);
private AlertActionRegistry actionRegistry;
private AlertActionManager actionManager;
final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config
@ -93,10 +98,6 @@ public class AlertManager extends AbstractLifecycleComponent {
public void setActionManager(AlertActionManager actionManager){
this.actionManager = actionManager;
protected void doStart() throws ElasticsearchException {
@ -114,21 +115,25 @@ public class AlertManager extends AbstractLifecycleComponent {
public AlertManager(Settings settings, Client client, ClusterService clusterService) {
public AlertManager(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
AlertActionRegistry actionRegistry) {
logger.warn("Initing AlertManager");
this.client = client;
alertMap = ConcurrentCollections.newConcurrentMap();
clusterService.add(new AlertsClusterStateListener());
this.threadPool = threadPool;
this.actionRegistry = actionRegistry;
this.actionManager = new AlertActionManager(client, this, actionRegistry, threadPool);
public void setAlertScheduler(AlertScheduler scheduler){
this.scheduler = scheduler;
private ClusterHealthStatus createAlertsIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
ClusterHealthResponse actionGet = client.admin().cluster()
return actionGet.getStatus();
@ -144,6 +149,25 @@ public class AlertManager extends AbstractLifecycleComponent {
public void doAction(Alert alert, AlertActionEntry result, DateTime scheduledTime) {
logger.warn("We have triggered");
DateTime lastActionFire = timeActionLastTriggered(alert.alertName());
long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis();
logger.error("last action fire [{}]", lastActionFire);
logger.error("msSinceLastAction [{}]", msSinceLastAction);
if (alert.timePeriod().getMillis() > msSinceLastAction) {
logger.warn("Not firing action because it was fired in the timePeriod");
} else {
actionRegistry.doAction(alert, result);
logger.warn("Did action !");
persistAlert(alert.alertName(), alert, IndexRequest.OpType.INDEX);
public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) {
Alert indexedAlert;
try {
@ -235,6 +259,7 @@ public class AlertManager extends AbstractLifecycleComponent {
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
SearchResponse searchResponse = client.prepareSearch().setSource(
"{ \"query\" : " +
"{ \"match_all\" : {}}," +
@ -257,15 +282,11 @@ public class AlertManager extends AbstractLifecycleComponent {
return 0;
public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime, boolean firedAction) throws Exception {
public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime) throws Exception {
try {
Alert alert = getAlertForName(alertName);
XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint();
if (firedAction) {
logger.error("Fired action [{}]",firedAction);
alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS);
UpdateRequest updateRequest = new UpdateRequest();
@ -283,46 +304,6 @@ public class AlertManager extends AbstractLifecycleComponent {
public boolean addHistory(String alertName, boolean triggered,
DateTime fireTime, SearchRequestBuilder triggeringQuery,
AlertTrigger trigger, long numberOfResults,
List<AlertAction> actions,
@Nullable List<String> indices) throws Exception {
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
historyEntry.field("alertName", alertName);
historyEntry.field("triggered", triggered);
historyEntry.field("fireTime", fireTime.toDateTimeISO());
trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
historyEntry.field("queryRan", triggeringQuery.toString());
historyEntry.field("numberOfResults", numberOfResults);
for (AlertAction action : actions) {
action.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
if (indices != null) {
for (String index : indices) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.refresh(true); //Always refresh after indexing an alert
return true;
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException {
if (!started.get()) {
@ -433,12 +414,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public Alert parseAlert(String alertId, Map<String, Object> fields, long version ) {
//Map<String,SearchHitField> fields = sh.getFields();
logger.warn("Parsing : [{}]", alertId);
for (String field : fields.keySet() ) {
logger.warn("Field : [{}]", field);
String query = fields.get(QUERY_FIELD.getPreferredName()).toString();
String schedule = fields.get(SCHEDULE_FIELD.getPreferredName()).toString();
Object triggerObj = fields.get(TRIGGER_FIELD.getPreferredName());
@ -457,9 +433,9 @@ public class AlertManager extends AbstractLifecycleComponent {
List<AlertAction> actions = null;
if (actionObj instanceof Map) {
Map<String, Object> actionMap = (Map<String, Object>) actionObj;
actions = actionManager.parseActionsFromMap(actionMap);
actions = actionRegistry.parseActionsFromMap(actionMap);
} else {
throw new ElasticsearchException("Unable to parse actions [" + triggerObj + "]");
throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]");
DateTime lastRan = new DateTime(0);
@ -540,33 +516,65 @@ public class AlertManager extends AbstractLifecycleComponent {
return started.get();
public boolean addHistory(String alertName, boolean isTriggered, DateTime dateTime, DateTime scheduledTime,
SearchRequestBuilder srb, AlertTrigger trigger, long totalHits, List<AlertAction> actions,
List<String> indices) throws IOException{
return actionManager.addHistory(alertName, isTriggered, dateTime, scheduledTime, srb, trigger, totalHits, actions, indices);
private final class AlertsClusterStateListener implements ClusterStateListener {
public void clusterChanged(ClusterChangedEvent event) {
if (event.indicesDeleted().contains(ALERT_INDEX)) {
if (!event.localNodeMaster()) { //We are not the master
if (started.compareAndSet(false, true)) {
if (startActions.compareAndSet(false, true)) {
//If actionManager was running and we aren't the master stop
actionManager.doStop(); //Safe to call this multiple times, it's a noop if we are already stopped
if (!started.get()) {
IndexMetaData alertIndexMetaData = event.state().getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) {
if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
// TODO: Do on a different thread and have some kind of retry mechanism?
try {
} catch (Exception e) {
logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually");
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AlertLoader());
if (!startActions.get()) {
IndexMetaData indexMetaData = event.state().getMetaData().index(AlertActionManager.ALERT_HISTORY_INDEX);
if (indexMetaData != null) {
if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
} else {
private class AlertLoader implements Runnable {
public void run() {
// TODO: have some kind of retry mechanism?
try {
} catch (Exception e) {
logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually");
@ -1,61 +0,0 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.alerts;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.joda.time.DateTime;
import java.util.Arrays;
public class AlertResult {
public SearchResponse searchResponse;
public AlertTrigger trigger;
public String alertName;
public DateTime fireTime;
public boolean isTriggered;
public SearchRequestBuilder query;
public String[] indices;
public AlertResult(String alertName, SearchResponse searchResponse, AlertTrigger trigger, boolean isTriggered, SearchRequestBuilder query, String[] indices, DateTime fireTime) {
this.searchResponse = searchResponse;
this.trigger = trigger;
this.isTriggered = isTriggered;
this.query = query;
this.indices = indices;
this.alertName = alertName;
this.fireTime = fireTime;
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertResult that = (AlertResult) o;
if (isTriggered != that.isTriggered) return false;
if (!Arrays.equals(indices, that.indices)) return false;
if (query != null ? !query.equals(that.query) : that.query != null) return false;
if (searchResponse != null ? !searchResponse.equals(that.searchResponse) : that.searchResponse != null)
return false;
if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
return true;
public int hashCode() {
int result = searchResponse != null ? searchResponse.hashCode() : 0;
result = 31 * result + (trigger != null ? trigger.hashCode() : 0);
result = 31 * result + (isTriggered ? 1 : 0);
result = 31 * result + (query != null ? query.hashCode() : 0);
result = 31 * result + (indices != null ? Arrays.hashCode(indices) : 0);
return result;
@ -5,7 +5,7 @@
package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.rest.AlertRestHandler;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.TriggerManager;
@ -18,7 +18,7 @@ public class AlertingModule extends AbstractModule {
@ -5,7 +5,7 @@
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -15,5 +15,5 @@ public interface AlertAction extends ToXContent {
public String getActionName();
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException;
public boolean doAction(String alertName, AlertResult alert);
public boolean doAction(Alert alert, AlertActionEntry actionEntry);
@ -0,0 +1,251 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
public class AlertActionEntry implements ToXContent{
private long version;
private String alertName;
private boolean triggered;
private DateTime fireTime;
private AlertTrigger trigger;
private String triggeringQuery;
private long numberOfResults;
private List<AlertAction> actions;
private List<String> indices;
private AlertActionState entryState;
private DateTime scheduledTime;
public String getId() {
return id;
public void setId(String id) {
this.id = id;
private String id;
public DateTime getScheduledTime() {
return scheduledTime;
public void setScheduledTime(DateTime scheduledTime) {
this.scheduledTime = scheduledTime;
public String getAlertName() {
return alertName;
public void setAlertName(String alertName) {
this.alertName = alertName;
public boolean isTriggered() {
return triggered;
public void setTriggered(boolean triggered) {
this.triggered = triggered;
public DateTime getFireTime() {
return fireTime;
public void setFireTime(DateTime fireTime) {
this.fireTime = fireTime;
public AlertTrigger getTrigger() {
return trigger;
public void setTrigger(AlertTrigger trigger) {
this.trigger = trigger;
public String getTriggeringQuery() {
return triggeringQuery;
public void setTriggeringQuery(String triggeringQuery) {
this.triggeringQuery = triggeringQuery;
public long getNumberOfResults() {
return numberOfResults;
public void setNumberOfResults(long numberOfResults) {
this.numberOfResults = numberOfResults;
public List<AlertAction> getActions() {
return actions;
public void setActions(List<AlertAction> actions) {
this.actions = actions;
public List<String> getIndices() {
return indices;
public void setIndices(List<String> indices) {
this.indices = indices;
public AlertActionState getEntryState() {
return entryState;
public void setEntryState(AlertActionState entryState) {
this.entryState = entryState;
public long getVersion() {
return version;
public void setVersion(long version) {
this.version = version;
protected AlertActionEntry() {
public AlertActionEntry(String id, long version, String alertName, boolean triggered, DateTime fireTime, DateTime scheduledTime, AlertTrigger trigger,
String queryRan, long numberOfResults, List<AlertAction> actions,
List<String> indices, AlertActionState state) {
this.id = id;
this.version = version;
this.alertName = alertName;
this.triggered = triggered;
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = trigger;
this.triggeringQuery = queryRan;
this.numberOfResults = numberOfResults;
this.actions = actions;
this.indices = indices;
this.entryState = state;
public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException {
historyEntry.field("alertName", alertName);
historyEntry.field("triggered", triggered);
historyEntry.field("fireTime", fireTime.toDateTimeISO());
historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO());
trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
historyEntry.field("queryRan", triggeringQuery);
historyEntry.field("numberOfResults", numberOfResults);
for (AlertAction action : actions) {
action.toXContent(historyEntry, params);
if (indices != null) {
for (String index : indices) {
historyEntry.field(AlertActionState.FIELD_NAME, entryState.toString());
return historyEntry;
public String toString() {
return "AlertHistoryEntry{" +
"version=" + version +
", alertName='" + alertName + '\'' +
", triggered=" + triggered +
", fireTime=" + fireTime +
", trigger=" + trigger +
", triggeringQuery='" + triggeringQuery + '\'' +
", numberOfResults=" + numberOfResults +
", actions=" + actions +
", indices=" + indices +
", entryState=" + entryState +
", scheduledTime=" + scheduledTime +
", id='" + id + '\'' +
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertActionEntry that = (AlertActionEntry) o;
if (numberOfResults != that.numberOfResults) return false;
if (triggered != that.triggered) return false;
if (version != that.version) return false;
if (actions != null ? !actions.equals(that.actions) : that.actions != null) return false;
if (alertName != null ? !alertName.equals(that.alertName) : that.alertName != null) return false;
if (entryState != that.entryState) return false;
if (fireTime != null ? !fireTime.equals(that.fireTime) : that.fireTime != null) return false;
if (id != null ? !id.equals(that.id) : that.id != null) return false;
if (indices != null ? !indices.equals(that.indices) : that.indices != null) return false;
if (scheduledTime != null ? !scheduledTime.equals(that.scheduledTime) : that.scheduledTime != null)
return false;
if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
if (triggeringQuery != null ? !triggeringQuery.equals(that.triggeringQuery) : that.triggeringQuery != null)
return false;
return true;
public int hashCode() {
int result = (int) (version ^ (version >>> 32));
result = 31 * result + (alertName != null ? alertName.hashCode() : 0);
result = 31 * result + (triggered ? 1 : 0);
result = 31 * result + (fireTime != null ? fireTime.hashCode() : 0);
result = 31 * result + (trigger != null ? trigger.hashCode() : 0);
result = 31 * result + (triggeringQuery != null ? triggeringQuery.hashCode() : 0);
result = 31 * result + (int) (numberOfResults ^ (numberOfResults >>> 32));
result = 31 * result + (actions != null ? actions.hashCode() : 0);
result = 31 * result + (indices != null ? indices.hashCode() : 0);
result = 31 * result + (entryState != null ? entryState.hashCode() : 0);
result = 31 * result + (scheduledTime != null ? scheduledTime.hashCode() : 0);
result = 31 * result + (id != null ? id.hashCode() : 0);
return result;
@ -5,61 +5,381 @@
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class AlertActionManager extends AbstractComponent {
public class AlertActionManager {
public static final String ALERT_NAME_FIELD = "alertName";
public static final String TRIGGERED_FIELD = "triggered";
public static final String FIRE_TIME_FIELD = "fireTime";
public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduledFireTime";
public static final String TRIGGER_FIELD = "trigger";
public static final String QUERY_RAN_FIELD = "queryRan";
public static final String NUMBER_OF_RESULTS_FIELD = "numberOfResults";
public static final String ACTIONS_FIELD = "actions";
public static final String INDICES_FIELD = "indices";
public static final String ALERT_HISTORY_INDEX = "alerthistory";
public static final String ALERT_HISTORY_TYPE = "alerthistory";
private final Client client;
private final AlertManager alertManager;
private volatile ImmutableOpenMap<String, AlertActionFactory> actionImplemented;
private final AlertActionRegistry actionRegistry;
private final ThreadPool threadPool;
public AlertActionManager(Settings settings, AlertManager alertManager, Client client) {
this.alertManager = alertManager;
this.actionImplemented = ImmutableOpenMap.<String, AlertActionFactory>builder()
.fPut("email", new EmailAlertActionFactory())
.fPut("index", new IndexAlertActionFactory(client))
private final ESLogger logger = Loggers.getLogger(AlertActionManager.class);
private BlockingQueue<AlertActionEntry> jobsToBeProcessed = new LinkedBlockingQueue<>();
public final AtomicBoolean running = new AtomicBoolean(false);
private Executor readerExecutor;
private static AlertActionEntry END_ENTRY = new AlertActionEntry();
class AlertHistoryRunnable implements Runnable {
AlertActionEntry entry;
AlertHistoryRunnable(AlertActionEntry entry) {
this.entry = entry;
public void run() {
try {
if (claimAlertHistoryEntry(entry)) {
alertManager.doAction(alertManager.getAlertForName(entry.getAlertName()), entry, entry.getScheduledTime());
updateHistoryEntry(entry, AlertActionState.ACTION_PERFORMED);
} else {
logger.warn("Unable to claim alert history entry" + entry);
} catch (Throwable t) {
logger.error("Failed to execute alert action", t);
public void registerAction(String name, AlertActionFactory actionFactory){
actionImplemented = ImmutableOpenMap.builder(actionImplemented)
.fPut(name, actionFactory)
class QueueLoaderThread implements Runnable {
public void run() {
boolean success = false;
do {
try {
success = loadQueue();
} catch (Exception e) {
logger.error("Unable to load the job queue", e);
try {
} catch (InterruptedException ie) {
} while (!success);
public List<AlertAction> parseActionsFromMap(Map<String,Object> actionMap) {
ImmutableOpenMap<String, AlertActionFactory> actionImplemented = this.actionImplemented;
List<AlertAction> actions = new ArrayList<>();
for (Map.Entry<String, Object> actionEntry : actionMap.entrySet()) {
AlertActionFactory factory = actionImplemented.get(actionEntry.getKey());
if (factory != null) {
} else {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionEntry.getKey() + "]");
class QueueReaderThread implements Runnable {
public void run() {
try {
logger.debug("Starting thread to read from the job queue");
while (running.get()) {
AlertActionEntry entry = null;
do {
try {
entry = jobsToBeProcessed.take();
} catch (InterruptedException ie) {
if (!running.get()) {
} while (entry == null);
if (!running.get() || entry == END_ENTRY) {
logger.debug("Stopping thread to read from the job queue");
.execute(new AlertHistoryRunnable(entry));
} catch (Throwable t) {
logger.error("Error during reader thread", t);
return actions;
public void doAction(String alertName, AlertResult alertResult){
Alert alert = alertManager.getAlertForName(alertName);
for (AlertAction action : alert.actions()) {
action.doAction(alertName, alertResult);
public AlertActionManager(Client client, AlertManager alertManager,
AlertActionRegistry actionRegistry,
ThreadPool threadPool) {
this.client = client;
this.alertManager = alertManager;
this.actionRegistry = actionRegistry;
this.threadPool = threadPool;
public void doStart() {
if (running.compareAndSet(false, true)) {
logger.info("Starting job queue");
readerExecutor = threadPool.executor(ThreadPool.Names.GENERIC);
readerExecutor.execute(new QueueReaderThread());
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueLoaderThread());
public void doStop() {
public boolean loadQueue() {
if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).execute().actionGet().isExists()) {
//@TODO: change to scan/scroll if we get back over 100
SearchResponse searchResponse = client.prepareSearch().setSource(
"{ \"query\" : " +
"{ \"term\" : {" +
"\"" + AlertActionState.FIELD_NAME + "\" : \"" + AlertActionState.ACTION_NEEDED.toString() + "\"}}," +
"\"size\" : \"100\"" +
for (SearchHit sh : searchResponse.getHits()) {
String historyId = sh.getId();
AlertActionEntry historyEntry = parseHistory(historyId, sh, sh.version());
assert historyEntry.getEntryState() == AlertActionState.ACTION_NEEDED;
return true;
protected AlertActionEntry parseHistory(String historyId, SearchHit sh, long version) {
Map<String, Object> fields = sh.sourceAsMap();
return parseHistory(historyId, fields, version);
protected AlertActionEntry parseHistory(String historyId, Map<String,Object> fields, long version) {
return parseHistory(historyId, fields, version, actionRegistry, logger);
protected static AlertActionEntry parseHistory(String historyId, Map<String,Object> fields, long version,
AlertActionRegistry actionRegistry, ESLogger logger) {
String alertName = fields.get(ALERT_NAME_FIELD).toString();
boolean triggered = (Boolean)fields.get(TRIGGERED_FIELD);
DateTime fireTime = new DateTime(fields.get(FIRE_TIME_FIELD).toString());
DateTime scheduledFireTime = new DateTime(fields.get(SCHEDULED_FIRE_TIME_FIELD).toString());
AlertTrigger trigger = TriggerManager.parseTriggerFromMap((Map<String,Object>)fields.get(TRIGGER_FIELD));
String queryRan = fields.get(QUERY_RAN_FIELD).toString();
long numberOfResults = ((Number)fields.get(NUMBER_OF_RESULTS_FIELD)).longValue();
Object actionObj = fields.get(ACTIONS_FIELD);
List<AlertAction> actions;
if (actionObj instanceof Map) {
Map<String, Object> actionMap = (Map<String, Object>) actionObj;
actions = actionRegistry.parseActionsFromMap(actionMap);
} else {
throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]");
List<String> indices = new ArrayList<>();
if (fields.get(INDICES_FIELD) != null && fields.get(INDICES_FIELD) instanceof List){
indices = (List<String>)fields.get(INDICES_FIELD);
} else {
logger.debug("Indices : " + fields.get(INDICES_FIELD) + " class " +
(fields.get(INDICES_FIELD) != null ? fields.get(INDICES_FIELD).getClass() : null ));
String stateString = fields.get(AlertActionState.FIELD_NAME).toString();
AlertActionState state = AlertActionState.fromString(stateString);
return new AlertActionEntry(historyId, version, alertName, triggered, fireTime, scheduledFireTime, trigger, queryRan,
numberOfResults, actions, indices, state);
public boolean addHistory(String alertName, boolean triggered,
DateTime fireTime, DateTime scheduledFireTime, SearchRequestBuilder triggeringQuery,
AlertTrigger trigger, long numberOfResults,
List<AlertAction> actions,
@Nullable List<String> indices) throws IOException {
if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).execute().actionGet().isExists()) {
ClusterHealthStatus chs = createAlertHistoryIndex();
AlertActionState state = AlertActionState.NO_ACTION_NEEDED;
if (triggered && !actions.isEmpty()) {
state = AlertActionState.ACTION_NEEDED;
AlertActionEntry entry = new AlertActionEntry(alertName + " " + scheduledFireTime.toDateTimeISO(), 1, alertName, triggered, fireTime, scheduledFireTime, trigger,
triggeringQuery.toString(), numberOfResults, actions, indices, state);
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
entry.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = new IndexRequest();
indexRequest.refresh(true); //Always refresh after indexing an alert
try {
if (client.index(indexRequest).actionGet().isCreated()) {
return true;
} else {
return false;
} catch (DocumentAlreadyExistsException daee){
logger.warn("Someone has already created a history entry for this alert run");
return false;
private void stopIfRunning() {
if (running.compareAndSet(true, false)) {
logger.info("Stopping job queue");
private ClusterHealthStatus createAlertHistoryIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_HISTORY_INDEX).addMapping(ALERT_HISTORY_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
if (!cir.isAcknowledged()) {
logger.error("Create [{}] was not acknowledged", ALERT_HISTORY_INDEX);
ClusterHealthResponse actionGet = client.admin().cluster()
return actionGet.getStatus();
private AlertActionEntry getHistoryEntryFromIndex(String entryId) {
GetRequest getRequest = Requests.getRequest(ALERT_HISTORY_INDEX);
GetResponse getResponse = client.get(getRequest).actionGet();
if (getResponse.isExists()) {
return parseHistory(entryId, getResponse.getSourceAsMap(), getResponse.getVersion());
} else {
throw new ElasticsearchException("Unable to find [" + entryId + "] in the [" + ALERT_HISTORY_INDEX + "]" );
private void updateHistoryEntry(AlertActionEntry entry, AlertActionState actionPerformed) {
UpdateRequest updateRequest = new UpdateRequest();
XContentBuilder historyBuilder;
try {
historyBuilder = XContentFactory.jsonBuilder();
entry.toXContent(historyBuilder, ToXContent.EMPTY_PARAMS);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to serialize alert history entry ["+ entry.getId() + "]", ie);
try {
} catch (ElasticsearchException ee) {
logger.error("Failed to update in claim", ee);
private boolean claimAlertHistoryEntry(AlertActionEntry entry) {
AlertActionEntry indexedHistoryEntry;
try {
indexedHistoryEntry = getHistoryEntryFromIndex(entry.getId());
if (indexedHistoryEntry.getEntryState() != AlertActionState.ACTION_NEEDED) {
//Someone else is doing or has done this action
return false;
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.version(entry.getVersion());//Since we loaded this alert directly from the index the version should be correct
XContentBuilder historyBuilder;
try {
historyBuilder = XContentFactory.jsonBuilder();
entry.toXContent(historyBuilder, ToXContent.EMPTY_PARAMS);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to serialize alert history entry ["+ entry.getId() + "]", ie);
try {
} catch (ElasticsearchException ee) {
logger.error("Failed to update in claim", ee);
return false;
} catch (Throwable t) {
logger.error("Failed to claim history entry " + entry, t);
return false;
return true;
@ -0,0 +1,60 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class AlertActionRegistry extends AbstractComponent {
private volatile ImmutableOpenMap<String, AlertActionFactory> actionImplemented;
public AlertActionRegistry(Settings settings, Client client) {
this.actionImplemented = ImmutableOpenMap.<String, AlertActionFactory>builder()
.fPut("email", new EmailAlertActionFactory())
.fPut("index", new IndexAlertActionFactory(client))
public void registerAction(String name, AlertActionFactory actionFactory){
actionImplemented = ImmutableOpenMap.builder(actionImplemented)
.fPut(name, actionFactory)
public List<AlertAction> parseActionsFromMap(Map<String,Object> actionMap) {
ImmutableOpenMap<String, AlertActionFactory> actionImplemented = this.actionImplemented;
List<AlertAction> actions = new ArrayList<>();
for (Map.Entry<String, Object> actionEntry : actionMap.entrySet()) {
AlertActionFactory factory = actionImplemented.get(actionEntry.getKey());
if (factory != null) {
} else {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionEntry.getKey() + "]");
return actions;
public void doAction(Alert alert, AlertActionEntry actionEntry){
for (AlertAction action : alert.actions()) {
action.doAction(alert, actionEntry);
@ -0,0 +1,64 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public enum AlertActionState implements ToXContent {
public static final String FIELD_NAME = "AlertHistoryState";
public String toString(){
switch (this) {
public static AlertActionState fromString(String s) {
switch(s.toUpperCase()) {
throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" );
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
@ -6,9 +6,8 @@
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.ArrayList;
@ -28,7 +27,6 @@ public class EmailAlertAction implements AlertAction {
String server = "smtp.gmail.com";
int port = 587;
public EmailAlertAction(String ... addresses){
for (String address : addresses) {
@ -69,7 +67,7 @@ public class EmailAlertAction implements AlertAction {
public boolean doAction(String alertName, AlertResult result) {
public boolean doAction(Alert alert, AlertActionEntry result) {
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
@ -86,19 +84,21 @@ public class EmailAlertAction implements AlertAction {
message.setFrom(new InternetAddress(from));
emailAddresses.toArray(new Address[1]));
message.setSubject("Elasticsearch Alert " + alertName + " triggered");
message.setSubject("Elasticsearch Alert " + alert.alertName() + " triggered");
StringBuffer output = new StringBuffer();
output.append("The following query triggered because " + result.trigger.toString() + "\n");
output.append("The total number of hits returned : " + result.searchResponse.getHits().getTotalHits() + "\n");
output.append("For query : " + result.query.toString());
output.append("The following query triggered because " + result.getTrigger().toString() + "\n");
output.append("The total number of hits returned : " + result.getNumberOfResults() + "\n");
output.append("For query : " + result.getTriggeringQuery());
output.append("Indices : ");
for (String index : result.indices) {
for (String index : result.getIndices()) {
if (displayField != null) {
for (SearchHit sh : result.searchResponse.getHits().getHits()) {
if (sh.sourceAsMap().containsKey(displayField)) {
@ -111,6 +111,7 @@ public class EmailAlertAction implements AlertAction {
} else {
} catch (Exception e){
@ -7,7 +7,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -45,15 +45,15 @@ public class IndexAlertAction implements AlertAction, ToXContent {
public boolean doAction(String alertName, AlertResult alertResult) {
public boolean doAction(Alert alert, AlertActionEntry alertResult) {
IndexRequest indexRequest = new IndexRequest();
try {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder = alertResult.searchResponse.toXContent(resultBuilder, ToXContent.EMPTY_PARAMS);
resultBuilder.field("timestamp", alertResult.fireTime);
//resultBuilder = alertResult.searchResponse.toXContent(resultBuilder, ToXContent.EMPTY_PARAMS);
resultBuilder.field("timestamp", alertResult.getFireTime());
} catch (IOException ie) {
@ -11,14 +11,19 @@ import org.elasticsearch.alerts.AlertingModule;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.AbstractPlugin;
import java.util.Collection;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
public class AlertsPlugin extends AbstractPlugin {
public static final String NAME = "alerts";
@Override public String name() {
return "alerts";
return NAME;
@Override public String description() {
@ -39,4 +44,12 @@ public class AlertsPlugin extends AbstractPlugin {
return modules;
public Settings additionalSettings() {
return settingsBuilder()
.put("threadpool."+ NAME + ".type","cached")
@ -9,9 +9,8 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -42,21 +41,22 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
private final Client client;
private final Scheduler scheduler;
private final AlertManager alertManager;
private final ScriptService scriptService;
private final TriggerManager triggerManager;
private final AlertActionManager actionManager;
private final ScriptService scriptService;
private AlertActionManager actionManager;
private final AtomicBoolean run = new AtomicBoolean(false);
public AlertScheduler(Settings settings, AlertManager alertManager, Client client,
TriggerManager triggerManager, AlertActionManager actionManager,
ScriptService scriptService, ClusterService clusterService) {
TriggerManager triggerManager, ScriptService scriptService,
ClusterService clusterService) {
this.alertManager = alertManager;
this.client = client;
this.triggerManager = triggerManager;
this.actionManager = actionManager;
this.scriptService = scriptService;
try {
SchedulerFactory schFactory = new StdSchedulerFactory();
@ -77,7 +77,7 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
logger.info("Starting scheduler");
} catch (SchedulerException se){
logger.error("Failed to start quartz scheduler",se);
logger.error("Failed to start quartz scheduler", se);
} else {
@ -89,10 +89,12 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
if (run.compareAndSet(true, false)) {
try {
logger.info("Stopping scheduler");
if (!scheduler.isShutdown()) {
} catch (SchedulerException se){
logger.error("Failed to stop quartz scheduler",se);
logger.error("Failed to stop quartz scheduler", se);
@ -156,36 +158,17 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
SearchResponse sr = srb.execute().get();
logger.warn("Got search response hits : [{}]", sr.getHits().getTotalHits() );
AlertResult result = new AlertResult(alertName, sr, alert.trigger(),
triggerManager.isTriggered(alertName,sr), srb, indices,
new DateTime(jobExecutionContext.getScheduledFireTime()));
boolean firedAction = false;
if (result.isTriggered) {
logger.warn("We have triggered");
DateTime lastActionFire = alertManager.timeActionLastTriggered(alertName);
long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis();
logger.error("last action fire [{}]", lastActionFire);
logger.error("msSinceLastAction [{}]", msSinceLastAction);
boolean isTriggered = triggerManager.isTriggered(alertName,sr);
if (alert.timePeriod().getMillis() > msSinceLastAction) {
logger.warn("Not firing action because it was fired in the timePeriod");
} else {
actionManager.doAction(alertName, result);
logger.warn("Did action !");
firedAction = true;
} else {
logger.warn("We didn't trigger");
alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime,firedAction);
if (!alertManager.addHistory(alertName, result.isTriggered,
new DateTime(jobExecutionContext.getScheduledFireTime()), result.query,
result.trigger, result.searchResponse.getHits().getTotalHits(), alert.actions(), alert.indices()))
alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime);
if (!alertManager.addHistory(alertName, isTriggered,
new DateTime(jobExecutionContext.getScheduledFireTime()), scheduledTime, srb,
alert.trigger(), sr.getHits().getTotalHits(), alert.actions(), alert.indices()))
logger.warn("Failed to store history for alert [{}]", alertName);
} catch (Exception e) {
logger.error("Failed execute alert [{}]", e, alertName);
@ -168,4 +168,29 @@ public class AlertTrigger implements ToXContent {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertTrigger that = (AlertTrigger) o;
if (value != that.value) return false;
if (scriptedTrigger != null ? !scriptedTrigger.equals(that.scriptedTrigger) : that.scriptedTrigger != null)
return false;
if (trigger != that.trigger) return false;
if (triggerType != that.triggerType) return false;
return true;
public int hashCode() {
int result = trigger != null ? trigger.hashCode() : 0;
result = 31 * result + (triggerType != null ? triggerType.hashCode() : 0);
result = 31 * result + value;
result = 31 * result + (scriptedTrigger != null ? scriptedTrigger.hashCode() : 0);
return result;
@ -33,8 +33,6 @@ public class TriggerManager extends AbstractComponent {
private final ScriptService scriptService;
public static AlertTrigger parseTriggerFromMap(Map<String, Object> triggerMap) {
//For now just trigger on number of events greater than 1
for (Map.Entry<String,Object> entry : triggerMap.entrySet()){
AlertTrigger.TriggerType type = AlertTrigger.TriggerType.fromString(entry.getKey());
if (type == AlertTrigger.TriggerType.SCRIPT) {
@ -48,6 +46,7 @@ public class TriggerManager extends AbstractComponent {
throw new ElasticsearchIllegalArgumentException();
private static ScriptedAlertTrigger parseScriptedTrigger(Object value) {
if (value instanceof Map) {
Map<String,Object> valueMap = (Map<String,Object>)value;
@ -8,6 +8,8 @@ package org.elasticsearch.alerts;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionFactory;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
@ -31,7 +33,7 @@ import static org.hamcrest.core.Is.is;
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 3)
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 1, minNumDataNodes = 1, numDataNodes = 1)
public class BasicAlertingTest extends ElasticsearchIntegrationTest {
@ -48,11 +50,15 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
// TODO: add request, response & request builder etc.
public void testAlerSchedulerStartsProperly() throws Exception {
client().prepareIndex(ScriptService.SCRIPT_INDEX, "mustache", "query")
ensureGreen("my-index", AlertManager.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX);
@ -92,14 +98,14 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
public boolean doAction(String alertName, AlertResult alert) {
logger.info("Alert {} invoked: {}", alertName, alert);
public boolean doAction(Alert alert, AlertActionEntry actionEntry) {
logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry);
return true;
AlertActionManager alertActionManager = internalCluster().getInstance(AlertActionManager.class, internalCluster().getMasterName());
alertActionManager.registerAction("test", new AlertActionFactory() {
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
alertActionRegistry.registerAction("test", new AlertActionFactory() {
public AlertAction createAction(Object parameters) {
return alertAction;
@ -125,7 +131,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
public void run() {
assertThat(alertActionInvoked.get(), is(true));
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertManager.ALERT_HISTORY_INDEX).get();
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get();
assertThat(indicesExistsResponse.isExists(), is(true));
}, 30, TimeUnit.SECONDS);
@ -0,0 +1,61 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.BasicAlertingTest;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 1)
public class AlertActionsTest extends ElasticsearchIntegrationTest {
public void testAlertActionParser(){
DateTime fireTime = new DateTime();
DateTime scheduledFireTime = new DateTime();
Map<String, Object> triggerMap = new HashMap<>();
triggerMap.put("numberOfEvents", ">1");
Map<String,Object> actionMap = new HashMap<>();
Map<String,Object> emailParamMap = new HashMap<>();
List<String> addresses = new ArrayList<>();
emailParamMap.put("addresses", addresses);
actionMap.put("email", emailParamMap);
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put(AlertActionManager.ALERT_NAME_FIELD, "testName");
fieldMap.put(AlertActionManager.TRIGGERED_FIELD, true);
fieldMap.put(AlertActionManager.FIRE_TIME_FIELD, fireTime.toDateTimeISO().toString());
fieldMap.put(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledFireTime.toDateTimeISO().toString());
fieldMap.put(AlertActionManager.TRIGGER_FIELD, triggerMap);
fieldMap.put(AlertActionManager.QUERY_RAN_FIELD, "foobar");
fieldMap.put(AlertActionManager.ACTIONS_FIELD, actionMap);
fieldMap.put(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString());
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", fieldMap, 0, alertActionRegistry, logger);
assertEquals(actionEntry.getVersion(), 0);
assertEquals(actionEntry.getAlertName(), "testName");
assertEquals(actionEntry.isTriggered(), true);
assertEquals(actionEntry.getScheduledTime(), scheduledFireTime);
assertEquals(actionEntry.getFireTime(), fireTime);
assertEquals(actionEntry.getEntryState(), AlertActionState.ACTION_NEEDED);
assertEquals(actionEntry.getNumberOfResults(), 10);
new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1));
Reference in New Issue
Block a user