From d16bf992a92a53aba20b51150fc1a1bbd86c393c Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Thu, 30 Apr 2015 11:24:36 +0100 Subject: [PATCH] Aggregations: min_bucket aggregation An aggregation to calculate the minimum value in a set of buckets. Closes #9999 --- .../search/aggregations/reducer.asciidoc | 1 + .../reducer/min-bucket-aggregation.asciidoc | 82 ++++ .../aggregations/AggregationModule.java | 4 +- .../TransportAggregationModule.java | 4 +- .../reducers/ReducerBuilders.java | 7 +- .../{ => max}/MaxBucketBuilder.java | 2 +- .../{ => max}/MaxBucketParser.java | 2 +- .../{ => max}/MaxBucketReducer.java | 3 +- .../bucketmetrics/min/MinBucketBuilder.java | 59 +++ .../bucketmetrics/min/MinBucketParser.java | 97 ++++ .../bucketmetrics/min/MinBucketReducer.java | 151 ++++++ .../aggregations/reducers/MinBucketTests.java | 433 ++++++++++++++++++ 12 files changed, 839 insertions(+), 6 deletions(-) create mode 100644 docs/reference/search/aggregations/reducer/min-bucket-aggregation.asciidoc rename src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/{ => max}/MaxBucketBuilder.java (99%) rename src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/{ => max}/MaxBucketParser.java (99%) rename src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/{ => max}/MaxBucketReducer.java (98%) create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketBuilder.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketParser.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketReducer.java create mode 100644 src/test/java/org/elasticsearch/search/aggregations/reducers/MinBucketTests.java diff --git a/docs/reference/search/aggregations/reducer.asciidoc b/docs/reference/search/aggregations/reducer.asciidoc index 11b0826e9eb..a725bc77e38 100644 --- a/docs/reference/search/aggregations/reducer.asciidoc +++ b/docs/reference/search/aggregations/reducer.asciidoc @@ -2,4 +2,5 @@ include::reducer/derivative-aggregation.asciidoc[] include::reducer/max-bucket-aggregation.asciidoc[] +include::reducer/min-bucket-aggregation.asciidoc[] include::reducer/movavg-aggregation.asciidoc[] diff --git a/docs/reference/search/aggregations/reducer/min-bucket-aggregation.asciidoc b/docs/reference/search/aggregations/reducer/min-bucket-aggregation.asciidoc new file mode 100644 index 00000000000..8eb69f9d683 --- /dev/null +++ b/docs/reference/search/aggregations/reducer/min-bucket-aggregation.asciidoc @@ -0,0 +1,82 @@ +[[search-aggregations-reducer-min-bucket-aggregation]] +=== Max Bucket Aggregation + +A sibling reducer aggregation which identifies the bucket(s) with the minimum value of a specified metric in a sibling aggregation +and outputs both the value and the key(s) of the bucket(s). The specified metric must be numeric and the sibling aggregation must +be a multi-bucket aggregation. + +The following snippet calculates the minimum of the total monthly `sales`: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "interval" : "month" + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + } + } + }, + "min_monthly_sales": { + "min_bucket": { + "buckets_paths": "sales_per_month>sales" <1> + } + } + } +} +-------------------------------------------------- + +<1> `bucket_paths` instructs this max_bucket aggregation that we want the minimum value of the `sales` aggregation in the +`sales_per_month` date histogram. + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "sales": { + "value": 550 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "sales": { + "value": 60 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "sales": { + "value": 375 + } + } + ] + }, + "min_monthly_sales": { + "keys": ["2015/02/01 00:00:00"], <1> + "value": 60 + } + } +} +-------------------------------------------------- + +<1> `keys` is an array of strings since the minimum value may be present in multiple buckets + diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index 803b52bc0bf..2f6e929071f 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -57,7 +57,8 @@ import org.elasticsearch.search.aggregations.metrics.sum.SumParser; import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser; import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser; import org.elasticsearch.search.aggregations.reducers.Reducer; -import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketParser; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketParser; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketParser; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser; import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelModule; @@ -107,6 +108,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ reducerParsers.add(DerivativeParser.class); reducerParsers.add(MaxBucketParser.class); + reducerParsers.add(MinBucketParser.class); reducerParsers.add(MovAvgParser.class); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index b0fe986a081..27f5cbcf0aa 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -61,7 +61,8 @@ import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits; import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount; import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue; import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue; -import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketReducer; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketReducer; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketReducer; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer; import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule; @@ -118,6 +119,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM InternalSimpleValue.registerStreams(); InternalBucketMetricValue.registerStreams(); MaxBucketReducer.registerStreams(); + MinBucketReducer.registerStreams(); MovAvgReducer.registerStreams(); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerBuilders.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerBuilders.java index ba6d3ebe7c2..d2632721c64 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerBuilders.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerBuilders.java @@ -19,7 +19,8 @@ package org.elasticsearch.search.aggregations.reducers; -import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketBuilder; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketBuilder; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketBuilder; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgBuilder; @@ -36,6 +37,10 @@ public final class ReducerBuilders { return new MaxBucketBuilder(name); } + public static final MinBucketBuilder minBucket(String name) { + return new MinBucketBuilder(name); + } + public static final MovAvgBuilder movingAvg(String name) { return new MovAvgBuilder(name); } diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketBuilder.java similarity index 99% rename from src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketBuilder.java rename to src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketBuilder.java index 7fbcd54f789..31d588a6497 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketBuilder.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketBuilder.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.search.aggregations.reducers.bucketmetrics; +package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketParser.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketParser.java similarity index 99% rename from src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketParser.java rename to src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketParser.java index 28fe0110238..c8f3bad49f1 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketParser.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketParser.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.search.aggregations.reducers.bucketmetrics; +package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.xcontent.XContentParser; diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketReducer.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketReducer.java similarity index 98% rename from src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketReducer.java rename to src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketReducer.java index 22bc30fd730..1d2d5c8d26c 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/MaxBucketReducer.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/max/MaxBucketReducer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.search.aggregations.reducers.bucketmetrics; +package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import org.elasticsearch.search.aggregations.reducers.BucketHelpers; import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue; import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.ReducerFactory; import org.elasticsearch.search.aggregations.reducers.ReducerStreams; diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketBuilder.java new file mode 100644 index 00000000000..b792b7bbac9 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketBuilder.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.reducers.ReducerBuilder; +import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser; + +import java.io.IOException; + +public class MinBucketBuilder extends ReducerBuilder { + + private String format; + private GapPolicy gapPolicy; + + public MinBucketBuilder(String name) { + super(name, MinBucketReducer.TYPE.name()); + } + + public MinBucketBuilder format(String format) { + this.format = format; + return this; + } + + public MinBucketBuilder gapPolicy(GapPolicy gapPolicy) { + this.gapPolicy = gapPolicy; + return this; + } + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(MinBucketParser.FORMAT.getPreferredName(), format); + } + if (gapPolicy != null) { + builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); + } + return builder; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketParser.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketParser.java new file mode 100644 index 00000000000..b956bdb6d79 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketParser.java @@ -0,0 +1,97 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.reducers.Reducer; +import org.elasticsearch.search.aggregations.reducers.ReducerFactory; +import org.elasticsearch.search.aggregations.support.format.ValueFormat; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class MinBucketParser implements Reducer.Parser { + public static final ParseField FORMAT = new ParseField("format"); + + @Override + public String type() { + return MinBucketReducer.TYPE.name(); + } + + @Override + public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + String[] bucketsPaths = null; + String format = null; + GapPolicy gapPolicy = GapPolicy.SKIP; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + if (FORMAT.match(currentFieldName)) { + format = parser.text(); + } else if (BUCKETS_PATH.match(currentFieldName)) { + bucketsPaths = new String[] { parser.text() }; + } else if (GAP_POLICY.match(currentFieldName)) { + gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation()); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (BUCKETS_PATH.match(currentFieldName)) { + List paths = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + String path = parser.text(); + paths.add(path); + } + bucketsPaths = paths.toArray(new String[paths.size()]); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].", + parser.getTokenLocation()); + } + } + + if (bucketsPaths == null) { + throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName() + + "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation()); + } + + ValueFormatter formatter = null; + if (format != null) { + formatter = ValueFormat.Patternable.Number.format(format).formatter(); + } + + return new MinBucketReducer.Factory(reducerName, bucketsPaths, gapPolicy, formatter); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketReducer.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketReducer.java new file mode 100644 index 00000000000..7ab257c9fb0 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/bucketmetrics/min/MinBucketReducer.java @@ -0,0 +1,151 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.reducers.BucketHelpers; +import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.reducers.Reducer; +import org.elasticsearch.search.aggregations.reducers.ReducerFactory; +import org.elasticsearch.search.aggregations.reducers.ReducerStreams; +import org.elasticsearch.search.aggregations.reducers.SiblingReducer; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue; +import org.elasticsearch.search.aggregations.support.AggregationPath; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class MinBucketReducer extends SiblingReducer { + + public final static Type TYPE = new Type("min_bucket"); + + public final static ReducerStreams.Stream STREAM = new ReducerStreams.Stream() { + @Override + public MinBucketReducer readResult(StreamInput in) throws IOException { + MinBucketReducer result = new MinBucketReducer(); + result.readFrom(in); + return result; + } + }; + + private ValueFormatter formatter; + private GapPolicy gapPolicy; + + public static void registerStreams() { + ReducerStreams.registerStream(STREAM, TYPE.stream()); + } + + private MinBucketReducer() { + } + + protected MinBucketReducer(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter, + Map metaData) { + super(name, bucketsPaths, metaData); + this.gapPolicy = gapPolicy; + this.formatter = formatter; + } + + @Override + public Type type() { + return TYPE; + } + + public InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) { + List minBucketKeys = new ArrayList<>(); + double minValue = Double.POSITIVE_INFINITY; + List bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList(); + for (Aggregation aggregation : aggregations) { + if (aggregation.getName().equals(bucketsPath.get(0))) { + bucketsPath = bucketsPath.subList(1, bucketsPath.size()); + InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation; + List buckets = multiBucketsAgg.getBuckets(); + for (int i = 0; i < buckets.size(); i++) { + Bucket bucket = buckets.get(i); + Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, gapPolicy); + if (bucketValue != null) { + if (bucketValue < minValue) { + minBucketKeys.clear(); + minBucketKeys.add(bucket.getKeyAsString()); + minValue = bucketValue; + } else if (bucketValue.equals(minValue)) { + minBucketKeys.add(bucket.getKeyAsString()); + } + } + } + } + } + String[] keys = minBucketKeys.toArray(new String[minBucketKeys.size()]); + return new InternalBucketMetricValue(name(), keys, minValue, formatter, Collections.EMPTY_LIST, metaData()); + } + + @Override + public void doReadFrom(StreamInput in) throws IOException { + formatter = ValueFormatterStreams.readOptional(in); + gapPolicy = GapPolicy.readFrom(in); + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + ValueFormatterStreams.writeOptional(formatter, out); + gapPolicy.writeTo(out); + } + + public static class Factory extends ReducerFactory { + + private final ValueFormatter formatter; + private final GapPolicy gapPolicy; + + public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter) { + super(name, TYPE.name(), bucketsPaths); + this.gapPolicy = gapPolicy; + this.formatter = formatter; + } + + @Override + protected Reducer createInternal(Map metaData) throws IOException { + return new MinBucketReducer(name, bucketsPaths, gapPolicy, formatter, metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List reducerFactories) { + if (bucketsPaths.length != 1) { + throw new IllegalStateException(Reducer.Parser.BUCKETS_PATH.getPreferredName() + + " must contain a single entry for reducer [" + name + "]"); + } + } + + } + +} diff --git a/src/test/java/org/elasticsearch/search/aggregations/reducers/MinBucketTests.java b/src/test/java/org/elasticsearch/search/aggregations/reducers/MinBucketTests.java new file mode 100644 index 00000000000..b755159526d --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/reducers/MinBucketTests.java @@ -0,0 +1,433 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.search.aggregations.reducers.ReducerBuilders.minBucket; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.IsNull.notNullValue; + +@ElasticsearchIntegrationTest.SuiteScopeTest +public class MinBucketTests extends ElasticsearchIntegrationTest { + + private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; + + static int numDocs; + static int interval; + static int minRandomValue; + static int maxRandomValue; + static int numValueBuckets; + static long[] valueCounts; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + numDocs = randomIntBetween(6, 20); + interval = randomIntBetween(2, 5); + + minRandomValue = 0; + maxRandomValue = 20; + + numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; + valueCounts = new long[numValueBuckets]; + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); + builders.add(client().prepareIndex("idx", "type").setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval)) + .endObject())); + final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); + valueCounts[bucket]++; + } + + assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); + for (int i = 0; i < 2; i++) { + builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject())); + } + indexRandom(true, builders); + ensureSearchable(); + } + + @Test + public void testDocCount_topLevel() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .addAggregation(minBucket("min_bucket").setBucketsPaths("histo>_count")).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + List minKeys = new ArrayList<>(); + double minValue = Double.POSITIVE_INFINITY; + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); + assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + if (bucket.getDocCount() < minValue) { + minValue = bucket.getDocCount(); + minKeys = new ArrayList<>(); + minKeys.add(bucket.getKeyAsString()); + } else if (bucket.getDocCount() == minValue) { + minKeys.add(bucket.getKeyAsString()); + } + } + + InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket"); + assertThat(minBucketValue, notNullValue()); + assertThat(minBucketValue.getName(), equalTo("min_bucket")); + assertThat(minBucketValue.value(), equalTo(minValue)); + assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); + } + + @Test + public void testDocCount_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(minBucket("min_bucket").setBucketsPaths("histo>_count"))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + List minKeys = new ArrayList<>(); + double minValue = Double.POSITIVE_INFINITY; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + if (bucket.getDocCount() < minValue) { + minValue = bucket.getDocCount(); + minKeys = new ArrayList<>(); + minKeys.add(bucket.getKeyAsString()); + } else if (bucket.getDocCount() == minValue) { + minKeys.add(bucket.getKeyAsString()); + } + } + + InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket"); + assertThat(minBucketValue, notNullValue()); + assertThat(minBucketValue.getName(), equalTo("min_bucket")); + assertThat(minBucketValue.value(), equalTo(minValue)); + assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); + } + } + + @Test + public void testMetric_topLevel() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(minBucket("min_bucket").setBucketsPaths("terms>sum")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(interval)); + + List minKeys = new ArrayList<>(); + double minValue = Double.POSITIVE_INFINITY; + for (int i = 0; i < interval; ++i) { + Terms.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); + assertThat(bucket.getDocCount(), greaterThan(0l)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + if (sum.value() < minValue) { + minValue = sum.value(); + minKeys = new ArrayList<>(); + minKeys.add(bucket.getKeyAsString()); + } else if (sum.value() == minValue) { + minKeys.add(bucket.getKeyAsString()); + } + } + + InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket"); + assertThat(minBucketValue, notNullValue()); + assertThat(minBucketValue.getName(), equalTo("min_bucket")); + assertThat(minBucketValue.value(), equalTo(minValue)); + assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); + } + + @Test + public void testMetric_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(minBucket("min_bucket").setBucketsPaths("histo>sum"))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + List minKeys = new ArrayList<>(); + double minValue = Double.POSITIVE_INFINITY; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + if (bucket.getDocCount() != 0) { + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + if (sum.value() < minValue) { + minValue = sum.value(); + minKeys = new ArrayList<>(); + minKeys.add(bucket.getKeyAsString()); + } else if (sum.value() == minValue) { + minKeys.add(bucket.getKeyAsString()); + } + } + } + + InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket"); + assertThat(minBucketValue, notNullValue()); + assertThat(minBucketValue.getName(), equalTo("min_bucket")); + assertThat(minBucketValue.value(), equalTo(minValue)); + assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); + } + } + + @Test + public void testMetric_asSubAggWithInsertZeros() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(minBucket("min_bucket").setBucketsPaths("histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS))) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + List minKeys = new ArrayList<>(); + double minValue = Double.POSITIVE_INFINITY; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + if (sum.value() < minValue) { + minValue = sum.value(); + minKeys = new ArrayList<>(); + minKeys.add(bucket.getKeyAsString()); + } else if (sum.value() == minValue) { + minKeys.add(bucket.getKeyAsString()); + } + } + + InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket"); + assertThat(minBucketValue, notNullValue()); + assertThat(minBucketValue.getName(), equalTo("min_bucket")); + assertThat(minBucketValue.value(), equalTo(minValue)); + assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); + } + } + + @Test + public void testNoBuckets() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(minBucket("min_bucket").setBucketsPaths("terms>sum")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket"); + assertThat(minBucketValue, notNullValue()); + assertThat(minBucketValue.getName(), equalTo("min_bucket")); + assertThat(minBucketValue.value(), equalTo(Double.POSITIVE_INFINITY)); + assertThat(minBucketValue.keys(), equalTo(new String[0])); + } + + @Test + public void testNested() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(minBucket("min_histo_bucket").setBucketsPaths("histo>_count"))) + .addAggregation(minBucket("min_terms_bucket").setBucketsPaths("terms>min_histo_bucket")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + List minTermsKeys = new ArrayList<>(); + double minTermsValue = Double.POSITIVE_INFINITY; + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + List minHistoKeys = new ArrayList<>(); + double minHistoValue = Double.POSITIVE_INFINITY; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + if (bucket.getDocCount() < minHistoValue) { + minHistoValue = bucket.getDocCount(); + minHistoKeys = new ArrayList<>(); + minHistoKeys.add(bucket.getKeyAsString()); + } else if (bucket.getDocCount() == minHistoValue) { + minHistoKeys.add(bucket.getKeyAsString()); + } + } + + InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_histo_bucket"); + assertThat(minBucketValue, notNullValue()); + assertThat(minBucketValue.getName(), equalTo("min_histo_bucket")); + assertThat(minBucketValue.value(), equalTo(minHistoValue)); + assertThat(minBucketValue.keys(), equalTo(minHistoKeys.toArray(new String[minHistoKeys.size()]))); + if (minHistoValue < minTermsValue) { + minTermsValue = minHistoValue; + minTermsKeys = new ArrayList<>(); + minTermsKeys.add(termsBucket.getKeyAsString()); + } else if (minHistoValue == minTermsValue) { + minTermsKeys.add(termsBucket.getKeyAsString()); + } + } + + InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_terms_bucket"); + assertThat(minBucketValue, notNullValue()); + assertThat(minBucketValue.getName(), equalTo("min_terms_bucket")); + assertThat(minBucketValue.value(), equalTo(minTermsValue)); + assertThat(minBucketValue.keys(), equalTo(minTermsKeys.toArray(new String[minTermsKeys.size()]))); + } +}