Aggregations: Sibling Pipeline Aggregations can now be nested in SingleBucketAggregations

Closes #11379
This commit is contained in:
Colin Goodheart-Smithe 2015-05-27 16:58:29 +01:00
parent acb07c72b9
commit 95faa35853
3 changed files with 94 additions and 14 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -63,6 +64,18 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
return aggregations;
}
/**
* Create a new copy of this {@link Aggregation} with the same settings as
* this {@link Aggregation} and contains the provided sub-aggregations.
*
* @param subAggregations
* the buckets to use in the new {@link Aggregation}
* @return the new {@link Aggregation}
*/
public InternalSingleBucketAggregation create(InternalAggregations subAggregations) {
return newAggregation(getName(), getDocCount(), subAggregations);
}
/**
* Create a <b>new</b> empty sub aggregation. This must be a new instance on each call.
*/

View File

@ -26,6 +26,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import java.util.ArrayList;
@ -45,20 +46,34 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
@SuppressWarnings("unchecked")
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
List<Bucket> newBuckets = new ArrayList<>();
for (int i = 0; i < buckets.size(); i++) {
InternalMultiBucketAggregation.InternalBucket bucket = (InternalMultiBucketAggregation.InternalBucket) buckets.get(i);
InternalAggregation aggToAdd = doReduce(bucket.getAggregations(), reduceContext);
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), AGGREGATION_TRANFORM_FUNCTION));
aggs.add(aggToAdd);
InternalMultiBucketAggregation.InternalBucket newBucket = multiBucketsAgg.createBucket(new InternalAggregations(aggs), bucket);
newBuckets.add(newBucket);
}
if (aggregation instanceof InternalMultiBucketAggregation) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
List<Bucket> newBuckets = new ArrayList<>();
for (int i = 0; i < buckets.size(); i++) {
InternalMultiBucketAggregation.InternalBucket bucket = (InternalMultiBucketAggregation.InternalBucket) buckets.get(i);
InternalAggregation aggToAdd = doReduce(bucket.getAggregations(), reduceContext);
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(),
AGGREGATION_TRANFORM_FUNCTION));
aggs.add(aggToAdd);
InternalMultiBucketAggregation.InternalBucket newBucket = multiBucketsAgg.createBucket(new InternalAggregations(aggs),
bucket);
newBuckets.add(newBucket);
}
return multiBucketsAgg.create(newBuckets);
return multiBucketsAgg.create(newBuckets);
} else if (aggregation instanceof InternalSingleBucketAggregation) {
InternalSingleBucketAggregation singleBucketAgg = (InternalSingleBucketAggregation) aggregation;
InternalAggregation aggToAdd = doReduce(singleBucketAgg.getAggregations(), reduceContext);
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(singleBucketAgg.getAggregations().asList(),
AGGREGATION_TRANFORM_FUNCTION));
aggs.add(aggToAdd);
return singleBucketAgg.create(new InternalAggregations(aggs));
} else {
throw new IllegalStateException("Aggregation [" + aggregation.getName() + "] must be a bucket aggregation ["
+ aggregation.type().name() + "]");
}
}
public abstract InternalAggregation doReduce(Aggregations aggregations, ReduceContext context);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.filter.Filter;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@ -34,11 +35,13 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.maxBucket;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.maxBucket;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
@ -282,6 +285,55 @@ public class MaxBucketTests extends ElasticsearchIntegrationTest {
}
}
@Test
public void testMetric_asSubAggOfSingleBucketAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
filter("filter")
.filter(termQuery("tag", "tag0"))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(maxBucket("max_bucket").setBucketsPaths("histo>sum"))).execute().actionGet();
assertSearchResponse(response);
Filter filter = response.getAggregations().get("filter");
assertThat(filter, notNullValue());
assertThat(filter.getName(), equalTo("filter"));
Histogram histo = filter.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
List<String> maxKeys = new ArrayList<>();
double maxValue = Double.NEGATIVE_INFINITY;
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
if (bucket.getDocCount() != 0) {
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
if (sum.value() > maxValue) {
maxValue = sum.value();
maxKeys = new ArrayList<>();
maxKeys.add(bucket.getKeyAsString());
} else if (sum.value() == maxValue) {
maxKeys.add(bucket.getKeyAsString());
}
}
}
InternalBucketMetricValue maxBucketValue = filter.getAggregations().get("max_bucket");
assertThat(maxBucketValue, notNullValue());
assertThat(maxBucketValue.getName(), equalTo("max_bucket"));
assertThat(maxBucketValue.value(), equalTo(maxValue));
assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()])));
}
@Test
public void testMetric_asSubAggWithInsertZeros() throws Exception {
SearchResponse response = client()