From 4b147b8f855cdbe12b2aba009993ac2cc4bb0174 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 25 Nov 2014 13:21:37 +0100 Subject: [PATCH] Fixes updating an existing alert works as expected. Original commit: elastic/x-pack-elasticsearch@236407367a7805f4b7156553e089d3c86fee8d21 --- .../elasticsearch/alerts/AlertManager.java | 15 ++-- .../org/elasticsearch/alerts/AlertsStore.java | 75 +++++++++++++------ .../alerts/actions/AlertActionManager.java | 4 +- .../alerts/scheduler/AlertScheduler.java | 18 +++-- .../actions/put/TransportPutAlertAction.java | 2 +- .../alerts/AbstractAlertingTests.java | 16 +++- .../alerts/BasicAlertingTest.java | 24 ++++++ 7 files changed, 117 insertions(+), 37 deletions(-) diff --git a/src/main/java/org/elasticsearch/alerts/AlertManager.java b/src/main/java/org/elasticsearch/alerts/AlertManager.java index 59ee95342fa..a136a65784c 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertManager.java +++ b/src/main/java/org/elasticsearch/alerts/AlertManager.java @@ -22,7 +22,6 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; @@ -93,13 +92,17 @@ public class AlertManager extends AbstractComponent { } } - public IndexResponse addAlert(String alertName, BytesReference alertSource) { + public IndexResponse putAlert(String alertName, BytesReference alertSource) { ensureStarted(); alertLock.acquire(alertName); try { - Tuple result = alertsStore.addAlert(alertName, alertSource); - scheduler.add(alertName, result.v1()); - return result.v2(); + AlertsStore.AlertStoreModification result = alertsStore.putAlert(alertName, alertSource); + if (result.getPrevious() == null) { + scheduler.schedule(alertName, result.getCurrent().schedule()); + } else if (!result.getPrevious().schedule().equals(result.getCurrent().schedule())) { + scheduler.schedule(alertName, result.getCurrent().schedule()); + } + return result.getIndexResponse(); } finally { alertLock.release(alertName); } @@ -126,7 +129,7 @@ public class AlertManager extends AbstractComponent { try { actionManager.addAlertAction(alert, scheduledFireTime, fireTime); } catch (Exception e) { - logger.error("Failed to add alert action for [{}]", e, alert); + logger.error("Failed to schedule alert action for [{}]", e, alert); } } finally { alertLock.release(alertName); diff --git a/src/main/java/org/elasticsearch/alerts/AlertsStore.java b/src/main/java/org/elasticsearch/alerts/AlertsStore.java index f8d23242d04..75e041e4264 100644 --- a/src/main/java/org/elasticsearch/alerts/AlertsStore.java +++ b/src/main/java/org/elasticsearch/alerts/AlertsStore.java @@ -7,7 +7,9 @@ package org.elasticsearch.alerts; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -21,7 +23,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.joda.time.DateTime; @@ -85,6 +86,7 @@ public class AlertsStore extends AbstractComponent { * Returns the alert with the specified name otherwise null is returned. */ public Alert getAlert(String name) { + ensureStarted(); return alertMap.get(name); } @@ -92,29 +94,27 @@ public class AlertsStore extends AbstractComponent { * Creates an alert with the specified name and source. If an alert with the specified name already exists it will * get overwritten. */ - public Tuple addAlert(String name, BytesReference alertSource) { - Alert alert = parseAlert(name, alertSource); - IndexResponse response = persistAlert(name, alertSource, IndexRequest.OpType.CREATE); + public AlertStoreModification putAlert(String alertName, BytesReference alertSource) { + ensureStarted(); + Alert alert = parseAlert(alertName, alertSource); + IndexRequest indexRequest = createIndexRequest(alertName, alertSource); + IndexResponse response = client.index(indexRequest).actionGet(); alert.version(response.getVersion()); - alertMap.put(name, alert); - return new Tuple<>(alert, response); + Alert previous = alertMap.put(alertName, alert); + return new AlertStoreModification(previous, alert, response); } /** * Updates the specified alert by making sure that the made changes are persisted. */ - public IndexResponse updateAlert(Alert alert) throws IOException { - IndexResponse response = client.prepareIndex(ALERT_INDEX, ALERT_TYPE, alert.alertName()) - .setSource(jsonBuilder().value(alert)) // TODO: the content type should be based on the provided content type when the alert was initially added. - .setVersion(alert.version()) - .setOpType(IndexRequest.OpType.INDEX) - .get(); + public void updateAlert(Alert alert) throws IOException { + ensureStarted(); + // TODO: the content type should be based on the provided content type when the alert was initially added. + BytesReference source = jsonBuilder().value(alert).bytes(); + IndexResponse response = client.index(createIndexRequest(alert.alertName(), source)).actionGet(); alert.version(response.getVersion()); - - // Don'<> need to update the alertMap, since we are working on an instance from it. + // Don't need to update the alertMap, since we are working on an instance from it. assert verifySameInstance(alert); - - return response; } private boolean verifySameInstance(Alert alert) { @@ -127,14 +127,15 @@ public class AlertsStore extends AbstractComponent { * Deletes the alert with the specified name if exists */ public DeleteResponse deleteAlert(String name) { + ensureStarted(); Alert alert = alertMap.remove(name); if (alert == null) { return new DeleteResponse(ALERT_INDEX, ALERT_TYPE, name, Versions.MATCH_ANY, false); } - DeleteResponse deleteResponse = client.prepareDelete(ALERT_INDEX, ALERT_TYPE, name) - .setVersion(alert.version()) - .get(); + DeleteRequest deleteRequest = new DeleteRequest(ALERT_INDEX, ALERT_TYPE, name); + deleteRequest.version(alert.version()); + DeleteResponse deleteResponse = client.delete(deleteRequest).actionGet(); assert deleteResponse.isFound(); return deleteResponse; } @@ -191,12 +192,11 @@ public class AlertsStore extends AbstractComponent { } } - private IndexResponse persistAlert(String alertName, BytesReference alertSource, IndexRequest.OpType opType) { + private IndexRequest createIndexRequest(String alertName, BytesReference alertSource) { IndexRequest indexRequest = new IndexRequest(ALERT_INDEX, ALERT_TYPE, alertName); indexRequest.listenerThreaded(false); indexRequest.source(alertSource, false); - indexRequest.opType(opType); - return client.index(indexRequest).actionGet(); + return indexRequest; } private void loadAlerts() { @@ -293,4 +293,35 @@ public class AlertsStore extends AbstractComponent { return alert; } + private void ensureStarted() { + if (!started.get()) { + throw new ElasticsearchIllegalStateException("Alert store not started"); + } + } + + public final class AlertStoreModification { + + private final Alert previous; + private final Alert current; + private final IndexResponse indexResponse; + + public AlertStoreModification(Alert previous, Alert current, IndexResponse indexResponse) { + this.current = current; + this.previous = previous; + this.indexResponse = indexResponse; + } + + public Alert getCurrent() { + return current; + } + + public Alert getPrevious() { + return previous; + } + + public IndexResponse getIndexResponse() { + return indexResponse; + } + } + } diff --git a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java index 9ecb034792a..16172747401 100644 --- a/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java +++ b/src/main/java/org/elasticsearch/alerts/actions/AlertActionManager.java @@ -335,8 +335,8 @@ public class AlertActionManager extends AbstractComponent { try { entry.setErrorMsg(e.getMessage()); updateHistoryEntry(entry, AlertActionState.ERROR); - } catch (IOException ioe) { - logger.error("Failed store error message to update action history entry", ioe); + } catch (Exception e2) { + logger.error("Failed to update action history entry with the error message", e2); } } else { logger.debug("Failed to execute alert action after shutdown", e); diff --git a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java index 973981284af..d1ea9fe6512 100644 --- a/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java +++ b/src/main/java/org/elasticsearch/alerts/scheduler/AlertScheduler.java @@ -19,8 +19,10 @@ import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.quartz.simpl.SimpleJobFactory; +import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; public class AlertScheduler extends AbstractComponent { @@ -59,7 +61,7 @@ public class AlertScheduler extends AbstractComponent { scheduler = schFactory.getScheduler(); scheduler.setJobFactory(new SimpleJobFactory()); for (Map.Entry entry : alerts.entrySet()) { - add(entry.getKey(), entry.getValue()); + schedule(entry.getKey(), entry.getValue().schedule()); } scheduler.start(); } catch (SchedulerException se){ @@ -111,17 +113,23 @@ public class AlertScheduler extends AbstractComponent { } } - public void add(String alertName, Alert alert) { + /** + * Schedules the alert with the specified name to be fired according to the specified cron expression. + */ + public void schedule(String alertName, String cronExpression) { JobDetail job = JobBuilder.newJob(AlertExecutorJob.class).withIdentity(alertName).build(); job.getJobDataMap().put("manager", this); CronTrigger cronTrigger = TriggerBuilder.newTrigger() - .withSchedule(CronScheduleBuilder.cronSchedule(alert.schedule())) + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) .build(); try { - logger.trace("Scheduling [{}] with schedule [{}]", alertName, alert.schedule()); - scheduler.scheduleJob(job, cronTrigger); + logger.trace("Scheduling [{}] with schedule [{}]", alertName, cronExpression); + Set triggers = new HashSet<>(); + triggers.add(cronTrigger); + scheduler.scheduleJob(job, triggers, true); } catch (SchedulerException se) { logger.error("Failed to schedule job",se); + throw new ElasticsearchException("Failed to schedule job", se); } } diff --git a/src/main/java/org/elasticsearch/alerts/transport/actions/put/TransportPutAlertAction.java b/src/main/java/org/elasticsearch/alerts/transport/actions/put/TransportPutAlertAction.java index ef020299d9a..262408f3097 100644 --- a/src/main/java/org/elasticsearch/alerts/transport/actions/put/TransportPutAlertAction.java +++ b/src/main/java/org/elasticsearch/alerts/transport/actions/put/TransportPutAlertAction.java @@ -53,7 +53,7 @@ public class TransportPutAlertAction extends TransportMasterNodeOperationAction< @Override protected void masterOperation(PutAlertRequest request, ClusterState state, ActionListener listener) throws ElasticsearchException { try { - IndexResponse indexResponse = alertManager.addAlert(request.getAlertName(), request.getAlertSource()); + IndexResponse indexResponse = alertManager.putAlert(request.getAlertName(), request.getAlertSource()); listener.onResponse(new PutAlertResponse(indexResponse)); } catch (Exception e) { listener.onFailure(e); diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index 90655147dcb..4e70c5999c3 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -106,6 +106,10 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest } protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception { + assertAlertTriggered(alertName, minimumExpectedAlertActionsWithActionPerformed, true); + } + + protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed, final boolean assertTriggerSearchMatched) throws Exception { assertBusy(new Runnable() { @Override public void run() { @@ -122,11 +126,21 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) .get(); assertThat(searchResponse.getHits().getTotalHits(), greaterThanOrEqualTo(minimumExpectedAlertActionsWithActionPerformed)); - assertThat((Integer) XContentMapValues.extractValue("response.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1)); + if (assertTriggerSearchMatched) { + assertThat((Integer) XContentMapValues.extractValue("response.hits.total", searchResponse.getHits().getAt(0).sourceAsMap()), greaterThanOrEqualTo(1)); + } } }); } + protected long findNumberOfPerformedActions(String alertName) { + SearchResponse searchResponse = client().prepareSearch(AlertActionManager.ALERT_HISTORY_INDEX) + .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setQuery(boolQuery().must(matchQuery("alert_name", alertName)).must(matchQuery("state", AlertActionState.ACTION_PERFORMED.toString()))) + .get(); + return searchResponse.getHits().getTotalHits(); + } + protected void assertNoAlertTrigger(final String alertName, final long expectedAlertActionsWithNoActionNeeded) throws Exception { assertBusy(new Runnable() { @Override diff --git a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java index 5cde971b17b..6451b499dbf 100644 --- a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java +++ b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java @@ -23,6 +23,7 @@ import static org.elasticsearch.index.query.QueryBuilders.*; import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; /** @@ -135,6 +136,29 @@ public class BasicAlertingTest extends AbstractAlertingTests { assertAlertTriggered("my-first-alert", 1); } + @Test + public void testModifyAlerts() throws Exception { + SearchRequest searchRequest = createTriggerSearchRequest("my-index").source(searchSource().query(matchAllQuery())); + alertClient().preparePutAlert("1") + .setAlertSource(createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 1")) + .get(); + assertAlertTriggered("1", 0, false); + + alertClient().preparePutAlert("1") + .setAlertSource(createAlertSource("0/5 * * * * ? *", searchRequest, "hits.total == 0")) + .get(); + assertAlertTriggered("1", 1, false); + + alertClient().preparePutAlert("1") + .setAlertSource(createAlertSource("0/5 * * * * ? 2020", searchRequest, "hits.total == 0")) + .get(); + + Thread.sleep(5000); + long triggered = findNumberOfPerformedActions("1"); + Thread.sleep(5000); + assertThat(triggered, equalTo(findNumberOfPerformedActions("1"))); + } + private final SearchSourceBuilder searchSourceBuilder = searchSource().query( filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{SCHEDULED_FIRE_TIME}}||-30s").to("{{SCHEDULED_FIRE_TIME}}")) );