Aggregations: bucket_sort pipeline aggregation (#27152)

This commit adds a parent pipeline aggregation that allows
sorting the buckets of a parent multi-bucket aggregation.

The aggregation also offers [from] and [size] parameters
in order to truncate the result as desired.

Closes #14928
This commit is contained in:
Dimitris Athanasiou 2017-11-09 17:59:57 +00:00 committed by GitHub
parent d22fd4ea58
commit 66bef26495
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1112 additions and 1 deletions

View File

@ -202,6 +202,8 @@ import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptP
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
@ -496,6 +498,11 @@ public class SearchModule {
BucketSelectorPipelineAggregationBuilder::new,
BucketSelectorPipelineAggregator::new,
BucketSelectorPipelineAggregationBuilder::parse));
registerPipelineAggregation(new PipelineAggregationSpec(
BucketSortPipelineAggregationBuilder.NAME,
BucketSortPipelineAggregationBuilder::new,
BucketSortPipelineAggregator::new,
BucketSortPipelineAggregationBuilder::parse));
registerPipelineAggregation(new PipelineAggregationSpec(
SerialDiffPipelineAggregationBuilder.NAME,
SerialDiffPipelineAggregationBuilder::new,

View File

@ -49,7 +49,7 @@ public class BucketHelpers {
* function.
*
* "insert_zeros": empty buckets will be filled with zeros for all metrics
* "ignore": empty buckets will simply be ignored
* "skip": empty buckets will simply be ignored
*/
public enum GapPolicy {
INSERT_ZEROS((byte) 0, "insert_zeros"), SKIP((byte) 1, "skip");

View File

@ -29,11 +29,14 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.stats.extend
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import java.util.List;
import java.util.Map;
public final class PipelineAggregatorBuilders {
@ -99,6 +102,10 @@ public final class PipelineAggregatorBuilders {
return new BucketSelectorPipelineAggregationBuilder(name, script, bucketsPaths);
}
public static BucketSortPipelineAggregationBuilder bucketSort(String name, List<FieldSortBuilder> sorts) {
return new BucketSortPipelineAggregationBuilder(name, sorts);
}
public static CumulativeSumPipelineAggregationBuilder cumulativeSum(String name,
String bucketsPath) {
return new CumulativeSumPipelineAggregationBuilder(name, bucketsPath);

View File

@ -0,0 +1,195 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketsort;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
/**
* Builds a pipeline aggregation that allows sorting the buckets of its parent
* aggregation. The bucket {@code _key}, {@code _count} or sub-aggregations may be used as sort
* keys. Parameters {@code from} and {@code size} may also be set in order to truncate the
* result bucket list.
*/
public class BucketSortPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<BucketSortPipelineAggregationBuilder> {
public static final String NAME = "bucket_sort";
private static final ParseField FROM = new ParseField("from");
private static final ParseField SIZE = new ParseField("size");
public static final ConstructingObjectParser<BucketSortPipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(NAME,
false, (a, context) -> new BucketSortPipelineAggregationBuilder(context, (List<FieldSortBuilder>) a[0]));
static {
PARSER.declareField(optionalConstructorArg(), (p, c) -> {
List<SortBuilder<?>> sorts = SortBuilder.fromXContent(p);
List<FieldSortBuilder> fieldSorts = new ArrayList<>(sorts.size());
for (SortBuilder<?> sort : sorts) {
if (sort instanceof FieldSortBuilder == false) {
throw new IllegalArgumentException("[" + NAME + "] only supports field based sorting; incompatible sort: ["
+ sort + "]");
}
fieldSorts.add((FieldSortBuilder) sort);
}
return fieldSorts;
}, SearchSourceBuilder.SORT_FIELD,
ObjectParser.ValueType.OBJECT_ARRAY);
PARSER.declareInt(BucketSortPipelineAggregationBuilder::from, FROM);
PARSER.declareInt(BucketSortPipelineAggregationBuilder::size, SIZE);
PARSER.declareField(BucketSortPipelineAggregationBuilder::gapPolicy, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return GapPolicy.parse(p.text().toLowerCase(Locale.ROOT), p.getTokenLocation());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, GAP_POLICY, ObjectParser.ValueType.STRING);
}
private List<FieldSortBuilder> sorts = Collections.emptyList();
private int from = 0;
private Integer size;
private GapPolicy gapPolicy = GapPolicy.SKIP;
public BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder> sorts) {
super(name, NAME, sorts == null ? new String[0] : sorts.stream().map(s -> s.getFieldName()).toArray(String[]::new));
this.sorts = sorts == null ? Collections.emptyList() : sorts;
}
/**
* Read from a stream.
*/
public BucketSortPipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
sorts = in.readList(FieldSortBuilder::new);
from = in.readVInt();
size = in.readOptionalVInt();
gapPolicy = GapPolicy.readFrom(in);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeList(sorts);
out.writeVInt(from);
out.writeOptionalVInt(size);
gapPolicy.writeTo(out);
}
public BucketSortPipelineAggregationBuilder from(int from) {
if (from < 0) {
throw new IllegalArgumentException("[" + FROM.getPreferredName() + "] must be a non-negative integer: [" + from + "]");
}
this.from = from;
return this;
}
public BucketSortPipelineAggregationBuilder size(Integer size) {
if (size != null && size <= 0) {
throw new IllegalArgumentException("[" + SIZE.getPreferredName() + "] must be a positive integer: [" + size + "]");
}
this.size = size;
return this;
}
public BucketSortPipelineAggregationBuilder gapPolicy(GapPolicy gapPolicy) {
if (gapPolicy == null) {
throw new IllegalArgumentException("[" + GAP_POLICY.getPreferredName() + "] must not be null: [" + name + "]");
}
this.gapPolicy = gapPolicy;
return this;
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new BucketSortPipelineAggregator(name, sorts, from, size, gapPolicy, metaData);
}
@Override
public void doValidate(AggregatorFactory<?> parent, List<AggregationBuilder> aggFactories,
List<PipelineAggregationBuilder> pipelineAggregatoractories) {
if (sorts.isEmpty() && size == null && from == 0) {
throw new IllegalStateException("[" + name + "] is configured to perform nothing. Please set either of "
+ Arrays.asList(SearchSourceBuilder.SORT_FIELD.getPreferredName(), SIZE.getPreferredName(), FROM.getPreferredName())
+ " to use " + NAME);
}
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(SearchSourceBuilder.SORT_FIELD.getPreferredName(), sorts);
builder.field(FROM.getPreferredName(), from);
if (size != null) {
builder.field(SIZE.getPreferredName(), size);
}
builder.field(GAP_POLICY.getPreferredName(), gapPolicy);
return builder;
}
public static BucketSortPipelineAggregationBuilder parse(String reducerName, XContentParser parser) throws IOException {
return PARSER.parse(parser, reducerName);
}
@Override
protected boolean overrideBucketsPath() {
return true;
}
@Override
protected int doHashCode() {
return Objects.hash(sorts, from, size, gapPolicy);
}
@Override
protected boolean doEquals(Object obj) {
BucketSortPipelineAggregationBuilder other = (BucketSortPipelineAggregationBuilder) obj;
return Objects.equals(sorts, other.sorts)
&& Objects.equals(from, other.from)
&& Objects.equals(size, other.size)
&& Objects.equals(gapPolicy, other.gapPolicy);
}
@Override
public String getWriteableName() {
return NAME;
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketsort;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class BucketSortPipelineAggregator extends PipelineAggregator {
private final List<FieldSortBuilder> sorts;
private final int from;
private final Integer size;
private final GapPolicy gapPolicy;
public BucketSortPipelineAggregator(String name, List<FieldSortBuilder> sorts, int from, Integer size, GapPolicy gapPolicy,
Map<String, Object> metadata) {
super(name, sorts.stream().map(s -> s.getFieldName()).toArray(String[]::new), metadata);
this.sorts = sorts;
this.from = from;
this.size = size;
this.gapPolicy = gapPolicy;
}
/**
* Read from a stream.
*/
public BucketSortPipelineAggregator(StreamInput in) throws IOException {
super(in);
sorts = in.readList(FieldSortBuilder::new);
from = in.readVInt();
size = in.readOptionalVInt();
gapPolicy = GapPolicy.readFrom(in);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeList(sorts);
out.writeVInt(from);
out.writeOptionalVInt(size);
gapPolicy.writeTo(out);
}
@Override
public String getWriteableName() {
return BucketSortPipelineAggregationBuilder.NAME;
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg =
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = originalAgg.getBuckets();
int bucketsCount = buckets.size();
int currentSize = size == null ? bucketsCount : size;
if (from >= bucketsCount) {
return originalAgg.create(Collections.emptyList());
}
// If no sorting needs to take place, we just truncate and return
if (sorts.size() == 0) {
return originalAgg.create(new ArrayList<>(buckets.subList(from, Math.min(from + currentSize, bucketsCount))));
}
int queueSize = Math.min(from + currentSize, bucketsCount);
PriorityQueue<ComparableBucket> ordered = new TopNPriorityQueue(queueSize);
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
ComparableBucket comparableBucket = new ComparableBucket(originalAgg, bucket);
if (comparableBucket.skip() == false) {
ordered.insertWithOverflow(new ComparableBucket(originalAgg, bucket));
}
}
int resultSize = Math.max(ordered.size() - from, 0);
// Popping from the priority queue returns the least element. The elements we want to skip due to offset would pop last.
// Thus, we just have to pop as many elements as we expect in results and store them in reverse order.
LinkedList<InternalMultiBucketAggregation.InternalBucket> newBuckets = new LinkedList<>();
for (int i = 0; i < resultSize; ++i) {
newBuckets.addFirst(ordered.pop().internalBucket);
}
return originalAgg.create(newBuckets);
}
private class ComparableBucket implements Comparable<ComparableBucket> {
private final MultiBucketsAggregation parentAgg;
private final InternalMultiBucketAggregation.InternalBucket internalBucket;
private final Map<FieldSortBuilder, Comparable<Object>> sortValues;
private ComparableBucket(MultiBucketsAggregation parentAgg, InternalMultiBucketAggregation.InternalBucket internalBucket) {
this.parentAgg = parentAgg;
this.internalBucket = internalBucket;
this.sortValues = resolveAndCacheSortValues();
}
private Map<FieldSortBuilder, Comparable<Object>> resolveAndCacheSortValues() {
Map<FieldSortBuilder, Comparable<Object>> resolved = new HashMap<>();
for (FieldSortBuilder sort : sorts) {
String sortField = sort.getFieldName();
if ("_key".equals(sortField)) {
resolved.put(sort, (Comparable<Object>) internalBucket.getKey());
} else {
Double bucketValue = BucketHelpers.resolveBucketValue(parentAgg, internalBucket, sortField, gapPolicy);
if (GapPolicy.SKIP == gapPolicy && Double.isNaN(bucketValue)) {
continue;
}
resolved.put(sort, (Comparable<Object>) (Object) bucketValue);
}
}
return resolved;
}
/**
* Whether the bucket should be skipped due to the gap policy
*/
private boolean skip() {
return sortValues.isEmpty();
}
@Override
public int compareTo(ComparableBucket that) {
int compareResult = 0;
for (FieldSortBuilder sort : sorts) {
Comparable<Object> thisValue = this.sortValues.get(sort);
Comparable<Object> thatValue = that.sortValues.get(sort);
if (thisValue == null && thatValue == null) {
continue;
} else if (thisValue == null) {
return -1;
} else if (thatValue == null) {
return 1;
} else {
compareResult = sort.order() == SortOrder.DESC ? thisValue.compareTo(thatValue) : -thisValue.compareTo(thatValue);
}
if (compareResult != 0) {
break;
}
}
return compareResult;
}
}
private static class TopNPriorityQueue extends PriorityQueue<ComparableBucket> {
private TopNPriorityQueue(int n) {
super(n, false);
}
@Override
protected boolean lessThan(ComparableBucket a, ComparableBucket b) {
return a.compareTo(b) < 0;
}
}
}

View File

@ -0,0 +1,437 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketsort;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.bucketSort;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.SuiteScopeTestCase
public class BucketSortIT extends ESIntegTestCase {
private static final String INDEX = "bucket-sort-it-data-index";
private static final String INDEX_WITH_GAPS = "bucket-sort-it-data-index-with-gaps";
private static final String TIME_FIELD = "time";
private static final String TERM_FIELD = "foo";
private static final String VALUE_1_FIELD = "value_1";
private static final String VALUE_2_FIELD = "value_2";
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex(INDEX, INDEX_WITH_GAPS);
client().admin().indices().preparePutMapping(INDEX)
.setType("doc")
.setSource("time", "type=date", "foo", "type=keyword", "value_1", "type=float", "value_2", "type=float")
.get();
int numTerms = 10;
List<String> terms = new ArrayList<>(numTerms);
for (int i = 0; i < numTerms; ++i) {
terms.add(randomAlphaOfLengthBetween(3, 8));
}
long now = System.currentTimeMillis();
long time = now - TimeValue.timeValueHours(24).millis();
List<IndexRequestBuilder> builders = new ArrayList<>();
while (time < now) {
for (String term : terms) {
int termCount = randomIntBetween(3, 6);
for (int i = 0; i < termCount; ++i) {
builders.add(client().prepareIndex(INDEX, "doc")
.setSource(newDocBuilder(time, term, randomIntBetween(1, 10) * randomDouble())));
}
}
time += TimeValue.timeValueHours(1).millis();
}
builders.add(client().prepareIndex(INDEX_WITH_GAPS, "doc").setSource(newDocBuilder(1, "foo", 1.0, 42.0)));
builders.add(client().prepareIndex(INDEX_WITH_GAPS, "doc").setSource(newDocBuilder(2, "foo", null, 42.0)));
builders.add(client().prepareIndex(INDEX_WITH_GAPS, "doc").setSource(newDocBuilder(3, "foo", 3.0, 42.0)));
indexRandom(true, builders);
ensureSearchable();
}
private XContentBuilder newDocBuilder(long timeMillis, String fooValue, Double value1) throws IOException {
return newDocBuilder(timeMillis, fooValue, value1, null);
}
private XContentBuilder newDocBuilder(long timeMillis, String fooValue, Double value1, Double value2) throws IOException {
XContentBuilder jsonBuilder = jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(TIME_FIELD, timeMillis);
jsonBuilder.field(TERM_FIELD, fooValue);
if (value1 != null) {
jsonBuilder.field(VALUE_1_FIELD, value1);
}
if (value2 != null) {
jsonBuilder.field(VALUE_2_FIELD, value2);
}
jsonBuilder.endObject();
return jsonBuilder;
}
public void testEmptyBucketSort() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(dateHistogram("time_buckets").field(TIME_FIELD).interval(TimeValue.timeValueHours(1).millis()))
.execute().actionGet();
assertSearchResponse(response);
Histogram histogram = response.getAggregations().get("time_buckets");
assertThat(histogram, notNullValue());
// These become our baseline
List<? extends Histogram.Bucket> timeBuckets = histogram.getBuckets();
DateTime previousKey = (DateTime) timeBuckets.get(0).getKey();
for (Histogram.Bucket timeBucket : timeBuckets) {
assertThat(previousKey, lessThanOrEqualTo((DateTime) timeBucket.getKey()));
previousKey = (DateTime) timeBucket.getKey();
}
// Now let's test using size
response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(dateHistogram("time_buckets").field(TIME_FIELD).interval(TimeValue.timeValueHours(1).millis())
.subAggregation(bucketSort("bucketSort", Collections.emptyList()).size(3)))
.execute().actionGet();
assertSearchResponse(response);
Histogram size3Histogram = response.getAggregations().get("time_buckets");
assertThat(size3Histogram, notNullValue());
List<? extends Histogram.Bucket> size3TimeBuckets = size3Histogram.getBuckets();
for (int i = 0; i < size3TimeBuckets.size(); ++i) {
assertThat(size3TimeBuckets.get(i).getKey(), equalTo(timeBuckets.get(i).getKey()));
}
// Finally, let's test using size + from
response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(dateHistogram("time_buckets").field(TIME_FIELD).interval(TimeValue.timeValueHours(1).millis())
.subAggregation(bucketSort("bucketSort", Collections.emptyList()).size(3).from(2)))
.execute().actionGet();
assertSearchResponse(response);
Histogram size3From2Histogram = response.getAggregations().get("time_buckets");
assertThat(size3From2Histogram, notNullValue());
List<? extends Histogram.Bucket> size3From2TimeBuckets = size3From2Histogram.getBuckets();
for (int i = 0; i < size3From2TimeBuckets.size(); ++i) {
assertThat(size3From2TimeBuckets.get(i).getKey(), equalTo(timeBuckets.get(i + 2).getKey()));
}
}
public void testSortTermsOnKey() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(bucketSort("bucketSort", Arrays.asList(new FieldSortBuilder("_key")))))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("foos");
assertThat(terms, notNullValue());
List<? extends Terms.Bucket> termsBuckets = terms.getBuckets();
String previousKey = (String) termsBuckets.get(0).getKey();
for (Terms.Bucket termBucket : termsBuckets) {
assertThat(previousKey, lessThanOrEqualTo((String) termBucket.getKey()));
previousKey = (String) termBucket.getKey();
}
}
public void testSortTermsOnSubAggregation() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(avg("avg_value").field(VALUE_1_FIELD))
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("avg_value").order(SortOrder.DESC)))))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("foos");
assertThat(terms, notNullValue());
List<? extends Terms.Bucket> termsBuckets = terms.getBuckets();
double previousAvgValue = ((Avg) termsBuckets.get(0).getAggregations().get("avg_value")).getValue();
for (Terms.Bucket termBucket : termsBuckets) {
Avg avg = termBucket.getAggregations().get("avg_value");
assertThat(avg, notNullValue());
assertThat(previousAvgValue, greaterThanOrEqualTo(avg.getValue()));
previousAvgValue = avg.getValue();
}
response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(avg("avg_value").field(VALUE_1_FIELD))
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("avg_value").order(SortOrder.DESC))).size(2).from(3)))
.execute().actionGet();
assertSearchResponse(response);
Terms size2From3Terms = response.getAggregations().get("foos");
assertThat(size2From3Terms, notNullValue());
List<? extends Terms.Bucket> size2From3TermsBuckets = size2From3Terms.getBuckets();
for (int i = 0; i < size2From3TermsBuckets.size(); ++i) {
assertThat(size2From3TermsBuckets.get(i).getKey(), equalTo(termsBuckets.get(i + 3).getKey()));
}
}
public void testSortTermsOnCountWithSecondarySort() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(avg("avg_value").field(VALUE_1_FIELD))
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("_count").order(SortOrder.ASC),
new FieldSortBuilder("avg_value").order(SortOrder.DESC)))))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("foos");
assertThat(terms, notNullValue());
List<? extends Terms.Bucket> termsBuckets = terms.getBuckets();
long previousCount = termsBuckets.get(0).getDocCount();
double previousAvgValue = ((Avg) termsBuckets.get(0).getAggregations().get("avg_value")).getValue();
for (Terms.Bucket termBucket : termsBuckets) {
Avg avg = termBucket.getAggregations().get("avg_value");
assertThat(avg, notNullValue());
assertThat(previousCount, lessThanOrEqualTo(termBucket.getDocCount()));
if (previousCount == termBucket.getDocCount()) {
assertThat(previousAvgValue, greaterThanOrEqualTo(avg.getValue()));
}
previousCount = termBucket.getDocCount();
previousAvgValue = avg.getValue();
}
}
public void testSortDateHistogramDescending() {
SearchResponse response = client().prepareSearch(INDEX)
.addAggregation(dateHistogram("time_buckets").field(TIME_FIELD).interval(TimeValue.timeValueHours(1).millis()))
.execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("time_buckets");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("time_buckets"));
List<? extends Histogram.Bucket> ascendingTimeBuckets = histo.getBuckets();
response = client().prepareSearch(INDEX)
.addAggregation(dateHistogram("time_buckets").field(TIME_FIELD).interval(TimeValue.timeValueHours(1).millis())
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("_key").order(SortOrder.DESC)))))
.execute().actionGet();
assertSearchResponse(response);
histo = response.getAggregations().get("time_buckets");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("time_buckets"));
List<? extends Histogram.Bucket> descendingTimeBuckets = histo.getBuckets();
assertThat(ascendingTimeBuckets.size(), equalTo(descendingTimeBuckets.size()));
int bucketCount = ascendingTimeBuckets.size();
for (int i = 0; i < bucketCount; ++i) {
assertThat(ascendingTimeBuckets.get(i).getKey(), equalTo(descendingTimeBuckets.get(bucketCount - i - 1).getKey()));
}
}
public void testSortHistogram_GivenGapsAndGapPolicyIsSkip() {
SearchResponse response = client().prepareSearch(INDEX_WITH_GAPS)
.addAggregation(histogram("time_buckets").field(TIME_FIELD).interval(1)
.subAggregation(avg("avg_value").field(VALUE_1_FIELD))
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("avg_value").order(SortOrder.DESC))).gapPolicy(
BucketHelpers.GapPolicy.SKIP)))
.execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("time_buckets");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("time_buckets"));
List<? extends Histogram.Bucket> timeBuckets = histo.getBuckets();
assertThat(timeBuckets.size(), equalTo(2));
assertThat(timeBuckets.get(0).getKey(), equalTo(3.0));
assertThat(timeBuckets.get(1).getKey(), equalTo(1.0));
}
public void testSortHistogram_GivenGapsAndGapPolicyIsSkipAndSizeIsLessThanAvailableBuckets() {
SearchResponse response = client().prepareSearch(INDEX_WITH_GAPS)
.addAggregation(histogram("time_buckets").field(TIME_FIELD).interval(1)
.subAggregation(avg("avg_value").field(VALUE_1_FIELD))
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("avg_value").order(SortOrder.DESC))).gapPolicy(
BucketHelpers.GapPolicy.SKIP).size(2)))
.execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("time_buckets");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("time_buckets"));
List<? extends Histogram.Bucket> timeBuckets = histo.getBuckets();
assertThat(timeBuckets.size(), equalTo(2));
assertThat(timeBuckets.get(0).getKey(), equalTo(3.0));
assertThat(timeBuckets.get(1).getKey(), equalTo(1.0));
}
public void testSortHistogram_GivenGapsAndGapPolicyIsSkipAndPrimarySortHasGaps() {
SearchResponse response = client().prepareSearch(INDEX_WITH_GAPS)
.addAggregation(histogram("time_buckets").field(TIME_FIELD).interval(1)
.subAggregation(avg("avg_value_1").field(VALUE_1_FIELD))
.subAggregation(avg("avg_value_2").field(VALUE_2_FIELD))
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("avg_value_1").order(SortOrder.DESC),
new FieldSortBuilder("avg_value_2").order(SortOrder.DESC))).gapPolicy(
BucketHelpers.GapPolicy.SKIP)))
.execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("time_buckets");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("time_buckets"));
List<? extends Histogram.Bucket> timeBuckets = histo.getBuckets();
assertThat(timeBuckets.size(), equalTo(3));
assertThat(timeBuckets.get(0).getKey(), equalTo(3.0));
assertThat(timeBuckets.get(1).getKey(), equalTo(1.0));
assertThat(timeBuckets.get(2).getKey(), equalTo(2.0));
}
public void testSortHistogram_GivenGapsAndGapPolicyIsSkipAndSecondarySortHasGaps() {
SearchResponse response = client().prepareSearch(INDEX_WITH_GAPS)
.addAggregation(histogram("time_buckets").field(TIME_FIELD).interval(1)
.subAggregation(avg("avg_value_1").field(VALUE_1_FIELD))
.subAggregation(avg("avg_value_2").field(VALUE_2_FIELD))
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("avg_value_2").order(SortOrder.DESC),
new FieldSortBuilder("avg_value_1").order(SortOrder.ASC))).gapPolicy(
BucketHelpers.GapPolicy.SKIP)))
.execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("time_buckets");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("time_buckets"));
List<? extends Histogram.Bucket> timeBuckets = histo.getBuckets();
assertThat(timeBuckets.size(), equalTo(3));
assertThat(timeBuckets.get(0).getKey(), equalTo(1.0));
assertThat(timeBuckets.get(1).getKey(), equalTo(3.0));
assertThat(timeBuckets.get(2).getKey(), equalTo(2.0));
}
public void testSortHistogram_GivenGapsAndGapPolicyIsInsertZeros() {
SearchResponse response = client().prepareSearch(INDEX_WITH_GAPS)
.addAggregation(histogram("time_buckets").field(TIME_FIELD).interval(1)
.subAggregation(avg("avg_value").field(VALUE_1_FIELD))
.subAggregation(bucketSort("bucketSort", Arrays.asList(
new FieldSortBuilder("avg_value").order(SortOrder.DESC))).gapPolicy(
BucketHelpers.GapPolicy.INSERT_ZEROS)))
.execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("time_buckets");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("time_buckets"));
List<? extends Histogram.Bucket> timeBuckets = histo.getBuckets();
assertThat(timeBuckets.size(), equalTo(3));
assertThat(timeBuckets.get(0).getKey(), equalTo(3.0));
assertThat(timeBuckets.get(1).getKey(), equalTo(1.0));
assertThat(timeBuckets.get(2).getKey(), equalTo(2.0));
}
public void testEmptyBuckets() {
SearchResponse response = client().prepareSearch(INDEX)
.setSize(0)
.setQuery(QueryBuilders.existsQuery("non-field"))
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(bucketSort("bucketSort", Arrays.asList(new FieldSortBuilder("_key")))))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("foos");
assertThat(terms, notNullValue());
List<? extends Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.isEmpty(), is(true));
}
public void testInvalidPath() {
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class,
() -> client().prepareSearch(INDEX)
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(bucketSort("bucketSort", Arrays.asList(new FieldSortBuilder("invalid")))))
.execute().actionGet());
assertThat(e.getCause().getMessage(), containsString("No aggregation found for path [invalid]"));
}
public void testNeitherSortsNorSizeSpecifiedAndFromIsDefault_ShouldThrowValidation() {
SearchPhaseExecutionException e = expectThrows(SearchPhaseExecutionException.class,
() -> client().prepareSearch(INDEX)
.addAggregation(terms("foos").field(TERM_FIELD)
.subAggregation(bucketSort("bucketSort", Collections.emptyList())))
.execute().actionGet());
assertThat(e.getCause().getMessage(), containsString("[bucketSort] is configured to perform nothing." +
" Please set either of [sort, size, from] to use bucket_sort"));
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketsort;
import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class BucketSortTests extends BasePipelineAggregationTestCase<BucketSortPipelineAggregationBuilder> {
@Override
protected BucketSortPipelineAggregationBuilder createTestAggregatorFactory() {
int sortCount = randomIntBetween(0, 5);
List<FieldSortBuilder> sorts = new ArrayList<>(sortCount);
for (int i = 0; i < sortCount; ++i) {
String sortField = randomAlphaOfLengthBetween(3, 20);
SortOrder sortOrder = randomFrom(SortOrder.values());
FieldSortBuilder fieldSortBuilder = new FieldSortBuilder(sortField);
fieldSortBuilder.order(sortOrder);
sorts.add(fieldSortBuilder);
}
BucketSortPipelineAggregationBuilder factory = new BucketSortPipelineAggregationBuilder(randomAlphaOfLengthBetween(3, 20), sorts);
Integer from = randomIntBetween(0, 20);
Integer size = randomBoolean() ? randomIntBetween(1, 1000) : null;
if (randomBoolean()) {
factory.from(from);
}
if (size != null) {
factory.size(size);
}
if (randomBoolean()) {
factory.gapPolicy(randomFrom(BucketHelpers.GapPolicy.values()));
}
// Check if the combination ended up being invalid
if (sorts.isEmpty() && size == null && from == 0) {
factory.size(42);
}
return factory;
}
public void testNegativeFrom() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> new BucketSortPipelineAggregationBuilder("foo", Collections.emptyList()).from(-1));
assertThat(e.getMessage(), equalTo("[from] must be a non-negative integer: [-1]"));
}
public void testNegativeSize() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> new BucketSortPipelineAggregationBuilder("foo", Collections.emptyList()).size(-1));
assertThat(e.getMessage(), equalTo("[size] must be a positive integer: [-1]"));
}
public void testZeroSize() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> new BucketSortPipelineAggregationBuilder("foo", Collections.emptyList()).size(0));
assertThat(e.getMessage(), equalTo("[size] must be a positive integer: [0]"));
}
public void testNullGapPolicy() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> new BucketSortPipelineAggregationBuilder("foo", Collections.emptyList()).gapPolicy(null));
assertThat(e.getMessage(), equalTo("[gap_policy] must not be null: [foo]"));
}
}

View File

@ -234,4 +234,5 @@ include::pipeline/movavg-aggregation.asciidoc[]
include::pipeline/cumulative-sum-aggregation.asciidoc[]
include::pipeline/bucket-script-aggregation.asciidoc[]
include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/bucket-sort-aggregation.asciidoc[]
include::pipeline/serial-diff-aggregation.asciidoc[]

View File

@ -0,0 +1,186 @@
[[search-aggregations-pipeline-bucket-sort-aggregation]]
=== Bucket Sort Aggregation
A parent pipeline aggregation which sorts the buckets of its parent multi-bucket aggregation.
Zero or more sort fields may be specified together with the corresponding sort order.
Each bucket may be sorted based on its `_key`, `_count` or its sub-aggregations.
In addition, parameters `from` and `size` may be set in order to truncate the result buckets.
NOTE: The `bucket_sort` aggregation, like all pipeline aggregations, is executed after all other non-pipeline aggregations.
This means the sorting only applies to whatever buckets are already returned from the parent aggregation. For example,
if the parent aggregation is `terms` and its `size` is set to `10`, the `bucket_sort` will only sort over those 10
returned term buckets.
==== Syntax
A `bucket_sort` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"bucket_sort": {
"sort": [
{"sort_field_1": {"order": "asc"},<1>
{"sort_field_2": {"order": "desc"},
"sort_field_3"
],
"from": 1,
"size": 3
}
}
--------------------------------------------------
// NOTCONSOLE
<1> Here, `sort_field_1` is the bucket path to the variable to be used as the primary sort and its order
is ascending.
.`bucket_sort` Parameters
|===
|Parameter Name |Description |Required |Default Value
|`sort` |The list of fields to sort on. See <<search-request-sort,`sort`>> for more details. |Optional |
|`from` |Buckets in positions prior to the set value will be truncated. |Optional | `0`
|`size` |The number of buckets to return. Defaults to all buckets of the parent aggregation. |Optional |
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
details)|Optional |`skip`
|===
The following snippet returns the buckets corresponding to the 3 months with the highest total sales in descending order:
[source,js]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"total_sales": {
"sum": {
"field": "price"
}
},
"sales_bucket_sort": {
"bucket_sort": {
"sort": [
{"total_sales": {"order": "desc"}}<1>
],
"size": 3<2>
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
<1> `sort` is set to use the values of `total_sales` in descending order
<2> `size` is set to `3` meaning only the top 3 months in `total_sales` will be returned
And the following may be the response:
[source,js]
--------------------------------------------------
{
"took": 82,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"total_sales": {
"value": 550.0
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"total_sales": {
"value": 375.0
},
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"total_sales": {
"value": 60.0
},
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 82/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
==== Truncating without sorting
It is also possible to use this aggregation in order to truncate the result buckets
without doing any sorting. To do so, just use the `from` and/or `size` parameters
without specifying `sort`.
The following example simply truncates the result so that only the second bucket is returned:
[source,js]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"bucket_truncate": {
"bucket_sort": {
"from": 1,
"size": 1
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
Response:
[source,js]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]