From 701dc53c2a21e742e56b94c0cd3c1db24c1a2a53 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Fri, 23 Jun 2017 12:36:32 +0100 Subject: [PATCH] [ML] Refactor aggregation response handling to make more flexible (elastic/x-pack-elasticsearch#1795) Currently, aggregated datafeeds construct JSON from the aggregation response by traversing all nested aggregations. In order to achieve this, multiple leaf aggregations are not supported. Also, scenarios it makes it impossible to effectively use pipeline aggregations as it will not ignore the intermediate bucket aggregations. This commit refactors AggregationToJsonProcessor in order to support the above scenarios. This is achieved by only converting the fields of interest, that is the job analysis fields. Original commit: elastic/x-pack-elasticsearch@8b575956ca1aca5de19c1b1d6ed85d17cadb93dc --- .../aggregation/AggregationDataExtractor.java | 2 +- .../AggregationDataExtractorContext.java | 7 +- .../AggregationDataExtractorFactory.java | 1 + .../AggregationToJsonProcessor.java | 87 ++++++++++----- .../xpack/ml/job/config/AnalysisConfig.java | 8 +- .../writer/AbstractDataToProcessWriter.java | 2 +- .../AggregationDataExtractorTests.java | 7 +- .../AggregationToJsonProcessorTests.java | 104 ++++++++++++++---- .../xpack/ml/job/config/JobTests.java | 5 +- 9 files changed, 159 insertions(+), 64 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java index e83fe1f2dd2..7d72bcd09e6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -146,7 +146,7 @@ class AggregationDataExtractor implements DataExtractor { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor( - context.timeField, context.includeDocCount, outputStream)) { + context.timeField, context.fields, context.includeDocCount, outputStream)) { while (histogramBuckets.isEmpty() == false && processor.getKeyValueCount() < BATCH_KEY_VALUE_PAIRS) { processor.process(histogramBuckets.removeFirst()); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java index e34fc8aa3e9..eefd32ef1fd 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java @@ -10,11 +10,13 @@ import org.elasticsearch.search.aggregations.AggregatorFactories; import java.util.List; import java.util.Objects; +import java.util.Set; class AggregationDataExtractorContext { final String jobId; final String timeField; + final Set fields; final String[] indices; final String[] types; final QueryBuilder query; @@ -23,10 +25,11 @@ class AggregationDataExtractorContext { final long end; final boolean includeDocCount; - AggregationDataExtractorContext(String jobId, String timeField, List indices, List types, QueryBuilder query, - AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount) { + AggregationDataExtractorContext(String jobId, String timeField, Set fields, List indices, List types, + QueryBuilder query, AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount) { this.jobId = Objects.requireNonNull(jobId); this.timeField = Objects.requireNonNull(timeField); + this.fields = Objects.requireNonNull(fields); this.indices = indices.toArray(new String[indices.size()]); this.types = types.toArray(new String[types.size()]); this.query = Objects.requireNonNull(query); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java index 3db0485b50b..67546853fa7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -30,6 +30,7 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory { AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext( job.getId(), job.getDataDescription().getTimeField(), + job.getAnalysisConfig().analysisFields(), datafeedConfig.getIndices(), datafeedConfig.getTypes(), datafeedConfig.getQuery(), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index 51c17c7cb36..3a59e38334f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -11,8 +11,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; @@ -28,6 +28,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation. @@ -37,14 +38,25 @@ import java.util.Objects; class AggregationToJsonProcessor implements Releasable { private final String timeField; + private final Set fields; private final boolean includeDocCount; private final XContentBuilder jsonBuilder; private final Map keyValuePairs; private long keyValueWrittenCount; - AggregationToJsonProcessor(String timeField, boolean includeDocCount, OutputStream outputStream) + /** + * Constructs a processor that processes aggregations into JSON + * + * @param timeField the time field + * @param fields the fields to convert into JSON + * @param includeDocCount whether to include the doc_count + * @param outputStream the stream to write the output + * @throws IOException if an error occurs during the processing + */ + AggregationToJsonProcessor(String timeField, Set fields, boolean includeDocCount, OutputStream outputStream) throws IOException { this.timeField = Objects.requireNonNull(timeField); + this.fields = Objects.requireNonNull(fields); this.includeDocCount = includeDocCount; jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); keyValuePairs = new LinkedHashMap<>(); @@ -55,7 +67,7 @@ class AggregationToJsonProcessor implements Releasable { * Processes a {@link Histogram.Bucket} and writes a flat JSON document for each of its leaf aggregations. * Supported sub-aggregations include: *
    - *
  • {@link Terms}
  • + *
  • {@link MultiBucketsAggregation}
  • *
  • {@link NumericMetricsAggregation.SingleValue}
  • *
  • {@link Percentiles}
  • *
@@ -82,51 +94,66 @@ class AggregationToJsonProcessor implements Releasable { } private void processNestedAggs(long docCount, List aggs) throws IOException { - if (aggs.isEmpty()) { - writeJsonObject(docCount); - return; - } - if (aggs.get(0) instanceof Terms) { - if (aggs.size() > 1) { - throw new IllegalArgumentException("Multiple non-leaf nested aggregations are not supported"); - } - processTerms((Terms) aggs.get(0)); - } else { - List addedKeys = new ArrayList<>(); - for (Aggregation nestedAgg : aggs) { - if (nestedAgg instanceof NumericMetricsAggregation.SingleValue) { - addedKeys.add(processSingleValue((NumericMetricsAggregation.SingleValue) nestedAgg)); - } else if (nestedAgg instanceof Percentiles) { - addedKeys.add(processPercentiles((Percentiles) nestedAgg)); + boolean processedBucketAgg = false; + List addedLeafKeys = new ArrayList<>(); + for (Aggregation agg : aggs) { + if (fields.contains(agg.getName())) { + if (agg instanceof MultiBucketsAggregation) { + if (processedBucketAgg) { + throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported"); + } + if (addedLeafKeys.isEmpty() == false) { + throw new IllegalArgumentException("Mixing bucket and leaf aggregations at the same level is not supported"); + } + processedBucketAgg = true; + processBucket((MultiBucketsAggregation) agg); } else { - throw new IllegalArgumentException("Unsupported aggregation type [" + nestedAgg.getName() + "]"); + if (processedBucketAgg) { + throw new IllegalArgumentException("Mixing bucket and leaf aggregations at the same level is not supported"); + } + addedLeafKeys.add(processLeaf(agg)); } } + } + if (addedLeafKeys.isEmpty() == false) { + writeJsonObject(docCount); + addedLeafKeys.forEach(k -> keyValuePairs.remove(k)); + } + + if (processedBucketAgg == false && addedLeafKeys.isEmpty()) { writeJsonObject(docCount); - addedKeys.forEach(k -> keyValuePairs.remove(k)); } } - private void processTerms(Terms termsAgg) throws IOException { - for (Terms.Bucket bucket : termsAgg.getBuckets()) { - keyValuePairs.put(termsAgg.getName(), bucket.getKey()); + private void processBucket(MultiBucketsAggregation bucketAgg) throws IOException { + for (MultiBucketsAggregation.Bucket bucket : bucketAgg.getBuckets()) { + keyValuePairs.put(bucketAgg.getName(), bucket.getKey()); processNestedAggs(bucket.getDocCount(), asList(bucket.getAggregations())); - keyValuePairs.remove(termsAgg.getName()); + keyValuePairs.remove(bucketAgg.getName()); } } - private String processSingleValue(NumericMetricsAggregation.SingleValue singleValue) throws IOException { - keyValuePairs.put(singleValue.getName(), singleValue.value()); - return singleValue.getName(); + private String processLeaf(Aggregation agg) throws IOException { + if (agg instanceof NumericMetricsAggregation.SingleValue) { + processSingleValue((NumericMetricsAggregation.SingleValue) agg); + } else if (agg instanceof Percentiles) { + processPercentiles((Percentiles) agg); + } else { + throw new IllegalArgumentException("Unsupported aggregation type [" + agg.getName() + "]"); + } + return agg.getName(); } - private String processPercentiles(Percentiles percentiles) throws IOException { + private void processSingleValue(NumericMetricsAggregation.SingleValue singleValue) throws IOException { + keyValuePairs.put(singleValue.getName(), singleValue.value()); + } + + private void processPercentiles(Percentiles percentiles) throws IOException { Iterator percentileIterator = percentiles.iterator(); keyValuePairs.put(percentiles.getName(), percentileIterator.next().getValue()); if (percentileIterator.hasNext()) { throw new IllegalArgumentException("Multi-percentile aggregation [" + percentiles.getName() + "] is not supported"); } - return percentiles.getName(); } private void writeJsonObject(long docCount) throws IOException { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/AnalysisConfig.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/AnalysisConfig.java index e5a89e07546..d564687c410 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/AnalysisConfig.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/config/AnalysisConfig.java @@ -274,16 +274,16 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable { } /** - * Return the list of fields required by the analysis. + * Return the set of fields required by the analysis. * These are the influencer fields, metric field, partition field, * by field and over field of each detector, plus the summary count * field and the categorization field name of the job. * null and empty strings are filtered from the * config. * - * @return List of required analysis fields - never null + * @return Set of required analysis fields - never null */ - public List analysisFields() { + public Set analysisFields() { Set analysisFields = termFields(); addIfNotNull(analysisFields, categorizationFieldName); @@ -296,7 +296,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable { // remove empty strings analysisFields.remove(""); - return new ArrayList<>(analysisFields); + return analysisFields; } private static void addIfNotNull(Set fields, String field) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java index e1380445783..7ddcee67603 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/AbstractDataToProcessWriter.java @@ -170,7 +170,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter * must see in the csv header */ final Collection inputFields() { - Set requiredFields = new HashSet<>(analysisConfig.analysisFields()); + Set requiredFields = analysisConfig.analysisFields(); requiredFields.add(dataDescription.getTimeField()); return requiredFields; diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java index 1e5c1e968be..8409568ef2b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -28,8 +28,10 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; @@ -49,6 +51,7 @@ public class AggregationDataExtractorTests extends ESTestCase { private List capturedSearchRequests; private String jobId; private String timeField; + private Set fields; private List types; private List indices; private QueryBuilder query; @@ -79,6 +82,8 @@ public class AggregationDataExtractorTests extends ESTestCase { capturedSearchRequests = new ArrayList<>(); jobId = "test-job"; timeField = "time"; + fields = new HashSet<>(); + fields.addAll(Arrays.asList("time", "airline", "responsetime")); indices = Arrays.asList("index-1", "index-2"); types = Arrays.asList("type-1", "type-2"); query = QueryBuilders.matchAllQuery(); @@ -270,7 +275,7 @@ public class AggregationDataExtractorTests extends ESTestCase { } private AggregationDataExtractorContext createContext(long start, long end) { - return new AggregationDataExtractorContext(jobId, timeField, indices, types, query, aggs, start, end, true); + return new AggregationDataExtractorContext(jobId, timeField, fields, indices, types, query, aggs, start, end, true); } @SuppressWarnings("unchecked") diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index 63016ab65aa..5134315d3b4 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -5,9 +5,12 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.max.Max; import org.elasticsearch.test.ESTestCase; import java.io.ByteArrayOutputStream; @@ -18,6 +21,7 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs; @@ -41,7 +45,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createHistogramBucket(2000L, 5) ); - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets)); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> aggToString("time", Collections.emptySet(), histogramBuckets)); assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]")); } @@ -51,7 +56,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createHistogramBucket(2000L, 5, Collections.singletonList(createTerms("time"))) ); - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets)); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> aggToString("time", Collections.emptySet(), histogramBuckets)); assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]")); } @@ -61,7 +67,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createHistogramBucket(2000L, 5, Collections.singletonList(createMax("timestamp", 2800))) ); - String json = aggToString("timestamp", histogramBuckets); + String json = aggToString("timestamp", Collections.emptySet(), histogramBuckets); assertThat(json, equalTo("{\"timestamp\":1200,\"doc_count\":3} {\"timestamp\":2800,\"doc_count\":5}")); assertThat(keyValuePairsWritten, equalTo(4L)); @@ -73,7 +79,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createHistogramBucket(2000L, 5, Collections.singletonList(createMax("time", 2000))) ); - String json = aggToString("time", false, histogramBuckets); + String json = aggToString("time", Collections.emptySet(), false, histogramBuckets); assertThat(json, equalTo("{\"time\":1000} {\"time\":2000}")); assertThat(keyValuePairsWritten, equalTo(2L)); @@ -87,7 +93,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createMax("time", 2000), createSingleValue("my_value", 2.0))) ); - String json = aggToString("time", histogramBuckets); + String json = aggToString("time", Sets.newHashSet("my_value"), histogramBuckets); assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} {\"time\":2000,\"my_value\":2.0,\"doc_count\":5}")); } @@ -106,7 +112,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createTerms("my_field", new Term("c", 4), new Term("b", 3)))) ); - String json = aggToString("time", histogramBuckets); + String json = aggToString("time", Sets.newHashSet("time", "my_field"), histogramBuckets); assertThat(json, equalTo("{\"time\":1100,\"my_field\":\"a\",\"doc_count\":1} " + "{\"time\":1100,\"my_field\":\"b\",\"doc_count\":2} " + @@ -132,7 +138,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createTerms("my_field", new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0)))) ); - String json = aggToString("time", histogramBuckets); + String json = aggToString("time", Sets.newHashSet("my_field", "my_value"), histogramBuckets); assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":11.0,\"doc_count\":1} " + "{\"time\":1000,\"my_field\":\"b\",\"my_value\":12.0,\"doc_count\":2} " + @@ -179,7 +185,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createTerms("my_field", new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs)))) ); - String json = aggToString("time", false, histogramBuckets); + String json = aggToString("time", Sets.newHashSet("my_field", "my_value", "my_value2"), false, histogramBuckets); assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":111.0,\"my_value2\":112.0} " + "{\"time\":1000,\"my_field\":\"b\",\"my_value\":121.0,\"my_value2\":122.0} " + @@ -192,24 +198,78 @@ public class AggregationToJsonProcessorTests extends ESTestCase { public void testProcessGivenUnsupportedAggregationUnderHistogram() throws IOException { Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); - Histogram anotherHistogram = mock(Histogram.class); + Aggregation anotherHistogram = mock(Aggregation.class); when(anotherHistogram.getName()).thenReturn("nested-agg"); Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), anotherHistogram)); when(histogramBucket.getAggregations()).thenReturn(subAggs); - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket)); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> aggToString("time", Sets.newHashSet("nested-agg"), histogramBucket)); assertThat(e.getMessage(), containsString("Unsupported aggregation type [nested-agg]")); } - public void testProcessGivenMultipleNestedAggregations() throws IOException { + public void testProcessGivenMultipleBucketAggregations() throws IOException { Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); Terms terms1 = mock(Terms.class); + when(terms1.getName()).thenReturn("terms_1"); Terms terms2 = mock(Terms.class); + when(terms2.getName()).thenReturn("terms_2"); Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), terms1, terms2)); when(histogramBucket.getAggregations()).thenReturn(subAggs); - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket)); - assertThat(e.getMessage(), containsString("Multiple non-leaf nested aggregations are not supported")); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> aggToString("time", Sets.newHashSet("terms_1", "terms_2"), histogramBucket)); + assertThat(e.getMessage(), containsString("Multiple bucket aggregations at the same level are not supported")); + } + + public void testProcessGivenMixedBucketAndLeafAggregationsAtSameLevel_BucketFirst() throws IOException { + Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); + Terms terms = mock(Terms.class); + when(terms.getName()).thenReturn("terms"); + Max maxAgg = createMax("max_value", 1200); + Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), terms, maxAgg)); + when(histogramBucket.getAggregations()).thenReturn(subAggs); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> aggToString("time", Sets.newHashSet("terms", "max_value"), histogramBucket)); + assertThat(e.getMessage(), containsString("Mixing bucket and leaf aggregations at the same level is not supported")); + } + + public void testProcessGivenMixedBucketAndLeafAggregationsAtSameLevel_LeafFirst() throws IOException { + Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); + Max maxAgg = createMax("max_value", 1200); + Terms terms = mock(Terms.class); + when(terms.getName()).thenReturn("terms"); + Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), maxAgg, terms)); + when(histogramBucket.getAggregations()).thenReturn(subAggs); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> aggToString("time", Sets.newHashSet("terms", "max_value"), histogramBucket)); + assertThat(e.getMessage(), containsString("Mixing bucket and leaf aggregations at the same level is not supported")); + } + + public void testProcessGivenBucketAndLeafAggregationsButBucketNotInFields() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1100), + createMax("my_value", 1), + createTerms("my_field", new Term("a", 1), new Term("b", 2), new Term("c", 1)))), + createHistogramBucket(2000L, 5, Arrays.asList( + createMax("time", 2200), + createMax("my_value", 2), + createTerms("my_field", new Term("a", 5), new Term("b", 2)))), + createHistogramBucket(3000L, 0, Collections.singletonList(createMax("time", -1))), + createHistogramBucket(4000L, 7, Arrays.asList( + createMax("time", 4400), + createMax("my_value", 4), + createTerms("my_field", new Term("c", 4), new Term("b", 3)))) + ); + + String json = aggToString("time", Sets.newHashSet("time", "my_value"), histogramBuckets); + + assertThat(json, equalTo("{\"time\":1100,\"my_value\":1.0,\"doc_count\":4} " + + "{\"time\":2200,\"my_value\":2.0,\"doc_count\":5} " + + "{\"time\":4400,\"my_value\":4.0,\"doc_count\":7}")); } public void testProcessGivenSinglePercentilesPerHistogram() throws IOException { @@ -224,7 +284,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createMax("time", 4000), createPercentiles("my_field", 4.0))) ); - String json = aggToString("time", histogramBuckets); + String json = aggToString("time", Sets.newHashSet("my_field"), histogramBuckets); assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " + "{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " + @@ -244,21 +304,23 @@ public class AggregationToJsonProcessorTests extends ESTestCase { createMax("time", 4000), createPercentiles("my_field", 4.0))) ); - IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets)); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> aggToString("time", Sets.newHashSet("my_field"), histogramBuckets)); assertThat(e.getMessage(), containsString("Multi-percentile aggregation [my_field] is not supported")); } - private String aggToString(String timeField, Histogram.Bucket bucket) throws IOException { - return aggToString(timeField, true, Collections.singletonList(bucket)); + private String aggToString(String timeField, Set fields, Histogram.Bucket bucket) throws IOException { + return aggToString(timeField, fields, true, Collections.singletonList(bucket)); } - private String aggToString(String timeField, List buckets) throws IOException { - return aggToString(timeField, true, buckets); + private String aggToString(String timeField, Set fields, List buckets) throws IOException { + return aggToString(timeField, fields, true, buckets); } - private String aggToString(String timeField, boolean includeDocCount, List buckets) throws IOException { + private String aggToString(String timeField, Set fields, boolean includeDocCount, List buckets) + throws IOException { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, includeDocCount, outputStream)) { + try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount, outputStream)) { for (Histogram.Bucket bucket : buckets) { processor.process(bucket); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java index 960818d360f..22a555c387a 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTests.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -187,7 +186,7 @@ public class JobTests extends AbstractSerializingTestCase { AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Arrays.asList(d1.build(), d2.build())); ac.setSummaryCountFieldName("agg"); - List analysisFields = ac.build().analysisFields(); + Set analysisFields = ac.build().analysisFields(); assertTrue(analysisFields.size() == 5); assertTrue(analysisFields.contains("agg")); @@ -199,7 +198,6 @@ public class JobTests extends AbstractSerializingTestCase { assertFalse(analysisFields.contains("max")); assertFalse(analysisFields.contains("median")); assertFalse(analysisFields.contains("")); - assertFalse(analysisFields.contains(null)); Detector.Builder d3 = new Detector.Builder("count", null); d3.setByFieldName("by2"); @@ -221,7 +219,6 @@ public class JobTests extends AbstractSerializingTestCase { assertFalse(analysisFields.contains("max")); assertFalse(analysisFields.contains("median")); assertFalse(analysisFields.contains("")); - assertFalse(analysisFields.contains(null)); } // JobConfigurationVerifierTests: