Move alert execution out of scheduler thread.

This commit moves the execution of the alert trigger to the AlertActions queued thread.
Also change the states of AlertActionEntry to include SEARCH_NEEDED and ERROR and remove now uneeded states.
AlertActions now take a TriggerResult instead of an alert action entry.

Original commit: elastic/x-pack-elasticsearch@2b650ca4c1
This commit is contained in:
Brian Murphy 2014-11-06 13:34:48 +00:00
parent f86544497a
commit 89dd5e2599
14 changed files with 165 additions and 112 deletions

View File

@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.actions.AlertActionManager; import org.elasticsearch.alerts.actions.AlertActionManager;
import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.scheduler.AlertScheduler; import org.elasticsearch.alerts.scheduler.AlertScheduler;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
import org.elasticsearch.alerts.triggers.TriggerResult; import org.elasticsearch.alerts.triggers.TriggerResult;
@ -42,17 +43,20 @@ public class AlertManager extends AbstractComponent {
private final TriggerManager triggerManager; private final TriggerManager triggerManager;
private final ClusterService clusterService; private final ClusterService clusterService;
private final AlertActionManager actionManager; private final AlertActionManager actionManager;
private final AlertActionRegistry actionRegistry;
private final AtomicBoolean started = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false);
@Inject @Inject
public AlertManager(Settings settings, ClusterService clusterService, AlertsStore alertsStore, public AlertManager(Settings settings, ClusterService clusterService, AlertsStore alertsStore,
IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager) { IndicesService indicesService, TriggerManager triggerManager, AlertActionManager actionManager, AlertActionRegistry actionRegistry) {
super(settings); super(settings);
this.alertsStore = alertsStore; this.alertsStore = alertsStore;
this.clusterService = clusterService; this.clusterService = clusterService;
this.triggerManager = triggerManager; this.triggerManager = triggerManager;
this.actionManager = actionManager; this.actionManager = actionManager;
this.actionManager.setAlertManager(this);
this.actionRegistry = actionRegistry;
clusterService.add(new AlertsClusterStateListener()); clusterService.add(new AlertsClusterStateListener());
// Close if the indices service is being stopped, so we don't run into search failures (locally) that will // Close if the indices service is being stopped, so we don't run into search failures (locally) that will
// happen because we're shutting down and an alert is scheduled. // happen because we're shutting down and an alert is scheduled.
@ -106,7 +110,7 @@ public class AlertManager extends AbstractComponent {
return started.get(); return started.get();
} }
public void executeAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){ public void scheduleAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime){
ensureStarted(); ensureStarted();
Alert alert = alertsStore.getAlert(alertName); Alert alert = alertsStore.getAlert(alertName);
if (alert == null) { if (alert == null) {
@ -117,14 +121,34 @@ public class AlertManager extends AbstractComponent {
logger.debug("Alert [{}] is not enabled", alert.alertName()); logger.debug("Alert [{}] is not enabled", alert.alertName());
return; return;
} }
try { try {
TriggerResult result = triggerManager.isTriggered(alert, scheduledFireTime, fireTime); actionManager.addAlertAction(alert, scheduledFireTime, fireTime);
actionManager.addAlertAction(alert, result, scheduledFireTime, fireTime); } catch (IOException ioe) {
} catch (Exception e) { logger.error("Failed to add alert action for [{}]", ioe, alert);
logger.error("Failed execute alert [{}]", e, alertName);
} }
} }
public TriggerResult executeAlert(String alertName, DateTime scheduledFireTime, DateTime fireTime) {
ensureStarted();
Alert alert = alertsStore.getAlert(alertName);
if (alert == null) {
throw new ElasticsearchException("Alert is not available");
}
try {
TriggerResult triggerResult = triggerManager.isTriggered(alert, scheduledFireTime, fireTime);
if (triggerResult.isTriggered()) {
actionRegistry.doAction(alert, triggerResult);
alert.lastActionFire(fireTime);
alertsStore.updateAlert(alert);
}
return triggerResult;
} catch (Exception e) {
throw new ElasticsearchException("Failed to execute alert [" + alert + "]", e);
}
}
public void stop() { public void stop() {
if (started.compareAndSet(true, false)) { if (started.compareAndSet(true, false)) {
scheduler.stop(); scheduler.stop();
@ -145,17 +169,6 @@ public class AlertManager extends AbstractComponent {
} }
} }
public Alert getAlert(String alertName) {
return alertsStore.getAlert(alertName);
}
public IndexResponse updateAlert(Alert alert, boolean updateMap) throws IOException {
if (!alertsStore.hasAlert(alert.alertName())) {
return null;
}
return alertsStore.updateAlert(alert, updateMap);
}
private final class AlertsClusterStateListener implements ClusterStateListener { private final class AlertsClusterStateListener implements ClusterStateListener {
@Override @Override

View File

@ -13,6 +13,8 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.alerts.actions.AlertAction; import org.elasticsearch.alerts.actions.AlertAction;
import org.elasticsearch.alerts.actions.AlertActionRegistry; import org.elasticsearch.alerts.actions.AlertActionRegistry;
import org.elasticsearch.alerts.triggers.TriggerManager; import org.elasticsearch.alerts.triggers.TriggerManager;
@ -99,26 +101,21 @@ public class AlertsStore extends AbstractComponent {
return new Tuple<>(alert, response); return new Tuple<>(alert, response);
} }
public IndexResponse updateAlert(Alert alert) throws IOException {
return updateAlert(alert, false);
}
/** /**
* Updates the specified alert by making sure that the made changes are persisted. * Updates the specified alert by making sure that the made changes are persisted.
*/ */
public IndexResponse updateAlert(Alert alert, boolean updateMap) throws IOException { public IndexResponse updateAlert(Alert alert) throws IOException {
IndexResponse response = client.prepareIndex(ALERT_INDEX, ALERT_TYPE, alert.alertName()) IndexResponse response = client.prepareIndex(ALERT_INDEX, ALERT_TYPE, alert.alertName())
.setSource(XContentFactory.jsonBuilder().value(alert)) .setSource()
.setVersion(alert.version()) .setVersion(alert.version())
.setOpType(IndexRequest.OpType.INDEX)
.get(); .get();
alert.version(response.getVersion()); alert.version(response.getVersion());
if (updateMap) { // Don'<></> need to update the alertMap, since we are working on an instance from it.
alertMap.put(alert.alertName(), alert); assert alertMap.get(alert.alertName()) == alert;
} else {
// Don'<></> need to update the alertMap, since we are working on an instance from it.
assert alertMap.get(alert.alertName()) == alert;
}
return response; return response;
} }

View File

@ -6,6 +6,7 @@
package org.elasticsearch.alerts.actions; package org.elasticsearch.alerts.actions;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -22,7 +23,7 @@ public interface AlertAction extends ToXContent {
public void writeTo(StreamOutput out) throws IOException; public void writeTo(StreamOutput out) throws IOException;
public void readFrom(StreamInput in) throws IOException; public void readFrom(StreamInput in) throws IOException;
public boolean doAction(Alert alert, AlertActionEntry actionEntry); public boolean doAction(Alert alert, TriggerResult result);
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.alerts.actions; package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
@ -28,28 +29,29 @@ public class AlertActionEntry implements ToXContent{
private String id; private String id;
private long version; private long version;
private String alertName; private String alertName;
private boolean triggered;
private DateTime fireTime; private DateTime fireTime;
private DateTime scheduledTime; private DateTime scheduledTime;
private AlertTrigger trigger; private AlertTrigger trigger;
private SearchRequest searchRequest;
private SearchResponse searchResponse;
private List<AlertAction> actions; private List<AlertAction> actions;
private AlertActionState entryState; private AlertActionState entryState;
/*Optional*/
private SearchRequest searchRequest;
private SearchResponse searchResponse;
private boolean triggered;
private String errorMsg;
AlertActionEntry() { AlertActionEntry() {
} }
public AlertActionEntry(Alert alert, TriggerResult result, DateTime scheduledTime, DateTime fireTime, AlertActionState state) throws IOException { public AlertActionEntry(Alert alert, DateTime scheduledTime, DateTime fireTime, AlertActionState state) throws IOException {
this.id = alert.alertName() + "#" + scheduledTime.toDateTimeISO(); this.id = alert.alertName() + "#" + scheduledTime.toDateTimeISO();
this.version = 1; this.version = 1;
this.alertName = alert.alertName(); this.alertName = alert.alertName();
this.triggered = result.isTriggered();
this.fireTime = fireTime; this.fireTime = fireTime;
this.scheduledTime = scheduledTime; this.scheduledTime = scheduledTime;
this.trigger = alert.trigger(); this.trigger = alert.trigger();
this.searchRequest = result.getRequest();
this.searchResponse = result.getResponse();
this.actions = alert.actions(); this.actions = alert.actions();
this.entryState = state; this.entryState = state;
} }
@ -172,6 +174,17 @@ public class AlertActionEntry implements ToXContent{
this.version = version; this.version = version;
} }
/**
* @return The error if an error occured otherwise null
*/
public String getErrorMsg(){
return this.errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
@Override @Override
public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder historyEntry, Params params) throws IOException {
historyEntry.startObject(); historyEntry.startObject();
@ -181,16 +194,22 @@ public class AlertActionEntry implements ToXContent{
historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO()); historyEntry.field(AlertActionManager.SCHEDULED_FIRE_TIME_FIELD, scheduledTime.toDateTimeISO());
historyEntry.field("trigger", trigger, params); historyEntry.field("trigger", trigger, params);
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out;
searchRequest.writeTo(new DataOutputStreamOutput(new DataOutputStream(out))); if (searchRequest != null) {
historyEntry.field("request_binary", out.toByteArray()); out = new ByteArrayOutputStream();
out = new ByteArrayOutputStream(); searchRequest.writeTo(new DataOutputStreamOutput(new DataOutputStream(out)));
searchResponse.writeTo(new DataOutputStreamOutput(new DataOutputStream(out))); historyEntry.field("request_binary", out.toByteArray());
historyEntry.field("response_binary", out.toByteArray()); }
// Serializing it as xcontent allows the search response to be encapsulated in a doc as a json object if (searchResponse != null) {
historyEntry.startObject("response"); out = new ByteArrayOutputStream();
searchResponse.toXContent(historyEntry, params); searchResponse.writeTo(new DataOutputStreamOutput(new DataOutputStream(out)));
historyEntry.endObject(); historyEntry.field("response_binary", out.toByteArray());
// Serializing it as xcontent allows the search response to be encapsulated in a doc as a json object
historyEntry.startObject("response");
searchResponse.toXContent(historyEntry, params);
historyEntry.endObject();
}
historyEntry.startObject("actions"); historyEntry.startObject("actions");
for (AlertAction action : actions) { for (AlertAction action : actions) {
historyEntry.field(action.getActionName()); historyEntry.field(action.getActionName());
@ -198,7 +217,13 @@ public class AlertActionEntry implements ToXContent{
} }
historyEntry.endObject(); historyEntry.endObject();
historyEntry.field(AlertActionState.FIELD_NAME, entryState.toString()); historyEntry.field(AlertActionState.FIELD_NAME, entryState.toString());
if (errorMsg != null) {
historyEntry.field("errorMsg", errorMsg);
}
historyEntry.endObject(); historyEntry.endObject();
return historyEntry; return historyEntry;
} }

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.AlertManager;
import org.elasticsearch.alerts.AlertsStore; import org.elasticsearch.alerts.AlertsStore;
import org.elasticsearch.alerts.LoadingListener; import org.elasticsearch.alerts.LoadingListener;
import org.elasticsearch.alerts.plugin.AlertsPlugin; import org.elasticsearch.alerts.plugin.AlertsPlugin;
@ -59,6 +60,7 @@ public class AlertActionManager extends AbstractComponent {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final AlertsStore alertsStore; private final AlertsStore alertsStore;
private final AlertActionRegistry actionRegistry; private final AlertActionRegistry actionRegistry;
private AlertManager alertManager;
private final BlockingQueue<AlertActionEntry> actionsToBeProcessed = new LinkedBlockingQueue<>(); private final BlockingQueue<AlertActionEntry> actionsToBeProcessed = new LinkedBlockingQueue<>();
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED); private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
@ -68,6 +70,8 @@ public class AlertActionManager extends AbstractComponent {
private static AlertActionEntry END_ENTRY = new AlertActionEntry(); private static AlertActionEntry END_ENTRY = new AlertActionEntry();
@Inject @Inject
public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, ThreadPool threadPool, AlertsStore alertsStore) { public AlertActionManager(Settings settings, Client client, AlertActionRegistry actionRegistry, ThreadPool threadPool, AlertsStore alertsStore) {
super(settings); super(settings);
@ -80,6 +84,10 @@ public class AlertActionManager extends AbstractComponent {
this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30)); this.scrollTimeout = settings.getAsTime("alerts.scroll.timeout", TimeValue.timeValueSeconds(30));
} }
public void setAlertManager(AlertManager alertManager){
this.alertManager = alertManager;
}
public void start(ClusterState state, final LoadingListener listener) { public void start(ClusterState state, final LoadingListener listener) {
IndexMetaData indexMetaData = state.getMetaData().index(ALERT_HISTORY_INDEX); IndexMetaData indexMetaData = state.getMetaData().index(ALERT_HISTORY_INDEX);
if (indexMetaData != null) { if (indexMetaData != null) {
@ -136,7 +144,7 @@ public class AlertActionManager extends AbstractComponent {
public void loadQueue() { public void loadQueue() {
SearchResponse response = client.prepareSearch() SearchResponse response = client.prepareSearch()
.setQuery(QueryBuilders.termQuery(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED)) .setQuery(QueryBuilders.termQuery(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED))
.setSearchType(SearchType.SCAN) .setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout) .setScroll(scrollTimeout)
.setSize(scrollSize) .setSize(scrollSize)
@ -147,7 +155,7 @@ public class AlertActionManager extends AbstractComponent {
for (SearchHit sh : response.getHits()) { for (SearchHit sh : response.getHits()) {
String historyId = sh.getId(); String historyId = sh.getId();
AlertActionEntry historyEntry = parseHistory(historyId, sh.getSourceRef(), sh.version(), actionRegistry); AlertActionEntry historyEntry = parseHistory(historyId, sh.getSourceRef(), sh.version(), actionRegistry);
assert historyEntry.getEntryState() == AlertActionState.ACTION_NEEDED; assert historyEntry.getEntryState() == AlertActionState.SEARCH_NEEDED;
actionsToBeProcessed.add(historyEntry); actionsToBeProcessed.add(historyEntry);
} }
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get(); response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
@ -224,29 +232,20 @@ public class AlertActionManager extends AbstractComponent {
return entry; return entry;
} }
public void addAlertAction(Alert alert, TriggerResult result, DateTime scheduledFireTime, DateTime fireTime) throws IOException { public void addAlertAction(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {
AlertActionState state = AlertActionState.NO_ACTION_NEEDED;
if (result.isTriggered() && !alert.actions().isEmpty()) {
state = AlertActionState.ACTION_NEEDED;
}
AlertActionEntry entry = new AlertActionEntry(alert, result, scheduledFireTime, fireTime, state);
AlertActionEntry entry = new AlertActionEntry(alert, scheduledFireTime, fireTime, AlertActionState.SEARCH_NEEDED);
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId()) IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId())
.setSource(XContentFactory.jsonBuilder().value(entry)) .setSource(XContentFactory.jsonBuilder().value(entry))
.setOpType(IndexRequest.OpType.CREATE) .setOpType(IndexRequest.OpType.CREATE)
.get(); .get();
entry.setVersion(response.getVersion()); entry.setVersion(response.getVersion());
if (state != AlertActionState.NO_ACTION_NEEDED) { actionsToBeProcessed.add(entry);
actionsToBeProcessed.add(entry);
}
} }
private void updateHistoryEntry(AlertActionEntry entry, AlertActionState actionPerformed) throws IOException { private void updateHistoryEntry(AlertActionEntry entry, AlertActionState actionPerformed) throws IOException {
entry.setEntryState(actionPerformed); entry.setEntryState(actionPerformed);
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(ALERT_HISTORY_INDEX);
updateRequest.type(ALERT_HISTORY_TYPE);
updateRequest.id(entry.getId());
IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId()) IndexResponse response = client.prepareIndex(ALERT_HISTORY_INDEX, ALERT_HISTORY_TYPE, entry.getId())
.setSource(XContentFactory.jsonBuilder().value(entry)) .setSource(XContentFactory.jsonBuilder().value(entry))
.get(); .get();
@ -265,15 +264,25 @@ public class AlertActionManager extends AbstractComponent {
public void run() { public void run() {
try { try {
Alert alert = alertsStore.getAlert(entry.getAlertName()); Alert alert = alertsStore.getAlert(entry.getAlertName());
updateHistoryEntry(entry, AlertActionState.ACTION_NEEDED); if (alert == null) {
entry.setErrorMsg("Alert was not found in the alerts store");
actionRegistry.doAction(alert, entry); updateHistoryEntry(entry, AlertActionState.ERROR);
return;
alert.lastActionFire(entry.getFireTime()); }
alertsStore.updateAlert(alert); updateHistoryEntry(entry, AlertActionState.SEARCH_UNDERWAY);
updateHistoryEntry(entry, AlertActionState.ACTION_PERFORMED); TriggerResult trigger = alertManager.executeAlert(alert.alertName(), entry.getScheduledTime(), entry.getFireTime());
} catch (Throwable t) { entry.setTriggered(trigger.isTriggered());
logger.error("Failed to execute alert action", t); entry.setSearchRequest(trigger.getRequest());
entry.setSearchResponse(trigger.getResponse());
updateHistoryEntry(entry, trigger.isTriggered() ? AlertActionState.ACTION_PERFORMED : AlertActionState.NO_ACTION_NEEDED);
} catch (Exception e) {
logger.error("Failed to execute alert action", e);
try {
entry.setErrorMsg(e.getMessage());
updateHistoryEntry(entry, AlertActionState.ERROR);
} catch (IOException ioe) {
logger.error("Failed to update action history entry", ioe);
}
} }
} }
} }
@ -285,17 +294,7 @@ public class AlertActionManager extends AbstractComponent {
try { try {
logger.debug("Starting thread to read from the job queue"); logger.debug("Starting thread to read from the job queue");
while (started()) { while (started()) {
AlertActionEntry entry = null; AlertActionEntry entry = actionsToBeProcessed.take();
do {
try {
entry = actionsToBeProcessed.take();
} catch (InterruptedException ie) {
if (!started()) {
break;
}
}
} while (entry == null);
if (!started() || entry == END_ENTRY) { if (!started() || entry == END_ENTRY) {
logger.debug("Stopping thread to read from the job queue"); logger.debug("Stopping thread to read from the job queue");
return; return;
@ -304,7 +303,9 @@ public class AlertActionManager extends AbstractComponent {
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Error during reader thread, restarting queue reader thread...", e); logger.error("Error during reader thread, restarting queue reader thread...", e);
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread()); if (started()) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(new QueueReaderThread());
}
} }
} }
} }

View File

@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
@ -60,7 +61,7 @@ public class AlertActionRegistry extends AbstractComponent {
return actions; return actions;
} }
public void doAction(Alert alert, AlertActionEntry actionEntry){ public void doAction(Alert alert, TriggerResult actionEntry){
for (AlertAction action : alert.actions()) { for (AlertAction action : alert.actions()) {
action.doAction(alert, actionEntry); action.doAction(alert, actionEntry);
} }

View File

@ -14,10 +14,11 @@ import java.io.IOException;
/** /**
*/ */
public enum AlertActionState implements ToXContent { public enum AlertActionState implements ToXContent {
SEARCH_NEEDED,
SEARCH_UNDERWAY,
NO_ACTION_NEEDED, NO_ACTION_NEEDED,
ACTION_NEEDED, ACTION_PERFORMED,
ACTION_UNDERWAY, ERROR;
ACTION_PERFORMED;
public static final String FIELD_NAME = "state"; public static final String FIELD_NAME = "state";
@ -25,14 +26,16 @@ public enum AlertActionState implements ToXContent {
@Override @Override
public String toString(){ public String toString(){
switch (this) { switch (this) {
case SEARCH_NEEDED:
return "SEARCH_NEEDED";
case SEARCH_UNDERWAY:
return "SEARCH_UNDERWAY";
case NO_ACTION_NEEDED: case NO_ACTION_NEEDED:
return "NO_ACTION_NEEDED"; return "NO_ACTION_NEEDED";
case ACTION_NEEDED:
return "ACTION_NEEDED";
case ACTION_UNDERWAY:
return "ACTION_UNDERWAY";
case ACTION_PERFORMED: case ACTION_PERFORMED:
return "ACTION_PERFORMED"; return "ACTION_PERFORMED";
case ERROR:
return "ERROR";
default: default:
return "NO_ACTION_NEEDED"; return "NO_ACTION_NEEDED";
} }
@ -40,14 +43,16 @@ public enum AlertActionState implements ToXContent {
public static AlertActionState fromString(String s) { public static AlertActionState fromString(String s) {
switch(s.toUpperCase()) { switch(s.toUpperCase()) {
case "SEARCH_NEEDED":
return SEARCH_NEEDED;
case "SEARCH_UNDERWAY":
return SEARCH_UNDERWAY;
case "NO_ACTION_NEEDED": case "NO_ACTION_NEEDED":
return NO_ACTION_NEEDED; return NO_ACTION_NEEDED;
case "ACTION_NEEDED":
return ACTION_NEEDED;
case "ACTION_UNDERWAY": case "ACTION_UNDERWAY":
return ACTION_UNDERWAY;
case "ACTION_PERFORMED":
return ACTION_PERFORMED; return ACTION_PERFORMED;
case "ERROR":
return ERROR;
default: default:
throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" ); throw new ElasticsearchIllegalArgumentException("Unknown value [" + s + "] for AlertHistoryState" );
} }

View File

@ -7,6 +7,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -95,7 +96,7 @@ public class EmailAlertAction implements AlertAction {
} }
@Override @Override
public boolean doAction(Alert alert, AlertActionEntry result) { public boolean doAction(Alert alert, TriggerResult result) {
Properties props = new Properties(); Properties props = new Properties();
props.put("mail.smtp.auth", "true"); props.put("mail.smtp.auth", "true");
props.put("mail.smtp.starttls.enable", "true"); props.put("mail.smtp.starttls.enable", "true");
@ -115,11 +116,11 @@ public class EmailAlertAction implements AlertAction {
message.setSubject("Elasticsearch Alert " + alert.alertName() + " triggered"); message.setSubject("Elasticsearch Alert " + alert.alertName() + " triggered");
StringBuffer output = new StringBuffer(); StringBuffer output = new StringBuffer();
output.append("The following query triggered because " + result.getTrigger().toString() + "\n"); output.append("The following query triggered because " + result.getTrigger().toString() + "\n");
output.append("The total number of hits returned : " + result.getSearchResponse().getHits().getTotalHits() + "\n"); output.append("The total number of hits returned : " + result.getResponse().getHits().getTotalHits() + "\n");
output.append("For query : " + result.getSearchRequest()); output.append("For query : " + result.getRequest());
output.append("\n"); output.append("\n");
output.append("Indices : "); output.append("Indices : ");
for (String index : result.getSearchRequest().indices()) { for (String index : result.getRequest().indices()) {
output.append(index); output.append(index);
output.append("/"); output.append("/");
} }

View File

@ -8,6 +8,7 @@ package org.elasticsearch.alerts.actions;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.alerts.Alert; import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -59,15 +60,15 @@ public class IndexAlertAction implements AlertAction, ToXContent {
} }
@Override @Override
public boolean doAction(Alert alert, AlertActionEntry alertResult) { public boolean doAction(Alert alert, TriggerResult result) {
IndexRequest indexRequest = new IndexRequest(); IndexRequest indexRequest = new IndexRequest();
indexRequest.index(index); indexRequest.index(index);
indexRequest.type(type); indexRequest.type(type);
try { try {
XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint(); XContentBuilder resultBuilder = XContentFactory.jsonBuilder().prettyPrint();
resultBuilder.startObject(); resultBuilder.startObject();
resultBuilder = alertResult.getSearchResponse().toXContent(resultBuilder, ToXContent.EMPTY_PARAMS); resultBuilder = result.getResponse().toXContent(resultBuilder, ToXContent.EMPTY_PARAMS);
resultBuilder.field("timestamp", alertResult.getFireTime()); //resultBuilder.field("timestamp", result.getFireTime()); ///@TODO FIXME
resultBuilder.endObject(); resultBuilder.endObject();
indexRequest.source(resultBuilder); indexRequest.source(resultBuilder);
} catch (IOException ie) { } catch (IOException ie) {

View File

@ -59,7 +59,7 @@ public class AlertScheduler extends AbstractComponent {
public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){ public void executeAlert(String alertName, JobExecutionContext jobExecutionContext){
DateTime scheduledFireTime = new DateTime(jobExecutionContext.getScheduledFireTime()); DateTime scheduledFireTime = new DateTime(jobExecutionContext.getScheduledFireTime());
DateTime fireTime = new DateTime(jobExecutionContext.getFireTime()); DateTime fireTime = new DateTime(jobExecutionContext.getFireTime());
alertManager.executeAlert(alertName, scheduledFireTime, fireTime); alertManager.scheduleAlert(alertName, scheduledFireTime, fireTime);
} }
public boolean remove(String alertName) { public boolean remove(String alertName) {

View File

@ -138,7 +138,7 @@ public class TriggerManager extends AbstractComponent {
triggered = false; //TODO FIX THESE triggered = false; //TODO FIX THESE
break; break;
} }
return new TriggerResult(triggered, request, response); return new TriggerResult(triggered, request, response, alert.trigger());
} }
private TriggerResult doScriptTrigger(Alert alert, SearchRequest request, SearchResponse response) { private TriggerResult doScriptTrigger(Alert alert, SearchRequest request, SearchResponse response) {
@ -165,7 +165,7 @@ public class TriggerManager extends AbstractComponent {
} catch (Exception e ){ } catch (Exception e ){
logger.error("Failed to execute script trigger", e); logger.error("Failed to execute script trigger", e);
} }
return new TriggerResult(triggered, request, response); return new TriggerResult(triggered, request, response, alert.trigger());
} }
private SearchRequest prepareTriggerSearch(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException { private SearchRequest prepareTriggerSearch(Alert alert, DateTime scheduledFireTime, DateTime fireTime) throws IOException {

View File

@ -15,11 +15,13 @@ public class TriggerResult {
private final boolean triggered; private final boolean triggered;
private final SearchRequest request; private final SearchRequest request;
private final SearchResponse response; private final SearchResponse response;
private final AlertTrigger trigger;
public TriggerResult(boolean triggered, SearchRequest request, SearchResponse response) { public TriggerResult(boolean triggered, SearchRequest request, SearchResponse response, AlertTrigger trigger) {
this.triggered = triggered; this.triggered = triggered;
this.request = request; this.request = request;
this.response = response; this.response = response;
this.trigger = trigger;
} }
public boolean isTriggered() { public boolean isTriggered() {
@ -33,4 +35,9 @@ public class TriggerResult {
public SearchResponse getResponse() { public SearchResponse getResponse() {
return response; return response;
} }
public AlertTrigger getTrigger() {
return trigger;
}
} }

View File

@ -16,6 +16,7 @@ import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest;
import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse;
import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger; import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
@ -55,7 +56,6 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
} }
@Test @Test
// TODO: add request, response & request builder etc.
public void testAlerSchedulerStartsProperly() throws Exception { public void testAlerSchedulerStartsProperly() throws Exception {
createIndex("my-index"); createIndex("my-index");
createIndex(AlertsStore.ALERT_INDEX); createIndex(AlertsStore.ALERT_INDEX);
@ -94,8 +94,8 @@ public class BasicAlertingTest extends ElasticsearchIntegrationTest {
} }
@Override @Override
public boolean doAction(Alert alert, AlertActionEntry actionEntry) { public boolean doAction(Alert alert, TriggerResult result) {
logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry); logger.info("Alert {} invoked: {}", alert.alertName(), result);
alertActionInvoked.set(true); alertActionInvoked.set(true);
return true; return true;
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.alerts.transport.actions.get.GetAlertRequest;
import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse; import org.elasticsearch.alerts.transport.actions.get.GetAlertResponse;
import org.elasticsearch.alerts.triggers.AlertTrigger; import org.elasticsearch.alerts.triggers.AlertTrigger;
import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger; import org.elasticsearch.alerts.triggers.ScriptedAlertTrigger;
import org.elasticsearch.alerts.triggers.TriggerResult;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -110,7 +111,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
searchResponse.writeTo(out); searchResponse.writeTo(out);
builder.field(AlertActionManager.RESPONSE, out.bytes()); builder.field(AlertActionManager.RESPONSE, out.bytes());
builder.field(AlertActionManager.ACTIONS_FIELD, actionMap); builder.field(AlertActionManager.ACTIONS_FIELD, actionMap);
builder.field(AlertActionState.FIELD_NAME, AlertActionState.ACTION_NEEDED.toString()); builder.field(AlertActionState.FIELD_NAME, AlertActionState.SEARCH_NEEDED.toString());
builder.endObject(); builder.endObject();
AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName()); AlertActionRegistry alertActionRegistry = internalCluster().getInstance(AlertActionRegistry.class, internalCluster().getMasterName());
AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry); AlertActionEntry actionEntry = AlertActionManager.parseHistory("foobar", builder.bytes(), 0, alertActionRegistry);
@ -120,7 +121,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
assertEquals(actionEntry.isTriggered(), true); assertEquals(actionEntry.isTriggered(), true);
assertEquals(actionEntry.getScheduledTime(), scheduledFireTime); assertEquals(actionEntry.getScheduledTime(), scheduledFireTime);
assertEquals(actionEntry.getFireTime(), fireTime); assertEquals(actionEntry.getFireTime(), fireTime);
assertEquals(actionEntry.getEntryState(), AlertActionState.ACTION_NEEDED); assertEquals(actionEntry.getEntryState(), AlertActionState.SEARCH_NEEDED);
assertEquals(actionEntry.getSearchResponse().getHits().getTotalHits(), 10); assertEquals(actionEntry.getSearchResponse().getHits().getTotalHits(), 10);
assertEquals(actionEntry.getTrigger(), assertEquals(actionEntry.getTrigger(),
new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1)); new AlertTrigger(AlertTrigger.SimpleTrigger.GREATER_THAN, AlertTrigger.TriggerType.NUMBER_OF_EVENTS, 1));
@ -173,7 +174,7 @@ public class AlertActionsTest extends ElasticsearchIntegrationTest {
} }
@Override @Override
public boolean doAction(Alert alert, AlertActionEntry actionEntry) { public boolean doAction(Alert alert, TriggerResult actionEntry) {
logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry); logger.info("Alert {} invoked: {}", alert.alertName(), actionEntry);
alertActionInvoked.set(true); alertActionInvoked.set(true);
return true; return true;