Aggregations: Run pipeline aggregations for empty buckets added in the Range Aggregation

Closes #15471
This commit is contained in:
Colin Goodheart-Smithe 2015-12-17 17:45:10 +00:00
parent 2f97ff0925
commit 15588a4991
3 changed files with 103 additions and 12 deletions

View File

@ -391,12 +391,14 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return reducedBuckets;
}
private void addEmptyBuckets(List<B> list) {
private void addEmptyBuckets(List<B> list, ReduceContext reduceContext) {
B lastBucket = null;
ExtendedBounds bounds = emptyBucketInfo.bounds;
ListIterator<B> iter = list.listIterator();
// first adding all the empty buckets *before* the actual data (based on th extended_bounds.min the user requested)
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(Collections.singletonList(emptyBucketInfo.subAggregations),
reduceContext);
if (bounds != null) {
B firstBucket = iter.hasNext() ? list.get(iter.nextIndex()) : null;
if (firstBucket == null) {
@ -404,7 +406,9 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
long key = bounds.min;
long max = bounds.max;
while (key <= max) {
iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
iter.add(getFactory().createBucket(key, 0,
reducedEmptySubAggs,
keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
@ -413,7 +417,9 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
long key = bounds.min;
if (key < firstBucket.key) {
while (key < firstBucket.key) {
iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
iter.add(getFactory().createBucket(key, 0,
reducedEmptySubAggs,
keyed, formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
@ -428,7 +434,9 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
if (lastBucket != null) {
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
while (key < nextBucket.key) {
iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
iter.add(getFactory().createBucket(key, 0,
reducedEmptySubAggs, keyed,
formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
assert key == nextBucket.key;
@ -441,7 +449,9 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
long key = emptyBucketInfo.rounding.nextRoundingValue(lastBucket.key);
long max = bounds.max;
while (key <= max) {
iter.add(getFactory().createBucket(key, 0, emptyBucketInfo.subAggregations, keyed, formatter));
iter.add(getFactory().createBucket(key, 0,
reducedEmptySubAggs, keyed,
formatter));
key = emptyBucketInfo.rounding.nextRoundingValue(key);
}
}
@ -453,7 +463,7 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
// adding empty buckets if needed
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets);
addEmptyBuckets(reducedBuckets, reduceContext);
}
if (order == InternalOrder.KEY_ASC) {

View File

@ -45,12 +45,14 @@ import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.derivative;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.having;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.SuiteScopeTestCase
public class BucketSelectorTests extends ESIntegTestCase {
@ -74,6 +76,7 @@ public class BucketSelectorTests extends ESIntegTestCase {
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
createIndex("idx_with_gaps");
interval = randomIntBetween(1, 50);
numDocs = randomIntBetween(10, 500);
@ -84,6 +87,10 @@ public class BucketSelectorTests extends ESIntegTestCase {
for (int docs = 0; docs < numDocs; docs++) {
builders.add(client().prepareIndex("idx", "type").setSource(newDocBuilder()));
}
builders.add(client().prepareIndex("idx_with_gaps", "type").setSource(newDocBuilder(1, 1, 0, 0)));
builders.add(client().prepareIndex("idx_with_gaps", "type").setSource(newDocBuilder(1, 2, 0, 0)));
builders.add(client().prepareIndex("idx_with_gaps", "type").setSource(newDocBuilder(3, 1, 0, 0)));
builders.add(client().prepareIndex("idx_with_gaps", "type").setSource(newDocBuilder(3, 3, 0, 0)));
client().preparePutIndexedScript().setId("my_script").setScriptLang(GroovyScriptEngineService.NAME)
.setSource("{ \"script\": \"Double.isNaN(_value0) ? false : (_value0 + _value1 > 100)\" }").get();
@ -93,12 +100,17 @@ public class BucketSelectorTests extends ESIntegTestCase {
}
private XContentBuilder newDocBuilder() throws IOException {
return newDocBuilder(randomIntBetween(minNumber, maxNumber), randomIntBetween(minNumber, maxNumber),
randomIntBetween(minNumber, maxNumber), randomIntBetween(minNumber, maxNumber));
}
private XContentBuilder newDocBuilder(int field1Value, int field2Value, int field3Value, int field4Value) throws IOException {
XContentBuilder jsonBuilder = jsonBuilder();
jsonBuilder.startObject();
jsonBuilder.field(FIELD_1_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_2_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_3_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_4_NAME, randomIntBetween(minNumber, maxNumber));
jsonBuilder.field(FIELD_1_NAME, field1Value);
jsonBuilder.field(FIELD_2_NAME, field2Value);
jsonBuilder.field(FIELD_3_NAME, field3Value);
jsonBuilder.field(FIELD_4_NAME, field4Value);
jsonBuilder.endObject();
return jsonBuilder;
}
@ -451,4 +463,70 @@ public class BucketSelectorTests extends ESIntegTestCase {
assertThat(field2SumValue + field3SumValue, greaterThan(100.0));
}
}
public void testEmptyBuckets() {
SearchResponse response = client().prepareSearch("idx_with_gaps")
.addAggregation(histogram("histo").field(FIELD_1_NAME).interval(1)
.subAggregation(histogram("inner_histo").field(FIELD_1_NAME).interval(1).extendedBounds(1l, 4l).minDocCount(0)
.subAggregation(derivative("derivative").setBucketsPaths("_count").gapPolicy(GapPolicy.INSERT_ZEROS))))
.execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(3));
Histogram.Bucket bucket = buckets.get(0);
assertThat(bucket, notNullValue());
assertThat(bucket.getKeyAsString(), equalTo("1"));
Histogram innerHisto = bucket.getAggregations().get("inner_histo");
assertThat(innerHisto, notNullValue());
List<? extends Histogram.Bucket> innerBuckets = innerHisto.getBuckets();
assertThat(innerBuckets, notNullValue());
assertThat(innerBuckets.size(), equalTo(4));
for (int i = 0; i < innerBuckets.size(); i++) {
Histogram.Bucket innerBucket = innerBuckets.get(i);
if (i == 0) {
assertThat(innerBucket.getAggregations().get("derivative"), nullValue());
} else {
assertThat(innerBucket.getAggregations().get("derivative"), notNullValue());
}
}
bucket = buckets.get(1);
assertThat(bucket, notNullValue());
assertThat(bucket.getKeyAsString(), equalTo("2"));
innerHisto = bucket.getAggregations().get("inner_histo");
assertThat(innerHisto, notNullValue());
innerBuckets = innerHisto.getBuckets();
assertThat(innerBuckets, notNullValue());
assertThat(innerBuckets.size(), equalTo(4));
for (int i = 0; i < innerBuckets.size(); i++) {
Histogram.Bucket innerBucket = innerBuckets.get(i);
if (i == 0) {
assertThat(innerBucket.getAggregations().get("derivative"), nullValue());
} else {
assertThat(innerBucket.getAggregations().get("derivative"), notNullValue());
}
}
bucket = buckets.get(2);
assertThat(bucket, notNullValue());
assertThat(bucket.getKeyAsString(), equalTo("3"));
innerHisto = bucket.getAggregations().get("inner_histo");
assertThat(innerHisto, notNullValue());
innerBuckets = innerHisto.getBuckets();
assertThat(innerBuckets, notNullValue());
assertThat(innerBuckets.size(), equalTo(4));
for (int i = 0; i < innerBuckets.size(); i++) {
Histogram.Bucket innerBucket = innerBuckets.get(i);
if (i == 0) {
assertThat(innerBucket.getAggregations().get("derivative"), nullValue());
} else {
assertThat(innerBucket.getAggregations().get("derivative"), notNullValue());
}
}
}
}

View File

@ -58,7 +58,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ClusterScope(scope = Scope.SUITE)
@ -739,6 +738,10 @@ public class ScriptedMetricTests extends ESIntegTestCase {
ScriptedMetric scriptedMetric = bucket.getAggregations().get("scripted");
assertThat(scriptedMetric, notNullValue());
assertThat(scriptedMetric.getName(), equalTo("scripted"));
assertThat(scriptedMetric.aggregation(), nullValue());
assertThat(scriptedMetric.aggregation(), notNullValue());
assertThat(scriptedMetric.aggregation(), instanceOf(List.class));
List<Integer> aggregationResult = (List<Integer>) scriptedMetric.aggregation();
assertThat(aggregationResult.size(), equalTo(1));
assertThat(aggregationResult.get(0), equalTo(0));
}
}