From 12cdf1cba44d84fd8a8aa3cae39e76a8b359b595 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Fri, 18 Jan 2019 15:08:53 -0600 Subject: [PATCH] ML: Add support for single bucket aggs in Datafeeds (#37544) Single bucket aggs are now supported in datafeed aggregation configurations. --- docs/reference/ml/aggregations.asciidoc | 47 +++++++++++++++++++ .../ml/integration/DatafeedJobsRestIT.java | 38 +++++++++++++++ .../AggregationToJsonProcessor.java | 37 ++++++++++++++- .../aggregation/AggregationTestUtils.java | 9 ++++ .../AggregationToJsonProcessorTests.java | 33 +++++++++++++ 5 files changed, 163 insertions(+), 1 deletion(-) diff --git a/docs/reference/ml/aggregations.asciidoc b/docs/reference/ml/aggregations.asciidoc index 3f09022d17e..a50016807a7 100644 --- a/docs/reference/ml/aggregations.asciidoc +++ b/docs/reference/ml/aggregations.asciidoc @@ -145,6 +145,53 @@ pipeline aggregation to find the first order derivative of the counter ---------------------------------- // NOTCONSOLE +{dfeeds-cap} not only supports multi-bucket aggregations, but also single bucket aggregations. +The following shows two `filter` aggregations, each gathering the number of unique entries for +the `error` field. + +[source,js] +---------------------------------- +{ + "job_id":"servers-unique-errors", + "indices": ["logs-*"], + "aggregations": { + "buckets": { + "date_histogram": { + "field": "time", + "interval": "360s", + "time_zone": "UTC" + }, + "aggregations": { + "time": { + "max": {"field": "time"} + } + "server1": { + "filter": {"term": {"source": "server-name-1"}}, + "aggregations": { + "server1_error_count": { + "value_count": { + "field": "error" + } + } + } + }, + "server2": { + "filter": {"term": {"source": "server-name-2"}}, + "aggregations": { + "server2_error_count": { + "value_count": { + "field": "error" + } + } + } + } + } + } + } +} +---------------------------------- +// NOTCONSOLE + When you define an aggregation in a {dfeed}, it must have the following form: [source,js] diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 2e69702381b..b794fee3118 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -894,6 +894,44 @@ public class DatafeedJobsRestIT extends ESRestTestCase { "action [indices:admin/xpack/rollup/search] is unauthorized for user [ml_admin_plus_data]\"")); } + public void testLookbackWithSingleBucketAgg() throws Exception { + String jobId = "aggs-date-histogram-with-single-bucket-agg-job"; + Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); + createJobRequest.setJsonEntity("{\n" + + " \"description\": \"Aggs job\",\n" + + " \"analysis_config\": {\n" + + " \"bucket_span\": \"3600s\",\n" + + " \"summary_count_field_name\": \"doc_count\",\n" + + " \"detectors\": [\n" + + " {\n" + + " \"function\": \"mean\",\n" + + " \"field_name\": \"responsetime\"" + + " }\n" + + " ]\n" + + " },\n" + + " \"data_description\": {\"time_field\": \"time stamp\"}\n" + + "}"); + client().performRequest(createJobRequest); + + String datafeedId = "datafeed-" + jobId; + String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"}," + + "\"aggregations\":{" + + "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}}," + + "\"airlineFilter\":{\"filter\":{\"term\": {\"airline\":\"AAA\"}}," + + " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; + new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build(); + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId); + waitUntilJobIsClosed(jobId); + Response jobStatsResponse = client().performRequest(new Request("GET", + MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")); + String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity()); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2")); + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + public void testRealtime() throws Exception { String jobId = "job-realtime-1"; createJob(jobId, "airline"); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java index c934653a626..db8dea22675 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; @@ -34,6 +35,7 @@ import java.util.Objects; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.stream.Collectors; /** * Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation. @@ -93,18 +95,39 @@ class AggregationToJsonProcessor { List leafAggregations = new ArrayList<>(); List bucketAggregations = new ArrayList<>(); + List singleBucketAggregations = new ArrayList<>(); // Sort into leaf and bucket aggregations. // The leaf aggregations will be processed first. for (Aggregation agg : aggregations) { if (agg instanceof MultiBucketsAggregation) { bucketAggregations.add((MultiBucketsAggregation)agg); + } else if (agg instanceof SingleBucketAggregation){ + // Skip a level down for single bucket aggs, if they have a sub-agg that is not + // a bucketed agg we should treat it like a leaf in this bucket + SingleBucketAggregation singleBucketAggregation = (SingleBucketAggregation)agg; + for (Aggregation subAgg : singleBucketAggregation.getAggregations()) { + if (subAgg instanceof MultiBucketsAggregation || subAgg instanceof SingleBucketAggregation) { + singleBucketAggregations.add(singleBucketAggregation); + } else { + leafAggregations.add(subAgg); + } + } } else { leafAggregations.add(agg); } } - if (bucketAggregations.size() > 1) { + // If on the current level (indicated via bucketAggregations) or one of the next levels (singleBucketAggregations) + // we have more than 1 `MultiBucketsAggregation`, we should error out. + // We need to make the check in this way as each of the items in `singleBucketAggregations` is treated as a separate branch + // in the recursive handling of this method. + int bucketAggLevelCount = Math.max(bucketAggregations.size(), (int)singleBucketAggregations.stream() + .flatMap(s -> asList(s.getAggregations()).stream()) + .filter(MultiBucketsAggregation.class::isInstance) + .count()); + + if (bucketAggLevelCount > 1) { throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported"); } @@ -137,6 +160,18 @@ class AggregationToJsonProcessor { } } } + noMoreBucketsToProcess = singleBucketAggregations.isEmpty() && noMoreBucketsToProcess; + // we support more than one `SingleBucketAggregation` at each level + // However, we only want to recurse with multi/single bucket aggs. + // Non-bucketed sub-aggregations were handle as leaf aggregations at this level + for (SingleBucketAggregation singleBucketAggregation : singleBucketAggregations) { + processAggs(singleBucketAggregation.getDocCount(), + asList(singleBucketAggregation.getAggregations()) + .stream() + .filter( + aggregation -> (aggregation instanceof MultiBucketsAggregation || aggregation instanceof SingleBucketAggregation)) + .collect(Collectors.toList())); + } // If there are no more bucket aggregations to process we've reached the end // and it's time to write the doc diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java index 47d2eb828c6..38202eee0ff 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.Terms; @@ -37,6 +38,14 @@ public final class AggregationTestUtils { return bucket; } + static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, List subAggregations) { + SingleBucketAggregation singleBucketAggregation = mock(SingleBucketAggregation.class); + when(singleBucketAggregation.getName()).thenReturn(name); + when(singleBucketAggregation.getDocCount()).thenReturn(docCount); + when(singleBucketAggregation.getAggregations()).thenReturn(createAggs(subAggregations)); + return singleBucketAggregation; + } + static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) { return createHistogramBucket(timestamp, docCount, Collections.emptyList()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java index bf283b5be51..be79b461eeb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -31,6 +31,7 @@ import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.Aggregat import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleBucketAgg; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; import static org.hamcrest.Matchers.containsString; @@ -439,6 +440,38 @@ public class AggregationToJsonProcessorTests extends ESTestCase { "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}")); } + public void testSingleBucketAgg() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), + createSingleBucketAgg("agg1", 3, Collections.singletonList(createMax("field1", 5.0))), + createSingleBucketAgg("agg2", 1, Collections.singletonList(createMax("field2", 3.0))))), + createHistogramBucket(2000L, 7, Arrays.asList( + createMax("time", 2000), + createSingleBucketAgg("agg2", 3, Collections.singletonList(createMax("field2", 1.0))), + createSingleBucketAgg("agg1", 4, Collections.singletonList(createMax("field1", 7.0)))))); + + String json = aggToString(Sets.newHashSet("field1", "field2"), histogramBuckets); + + assertThat(json, equalTo("{\"time\":1000,\"field1\":5.0,\"field2\":3.0,\"doc_count\":4}" + + " {\"time\":2000,\"field2\":1.0,\"field1\":7.0,\"doc_count\":7}")); + } + + public void testSingleBucketAgg_failureWithSubMultiBucket() throws IOException { + + List histogramBuckets = Collections.singletonList( + createHistogramBucket(1000L, 4, Arrays.asList( + createMax("time", 1000), + createSingleBucketAgg("agg1", 3, + Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 5.0))), + createSingleBucketAgg("agg2", 1, + Arrays.asList(createHistogramAggregation("histo", Collections.emptyList()),createMax("field1", 3.0)))))); + + + expectThrows(IllegalArgumentException.class, + () -> aggToString(Sets.newHashSet("my_field"), histogramBuckets)); + } + private String aggToString(Set fields, Histogram.Bucket bucket) throws IOException { return aggToString(fields, Collections.singletonList(bucket)); }