diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java index 49422995c95..196f7cca473 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java @@ -161,7 +161,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { }).collect(Collectors.toList()); aggs.add(new InternalSimpleValue(name(), predictions[i], formatter, new ArrayList(), metaData())); - Bucket newBucket = factory.createBucket(newKey, 0, new InternalAggregations(aggs)); + Bucket newBucket = factory.createBucket(newKey, bucket.getDocCount(), new InternalAggregations(aggs)); // Overwrite the existing bucket with the new version newBuckets.set(lastValidPosition + i + 1, newBucket); diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java index f24dfe42270..bbe6ecc3a4e 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgIT.java @@ -19,9 +19,12 @@ package org.elasticsearch.search.aggregations.pipeline.moving.avg; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.collect.EvictingQueue; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; @@ -41,6 +44,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuil import org.elasticsearch.test.ESIntegTestCase; import org.hamcrest.Matchers; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -67,6 +71,7 @@ import static org.hamcrest.core.IsNull.nullValue; public class MovAvgIT extends ESIntegTestCase { private static final String INTERVAL_FIELD = "l_value"; private static final String VALUE_FIELD = "v_value"; + private static final String VALUE_FIELD2 = "v_value2"; static int interval; static int numBuckets; @@ -1204,6 +1209,68 @@ public class MovAvgIT extends ESIntegTestCase { } } + public void testPredictWithNonEmptyBuckets() throws Exception { + + createIndex("predict_non_empty"); + BulkRequestBuilder bulkBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + for (int i = 0; i < 10; i++) { + bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource( + jsonBuilder().startObject().field(INTERVAL_FIELD, i) + .field(VALUE_FIELD, 10) + .field(VALUE_FIELD2, 10) + .endObject())); + } + for (int i = 10; i < 20; i++) { + // Extra so there is a bucket that only has second field + bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource( + jsonBuilder().startObject().field(INTERVAL_FIELD, i).field(VALUE_FIELD2, 10).endObject())); + } + + bulkBuilder.execute().actionGet(); + ensureSearchable(); + + SearchResponse response = client() + .prepareSearch("predict_non_empty") + .setTypes("type") + .addAggregation( + histogram("histo") + .field(INTERVAL_FIELD) + .interval(1) + .subAggregation(max("max").field(VALUE_FIELD)) + .subAggregation(max("max2").field(VALUE_FIELD2)) + .subAggregation( + movingAvg("movavg_values", "max") + .window(windowSize) + .modelBuilder(new SimpleModel.SimpleModelBuilder()) + .gapPolicy(BucketHelpers.GapPolicy.SKIP).predict(5))).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(20)); + + SimpleValue current = buckets.get(0).getAggregations().get("movavg_values"); + assertThat(current, nullValue()); + + for (int i = 1; i < 20; i++) { + Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(bucket.getKey(), equalTo((double)i)); + assertThat(bucket.getDocCount(), equalTo(1L)); + SimpleValue movAvgAgg = bucket.getAggregations().get("movavg_values"); + if (i < 15) { + assertThat(movAvgAgg, notNullValue()); + assertThat(movAvgAgg.value(), equalTo(10d)); + } else { + assertThat(movAvgAgg, nullValue()); + } + } + } + private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) { if (!expectedBucketIter.hasNext()) { fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch");