diff --git a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java index e585fe7461d..891ce9125e6 100644 --- a/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java +++ b/src/test/java/org/elasticsearch/alerts/AbstractAlertingTests.java @@ -122,6 +122,24 @@ public abstract class AbstractAlertingTests extends ElasticsearchIntegrationTest return internalTestCluster().getInstance(AlertsClient.class); } + protected void assertAlertTriggeredExact(final String alertName, final long expectedAlertActionsWithActionPerformed) throws Exception { + assertBusy(new Runnable() { + @Override + public void run() { + ClusterState state = client().admin().cluster().prepareState().get().getState(); + String[] alertHistoryIndices = state.metaData().concreteIndices(IndicesOptions.lenientExpandOpen(), AlertActionManager.ALERT_HISTORY_INDEX_PREFIX + "*"); + assertThat(alertHistoryIndices, not(emptyArray())); + for (String index : alertHistoryIndices) { + IndexRoutingTable routingTable = state.getRoutingTable().index(index); + assertThat(routingTable, notNullValue()); + assertThat(routingTable.allPrimaryShardsActive(), is(true)); + } + + assertThat(findNumberOfPerformedActions(alertName), equalTo(expectedAlertActionsWithActionPerformed)); + } + }); + } + protected void assertAlertTriggered(final String alertName, final long minimumExpectedAlertActionsWithActionPerformed) throws Exception { assertAlertTriggered(alertName, minimumExpectedAlertActionsWithActionPerformed, true); } diff --git a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java index 7865b9276d0..166457430c8 100644 --- a/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java +++ b/src/test/java/org/elasticsearch/alerts/BasicAlertingTest.java @@ -13,7 +13,12 @@ import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertRequest; import org.elasticsearch.alerts.transport.actions.delete.DeleteAlertResponse; import org.elasticsearch.alerts.transport.actions.put.PutAlertResponse; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.query.FilterBuilders; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.Test; @@ -158,6 +163,55 @@ public class BasicAlertingTest extends AbstractAlertingTests { assertThat(triggered, equalTo(findNumberOfPerformedActions("1"))); } + @Test + public void testAggregations() throws Exception { + class R implements Runnable { + + private final long sleepTime; + private final long totalTime; + + R(long sleepTime, long totalTime) { + this.sleepTime = sleepTime; + this.totalTime = totalTime; + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + while ((System.currentTimeMillis() - startTime) < totalTime) { + client().prepareIndex("my-index", "my-type").setCreate(true).setSource("{}").get(); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + } + + assertAcked(prepareCreate("my-index").addMapping("my-type", "_timestamp", "enabled=true")); + SearchRequest searchRequest = createTriggerSearchRequest("my-index").source( + searchSource() + .query(QueryBuilders.constantScoreQuery(FilterBuilders.rangeFilter("_timestamp").from("{{SCHEDULED_FIRE_TIME}}||-1m").to("{{SCHEDULED_FIRE_TIME}}"))) + .aggregation(AggregationBuilders.dateHistogram("rate").field("_timestamp").interval(DateHistogram.Interval.SECOND).order(Histogram.Order.COUNT_DESC)) + ); + BytesReference reference = createAlertSource("* 0/1 * * * ? *", searchRequest, "aggregations.rate.buckets[0]?.doc_count > 5"); + alertClient().preparePutAlert("rate-alert").setAlertSource(reference).get(); + + Thread indexThread = new Thread(new R(500, 60000)); + indexThread.start(); + indexThread.join(); + + assertAlertTriggeredExact("rate-alert", 0); + assertNoAlertTrigger("rate-alert", 1); + + indexThread = new Thread(new R(100, 60000)); + indexThread.start(); + indexThread.join(); + assertAlertTriggered("rate-alert", 1); + } + private final SearchSourceBuilder searchSourceBuilder = searchSource().query( filteredQuery(matchQuery("event_type", "a"), rangeFilter("_timestamp").from("{{SCHEDULED_FIRE_TIME}}||-30s").to("{{SCHEDULED_FIRE_TIME}}")) );