AlertActionManager: Add alert action job queue

This change adds the AlertActionManager and AlertActionEntry. The old AlertActionManager has become the AlertActionRegistry.
This means that now the results of Alerts are queued up in a job queue and executed in separate threads.
The AlertActionManager is a composite member of the AlertManager.
Change the BasicTest to just run on a single node to fix the action registration if the action happens on a different node.
Threads are not directly constructed but now the threadpool is used.
The ClusterStateListener in AlertManager is responsible now for starting the job queue.

Original commit: elastic/x-pack-elasticsearch@a73c6b60f8
This commit is contained in:
Brian Murphy 2014-10-24 12:24:04 +01:00
parent a23487cd38
commit 4da8f9fec7
16 changed files with 966 additions and 236 deletions

View File

@ -22,6 +22,8 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager;
@ -31,7 +33,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableMap;
@ -46,6 +47,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
@ -59,11 +61,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class AlertManager extends AbstractLifecycleComponent {
public static final String ALERT_INDEX = ".alerts";
public static final String ALERT_TYPE = "alert";
public static final String ALERT_HISTORY_INDEX = "alerthistory";
public static final String ALERT_HISTORY_TYPE = "alertHistory";
public static final ParseField QUERY_FIELD = new ParseField("query");
public static final ParseField SCHEDULE_FIELD = new ParseField("schedule");
public static final ParseField TRIGGER_FIELD = new ParseField("trigger");
@ -79,11 +79,16 @@ public class AlertManager extends AbstractLifecycleComponent {
private final Client client;
private AlertScheduler scheduler;
private final ThreadPool threadPool;
private final ConcurrentMap<String,Alert> alertMap;
private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean startActions = new AtomicBoolean(false);
private AlertActionRegistry actionRegistry;
private AlertActionManager actionManager;
final TimeValue defaultTimePeriod = new TimeValue(300*1000); //TODO : read from config
@ -93,10 +98,6 @@ public class AlertManager extends AbstractLifecycleComponent {
}
}
public void setActionManager(AlertActionManager actionManager){
this.actionManager = actionManager;
}
@Override
protected void doStart() throws ElasticsearchException {
logger.warn("STARTING");
@ -114,21 +115,25 @@ public class AlertManager extends AbstractLifecycleComponent {
@Inject
public AlertManager(Settings settings, Client client, ClusterService clusterService) {
public AlertManager(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool,
AlertActionRegistry actionRegistry) {
super(settings);
logger.warn("Initing AlertManager");
this.client = client;
alertMap = ConcurrentCollections.newConcurrentMap();
clusterService.add(new AlertsClusterStateListener());
this.threadPool = threadPool;
this.actionRegistry = actionRegistry;
this.actionManager = new AlertActionManager(client, this, actionRegistry, threadPool);
}
public void setAlertScheduler(AlertScheduler scheduler){
this.scheduler = scheduler;
}
private ClusterHealthStatus createAlertsIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_INDEX).addMapping(ALERT_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
logger.warn(cir.toString());
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest(ALERT_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
return actionGet.getStatus();
@ -144,6 +149,25 @@ public class AlertManager extends AbstractLifecycleComponent {
}
}
public void doAction(Alert alert, AlertActionEntry result, DateTime scheduledTime) {
logger.warn("We have triggered");
DateTime lastActionFire = timeActionLastTriggered(alert.alertName());
long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis();
logger.error("last action fire [{}]", lastActionFire);
logger.error("msSinceLastAction [{}]", msSinceLastAction);
if (alert.timePeriod().getMillis() > msSinceLastAction) {
logger.warn("Not firing action because it was fired in the timePeriod");
} else {
actionRegistry.doAction(alert, result);
logger.warn("Did action !");
alert.lastActionFire(scheduledTime);
persistAlert(alert.alertName(), alert, IndexRequest.OpType.INDEX);
}
}
public boolean claimAlertRun(String alertName, DateTime scheduleRunTime) {
Alert indexedAlert;
try {
@ -235,6 +259,7 @@ public class AlertManager extends AbstractLifecycleComponent {
if (!client.admin().indices().prepareExists(ALERT_INDEX).execute().actionGet().isExists()) {
createAlertsIndex();
}
SearchResponse searchResponse = client.prepareSearch().setSource(
"{ \"query\" : " +
"{ \"match_all\" : {}}," +
@ -257,15 +282,11 @@ public class AlertManager extends AbstractLifecycleComponent {
return 0;
}
public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime, boolean firedAction) throws Exception {
public boolean updateLastRan(String alertName, DateTime fireTime, DateTime scheduledTime) throws Exception {
try {
Alert alert = getAlertForName(alertName);
alert.lastRan(fireTime);
XContentBuilder alertBuilder = XContentFactory.jsonBuilder().prettyPrint();
if (firedAction) {
logger.error("Fired action [{}]",firedAction);
alert.lastActionFire(scheduledTime);
}
alert.toXContent(alertBuilder, ToXContent.EMPTY_PARAMS);
logger.error(XContentHelper.convertToJson(alertBuilder.bytes(),false,true));
UpdateRequest updateRequest = new UpdateRequest();
@ -283,46 +304,6 @@ public class AlertManager extends AbstractLifecycleComponent {
}
}
public boolean addHistory(String alertName, boolean triggered,
DateTime fireTime, SearchRequestBuilder triggeringQuery,
AlertTrigger trigger, long numberOfResults,
List<AlertAction> actions,
@Nullable List<String> indices) throws Exception {
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
historyEntry.startObject();
historyEntry.field("alertName", alertName);
historyEntry.field("triggered", triggered);
historyEntry.field("fireTime", fireTime.toDateTimeISO());
historyEntry.field("trigger");
trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
historyEntry.field("queryRan", triggeringQuery.toString());
historyEntry.field("numberOfResults", numberOfResults);
historyEntry.field("actions");
historyEntry.startArray();
for (AlertAction action : actions) {
action.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
}
historyEntry.endArray();
if (indices != null) {
historyEntry.field("indices");
historyEntry.startArray();
for (String index : indices) {
historyEntry.value(index);
}
historyEntry.endArray();
}
historyEntry.endObject();
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(ALERT_HISTORY_INDEX);
indexRequest.type(ALERT_HISTORY_TYPE);
indexRequest.source(historyEntry);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.opType(IndexRequest.OpType.CREATE);
client.index(indexRequest).actionGet().isCreated();
return true;
}
public boolean deleteAlert(String alertName) throws InterruptedException, ExecutionException {
if (!started.get()) {
@ -433,12 +414,7 @@ public class AlertManager extends AbstractLifecycleComponent {
public Alert parseAlert(String alertId, Map<String, Object> fields, long version ) {
//Map<String,SearchHitField> fields = sh.getFields();
logger.warn("Parsing : [{}]", alertId);
for (String field : fields.keySet() ) {
logger.warn("Field : [{}]", field);
}
String query = fields.get(QUERY_FIELD.getPreferredName()).toString();
String schedule = fields.get(SCHEDULE_FIELD.getPreferredName()).toString();
Object triggerObj = fields.get(TRIGGER_FIELD.getPreferredName());
@ -457,9 +433,9 @@ public class AlertManager extends AbstractLifecycleComponent {
List<AlertAction> actions = null;
if (actionObj instanceof Map) {
Map<String, Object> actionMap = (Map<String, Object>) actionObj;
actions = actionManager.parseActionsFromMap(actionMap);
actions = actionRegistry.parseActionsFromMap(actionMap);
} else {
throw new ElasticsearchException("Unable to parse actions [" + triggerObj + "]");
throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]");
}
DateTime lastRan = new DateTime(0);
@ -540,19 +516,56 @@ public class AlertManager extends AbstractLifecycleComponent {
return started.get();
}
public boolean addHistory(String alertName, boolean isTriggered, DateTime dateTime, DateTime scheduledTime,
SearchRequestBuilder srb, AlertTrigger trigger, long totalHits, List<AlertAction> actions,
List<String> indices) throws IOException{
return actionManager.addHistory(alertName, isTriggered, dateTime, scheduledTime, srb, trigger, totalHits, actions, indices);
}
private final class AlertsClusterStateListener implements ClusterStateListener {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.indicesDeleted().contains(ALERT_INDEX)) {
if (!event.localNodeMaster()) { //We are not the master
if (started.compareAndSet(false, true)) {
scheduler.clearAlerts();
alertMap.clear();
}
if (startActions.compareAndSet(false, true)) {
//If actionManager was running and we aren't the master stop
actionManager.doStop(); //Safe to call this multiple times, it's a noop if we are already stopped
}
return;
}
if (!started.get()) {
IndexMetaData alertIndexMetaData = event.state().getMetaData().index(ALERT_INDEX);
if (alertIndexMetaData != null) {
if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
// TODO: Do on a different thread and have some kind of retry mechanism?
started.set(true);
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AlertLoader());
}
}
}
if (!startActions.get()) {
IndexMetaData indexMetaData = event.state().getMetaData().index(AlertActionManager.ALERT_HISTORY_INDEX);
if (indexMetaData != null) {
if (event.state().routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
startActions.set(true);
actionManager.doStart();
}
}
}
}
}
private class AlertLoader implements Runnable {
@Override
public void run() {
// TODO: have some kind of retry mechanism?
try {
loadAlerts();
sendAlertsToScheduler();
@ -560,13 +573,8 @@ public class AlertManager extends AbstractLifecycleComponent {
logger.warn("Error during loading of alerts from an existing .alerts index... refresh the alerts manually");
}
started.set(true);
}
} else {
started.set(true);
}
}
}
}
}

View File

@ -1,61 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.joda.time.DateTime;
import java.util.Arrays;
public class AlertResult {
public SearchResponse searchResponse;
public AlertTrigger trigger;
public String alertName;
public DateTime fireTime;
public boolean isTriggered;
public SearchRequestBuilder query;
public String[] indices;
public AlertResult(String alertName, SearchResponse searchResponse, AlertTrigger trigger, boolean isTriggered, SearchRequestBuilder query, String[] indices, DateTime fireTime) {
this.searchResponse = searchResponse;
this.trigger = trigger;
this.isTriggered = isTriggered;
this.query = query;
this.indices = indices;
this.alertName = alertName;
this.fireTime = fireTime;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertResult that = (AlertResult) o;
if (isTriggered != that.isTriggered) return false;
if (!Arrays.equals(indices, that.indices)) return false;
if (query != null ? !query.equals(that.query) : that.query != null) return false;
if (searchResponse != null ? !searchResponse.equals(that.searchResponse) : that.searchResponse != null)
return false;
if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
return true;
}
@Override
public int hashCode() {
int result = searchResponse != null ? searchResponse.hashCode() : 0;
result = 31 * result + (trigger != null ? trigger.hashCode() : 0);
result = 31 * result + (isTriggered ? 1 : 0);
result = 31 * result + (query != null ? query.hashCode() : 0);
result = 31 * result + (indices != null ? Arrays.hashCode(indices) : 0);
return result;
}
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.rest.AlertRestHandler;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.TriggerManager;
@ -18,7 +18,7 @@ public class AlertingModule extends AbstractModule {
bind(AlertManager.class).asEagerSingleton();
bind(TriggerManager.class).asEagerSingleton();
bind(AlertScheduler.class).asEagerSingleton();
bind(AlertActionManager.class).asEagerSingleton();
bind(AlertActionRegistry.class).asEagerSingleton();
bind(AlertRestHandler.class).asEagerSingleton();
}

View File

@ -5,7 +5,7 @@
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -15,5 +15,5 @@ public interface AlertAction extends ToXContent {
public String getActionName();
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException;
public boolean doAction(String alertName, AlertResult alert);
public boolean doAction(Alert alert, AlertActionEntry actionEntry);
}

View File

@ -0,0 +1,251 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
/**
*/
public class AlertActionEntry implements ToXContent{
private long version;
private String alertName;
private boolean triggered;
private DateTime fireTime;
private AlertTrigger trigger;
private String triggeringQuery;
private long numberOfResults;
private List<AlertAction> actions;
private List<String> indices;
private AlertActionState entryState;
private DateTime scheduledTime;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
private String id;
public DateTime getScheduledTime() {
return scheduledTime;
}
public void setScheduledTime(DateTime scheduledTime) {
this.scheduledTime = scheduledTime;
}
public String getAlertName() {
return alertName;
}
public void setAlertName(String alertName) {
this.alertName = alertName;
}
public boolean isTriggered() {
return triggered;
}
public void setTriggered(boolean triggered) {
this.triggered = triggered;
}
public DateTime getFireTime() {
return fireTime;
}
public void setFireTime(DateTime fireTime) {
this.fireTime = fireTime;
}
public AlertTrigger getTrigger() {
return trigger;
}
public void setTrigger(AlertTrigger trigger) {
this.trigger = trigger;
}
public String getTriggeringQuery() {
return triggeringQuery;
}
public void setTriggeringQuery(String triggeringQuery) {
this.triggeringQuery = triggeringQuery;
}
public long getNumberOfResults() {
return numberOfResults;
}
public void setNumberOfResults(long numberOfResults) {
this.numberOfResults = numberOfResults;
}
public List<AlertAction> getActions() {
return actions;
}
public void setActions(List<AlertAction> actions) {
this.actions = actions;
}
public List<String> getIndices() {
return indices;
}
public void setIndices(List<String> indices) {
this.indices = indices;
}
public AlertActionState getEntryState() {
return entryState;
}
public void setEntryState(AlertActionState entryState) {
this.entryState = entryState;
}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
protected AlertActionEntry() {
}
public AlertActionEntry(String id, long version, String alertName, boolean triggered, DateTime fireTime, DateTime scheduledTime, AlertTrigger trigger,
String queryRan, long numberOfResults, List<AlertAction> actions,
List<String> indices, AlertActionState state) {
this.id = id;
this.version = version;
this.alertName = alertName;
this.triggered = triggered;
this.fireTime = fireTime;
this.scheduledTime = scheduledTime;
this.trigger = trigger;
this.triggeringQuery = queryRan;
this.numberOfResults = numberOfResults;
this.actions = actions;
this.indices = indices;
this.entryState = state;
}
@Override
public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException {
historyEntry.startObject();
historyEntry.field("alertName", alertName);
historyEntry.field("triggered", triggered);
historyEntry.field("fireTime", fireTime.toDateTimeISO());
historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO());
historyEntry.field("trigger");
trigger.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
historyEntry.field("queryRan", triggeringQuery);
historyEntry.field("numberOfResults", numberOfResults);
historyEntry.field("actions");
historyEntry.startObject();
for (AlertAction action : actions) {
historyEntry.field(action.getActionName());
action.toXContent(historyEntry, params);
}
historyEntry.endObject();
if (indices != null) {
historyEntry.field("indices");
historyEntry.startArray();
for (String index : indices) {
historyEntry.value(index);
}
historyEntry.endArray();
}
historyEntry.field(AlertActionState.FIELD_NAME, entryState.toString());
historyEntry.endObject();
return historyEntry;
}
@Override
public String toString() {
return "AlertHistoryEntry{" +
"version=" + version +
", alertName='" + alertName + '\'' +
", triggered=" + triggered +
", fireTime=" + fireTime +
", trigger=" + trigger +
", triggeringQuery='" + triggeringQuery + '\'' +
", numberOfResults=" + numberOfResults +
", actions=" + actions +
", indices=" + indices +
", entryState=" + entryState +
", scheduledTime=" + scheduledTime +
", id='" + id + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertActionEntry that = (AlertActionEntry) o;
if (numberOfResults != that.numberOfResults) return false;
if (triggered != that.triggered) return false;
if (version != that.version) return false;
if (actions != null ? !actions.equals(that.actions) : that.actions != null) return false;
if (alertName != null ? !alertName.equals(that.alertName) : that.alertName != null) return false;
if (entryState != that.entryState) return false;
if (fireTime != null ? !fireTime.equals(that.fireTime) : that.fireTime != null) return false;
if (id != null ? !id.equals(that.id) : that.id != null) return false;
if (indices != null ? !indices.equals(that.indices) : that.indices != null) return false;
if (scheduledTime != null ? !scheduledTime.equals(that.scheduledTime) : that.scheduledTime != null)
return false;
if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
if (triggeringQuery != null ? !triggeringQuery.equals(that.triggeringQuery) : that.triggeringQuery != null)
return false;
return true;
}
@Override
public int hashCode() {
int result = (int) (version ^ (version >>> 32));
result = 31 * result + (alertName != null ? alertName.hashCode() : 0);
result = 31 * result + (triggered ? 1 : 0);
result = 31 * result + (fireTime != null ? fireTime.hashCode() : 0);
result = 31 * result + (trigger != null ? trigger.hashCode() : 0);
result = 31 * result + (triggeringQuery != null ? triggeringQuery.hashCode() : 0);
result = 31 * result + (int) (numberOfResults ^ (numberOfResults >>> 32));
result = 31 * result + (actions != null ? actions.hashCode() : 0);
result = 31 * result + (indices != null ? indices.hashCode() : 0);
result = 31 * result + (entryState != null ? entryState.hashCode() : 0);
result = 31 * result + (scheduledTime != null ? scheduledTime.hashCode() : 0);
result = 31 * result + (id != null ? id.hashCode() : 0);
return result;
}
}

View File

@ -5,61 +5,381 @@
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class AlertActionManager extends AbstractComponent {
/**
*/
public class AlertActionManager {
public static final String ALERT_NAME_FIELD = "alertName";
public static final String TRIGGERED_FIELD = "triggered";
public static final String FIRE_TIME_FIELD = "fireTime";
public static final String SCHEDULED_FIRE_TIME_FIELD = "scheduledFireTime";
public static final String TRIGGER_FIELD = "trigger";
public static final String QUERY_RAN_FIELD = "queryRan";
public static final String NUMBER_OF_RESULTS_FIELD = "numberOfResults";
public static final String ACTIONS_FIELD = "actions";
public static final String INDICES_FIELD = "indices";
public static final String ALERT_HISTORY_INDEX = "alerthistory";
public static final String ALERT_HISTORY_TYPE = "alerthistory";
private final Client client;
private final AlertManager alertManager;
private volatile ImmutableOpenMap<String, AlertActionFactory> actionImplemented;
private final AlertActionRegistry actionRegistry;
private final ThreadPool threadPool;
@Inject
public AlertActionManager(Settings settings, AlertManager alertManager, Client client) {
super(settings);
this.alertManager = alertManager;
this.actionImplemented = ImmutableOpenMap.<String, AlertActionFactory>builder()
.fPut("email", new EmailAlertActionFactory())
.fPut("index", new IndexAlertActionFactory(client))
.build();
alertManager.setActionManager(this);
private final ESLogger logger = Loggers.getLogger(AlertActionManager.class);
private BlockingQueue<AlertActionEntry> jobsToBeProcessed = new LinkedBlockingQueue<>();
public final AtomicBoolean running = new AtomicBoolean(false);
private Executor readerExecutor;
private static AlertActionEntry END_ENTRY = new AlertActionEntry();
class AlertHistoryRunnable implements Runnable {
AlertActionEntry entry;
AlertHistoryRunnable(AlertActionEntry entry) {
this.entry = entry;
}
public void registerAction(String name, AlertActionFactory actionFactory){
actionImplemented = ImmutableOpenMap.builder(actionImplemented)
.fPut(name, actionFactory)
.build();
}
public List<AlertAction> parseActionsFromMap(Map<String,Object> actionMap) {
ImmutableOpenMap<String, AlertActionFactory> actionImplemented = this.actionImplemented;
List<AlertAction> actions = new ArrayList<>();
for (Map.Entry<String, Object> actionEntry : actionMap.entrySet()) {
AlertActionFactory factory = actionImplemented.get(actionEntry.getKey());
if (factory != null) {
actions.add(factory.createAction(actionEntry.getValue()));
@Override
public void run() {
try {
if (claimAlertHistoryEntry(entry)) {
alertManager.doAction(alertManager.getAlertForName(entry.getAlertName()), entry, entry.getScheduledTime());
updateHistoryEntry(entry, AlertActionState.ACTION_PERFORMED);
} else {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionEntry.getKey() + "]");
logger.warn("Unable to claim alert history entry" + entry);
}
}
return actions;
} catch (Throwable t) {
logger.error("Failed to execute alert action", t);
}
public void doAction(String alertName, AlertResult alertResult){
Alert alert = alertManager.getAlertForName(alertName);
for (AlertAction action : alert.actions()) {
action.doAction(alertName, alertResult);
}
}
class QueueLoaderThread implements Runnable {
@Override
public void run() {
boolean success = false;
do {
try {
success = loadQueue();
} catch (Exception e) {
logger.error("Unable to load the job queue", e);
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
} while (!success);
}
}
class QueueReaderThread implements Runnable {
@Override
public void run() {
try {
logger.debug("Starting thread to read from the job queue");
while (running.get()) {
AlertActionEntry entry = null;
do {
try {
entry = jobsToBeProcessed.take();
} catch (InterruptedException ie) {
if (!running.get()) {
break;
}
}
} while (entry == null);
if (!running.get() || entry == END_ENTRY) {
logger.debug("Stopping thread to read from the job queue");
}
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(new AlertHistoryRunnable(entry));
}
} catch (Throwable t) {
logger.error("Error during reader thread", t);
}
}
}
public AlertActionManager(Client client, AlertManager alertManager,
AlertActionRegistry actionRegistry,
ThreadPool threadPool) {
this.client = client;
this.alertManager = alertManager;
this.actionRegistry = actionRegistry;
this.threadPool = threadPool;
}
public void doStart() {
if (running.compareAndSet(false, true)) {
logger.info("Starting job queue");
readerExecutor = threadPool.executor(ThreadPool.Names.GENERIC);
readerExecutor.execute(new QueueReaderThread());
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueLoaderThread());
}
}
public void doStop() {
stopIfRunning();
}
public boolean loadQueue() {
if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).execute().actionGet().isExists()) {
createAlertHistoryIndex();
}
//@TODO: change to scan/scroll if we get back over 100
SearchResponse searchResponse = client.prepareSearch().setSource(
"{ \"query\" : " +
"{ \"term\" : {" +
"\"" + AlertActionState.FIELD_NAME + "\" : \"" + AlertActionState.ACTION_NEEDED.toString() + "\"}}," +
"\"size\" : \"100\"" +
"}"
).setTypes(ALERT_HISTORY_TYPE).setIndices(ALERT_HISTORY_INDEX).setListenerThreaded(false).execute().actionGet();
for (SearchHit sh : searchResponse.getHits()) {
String historyId = sh.getId();
AlertActionEntry historyEntry = parseHistory(historyId, sh, sh.version());
assert historyEntry.getEntryState() == AlertActionState.ACTION_NEEDED;
jobsToBeProcessed.add(historyEntry);
}
return true;
}
protected AlertActionEntry parseHistory(String historyId, SearchHit sh, long version) {
Map<String, Object> fields = sh.sourceAsMap();
return parseHistory(historyId, fields, version);
}
protected AlertActionEntry parseHistory(String historyId, Map<String,Object> fields, long version) {
return parseHistory(historyId, fields, version, actionRegistry, logger);
}
protected static AlertActionEntry parseHistory(String historyId, Map<String,Object> fields, long version,
AlertActionRegistry actionRegistry, ESLogger logger) {
String alertName = fields.get(ALERT_NAME_FIELD).toString();
boolean triggered = (Boolean)fields.get(TRIGGERED_FIELD);
DateTime fireTime = new DateTime(fields.get(FIRE_TIME_FIELD).toString());
DateTime scheduledFireTime = new DateTime(fields.get(SCHEDULED_FIRE_TIME_FIELD).toString());
AlertTrigger trigger = TriggerManager.parseTriggerFromMap((Map<String,Object>)fields.get(TRIGGER_FIELD));
String queryRan = fields.get(QUERY_RAN_FIELD).toString();
long numberOfResults = ((Number)fields.get(NUMBER_OF_RESULTS_FIELD)).longValue();
Object actionObj = fields.get(ACTIONS_FIELD);
List<AlertAction> actions;
if (actionObj instanceof Map) {
Map<String, Object> actionMap = (Map<String, Object>) actionObj;
actions = actionRegistry.parseActionsFromMap(actionMap);
} else {
throw new ElasticsearchException("Unable to parse actions [" + actionObj + "]");
}
List<String> indices = new ArrayList<>();
if (fields.get(INDICES_FIELD) != null && fields.get(INDICES_FIELD) instanceof List){
indices = (List<String>)fields.get(INDICES_FIELD);
} else {
logger.debug("Indices : " + fields.get(INDICES_FIELD) + " class " +
(fields.get(INDICES_FIELD) != null ? fields.get(INDICES_FIELD).getClass() : null ));
}
String stateString = fields.get(AlertActionState.FIELD_NAME).toString();
AlertActionState state = AlertActionState.fromString(stateString);
return new AlertActionEntry(historyId, version, alertName, triggered, fireTime, scheduledFireTime, trigger, queryRan,
numberOfResults, actions, indices, state);
}
public boolean addHistory(String alertName, boolean triggered,
DateTime fireTime, DateTime scheduledFireTime, SearchRequestBuilder triggeringQuery,
AlertTrigger trigger, long numberOfResults,
List<AlertAction> actions,
@Nullable List<String> indices) throws IOException {
if (!client.admin().indices().prepareExists(ALERT_HISTORY_INDEX).execute().actionGet().isExists()) {
ClusterHealthStatus chs = createAlertHistoryIndex();
}
AlertActionState state = AlertActionState.NO_ACTION_NEEDED;
if (triggered && !actions.isEmpty()) {
state = AlertActionState.ACTION_NEEDED;
}
AlertActionEntry entry = new AlertActionEntry(alertName + " " + scheduledFireTime.toDateTimeISO(), 1, alertName, triggered, fireTime, scheduledFireTime, trigger,
triggeringQuery.toString(), numberOfResults, actions, indices, state);
XContentBuilder historyEntry = XContentFactory.jsonBuilder();
entry.toXContent(historyEntry, ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(ALERT_HISTORY_INDEX);
indexRequest.type(ALERT_HISTORY_TYPE);
indexRequest.id(entry.getId());
indexRequest.source(historyEntry);
indexRequest.listenerThreaded(false);
indexRequest.operationThreaded(false);
indexRequest.refresh(true); //Always refresh after indexing an alert
indexRequest.opType(IndexRequest.OpType.CREATE);
try {
if (client.index(indexRequest).actionGet().isCreated()) {
jobsToBeProcessed.add(entry);
return true;
} else {
return false;
}
} catch (DocumentAlreadyExistsException daee){
logger.warn("Someone has already created a history entry for this alert run");
return false;
}
}
private void stopIfRunning() {
if (running.compareAndSet(true, false)) {
logger.info("Stopping job queue");
jobsToBeProcessed.add(END_ENTRY);
}
}
private ClusterHealthStatus createAlertHistoryIndex() {
CreateIndexResponse cir = client.admin().indices().prepareCreate(ALERT_HISTORY_INDEX).addMapping(ALERT_HISTORY_TYPE).execute().actionGet(); //TODO FIX MAPPINGS
if (!cir.isAcknowledged()) {
logger.error("Create [{}] was not acknowledged", ALERT_HISTORY_INDEX);
}
ClusterHealthResponse actionGet = client.admin().cluster()
.health(Requests.clusterHealthRequest(ALERT_HISTORY_INDEX).waitForGreenStatus().waitForEvents(Priority.LANGUID).waitForRelocatingShards(0)).actionGet();
return actionGet.getStatus();
}
private AlertActionEntry getHistoryEntryFromIndex(String entryId) {
GetRequest getRequest = Requests.getRequest(ALERT_HISTORY_INDEX);
getRequest.type(ALERT_HISTORY_TYPE);
getRequest.id(entryId);
GetResponse getResponse = client.get(getRequest).actionGet();
if (getResponse.isExists()) {
return parseHistory(entryId, getResponse.getSourceAsMap(), getResponse.getVersion());
} else {
throw new ElasticsearchException("Unable to find [" + entryId + "] in the [" + ALERT_HISTORY_INDEX + "]" );
}
}
private void updateHistoryEntry(AlertActionEntry entry, AlertActionState actionPerformed) {
entry.setEntryState(AlertActionState.ACTION_PERFORMED);
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(ALERT_HISTORY_INDEX);
updateRequest.type(ALERT_HISTORY_TYPE);
updateRequest.id(entry.getId());
entry.setEntryState(actionPerformed);
XContentBuilder historyBuilder;
try {
historyBuilder = XContentFactory.jsonBuilder();
entry.toXContent(historyBuilder, ToXContent.EMPTY_PARAMS);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to serialize alert history entry ["+ entry.getId() + "]", ie);
}
updateRequest.doc(historyBuilder);
try {
client.update(updateRequest).actionGet();
} catch (ElasticsearchException ee) {
logger.error("Failed to update in claim", ee);
}
}
private boolean claimAlertHistoryEntry(AlertActionEntry entry) {
AlertActionEntry indexedHistoryEntry;
try {
indexedHistoryEntry = getHistoryEntryFromIndex(entry.getId());
if (indexedHistoryEntry.getEntryState() != AlertActionState.ACTION_NEEDED) {
//Someone else is doing or has done this action
return false;
}
entry.setEntryState(AlertActionState.ACTION_UNDERWAY);
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(ALERT_HISTORY_INDEX);
updateRequest.type(ALERT_HISTORY_TYPE);
updateRequest.id(entry.getId());
updateRequest.version(entry.getVersion());//Since we loaded this alert directly from the index the version should be correct
XContentBuilder historyBuilder;
try {
historyBuilder = XContentFactory.jsonBuilder();
entry.toXContent(historyBuilder, ToXContent.EMPTY_PARAMS);
} catch (IOException ie) {
throw new ElasticsearchException("Unable to serialize alert history entry ["+ entry.getId() + "]", ie);
}
updateRequest.doc(historyBuilder);
updateRequest.retryOnConflict(0);
try {
client.update(updateRequest).actionGet();
} catch (ElasticsearchException ee) {
logger.error("Failed to update in claim", ee);
return false;
}
} catch (Throwable t) {
logger.error("Failed to claim history entry " + entry, t);
return false;
}
return true;
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class AlertActionRegistry extends AbstractComponent {
private volatile ImmutableOpenMap<String, AlertActionFactory> actionImplemented;
@Inject
public AlertActionRegistry(Settings settings, Client client) {
super(settings);
this.actionImplemented = ImmutableOpenMap.<String, AlertActionFactory>builder()
.fPut("email", new EmailAlertActionFactory())
.fPut("index", new IndexAlertActionFactory(client))
.build();
}
public void registerAction(String name, AlertActionFactory actionFactory){
actionImplemented = ImmutableOpenMap.builder(actionImplemented)
.fPut(name, actionFactory)
.build();
}
public List<AlertAction> parseActionsFromMap(Map<String,Object> actionMap) {
ImmutableOpenMap<String, AlertActionFactory> actionImplemented = this.actionImplemented;
List<AlertAction> actions = new ArrayList<>();
for (Map.Entry<String, Object> actionEntry : actionMap.entrySet()) {
AlertActionFactory factory = actionImplemented.get(actionEntry.getKey());
if (factory != null) {
actions.add(factory.createAction(actionEntry.getValue()));
} else {
throw new ElasticsearchIllegalArgumentException("No action exists with the name [" + actionEntry.getKey() + "]");
}
}
return actions;
}
public void doAction(Alert alert, AlertActionEntry actionEntry){
for (AlertAction action : alert.actions()) {
action.doAction(alert, actionEntry);
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
*/
public enum AlertActionState implements ToXContent {
NO_ACTION_NEEDED,
ACTION_NEEDED,
ACTION_UNDERWAY,
ACTION_PERFORMED;
public static final String FIELD_NAME = "AlertHistoryState";
@Override
public String toString(){
switch (this) {
case NO_ACTION_NEEDED:
return "NO_ACTION_NEEDED";
case ACTION_NEEDED:
return "ACTION_NEEDED";
case ACTION_UNDERWAY:
return "ACTION_UNDERWAY";
case ACTION_PERFORMED:
return "ACTION_PERFORMED";
default:
return "NO_ACTION_NEEDED";
}
}
public static AlertActionState fromString(String s) {
switch(s.toUpperCase()) {
case "NO_ACTION_NEEDED":
return NO_ACTION_NEEDED;
case "ACTION_NEEDED":
return ACTION_NEEDED;
case "ACTION_UNDERWAY":
return ACTION_UNDERWAY;
case "ACTION_PERFORMED":
return ACTION_PERFORMED;
default:
throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" );
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FIELD_NAME);
builder.value(this.toString());
builder.endObject();
return builder;
}
}

View File

@ -6,9 +6,8 @@
package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.ArrayList;
@ -28,7 +27,6 @@ public class EmailAlertAction implements AlertAction {
String server = "smtp.gmail.com";
int port = 587;
public EmailAlertAction(String ... addresses){
for (String address : addresses) {
addEmailAddress(address);
@ -69,7 +67,7 @@ public class EmailAlertAction implements AlertAction {
}
@Override
public boolean doAction(String alertName, AlertResult result) {
public boolean doAction(Alert alert, AlertActionEntry result) {
Properties props = new Properties();
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true");
@ -86,19 +84,21 @@ public class EmailAlertAction implements AlertAction {
message.setFrom(new InternetAddress(from));
message.setRecipients(Message.RecipientType.TO,
emailAddresses.toArray(new Address[1]));
message.setSubject("Elasticsearch Alert " + alertName + " triggered");
message.setSubject("Elasticsearch Alert " + alert.alertName() + " triggered");
StringBuffer output = new StringBuffer();
output.append("The following query triggered because " + result.trigger.toString() + "\n");
output.append("The total number of hits returned : " + result.searchResponse.getHits().getTotalHits() + "\n");
output.append("For query : " + result.query.toString());
output.append("The following query triggered because " + result.getTrigger().toString() + "\n");
output.append("The total number of hits returned : " + result.getNumberOfResults() + "\n");
output.append("For query : " + result.getTriggeringQuery());
output.append("\n");
output.append("Indices : ");
for (String index : result.indices) {
for (String index : result.getIndices()) {
output.append(index);
output.append("/");
}
output.append("\n");
output.append("\n");
/*
///@TODO: FIX THE SEARCH RESULT DISPLAY STUFF
if (displayField != null) {
for (SearchHit sh : result.searchResponse.getHits().getHits()) {
if (sh.sourceAsMap().containsKey(displayField)) {
@ -111,6 +111,7 @@ public class EmailAlertAction implements AlertAction {
} else {
output.append(result.searchResponse.toString());
}
*/
message.setText(output.toString());
Transport.send(message);
} catch (Exception e){

View File

@ -7,7 +7,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
@ -45,15 +45,15 @@ public class IndexAlertAction implements AlertAction, ToXContent {
}
@Override
public boolean doAction(String alertName, AlertResult alertResult) {
public boolean doAction(Alert alert, AlertActionEntry alertResult) {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(index);
indexRequest.type(type);
try {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject();
resultBuilder = alertResult.searchResponse.toXContent(resultBuilder, ToXContent.EMPTY_PARAMS);
resultBuilder.field("timestamp", alertResult.fireTime);
//resultBuilder = alertResult.searchResponse.toXContent(resultBuilder, ToXContent.EMPTY_PARAMS);
resultBuilder.field("timestamp", alertResult.getFireTime());
resultBuilder.endObject();
indexRequest.source(resultBuilder);
} catch (IOException ie) {

View File

@ -11,14 +11,19 @@ import org.elasticsearch.alerts.AlertingModule;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.AbstractPlugin;
import java.util.Collection;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
public class AlertsPlugin extends AbstractPlugin {
public static final String NAME = "alerts";
@Override public String name() {
return "alerts";
return NAME;
}
@Override public String description() {
@ -39,4 +44,12 @@ public class AlertsPlugin extends AbstractPlugin {
modules.add(AlertingModule.class);
return modules;
}
@Override
public Settings additionalSettings() {
return settingsBuilder()
.put("threadpool."+ NAME + ".type","cached")
.build();
}
}

View File

@ -9,9 +9,8 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertResult;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -42,21 +41,22 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
private final Client client;
private final Scheduler scheduler;
private final AlertManager alertManager;
private final ScriptService scriptService;
private final TriggerManager triggerManager;
private final AlertActionManager actionManager;
private final ScriptService scriptService;
private AlertActionManager actionManager;
private final AtomicBoolean run = new AtomicBoolean(false);
@Inject
public AlertScheduler(Settings settings, AlertManager alertManager, Client client,
TriggerManager triggerManager, AlertActionManager actionManager,
ScriptService scriptService, ClusterService clusterService) {
TriggerManager triggerManager, ScriptService scriptService,
ClusterService clusterService) {
super(settings);
this.alertManager = alertManager;
this.client = client;
this.triggerManager = triggerManager;
this.actionManager = actionManager;
this.scriptService = scriptService;
try {
SchedulerFactory schFactory = new StdSchedulerFactory();
@ -89,8 +89,10 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
if (run.compareAndSet(true, false)) {
try {
logger.info("Stopping scheduler");
if (!scheduler.isShutdown()) {
scheduler.clear();
scheduler.shutdown(false);
}
} catch (SchedulerException se){
logger.error("Failed to stop quartz scheduler", se);
}
@ -156,36 +158,17 @@ public class AlertScheduler extends AbstractLifecycleComponent implements Cluste
SearchResponse sr = srb.execute().get();
logger.warn("Got search response hits : [{}]", sr.getHits().getTotalHits() );
AlertResult result = new AlertResult(alertName, sr, alert.trigger(),
triggerManager.isTriggered(alertName,sr), srb, indices,
new DateTime(jobExecutionContext.getScheduledFireTime()));
boolean firedAction = false;
if (result.isTriggered) {
logger.warn("We have triggered");
DateTime lastActionFire = alertManager.timeActionLastTriggered(alertName);
long msSinceLastAction = scheduledTime.getMillis() - lastActionFire.getMillis();
logger.error("last action fire [{}]", lastActionFire);
logger.error("msSinceLastAction [{}]", msSinceLastAction);
boolean isTriggered = triggerManager.isTriggered(alertName,sr);
if (alert.timePeriod().getMillis() > msSinceLastAction) {
logger.warn("Not firing action because it was fired in the timePeriod");
} else {
actionManager.doAction(alertName, result);
logger.warn("Did action !");
firedAction = true;
}
} else {
logger.warn("We didn't trigger");
}
alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime,firedAction);
if (!alertManager.addHistory(alertName, result.isTriggered,
new DateTime(jobExecutionContext.getScheduledFireTime()), result.query,
result.trigger, result.searchResponse.getHits().getTotalHits(), alert.actions(), alert.indices()))
alertManager.updateLastRan(alertName, new DateTime(jobExecutionContext.getFireTime()),scheduledTime);
if (!alertManager.addHistory(alertName, isTriggered,
new DateTime(jobExecutionContext.getScheduledFireTime()), scheduledTime, srb,
alert.trigger(), sr.getHits().getTotalHits(), alert.actions(), alert.indices()))
{
logger.warn("Failed to store history for alert [{}]", alertName);
}
} catch (Exception e) {
logger.error("Failed execute alert [{}]", e, alertName);
}

View File

@ -168,4 +168,29 @@ public class AlertTrigger implements ToXContent {
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AlertTrigger that = (AlertTrigger) o;
if (value != that.value) return false;
if (scriptedTrigger != null ? !scriptedTrigger.equals(that.scriptedTrigger) : that.scriptedTrigger != null)
return false;
if (trigger != that.trigger) return false;
if (triggerType != that.triggerType) return false;
return true;
}
@Override
public int hashCode() {
int result = trigger != null ? trigger.hashCode() : 0;
result = 31 * result + (triggerType != null ? triggerType.hashCode() : 0);
result = 31 * result + value;
result = 31 * result + (scriptedTrigger != null ? scriptedTrigger.hashCode() : 0);
return result;
}
}

View File

@ -33,8 +33,6 @@ public class TriggerManager extends AbstractComponent {
private final ScriptService scriptService;
public static AlertTrigger parseTriggerFromMap(Map<String, Object> triggerMap) {
//For now just trigger on number of events greater than 1
for (Map.Entry<String,Object> entry : triggerMap.entrySet()){
AlertTrigger.TriggerType type = AlertTrigger.TriggerType.fromString(entry.getKey());
if (type == AlertTrigger.TriggerType.SCRIPT) {
@ -48,6 +46,7 @@ public class TriggerManager extends AbstractComponent {
}
throw new ElasticsearchIllegalArgumentException();
}
private static ScriptedAlertTrigger parseScriptedTrigger(Object value) {
if (value instanceof Map) {
Map<String,Object> valueMap = (Map<String,Object>)value;

View File

@ -8,6 +8,8 @@ package org.elasticsearch.alerts;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionFactory;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.actions.AlertActionEntry;
import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.plugin.AlertsPlugin;
import org.elasticsearch.alerts.scheduler.AlertScheduler;
@ -31,7 +33,7 @@ import static org.hamcrest.core.Is.is;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 3)
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, maxNumDataNodes = 1, minNumDataNodes = 1, numDataNodes = 1)
public class BasicAlertingTest extends ElasticsearchIntegrationTest {
@Override
@ -48,11 +50,15 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
@Test
// TODO: add request, response & request builder etc.
public void testAlerSchedulerStartsProperly() throws Exception {
createIndex("my-index");
createIndex(ScriptService.SCRIPT_INDEX);
client().admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
client().prepareIndex(ScriptService.SCRIPT_INDEX, "mustache", "query")
createIndex("my-index");
createIndex(AlertManager.ALERT_INDEX);
createIndex(AlertActionManager.ALERT_HISTORY_INDEX);
ensureGreen("my-index", AlertManager.ALERT_INDEX, AlertActionManager.ALERT_HISTORY_INDEX);
client().preparePutIndexedScript()
.setScriptLang("mustache")
.setId("query")
.setSource(jsonBuilder().startObject().startObject("template").startObject("match_all").endObject().endObject().endObject())
.get();
@ -92,14 +98,14 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
}
@Override
public boolean doAction(String alertName, AlertResult alert) {
logger.info("Alert {} invoked: {}", alertName, alert);
public boolean doAction(Alert alert, AlertActionEntry actionEntry) {
logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry);
alertActionInvoked.set(true);
return true;
}
};
AlertActionManager alertActionManager = internalCluster().getInstance(AlertActionManager.class, internalCluster().getMasterName());
alertActionManager.registerAction("test", new AlertActionFactory() {
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
alertActionRegistry.registerAction("test", new AlertActionFactory() {
@Override
public AlertAction createAction(Object parameters) {
return alertAction;
@ -125,7 +131,7 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
@Override
public void run() {
assertThat(alertActionInvoked.get(), is(true));
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertManager.ALERT_HISTORY_INDEX).get();
IndicesExistsResponse indicesExistsResponse = client().admin().indices().prepareExists(AlertActionManager.ALERT_HISTORY_INDEX).get();
assertThat(indicesExistsResponse.isExists(), is(true));
}
}, 30, TimeUnit.SECONDS);

View File

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.BasicAlertingTest;
import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numClientNodes = 0, transportClientRatio = 0, numDataNodes = 1)
public class AlertActionsTest extends ElasticsearchIntegrationTest {
@Test
public void testAlertActionParser(){
DateTime fireTime = new DateTime();
DateTime scheduledFireTime = new DateTime();
Map<String, Object> triggerMap = new HashMap<>();
triggerMap.put("numberOfEvents", ">1");
Map<String,Object> actionMap = new HashMap<>();
Map<String,Object> emailParamMap = new HashMap<>();
List<String> addresses = new ArrayList<>();
addresses.add("foo@bar.com");
emailParamMap.put("addresses", addresses);
actionMap.put("email", emailParamMap);
Map<String, Object> fieldMap = new HashMap<>();
fieldMap.put(AlertActionManager.ALERT_NAME_FIELD, "testName");
fieldMap.put(AlertActionManager.TRIGGERED_FIELD, true);
fieldMap.put(AlertActionManager.FIRE_TIME_FIELD, fireTime.toDateTimeISO().toString());
fieldMap.put(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledFireTime.toDateTimeISO().toString());
fieldMap.put(AlertActionManager.TRIGGER_FIELD, triggerMap);
fieldMap.put(AlertActionManager.QUERY_RAN_FIELD, "foobar");
fieldMap.put(AlertActionManager.NUMBER_OF_RESULTS_FIELD,10);
fieldMap.put(AlertActionManager.ACTIONS_FIELD, actionMap);
fieldMap.put(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString());
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", fieldMap, 0, alertActionRegistry, logger);
assertEquals(actionEntry.getVersion(), 0);
assertEquals(actionEntry.getAlertName(), "testName");
assertEquals(actionEntry.isTriggered(), true);
assertEquals(actionEntry.getScheduledTime(), scheduledFireTime);
assertEquals(actionEntry.getFireTime(), fireTime);
assertEquals(actionEntry.getEntryState(), AlertActionState.ACTION_NEEDED);
assertEquals(actionEntry.getNumberOfResults(), 10);
assertEquals(actionEntry.getTrigger(),
new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1));
}
}