Fixes updating an existing alert works as expected.

Original commit: elastic/x-pack-elasticsearch@236407367a
This commit is contained in:
Martijn van Groningen 2014-11-25 13:21:37 +01:00
parent 00bfd694af
commit 4b147b8f85
7 changed files with 117 additions and 37 deletions

View File

@ -22,7 +22,6 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -93,13 +92,17 @@ public class AlertManager extends AbstractComponent {
} }
} }
public IndexResponse addAlert(String alertName, BytesReference alertSource) { public IndexResponse putAlert(String alertName, BytesReference alertSource) {
ensureStarted(); ensureStarted();
alertLock.acquire(alertName); alertLock.acquire(alertName);
try { try {
Tuple<Alert, IndexResponse> result = alertsStore.addAlert(alertName, alertSource); AlertsStore.AlertStoreModification result = alertsStore.putAlert(alertName, alertSource);
scheduler.add(alertName, result.v1()); if (result.getPrevious() == null) {
return result.v2(); scheduler.schedule(alertName, result.getCurrent().schedule());
} else if (!result.getPrevious().schedule().equals(result.getCurrent().schedule())) {
scheduler.schedule(alertName, result.getCurrent().schedule());
}
return result.getIndexResponse();
} finally { } finally {
alertLock.release(alertName); alertLock.release(alertName);
} }
@ -126,7 +129,7 @@ public class AlertManager extends AbstractComponent {
try { try {
actionManager.addAlertAction(alert, scheduledFireTime, fireTime); actionManager.addAlertAction(alert, scheduledFireTime, fireTime);
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to add alert action for [{}]", e, alert); logger.error("Failed to schedule alert action for [{}]", e, alert);
} }
} finally { } finally {
alertLock.release(alertName); alertLock.release(alertName);

View File

@ -7,7 +7,9 @@ package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
@ -21,7 +23,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.joda.time.DateTime;
@ -85,6 +86,7 @@ public class AlertsStore extends AbstractComponent {
* Returns the alert with the specified name otherwise <code>null</code> is returned. * Returns the alert with the specified name otherwise <code>null</code> is returned.
*/ */
public Alert getAlert(String name) { public Alert getAlert(String name) {
ensureStarted();
return alertMap.get(name); return alertMap.get(name);
} }
@ -92,29 +94,27 @@ public class AlertsStore extends AbstractComponent {
* Creates an alert with the specified name and source. If an alert with the specified name already exists it will * Creates an alert with the specified name and source. If an alert with the specified name already exists it will
* get overwritten. * get overwritten.
*/ */
public Tuple<Alert, IndexResponse> addAlert(String name, BytesReference alertSource) { public AlertStoreModification putAlert(String alertName, BytesReference alertSource) {
Alert alert = parseAlert(name, alertSource); ensureStarted();
IndexResponse response = persistAlert(name, alertSource, IndexRequest.OpType.CREATE); Alert alert = parseAlert(alertName, alertSource);
IndexRequest indexRequest = createIndexRequest(alertName, alertSource);
IndexResponse response = client.index(indexRequest).actionGet();
alert.version(response.getVersion()); alert.version(response.getVersion());
alertMap.put(name, alert); Alert previous = alertMap.put(alertName, alert);
return new Tuple<>(alert, response); return new AlertStoreModification(previous, alert, response);
} }
/** /**
* Updates the specified alert by making sure that the made changes are persisted. * Updates the specified alert by making sure that the made changes are persisted.
*/ */
public IndexResponse updateAlert(Alert alert) throws IOException { public void updateAlert(Alert alert) throws IOException {
IndexResponse response = client.prepareIndex(ALERT_INDEX, ALERT_TYPE, alert.alertName()) ensureStarted();
.setSource(jsonBuilder().value(alert)) // TODO: the content type should be based on the provided content type when the alert was initially added. // TODO: the content type should be based on the provided content type when the alert was initially added.
.setVersion(alert.version()) BytesReference source = jsonBuilder().value(alert).bytes();
.setOpType(IndexRequest.OpType.INDEX) IndexResponse response = client.index(createIndexRequest(alert.alertName(), source)).actionGet();
.get();
alert.version(response.getVersion()); alert.version(response.getVersion());
// Don't need to update the alertMap, since we are working on an instance from it.
// Don'<></> need to update the alertMap, since we are working on an instance from it.
assert verifySameInstance(alert); assert verifySameInstance(alert);
return response;
} }
private boolean verifySameInstance(Alert alert) { private boolean verifySameInstance(Alert alert) {
@ -127,14 +127,15 @@ public class AlertsStore extends AbstractComponent {
* Deletes the alert with the specified name if exists * Deletes the alert with the specified name if exists
*/ */
public DeleteResponse deleteAlert(String name) { public DeleteResponse deleteAlert(String name) {
ensureStarted();
Alert alert = alertMap.remove(name); Alert alert = alertMap.remove(name);
if (alert == null) { if (alert == null) {
return new DeleteResponse(ALERT_INDEX, ALERT_TYPE, name, Versions.MATCH_ANY, false); return new DeleteResponse(ALERT_INDEX, ALERT_TYPE, name, Versions.MATCH_ANY, false);
} }
DeleteResponse deleteResponse = client.prepareDelete(ALERT_INDEX, ALERT_TYPE, name) DeleteRequest deleteRequest = new DeleteRequest(ALERT_INDEX, ALERT_TYPE, name);
.setVersion(alert.version()) deleteRequest.version(alert.version());
.get(); DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet();
assert deleteResponse.isFound(); assert deleteResponse.isFound();
return deleteResponse; return deleteResponse;
} }
@ -191,12 +192,11 @@ public class AlertsStore extends AbstractComponent {
} }
} }
private IndexResponse persistAlert(String alertName, BytesReference alertSource, IndexRequest.OpType opType) { private IndexRequest createIndexRequest(String alertName, BytesReference alertSource) {
IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName); IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName);
indexRequest.listenerThreaded(false); indexRequest.listenerThreaded(false);
indexRequest.source(alertSource, false); indexRequest.source(alertSource, false);
indexRequest.opType(opType); return indexRequest;
return client.index(indexRequest).actionGet();
} }
private void loadAlerts() { private void loadAlerts() {
@ -293,4 +293,35 @@ public class AlertsStore extends AbstractComponent {
return alert; return alert;
} }
private void ensureStarted() {
if (!started.get()) {
throw new ElasticsearchIllegalStateException("Alert store not started");
}
}
public final class AlertStoreModification {
private final Alert previous;
private final Alert current;
private final IndexResponse indexResponse;
public AlertStoreModification(Alert previous, Alert current, IndexResponse indexResponse) {
this.current = current;
this.previous = previous;
this.indexResponse = indexResponse;
}
public Alert getCurrent() {
return current;
}
public Alert getPrevious() {
return previous;
}
public IndexResponse getIndexResponse() {
return indexResponse;
}
}
} }

View File

@ -335,8 +335,8 @@ public class AlertActionManager extends AbstractComponent {
try { try {
entry.setErrorMsg(e.getMessage()); entry.setErrorMsg(e.getMessage());
updateHistoryEntry(entry, AlertActionState.ERROR); updateHistoryEntry(entry, AlertActionState.ERROR);
} catch (IOException ioe) { } catch (Exception e2) {
logger.error("Failed store error message to update action history entry", ioe); logger.error("Failed to update action history entry with the error message", e2);
} }
} else { } else {
logger.debug("Failed to execute alert action after shutdown", e); logger.debug("Failed to execute alert action after shutdown", e);

View File

@ -19,8 +19,10 @@ import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory; import org.quartz.impl.StdSchedulerFactory;
import org.quartz.simpl.SimpleJobFactory; import org.quartz.simpl.SimpleJobFactory;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
public class AlertScheduler extends AbstractComponent { public class AlertScheduler extends AbstractComponent {
@ -59,7 +61,7 @@ public class AlertScheduler extends AbstractComponent {
scheduler = schFactory.getScheduler(); scheduler = schFactory.getScheduler();
scheduler.setJobFactory(new SimpleJobFactory()); scheduler.setJobFactory(new SimpleJobFactory());
for (Map.Entry<String, Alert> entry : alerts.entrySet()) { for (Map.Entry<String, Alert> entry : alerts.entrySet()) {
add(entry.getKey(), entry.getValue()); schedule(entry.getKey(), entry.getValue().schedule());
} }
scheduler.start(); scheduler.start();
} catch (SchedulerException se){ } catch (SchedulerException se){
@ -111,17 +113,23 @@ public class AlertScheduler extends AbstractComponent {
} }
} }
public void add(String alertName, Alert alert) { /**
* Schedules the alert with the specified name to be fired according to the specified cron expression.
*/
public void schedule(String alertName, String cronExpression) {
JobDetail job = JobBuilder.newJob(AlertExecutorJob.class).withIdentity(alertName).build(); JobDetail job = JobBuilder.newJob(AlertExecutorJob.class).withIdentity(alertName).build();
job.getJobDataMap().put("manager", this); job.getJobDataMap().put("manager", this);
CronTrigger cronTrigger = TriggerBuilder.newTrigger() CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withSchedule(CronScheduleBuilder.cronSchedule(alert.schedule())) .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build(); .build();
try { try {
logger.trace("Scheduling [{}] with schedule [{}]", alertName, alert.schedule()); logger.trace("Scheduling [{}] with schedule [{}]", alertName, cronExpression);
scheduler.scheduleJob(job, cronTrigger); Set<CronTrigger> triggers = new HashSet<>();
triggers.add(cronTrigger);
scheduler.scheduleJob(job, triggers, true);
} catch (SchedulerException se) { } catch (SchedulerException se) {
logger.error("Failed to schedule job",se); logger.error("Failed to schedule job",se);
throw new ElasticsearchException("Failed to schedule job", se);
} }
} }

View File

@ -53,7 +53,7 @@ public class TransportPutAlertAction extends TransportMasterNodeOperationAction<
@Override @Override
protected void masterOperation(PutAlertRequest request, ClusterState state, ActionListener<PutAlertResponse> listener) throws ElasticsearchException { protected void masterOperation(PutAlertRequest request, ClusterState state, ActionListener<PutAlertResponse> listener) throws ElasticsearchException {
try { try {
IndexResponse indexResponse = alertManager.addAlert(request.getAlertName(), request.getAlertSource()); IndexResponse indexResponse = alertManager.putAlert(request.getAlertName(), request.getAlertSource());
listener.onResponse(new PutAlertResponse(indexResponse)); listener.onResponse(new PutAlertResponse(indexResponse));
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);

View File

@ -106,6 +106,10 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
} }
protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception { protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception {
assertAlertTriggered(alertName, minimumExpectedAlertActionsWithActionPerformed, true);
}
protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed, final boolean assertTriggerSearchMatched) throws Exception {
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -122,11 +126,21 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
.get(); .get();
assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed)); assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed));
assertThat((Integer) XContentMapValues.extractValue("response.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1)); if (assertTriggerSearchMatched) {
assertThat((Integer) XContentMapValues.extractValue("response.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1));
}
} }
}); });
} }
protected long findNumberOfPerformedActions(String alertName) {
SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString())))
.get();
return searchResponse.getHits().getTotalHits();
}
protected void assertNoAlertTrigger(final String alertName, final long expectedAlertActionsWithNoActionNeeded) throws Exception { protected void assertNoAlertTrigger(final String alertName, final long expectedAlertActionsWithNoActionNeeded) throws Exception {
assertBusy(new Runnable() { assertBusy(new Runnable() {
@Override @Override

View File

@ -23,6 +23,7 @@ import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
/** /**
@ -135,6 +136,29 @@ public class BasicAlertingTest extends AbstractAlertingTests {
assertAlertTriggered("my-first-alert", 1); assertAlertTriggered("my-first-alert", 1);
} }
@Test
public void testModifyAlerts() throws Exception {
SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(matchAllQuery()));
alertClient().preparePutAlert("1")
.setAlertSource(createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1"))
.get();
assertAlertTriggered("1", 0, false);
alertClient().preparePutAlert("1")
.setAlertSource(createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 0"))
.get();
assertAlertTriggered("1", 1, false);
alertClient().preparePutAlert("1")
.setAlertSource(createAlertSource("0/5 * * * * ? 2020", searchRequest, "hits.total == 0"))
.get();
Thread.sleep(5000);
long triggered = findNumberOfPerformedActions("1");
Thread.sleep(5000);
assertThat(triggered, equalTo(findNumberOfPerformedActions("1")));
}
private final SearchSourceBuilder searchSourceBuilder = searchSource().query( private final SearchSourceBuilder searchSourceBuilder = searchSource().query(
filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{SCHEDULED_FIRE_TIME}}||-30s").to("{{SCHEDULED_FIRE_TIME}}")) filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{SCHEDULED_FIRE_TIME}}||-30s").to("{{SCHEDULED_FIRE_TIME}}"))
); );