diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index d74186ccb2c..318482b123a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -132,10 +132,6 @@ class AggregationDataExtractor implements DataExtractor { Aggregation topAgg = aggsAsList.get(0); if (topAgg instanceof Histogram) { - if (context.timeField.equals(topAgg.getName()) == false) { - throw new IllegalArgumentException("Histogram name [" + topAgg.getName() - + "] does not match time field [" + context.timeField + "]"); - } return ((Histogram) topAgg).getBuckets(); } else { throw new IllegalArgumentException("Top level aggregation should be [histogram]"); @@ -149,9 +145,10 @@ class AggregationDataExtractor implements DataExtractor { } ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(context.includeDocCount, outputStream)) { + try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor( + context.timeField, context.includeDocCount, outputStream)) { while (histogramBuckets.isEmpty() == false && processor.getKeyValueCount() < BATCH_KEY_VALUE_PAIRS) { - processor.process(context.timeField, histogramBuckets.removeFirst()); + processor.process(histogramBuckets.removeFirst()); } if (histogramBuckets.isEmpty()) { hasNext = false; diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index 6940efe0728..51c17c7cb36 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -13,10 +14,10 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.joda.time.base.BaseDateTime; import java.io.IOException; import java.io.OutputStream; @@ -26,18 +27,24 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; /** * Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation. + * In order to ensure that datafeeds can restart without duplicating data, we require that + * each histogram bucket has a nested max aggregation matching the time_field. */ class AggregationToJsonProcessor implements Releasable { + private final String timeField; private final boolean includeDocCount; private final XContentBuilder jsonBuilder; private final Map keyValuePairs; private long keyValueWrittenCount; - AggregationToJsonProcessor(boolean includeDocCount, OutputStream outputStream) throws IOException { + AggregationToJsonProcessor(String timeField, boolean includeDocCount, OutputStream outputStream) + throws IOException { + this.timeField = Objects.requireNonNull(timeField); this.includeDocCount = includeDocCount; jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); keyValuePairs = new LinkedHashMap<>(); @@ -53,17 +60,28 @@ class AggregationToJsonProcessor implements Releasable { *
  • {@link Percentiles}
  • * */ - public void process(String timeField, Histogram.Bucket bucket) throws IOException { - Object timestamp = bucket.getKey(); - if (timestamp instanceof BaseDateTime) { - timestamp = ((BaseDateTime) timestamp).getMillis(); + public void process(Histogram.Bucket bucket) throws IOException { + if (bucket.getDocCount() == 0) { + return; } + + Aggregations aggs = bucket.getAggregations(); + Aggregation timeAgg = aggs == null ? null : aggs.get(timeField); + if (timeAgg instanceof Max == false) { + throw new IllegalArgumentException("Missing max aggregation for time_field [" + timeField + "]"); + } + + // We want to handle the max time aggregation only at the bucket level. + // So, we add the value here and then remove the aggregation before + // processing the rest of the sub aggs. + long timestamp = (long) ((Max) timeAgg).value(); keyValuePairs.put(timeField, timestamp); - processNestedAggs(bucket.getDocCount(), bucket.getAggregations()); + List subAggs = new ArrayList<>(aggs.asList()); + subAggs.remove(timeAgg); + processNestedAggs(bucket.getDocCount(), subAggs); } - private void processNestedAggs(long docCount, Aggregations subAggs) throws IOException { - List aggs = subAggs == null ? Collections.emptyList() : subAggs.asList(); + private void processNestedAggs(long docCount, List aggs) throws IOException { if (aggs.isEmpty()) { writeJsonObject(docCount); return; @@ -92,7 +110,7 @@ class AggregationToJsonProcessor implements Releasable { private void processTerms(Terms termsAgg) throws IOException { for (Terms.Bucket bucket : termsAgg.getBuckets()) { keyValuePairs.put(termsAgg.getName(), bucket.getKey()); - processNestedAggs(bucket.getDocCount(), bucket.getAggregations()); + processNestedAggs(bucket.getDocCount(), asList(bucket.getAggregations())); keyValuePairs.remove(termsAgg.getName()); } } @@ -137,4 +155,8 @@ class AggregationToJsonProcessor implements Releasable { public long getKeyValueCount() { return keyValueWrittenCount; } + + private static List asList(@Nullable Aggregations aggs) { + return aggs == null ? Collections.emptyList() : aggs.asList(); + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 505a5256529..823df1541aa 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -90,9 +91,11 @@ public class AggregationDataExtractorTests extends ESTestCase { public void testExtraction() throws IOException { List histogramBuckets = Arrays.asList( createHistogramBucket(1000L, 3, Arrays.asList( + createMax("time", 1999), createTerms("airline", new Term("a", 1, "responsetime", 11.0), new Term("b", 2, "responsetime", 12.0)))), createHistogramBucket(2000L, 0, Arrays.asList()), createHistogramBucket(3000L, 7, Arrays.asList( + createMax("time", 3999), createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0)))) ); @@ -104,10 +107,10 @@ public class AggregationDataExtractorTests extends ESTestCase { assertThat(extractor.hasNext(), is(true)); Optional stream = extractor.next(); assertThat(stream.isPresent(), is(true)); - String expectedStream = "{\"time\":1000,\"airline\":\"a\",\"responsetime\":11.0,\"doc_count\":1} " - + "{\"time\":1000,\"airline\":\"b\",\"responsetime\":12.0,\"doc_count\":2} " - + "{\"time\":3000,\"airline\":\"c\",\"responsetime\":31.0,\"doc_count\":4} " - + "{\"time\":3000,\"airline\":\"b\",\"responsetime\":32.0,\"doc_count\":3}"; + String expectedStream = "{\"time\":1999,\"airline\":\"a\",\"responsetime\":11.0,\"doc_count\":1} " + + "{\"time\":1999,\"airline\":\"b\",\"responsetime\":12.0,\"doc_count\":2} " + + "{\"time\":3999,\"airline\":\"c\",\"responsetime\":31.0,\"doc_count\":4} " + + "{\"time\":3999,\"airline\":\"b\",\"responsetime\":32.0,\"doc_count\":3}"; assertThat(asString(stream.get()), equalTo(expectedStream)); assertThat(extractor.hasNext(), is(false)); assertThat(capturedSearchRequests.size(), equalTo(1)); @@ -128,7 +131,7 @@ public class AggregationDataExtractorTests extends ESTestCase { List histogramBuckets = new ArrayList<>(buckets); long timestamp = 1000; for (int i = 0; i < buckets; i++) { - histogramBuckets.add(createHistogramBucket(timestamp, 3)); + histogramBuckets.add(createHistogramBucket(timestamp, 3, Arrays.asList(createMax("time", timestamp)))); timestamp += 1000L; } @@ -222,7 +225,7 @@ public class AggregationDataExtractorTests extends ESTestCase { List histogramBuckets = new ArrayList<>(buckets); long timestamp = 1000; for (int i = 0; i < buckets; i++) { - histogramBuckets.add(createHistogramBucket(timestamp, 3)); + histogramBuckets.add(createHistogramBucket(timestamp, 3, Arrays.asList(createMax("time", timestamp)))); timestamp += 1000L; } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java index 13a40bb13fb..88deaf31074 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java @@ -11,6 +11,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles; import org.joda.time.DateTime; @@ -39,6 +40,9 @@ public final class AggregationTestUtils { static Aggregations createAggs(List aggsList) { Aggregations aggs = mock(Aggregations.class); when(aggs.asList()).thenReturn(aggsList); + for (Aggregation agg: aggsList) { + when(aggs.get(agg.getName())).thenReturn(agg); + } return aggs; } @@ -56,6 +60,14 @@ public final class AggregationTestUtils { return bucket; } + static Max createMax(String name, double value) { + Max max = mock(Max.class); + when(max.getName()).thenReturn(name); + when(max.value()).thenReturn(value); + when(max.getValue()).thenReturn(value); + return max; + } + static NumericMetricsAggregation.SingleValue createSingleValue(String name, double value) { NumericMetricsAggregation.SingleValue singleValue = mock(NumericMetricsAggregation.SingleValue.class); when(singleValue.getName()).thenReturn(name); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index 203ab2f1975..a73fefa4663 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -9,7 +9,6 @@ import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.test.ESTestCase; -import org.joda.time.DateTime; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -22,8 +21,8 @@ import java.util.Map; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs; -import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createDateHistogramBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; @@ -36,22 +35,42 @@ public class AggregationToJsonProcessorTests extends ESTestCase { private long keyValuePairsWritten = 0; - public void testProcessGivenHistogramOnly() throws IOException { + public void testProcessGivenMaxTimeIsMissing() throws IOException { List histogramBuckets = Arrays.asList( createHistogramBucket(1000L, 3), createHistogramBucket(2000L, 5) ); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets)); + assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]")); + } + + public void testProcessGivenNonMaxTimeAgg() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 3, Arrays.asList(createTerms("time"))), + createHistogramBucket(2000L, 5, Arrays.asList(createTerms("time"))) + ); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets)); + assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]")); + } + + public void testProcessGivenHistogramOnly() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 3, Arrays.asList(createMax("timestamp", 1200))), + createHistogramBucket(2000L, 5, Arrays.asList(createMax("timestamp", 2800))) + ); + String json = aggToString("timestamp", histogramBuckets); - assertThat(json, equalTo("{\"timestamp\":1000,\"doc_count\":3} {\"timestamp\":2000,\"doc_count\":5}")); + assertThat(json, equalTo("{\"timestamp\":1200,\"doc_count\":3} {\"timestamp\":2800,\"doc_count\":5}")); assertThat(keyValuePairsWritten, equalTo(4L)); } public void testProcessGivenHistogramOnlyAndNoDocCount() throws IOException { List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 3), - createHistogramBucket(2000L, 5) + createHistogramBucket(1000L, 3, Arrays.asList(createMax("time", 1000))), + createHistogramBucket(2000L, 5, Arrays.asList(createMax("time", 2000))) ); String json = aggToString("time", false, histogramBuckets); @@ -62,8 +81,10 @@ public class AggregationToJsonProcessorTests extends ESTestCase { public void testProcessGivenSingleMetricPerHistogram() throws IOException { List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 3, Arrays.asList(createSingleValue("my_value", 1.0))), - createHistogramBucket(2000L, 5, Arrays.asList(createSingleValue("my_value", 2.0))) + createHistogramBucket(1000L, 3, Arrays.asList( + createMax("time", 1000), createSingleValue("my_value", 1.0))), + createHistogramBucket(2000L, 5, Arrays.asList( + createMax("time", 2000), createSingleValue("my_value", 2.0))) ); String json = aggToString("time", histogramBuckets); @@ -74,32 +95,41 @@ public class AggregationToJsonProcessorTests extends ESTestCase { public void testProcessGivenTermsPerHistogram() throws IOException { List histogramBuckets = Arrays.asList( createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1100), createTerms("my_field", new Term("a", 1), new Term("b", 2), new Term("c", 1)))), - createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", new Term("a", 5), new Term("b", 2)))), - createHistogramBucket(3000L, 0, Arrays.asList()), - createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", new Term("c", 4), new Term("b", 3)))) + createHistogramBucket(2000L, 5, Arrays.asList( + createMax("time", 2200), + createTerms("my_field", new Term("a", 5), new Term("b", 2)))), + createHistogramBucket(3000L, 0, Arrays.asList(createMax("time", -1))), + createHistogramBucket(4000L, 7, Arrays.asList( + createMax("time", 4400), + createTerms("my_field", new Term("c", 4), new Term("b", 3)))) ); String json = aggToString("time", histogramBuckets); - assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"doc_count\":1} " + - "{\"time\":1000,\"my_field\":\"b\",\"doc_count\":2} " + - "{\"time\":1000,\"my_field\":\"c\",\"doc_count\":1} " + - "{\"time\":2000,\"my_field\":\"a\",\"doc_count\":5} " + - "{\"time\":2000,\"my_field\":\"b\",\"doc_count\":2} " + - "{\"time\":4000,\"my_field\":\"c\",\"doc_count\":4} " + - "{\"time\":4000,\"my_field\":\"b\",\"doc_count\":3}")); + assertThat(json, equalTo("{\"time\":1100,\"my_field\":\"a\",\"doc_count\":1} " + + "{\"time\":1100,\"my_field\":\"b\",\"doc_count\":2} " + + "{\"time\":1100,\"my_field\":\"c\",\"doc_count\":1} " + + "{\"time\":2200,\"my_field\":\"a\",\"doc_count\":5} " + + "{\"time\":2200,\"my_field\":\"b\",\"doc_count\":2} " + + "{\"time\":4400,\"my_field\":\"c\",\"doc_count\":4} " + + "{\"time\":4400,\"my_field\":\"b\",\"doc_count\":3}")); } public void testProcessGivenSingleMetricPerSingleTermsPerHistogram() throws IOException { List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 4, Arrays.asList(createTerms("my_field", - new Term("a", 1, "my_value", 11.0), new Term("b", 2, "my_value", 12.0), new Term("c", 1, "my_value", 13.0)))), - createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", - new Term("a", 5, "my_value", 21.0), new Term("b", 2, "my_value", 22.0)))), - createHistogramBucket(3000L, 0, Arrays.asList()), - createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", - new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0)))) + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), + createTerms("my_field", new Term("a", 1, "my_value", 11.0), + new Term("b", 2, "my_value", 12.0), new Term("c", 1, "my_value", 13.0)))), + createHistogramBucket(2000L, 5, Arrays.asList( + createMax("time", 2000), + createTerms("my_field", new Term("a", 5, "my_value", 21.0), new Term("b", 2, "my_value", 22.0)))), + createHistogramBucket(3000L, 0, Arrays.asList(createMax("time", 3000))), + createHistogramBucket(4000L, 7, Arrays.asList( + createMax("time", 4000), + createTerms("my_field", new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0)))) ); String json = aggToString("time", histogramBuckets); @@ -136,13 +166,17 @@ public class AggregationToJsonProcessorTests extends ESTestCase { b4NumericAggs.put("my_value", 421.0); b4NumericAggs.put("my_value2", 422.0); List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 4, Arrays.asList(createTerms("my_field", - new Term("a", 1, a1NumericAggs), new Term("b", 2, b1NumericAggs), new Term("c", 1, c1NumericAggs)))), - createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", - new Term("a", 5, a2NumericAggs), new Term("b", 2, b2NumericAggs)))), - createHistogramBucket(3000L, 0, Arrays.asList()), - createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", - new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs)))) + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), + createTerms("my_field", new Term("a", 1, a1NumericAggs), + new Term("b", 2, b1NumericAggs), new Term("c", 1, c1NumericAggs)))), + createHistogramBucket(2000L, 5, Arrays.asList( + createMax("time", 2000), + createTerms("my_field", new Term("a", 5, a2NumericAggs), new Term("b", 2, b2NumericAggs)))), + createHistogramBucket(3000L, 0, Arrays.asList(createMax("time", 3000))), + createHistogramBucket(4000L, 7, Arrays.asList( + createMax("time", 4000), + createTerms("my_field", new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs)))) ); String json = aggToString("time", false, histogramBuckets); @@ -160,7 +194,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); Histogram anotherHistogram = mock(Histogram.class); when(anotherHistogram.getName()).thenReturn("nested-agg"); - Aggregations subAggs = createAggs(Arrays.asList(anotherHistogram)); + Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), anotherHistogram)); when(histogramBucket.getAggregations()).thenReturn(subAggs); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket)); @@ -171,30 +205,23 @@ public class AggregationToJsonProcessorTests extends ESTestCase { Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); Terms terms1 = mock(Terms.class); Terms terms2 = mock(Terms.class); - Aggregations subAggs = createAggs(Arrays.asList(terms1, terms2)); + Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), terms1, terms2)); when(histogramBucket.getAggregations()).thenReturn(subAggs); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket)); assertThat(e.getMessage(), containsString("Multiple non-leaf nested aggregations are not supported")); } - public void testProcessGivenHistogramWithDateTimeKeys() throws IOException { - List histogramBuckets = Arrays.asList( - createDateHistogramBucket(new DateTime(1000L), 3), - createDateHistogramBucket(new DateTime(2000L), 5) - ); - - String json = aggToString("time", histogramBuckets); - - assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}")); - } - public void testProcessGivenSinglePercentilesPerHistogram() throws IOException { List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 4, Arrays.asList(createPercentiles("my_field", 1.0))), - createHistogramBucket(2000L, 7, Arrays.asList(createPercentiles("my_field", 2.0))), - createHistogramBucket(3000L, 10, Arrays.asList(createPercentiles("my_field", 3.0))), - createHistogramBucket(4000L, 14, Arrays.asList(createPercentiles("my_field", 4.0))) + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), createPercentiles("my_field", 1.0))), + createHistogramBucket(2000L, 7, Arrays.asList( + createMax("time", 2000), createPercentiles("my_field", 2.0))), + createHistogramBucket(3000L, 10, Arrays.asList( + createMax("time", 3000), createPercentiles("my_field", 3.0))), + createHistogramBucket(4000L, 14, Arrays.asList( + createMax("time", 4000), createPercentiles("my_field", 4.0))) ); String json = aggToString("time", histogramBuckets); @@ -207,10 +234,14 @@ public class AggregationToJsonProcessorTests extends ESTestCase { public void testProcessGivenMultiplePercentilesPerHistogram() throws IOException { List histogramBuckets = Arrays.asList( - createHistogramBucket(1000L, 4, Arrays.asList(createPercentiles("my_field", 1.0))), - createHistogramBucket(2000L, 7, Arrays.asList(createPercentiles("my_field", 2.0, 5.0))), - createHistogramBucket(3000L, 10, Arrays.asList(createPercentiles("my_field", 3.0))), - createHistogramBucket(4000L, 14, Arrays.asList(createPercentiles("my_field", 4.0))) + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), createPercentiles("my_field", 1.0))), + createHistogramBucket(2000L, 7, Arrays.asList( + createMax("time", 2000), createPercentiles("my_field", 2.0, 5.0))), + createHistogramBucket(3000L, 10, Arrays.asList( + createMax("time", 3000), createPercentiles("my_field", 3.0))), + createHistogramBucket(4000L, 14, Arrays.asList( + createMax("time", 4000), createPercentiles("my_field", 4.0))) ); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets)); @@ -227,9 +258,9 @@ public class AggregationToJsonProcessorTests extends ESTestCase { private String aggToString(String timeField, boolean includeDocCount, List buckets) throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(includeDocCount, outputStream)) { + try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, includeDocCount, outputStream)) { for (Histogram.Bucket bucket : buckets) { - processor.process(timeField, bucket); + processor.process(bucket); } keyValuePairsWritten = processor.getKeyValueCount(); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java index a848e2b455c..b45a2a9fb10 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java @@ -305,9 +305,11 @@ public class DatafeedJobIT extends ESRestTestCase { new StringEntity(job, ContentType.APPLICATION_JSON)); String datafeedId = "datafeed-" + jobId; - String aggregations = "{\"time stamp\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000}," - + "\"aggregations\":{\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10}," - + "\"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; + String aggregations = "{\"buckets\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000}," + + "\"aggregations\":{" + + "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}}," + + "\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10}," + + " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build(); openJob(client(), jobId); @@ -332,8 +334,10 @@ public class DatafeedJobIT extends ESRestTestCase { String datafeedId = "datafeed-" + jobId; String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"}," - + "\"aggregations\":{\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10}," - + "\"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; + + "\"aggregations\":{" + + "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}}," + + "\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10}," + + " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build(); openJob(client(), jobId); diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/preview_datafeed.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/preview_datafeed.yaml index d80e8a519d1..bfbddb2c7cb 100644 --- a/plugin/src/test/resources/rest-api-spec/test/ml/preview_datafeed.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/ml/preview_datafeed.yaml @@ -142,12 +142,17 @@ setup: "indexes":"airline-data", "types":"response", "aggregations": { - "time": { + "buckets": { "histogram": { "field": "time", "interval": 3600000 }, "aggregations": { + "time": { + "max": { + "field": "time" + } + }, "airline": { "terms": { "field": "airline", @@ -170,15 +175,15 @@ setup: xpack.ml.preview_datafeed: datafeed_id: aggregation-doc-count-feed - length: { $body: 3 } - - match: { 0.time: 1.487376E12 } + - match: { 0.time: 1487377800000 } - match: { 0.airline: foo } - match: { 0.responsetime: 2.0 } - match: { 0.doc_count: 2 } - - match: { 1.time: 1.4873796E12 } + - match: { 1.time: 1487379660000 } - match: { 1.airline: bar } - match: { 1.responsetime: 42.0 } - match: { 1.doc_count: 1 } - - match: { 1.time: 1.4873796E12 } + - match: { 1.time: 1487379660000 } - match: { 2.airline: foo } - match: { 2.responsetime: 42.0 } - match: { 2.doc_count: 1 } @@ -210,12 +215,17 @@ setup: "indexes":"airline-data", "types":"response", "aggregations": { - "time": { + "buckets": { "histogram": { "field": "time", "interval": 3600000 }, "aggregations": { + "time": { + "max": { + "field": "time" + } + }, "dc_airline": { "cardinality": { "field": "airline" @@ -230,10 +240,10 @@ setup: xpack.ml.preview_datafeed: datafeed_id: aggregation-custom-single-metric-summary-feed - length: { $body: 2 } - - match: { 0.time: 1.487376E12 } + - match: { 0.time: 1487377800000 } - match: { 0.dc_airline: 1 } - is_false: 0.doc_count - - match: { 1.time: 1.4873796E12 } + - match: { 1.time: 1487379660000 } - match: { 1.dc_airline: 2 } - is_false: 1.doc_count @@ -264,12 +274,17 @@ setup: "indexes":"airline-data", "types":"response", "aggregations": { - "time": { + "buckets": { "histogram": { "field": "time", "interval": 3600000 }, "aggregations": { + "time": { + "max": { + "field": "time" + } + }, "airline": { "terms": { "field": "airline" @@ -296,17 +311,17 @@ setup: xpack.ml.preview_datafeed: datafeed_id: aggregation-custom-multi-metric-summary-feed - length: { $body: 3 } - - match: { 0.time: 1.487376E12 } + - match: { 0.time: 1487377800000 } - match: { 0.airline: foo } - match: { 0.responsetime: 2.0 } - match: { 0.event_rate: 11 } - is_false: 0.doc_count - - match: { 1.time: 1.4873796E12 } + - match: { 1.time: 1487379660000 } - match: { 1.airline: bar } - match: { 1.responsetime: 42.0 } - match: { 1.event_rate: 8 } - is_false: 1.doc_count - - match: { 1.time: 1.4873796E12 } + - match: { 1.time: 1487379660000 } - match: { 2.airline: foo } - match: { 2.responsetime: 42.0 } - match: { 2.event_rate: 7 }