Aggregations: min_bucket aggregation

An aggregation to calculate the minimum value in a set of buckets.

Closes #9999
This commit is contained in:
Colin Goodheart-Smithe 2015-04-30 11:24:36 +01:00
parent 77ac4528fb
commit d16bf992a9
12 changed files with 839 additions and 6 deletions

View File

@ -2,4 +2,5 @@
include::reducer/derivative-aggregation.asciidoc[] include::reducer/derivative-aggregation.asciidoc[]
include::reducer/max-bucket-aggregation.asciidoc[] include::reducer/max-bucket-aggregation.asciidoc[]
include::reducer/min-bucket-aggregation.asciidoc[]
include::reducer/movavg-aggregation.asciidoc[] include::reducer/movavg-aggregation.asciidoc[]

View File

@ -0,0 +1,82 @@
[[search-aggregations-reducer-min-bucket-aggregation]]
=== Max Bucket Aggregation
A sibling reducer aggregation which identifies the bucket(s) with the minimum value of a specified metric in a sibling aggregation
and outputs both the value and the key(s) of the bucket(s). The specified metric must be numeric and the sibling aggregation must
be a multi-bucket aggregation.
The following snippet calculates the minimum of the total monthly `sales`:
[source,js]
--------------------------------------------------
{
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"min_monthly_sales": {
"min_bucket": {
"buckets_paths": "sales_per_month>sales" <1>
}
}
}
}
--------------------------------------------------
<1> `bucket_paths` instructs this max_bucket aggregation that we want the minimum value of the `sales` aggregation in the
`sales_per_month` date histogram.
And the following may be the response:
[source,js]
--------------------------------------------------
{
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375
}
}
]
},
"min_monthly_sales": {
"keys": ["2015/02/01 00:00:00"], <1>
"value": 60
}
}
}
--------------------------------------------------
<1> `keys` is an array of strings since the minimum value may be present in multiple buckets

View File

@ -57,7 +57,8 @@ import org.elasticsearch.search.aggregations.metrics.sum.SumParser;
import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser; import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser; import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;
import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketParser; import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketParser;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketParser;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelModule; import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelModule;
@ -107,6 +108,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
reducerParsers.add(DerivativeParser.class); reducerParsers.add(DerivativeParser.class);
reducerParsers.add(MaxBucketParser.class); reducerParsers.add(MaxBucketParser.class);
reducerParsers.add(MinBucketParser.class);
reducerParsers.add(MovAvgParser.class); reducerParsers.add(MovAvgParser.class);
} }

View File

@ -61,7 +61,8 @@ import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount; import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue; import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue; import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketReducer; import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketReducer;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketReducer;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer;
import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule; import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule;
@ -118,6 +119,7 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
InternalSimpleValue.registerStreams(); InternalSimpleValue.registerStreams();
InternalBucketMetricValue.registerStreams(); InternalBucketMetricValue.registerStreams();
MaxBucketReducer.registerStreams(); MaxBucketReducer.registerStreams();
MinBucketReducer.registerStreams();
MovAvgReducer.registerStreams(); MovAvgReducer.registerStreams();
} }

View File

@ -19,7 +19,8 @@
package org.elasticsearch.search.aggregations.reducers; package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.MaxBucketBuilder; import org.elasticsearch.search.aggregations.reducers.bucketmetrics.max.MaxBucketBuilder;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.min.MinBucketBuilder;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgBuilder; import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgBuilder;
@ -36,6 +37,10 @@ public final class ReducerBuilders {
return new MaxBucketBuilder(name); return new MaxBucketBuilder(name);
} }
public static final MinBucketBuilder minBucket(String name) {
return new MinBucketBuilder(name);
}
public static final MovAvgBuilder movingAvg(String name) { public static final MovAvgBuilder movingAvg(String name) {
return new MovAvgBuilder(name); return new MovAvgBuilder(name);
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.search.aggregations.reducers.bucketmetrics; package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.search.aggregations.reducers.bucketmetrics; package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.search.aggregations.reducers.bucketmetrics; package org.elasticsearch.search.aggregations.reducers.bucketmetrics.max;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -32,6 +32,7 @@ import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers; import org.elasticsearch.search.aggregations.reducers.BucketHelpers;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory; import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.reducers.ReducerStreams; import org.elasticsearch.search.aggregations.reducers.ReducerStreams;

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
import java.io.IOException;
public class MinBucketBuilder extends ReducerBuilder<MinBucketBuilder> {
private String format;
private GapPolicy gapPolicy;
public MinBucketBuilder(String name) {
super(name, MinBucketReducer.TYPE.name());
}
public MinBucketBuilder format(String format) {
this.format = format;
return this;
}
public MinBucketBuilder gapPolicy(GapPolicy gapPolicy) {
this.gapPolicy = gapPolicy;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(MinBucketParser.FORMAT.getPreferredName(), format);
}
if (gapPolicy != null) {
builder.field(DerivativeParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
}
return builder;
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class MinBucketParser implements Reducer.Parser {
public static final ParseField FORMAT = new ParseField("format");
@Override
public String type() {
return MinBucketReducer.TYPE.name();
}
@Override
public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
XContentParser.Token token;
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
GapPolicy gapPolicy = GapPolicy.SKIP;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_STRING) {
if (FORMAT.match(currentFieldName)) {
format = parser.text();
} else if (BUCKETS_PATH.match(currentFieldName)) {
bucketsPaths = new String[] { parser.text() };
} else if (GAP_POLICY.match(currentFieldName)) {
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (BUCKETS_PATH.match(currentFieldName)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].",
parser.getTokenLocation());
}
}
if (bucketsPaths == null) {
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for derivative aggregation [" + reducerName + "]", parser.getTokenLocation());
}
ValueFormatter formatter = null;
if (format != null) {
formatter = ValueFormat.Patternable.Number.format(format).formatter();
}
return new MinBucketReducer.Factory(reducerName, bucketsPaths, gapPolicy, formatter);
}
}

View File

@ -0,0 +1,151 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.bucketmetrics.min;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.reducers.ReducerStreams;
import org.elasticsearch.search.aggregations.reducers.SiblingReducer;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class MinBucketReducer extends SiblingReducer {
public final static Type TYPE = new Type("min_bucket");
public final static ReducerStreams.Stream STREAM = new ReducerStreams.Stream() {
@Override
public MinBucketReducer readResult(StreamInput in) throws IOException {
MinBucketReducer result = new MinBucketReducer();
result.readFrom(in);
return result;
}
};
private ValueFormatter formatter;
private GapPolicy gapPolicy;
public static void registerStreams() {
ReducerStreams.registerStream(STREAM, TYPE.stream());
}
private MinBucketReducer() {
}
protected MinBucketReducer(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter,
Map<String, Object> metaData) {
super(name, bucketsPaths, metaData);
this.gapPolicy = gapPolicy;
this.formatter = formatter;
}
@Override
public Type type() {
return TYPE;
}
public InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) {
List<String> minBucketKeys = new ArrayList<>();
double minValue = Double.POSITIVE_INFINITY;
List<String> bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList();
for (Aggregation aggregation : aggregations) {
if (aggregation.getName().equals(bucketsPath.get(0))) {
bucketsPath = bucketsPath.subList(1, bucketsPath.size());
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Bucket bucket = buckets.get(i);
Double bucketValue = BucketHelpers.resolveBucketValue(multiBucketsAgg, bucket, bucketsPath, gapPolicy);
if (bucketValue != null) {
if (bucketValue < minValue) {
minBucketKeys.clear();
minBucketKeys.add(bucket.getKeyAsString());
minValue = bucketValue;
} else if (bucketValue.equals(minValue)) {
minBucketKeys.add(bucket.getKeyAsString());
}
}
}
}
}
String[] keys = minBucketKeys.toArray(new String[minBucketKeys.size()]);
return new InternalBucketMetricValue(name(), keys, minValue, formatter, Collections.EMPTY_LIST, metaData());
}
@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in);
gapPolicy = GapPolicy.readFrom(in);
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(formatter, out);
gapPolicy.writeTo(out);
}
public static class Factory extends ReducerFactory {
private final ValueFormatter formatter;
private final GapPolicy gapPolicy;
public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, @Nullable ValueFormatter formatter) {
super(name, TYPE.name(), bucketsPaths);
this.gapPolicy = gapPolicy;
this.formatter = formatter;
}
@Override
protected Reducer createInternal(Map<String, Object> metaData) throws IOException {
return new MinBucketReducer(name, bucketsPaths, gapPolicy, formatter, metaData);
}
@Override
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List<ReducerFactory> reducerFactories) {
if (bucketsPaths.length != 1) {
throw new IllegalStateException(Reducer.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for reducer [" + name + "]");
}
}
}
}

View File

@ -0,0 +1,433 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.reducers.bucketmetrics.InternalBucketMetricValue;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.reducers.ReducerBuilders.minBucket;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.IsNull.notNullValue;
@ElasticsearchIntegrationTest.SuiteScopeTest
public class MinBucketTests extends ElasticsearchIntegrationTest {
private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
static int numDocs;
static int interval;
static int minRandomValue;
static int maxRandomValue;
static int numValueBuckets;
static long[] valueCounts;
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
numDocs = randomIntBetween(6, 20);
interval = randomIntBetween(2, 5);
minRandomValue = 0;
maxRandomValue = 20;
numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1;
valueCounts = new long[numValueBuckets];
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
int fieldValue = randomIntBetween(minRandomValue, maxRandomValue);
builders.add(client().prepareIndex("idx", "type").setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval))
.endObject()));
final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1);
valueCounts[bucket]++;
}
assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
for (int i = 0; i < 2; i++) {
builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()));
}
indexRandom(true, builders);
ensureSearchable();
}
@Test
public void testDocCount_topLevel() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.addAggregation(minBucket("min_bucket").setBucketsPaths("histo>_count")).execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
List<String> minKeys = new ArrayList<>();
double minValue = Double.POSITIVE_INFINITY;
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
assertThat(bucket.getDocCount(), equalTo(valueCounts[i]));
if (bucket.getDocCount() < minValue) {
minValue = bucket.getDocCount();
minKeys = new ArrayList<>();
minKeys.add(bucket.getKeyAsString());
} else if (bucket.getDocCount() == minValue) {
minKeys.add(bucket.getKeyAsString());
}
}
InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket");
assertThat(minBucketValue, notNullValue());
assertThat(minBucketValue.getName(), equalTo("min_bucket"));
assertThat(minBucketValue.value(), equalTo(minValue));
assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()])));
}
@Test
public void testDocCount_asSubAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(minBucket("min_bucket").setBucketsPaths("histo>_count"))).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
List<String> minKeys = new ArrayList<>();
double minValue = Double.POSITIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
if (bucket.getDocCount() < minValue) {
minValue = bucket.getDocCount();
minKeys = new ArrayList<>();
minKeys.add(bucket.getKeyAsString());
} else if (bucket.getDocCount() == minValue) {
minKeys.add(bucket.getKeyAsString());
}
}
InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket");
assertThat(minBucketValue, notNullValue());
assertThat(minBucketValue.getName(), equalTo("min_bucket"));
assertThat(minBucketValue.value(), equalTo(minValue));
assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()])));
}
}
@Test
public void testMetric_topLevel() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(minBucket("min_bucket").setBucketsPaths("terms>sum")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(interval));
List<String> minKeys = new ArrayList<>();
double minValue = Double.POSITIVE_INFINITY;
for (int i = 0; i < interval; ++i) {
Terms.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval)));
assertThat(bucket.getDocCount(), greaterThan(0l));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
if (sum.value() < minValue) {
minValue = sum.value();
minKeys = new ArrayList<>();
minKeys.add(bucket.getKeyAsString());
} else if (sum.value() == minValue) {
minKeys.add(bucket.getKeyAsString());
}
}
InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket");
assertThat(minBucketValue, notNullValue());
assertThat(minBucketValue.getName(), equalTo("min_bucket"));
assertThat(minBucketValue.value(), equalTo(minValue));
assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()])));
}
@Test
public void testMetric_asSubAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(minBucket("min_bucket").setBucketsPaths("histo>sum"))).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
List<String> minKeys = new ArrayList<>();
double minValue = Double.POSITIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
if (bucket.getDocCount() != 0) {
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
if (sum.value() < minValue) {
minValue = sum.value();
minKeys = new ArrayList<>();
minKeys.add(bucket.getKeyAsString());
} else if (sum.value() == minValue) {
minKeys.add(bucket.getKeyAsString());
}
}
}
InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket");
assertThat(minBucketValue, notNullValue());
assertThat(minBucketValue.getName(), equalTo("min_bucket"));
assertThat(minBucketValue.value(), equalTo(minValue));
assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()])));
}
}
@Test
public void testMetric_asSubAggWithInsertZeros() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(minBucket("min_bucket").setBucketsPaths("histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
List<String> minKeys = new ArrayList<>();
double minValue = Double.POSITIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
if (sum.value() < minValue) {
minValue = sum.value();
minKeys = new ArrayList<>();
minKeys.add(bucket.getKeyAsString());
} else if (sum.value() == minValue) {
minKeys.add(bucket.getKeyAsString());
}
}
InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket");
assertThat(minBucketValue, notNullValue());
assertThat(minBucketValue.getName(), equalTo("min_bucket"));
assertThat(minBucketValue.value(), equalTo(minValue));
assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()])));
}
}
@Test
public void testNoBuckets() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(minBucket("min_bucket").setBucketsPaths("terms>sum")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(0));
InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket");
assertThat(minBucketValue, notNullValue());
assertThat(minBucketValue.getName(), equalTo("min_bucket"));
assertThat(minBucketValue.value(), equalTo(Double.POSITIVE_INFINITY));
assertThat(minBucketValue.keys(), equalTo(new String[0]));
}
@Test
public void testNested() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(minBucket("min_histo_bucket").setBucketsPaths("histo>_count")))
.addAggregation(minBucket("min_terms_bucket").setBucketsPaths("terms>min_histo_bucket")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
List<String> minTermsKeys = new ArrayList<>();
double minTermsValue = Double.POSITIVE_INFINITY;
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
List<String> minHistoKeys = new ArrayList<>();
double minHistoValue = Double.POSITIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
if (bucket.getDocCount() < minHistoValue) {
minHistoValue = bucket.getDocCount();
minHistoKeys = new ArrayList<>();
minHistoKeys.add(bucket.getKeyAsString());
} else if (bucket.getDocCount() == minHistoValue) {
minHistoKeys.add(bucket.getKeyAsString());
}
}
InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_histo_bucket");
assertThat(minBucketValue, notNullValue());
assertThat(minBucketValue.getName(), equalTo("min_histo_bucket"));
assertThat(minBucketValue.value(), equalTo(minHistoValue));
assertThat(minBucketValue.keys(), equalTo(minHistoKeys.toArray(new String[minHistoKeys.size()])));
if (minHistoValue < minTermsValue) {
minTermsValue = minHistoValue;
minTermsKeys = new ArrayList<>();
minTermsKeys.add(termsBucket.getKeyAsString());
} else if (minHistoValue == minTermsValue) {
minTermsKeys.add(termsBucket.getKeyAsString());
}
}
InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_terms_bucket");
assertThat(minBucketValue, notNullValue());
assertThat(minBucketValue.getName(), equalTo("min_terms_bucket"));
assertThat(minBucketValue.value(), equalTo(minTermsValue));
assertThat(minBucketValue.keys(), equalTo(minTermsKeys.toArray(new String[minTermsKeys.size()])));
}
}