From 15160e41a20a2fa98f44cefc09202f59d1be5708 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Tue, 7 Feb 2017 14:45:02 +0000 Subject: [PATCH] Fix datafeed with date_histogram aggregation (elastic/elasticsearch#876) date_histogram buckets return the key as a DateTime object. This PR checks if the key is DateTime and returns the epoch millis when suitable. Fixes elastic/elasticsearch#869 Original commit: elastic/x-pack-elasticsearch@8e39760dad55e07c9f216f59ed673fb8344da202 --- .../AggregationToJsonProcessor.java | 7 ++++- .../aggregation/AggregationTestUtils.java | 8 ++++++ .../AggregationToJsonProcessorTests.java | 16 +++++++++++ .../xpack/ml/integration/DatafeedJobIT.java | 27 +++++++++++++++++-- 4 files changed, 55 insertions(+), 3 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index 03bc67e0496..e8ff3cf2316 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -14,6 +14,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.joda.time.base.BaseDateTime; import java.io.IOException; import java.io.OutputStream; @@ -51,7 +52,11 @@ class AggregationToJsonProcessor implements Releasable { private void processHistogram(Histogram histogram) throws IOException { for (Histogram.Bucket bucket : histogram.getBuckets()) { - keyValuePairs.put(histogram.getName(), bucket.getKey()); + Object timestamp = bucket.getKey(); + if (timestamp instanceof BaseDateTime) { + timestamp = ((BaseDateTime) timestamp).getMillis(); + } + keyValuePairs.put(histogram.getName(), timestamp); processNestedAggs(bucket.getDocCount(), bucket.getAggregations()); } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java index 242a22bb7d6..a3848a18132 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java @@ -11,6 +11,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import org.joda.time.DateTime; import java.util.ArrayList; import java.util.Arrays; @@ -43,6 +44,13 @@ public final class AggregationTestUtils { return bucket; } + static Histogram.Bucket createDateHistogramBucket(DateTime timestamp, long docCount) { + Histogram.Bucket bucket = mock(Histogram.Bucket.class); + when(bucket.getKey()).thenReturn(timestamp); + when(bucket.getDocCount()).thenReturn(docCount); + return bucket; + } + static NumericMetricsAggregation.SingleValue createSingleValue(String name, double value) { NumericMetricsAggregation.SingleValue singleValue = mock(NumericMetricsAggregation.SingleValue.class); when(singleValue.getName()).thenReturn(name); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index 3439bc29bc3..cd35d32f690 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.test.ESTestCase; +import org.joda.time.DateTime; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -19,6 +20,7 @@ import java.util.List; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createDateHistogramBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; @@ -139,6 +141,20 @@ public class AggregationToJsonProcessorTests extends ESTestCase { assertThat(e.getMessage(), containsString("Multiple nested aggregations are not supported")); } + public void testProcessGivenHistogramWithDateTimeKeys() throws IOException { + List histogramBuckets = Arrays.asList( + createDateHistogramBucket(new DateTime(1000L), 3), + createDateHistogramBucket(new DateTime(2000L), 5) + ); + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn("time"); + when(histogram.getBuckets()).thenReturn(histogramBuckets); + + String json = aggToString(histogram); + + assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}")); + } + private String aggToString(Aggregation aggregation) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(outputStream)) { diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java index e873a142425..eab0a30c2ab 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java @@ -191,8 +191,8 @@ public class DatafeedJobIT extends ESRestTestCase { .execute(); } - public void testLookbackOnlyGivenAggregations() throws Exception { - String jobId = "aggs-job"; + public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception { + String jobId = "aggs-histogram-job"; String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":3600,\"summary_count_field_name\":\"doc_count\"," + "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]}," + "\"data_description\" : {\"time_field\":\"time stamp\"}" @@ -214,6 +214,29 @@ public class DatafeedJobIT extends ESRestTestCase { assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); } + public void testLookbackOnlyGivenAggregationsWithDateHistogram() throws Exception { + String jobId = "aggs-date-histogram-job"; + String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":3600,\"summary_count_field_name\":\"doc_count\"," + + "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]}," + + "\"data_description\" : {\"time_field\":\"time stamp\"}" + + "}"; + client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job)); + + String datafeedId = "datafeed-" + jobId; + String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"}," + + "\"aggregations\":{\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10}," + + "\"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; + new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build(); + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId); + Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":4")); + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + public void testRealtime() throws Exception { String jobId = "job-realtime-1"; createJob(jobId);