Correctly set doc_count when MovAvg "predicts" values on existing buckets (#24892)
If the bucket already exists, due to non-overlapping series or missing data, the MovAvg creates a merged bucket with the existing aggs + the new prediction. This fixes a small bug where the doc_count was not being set correctly. Relates to #24327
This commit is contained in:
parent
9957bdf0ad
commit
b8d7b83f8e
|
@ -161,7 +161,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
|
||||||
}).collect(Collectors.toList());
|
}).collect(Collectors.toList());
|
||||||
aggs.add(new InternalSimpleValue(name(), predictions[i], formatter, new ArrayList<PipelineAggregator>(), metaData()));
|
aggs.add(new InternalSimpleValue(name(), predictions[i], formatter, new ArrayList<PipelineAggregator>(), metaData()));
|
||||||
|
|
||||||
Bucket newBucket = factory.createBucket(newKey, 0, new InternalAggregations(aggs));
|
Bucket newBucket = factory.createBucket(newKey, bucket.getDocCount(), new InternalAggregations(aggs));
|
||||||
|
|
||||||
// Overwrite the existing bucket with the new version
|
// Overwrite the existing bucket with the new version
|
||||||
newBuckets.set(lastValidPosition + i + 1, newBucket);
|
newBuckets.set(lastValidPosition + i + 1, newBucket);
|
||||||
|
|
|
@ -19,9 +19,12 @@
|
||||||
|
|
||||||
package org.elasticsearch.search.aggregations.pipeline.moving.avg;
|
package org.elasticsearch.search.aggregations.pipeline.moving.avg;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
import org.elasticsearch.common.collect.EvictingQueue;
|
import org.elasticsearch.common.collect.EvictingQueue;
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket;
|
||||||
|
@ -41,6 +44,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuil
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -67,6 +71,7 @@ import static org.hamcrest.core.IsNull.nullValue;
|
||||||
public class MovAvgIT extends ESIntegTestCase {
|
public class MovAvgIT extends ESIntegTestCase {
|
||||||
private static final String INTERVAL_FIELD = "l_value";
|
private static final String INTERVAL_FIELD = "l_value";
|
||||||
private static final String VALUE_FIELD = "v_value";
|
private static final String VALUE_FIELD = "v_value";
|
||||||
|
private static final String VALUE_FIELD2 = "v_value2";
|
||||||
|
|
||||||
static int interval;
|
static int interval;
|
||||||
static int numBuckets;
|
static int numBuckets;
|
||||||
|
@ -1204,6 +1209,68 @@ public class MovAvgIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testPredictWithNonEmptyBuckets() throws Exception {
|
||||||
|
|
||||||
|
createIndex("predict_non_empty");
|
||||||
|
BulkRequestBuilder bulkBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource(
|
||||||
|
jsonBuilder().startObject().field(INTERVAL_FIELD, i)
|
||||||
|
.field(VALUE_FIELD, 10)
|
||||||
|
.field(VALUE_FIELD2, 10)
|
||||||
|
.endObject()));
|
||||||
|
}
|
||||||
|
for (int i = 10; i < 20; i++) {
|
||||||
|
// Extra so there is a bucket that only has second field
|
||||||
|
bulkBuilder.add(client().prepareIndex("predict_non_empty", "type").setSource(
|
||||||
|
jsonBuilder().startObject().field(INTERVAL_FIELD, i).field(VALUE_FIELD2, 10).endObject()));
|
||||||
|
}
|
||||||
|
|
||||||
|
bulkBuilder.execute().actionGet();
|
||||||
|
ensureSearchable();
|
||||||
|
|
||||||
|
SearchResponse response = client()
|
||||||
|
.prepareSearch("predict_non_empty")
|
||||||
|
.setTypes("type")
|
||||||
|
.addAggregation(
|
||||||
|
histogram("histo")
|
||||||
|
.field(INTERVAL_FIELD)
|
||||||
|
.interval(1)
|
||||||
|
.subAggregation(max("max").field(VALUE_FIELD))
|
||||||
|
.subAggregation(max("max2").field(VALUE_FIELD2))
|
||||||
|
.subAggregation(
|
||||||
|
movingAvg("movavg_values", "max")
|
||||||
|
.window(windowSize)
|
||||||
|
.modelBuilder(new SimpleModel.SimpleModelBuilder())
|
||||||
|
.gapPolicy(BucketHelpers.GapPolicy.SKIP).predict(5))).execute().actionGet();
|
||||||
|
|
||||||
|
assertSearchResponse(response);
|
||||||
|
|
||||||
|
Histogram histo = response.getAggregations().get("histo");
|
||||||
|
assertThat(histo, notNullValue());
|
||||||
|
assertThat(histo.getName(), equalTo("histo"));
|
||||||
|
List<? extends Bucket> buckets = histo.getBuckets();
|
||||||
|
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(20));
|
||||||
|
|
||||||
|
SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
|
||||||
|
assertThat(current, nullValue());
|
||||||
|
|
||||||
|
for (int i = 1; i < 20; i++) {
|
||||||
|
Bucket bucket = buckets.get(i);
|
||||||
|
assertThat(bucket, notNullValue());
|
||||||
|
assertThat(bucket.getKey(), equalTo((double)i));
|
||||||
|
assertThat(bucket.getDocCount(), equalTo(1L));
|
||||||
|
SimpleValue movAvgAgg = bucket.getAggregations().get("movavg_values");
|
||||||
|
if (i < 15) {
|
||||||
|
assertThat(movAvgAgg, notNullValue());
|
||||||
|
assertThat(movAvgAgg.value(), equalTo(10d));
|
||||||
|
} else {
|
||||||
|
assertThat(movAvgAgg, nullValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) {
|
private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) {
|
||||||
if (!expectedBucketIter.hasNext()) {
|
if (!expectedBucketIter.hasNext()) {
|
||||||
fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch");
|
fail("`expectedBucketIter` iterator ended before `actual` iterator, size mismatch");
|
||||||
|
|
Loading…
Reference in New Issue