Merge pull request #11641 from polyfractal/bugfix/movavg_predict
Aggregations: Moving average forecasts should not include current point
This commit is contained in:
commit
2ebb44d56f
|
@ -120,7 +120,6 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
|
|||
InternalHistogram.Bucket newBucket = bucket;
|
||||
|
||||
if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) {
|
||||
values.offer(thisBucketValue);
|
||||
|
||||
// Some models (e.g. HoltWinters) have certain preconditions that must be met
|
||||
if (model.hasValue(values.size())) {
|
||||
|
@ -142,6 +141,8 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
|
|||
}
|
||||
lastValidPosition = counter;
|
||||
}
|
||||
|
||||
values.offer(thisBucketValue);
|
||||
}
|
||||
counter += 1;
|
||||
newBuckets.add(newBucket);
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.search.internal.SearchContext;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -50,6 +51,15 @@ public class EwmaModel extends MovAvgModel {
|
|||
this.alpha = alpha;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
|
||||
double[] predictions = new double[numPredictions];
|
||||
|
||||
// EWMA just emits the same final prediction repeatedly.
|
||||
Arrays.fill(predictions, next(values));
|
||||
|
||||
return predictions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends Number> double next(Collection<T> values) {
|
||||
|
|
|
@ -67,7 +67,7 @@ public class HoltLinearModel extends MovAvgModel {
|
|||
* @return Returns an array of doubles, since most smoothing methods operate on floating points
|
||||
*/
|
||||
@Override
|
||||
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
|
||||
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
|
||||
return next(values, numPredictions);
|
||||
}
|
||||
|
||||
|
|
|
@ -176,7 +176,7 @@ public class HoltWintersModel extends MovAvgModel {
|
|||
* @return Returns an array of doubles, since most smoothing methods operate on floating points
|
||||
*/
|
||||
@Override
|
||||
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
|
||||
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
|
||||
return next(values, numPredictions);
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.search.internal.SearchContext;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -41,6 +42,16 @@ public class LinearModel extends MovAvgModel {
|
|||
|
||||
protected static final ParseField NAME_FIELD = new ParseField("linear");
|
||||
|
||||
@Override
|
||||
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
|
||||
double[] predictions = new double[numPredictions];
|
||||
|
||||
// EWMA just emits the same final prediction repeatedly.
|
||||
Arrays.fill(predictions, next(values));
|
||||
|
||||
return predictions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends Number> double next(Collection<T> values) {
|
||||
double avg = 0;
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.pipeline.movavg.models;
|
||||
|
||||
import com.google.common.collect.EvictingQueue;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
|
@ -44,7 +42,7 @@ public abstract class MovAvgModel {
|
|||
*/
|
||||
public boolean hasValue(int windowLength) {
|
||||
// Default implementation can always provide a next() value
|
||||
return true;
|
||||
return windowLength > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -57,9 +55,7 @@ public abstract class MovAvgModel {
|
|||
public abstract <T extends Number> double next(Collection<T> values);
|
||||
|
||||
/**
|
||||
* Predicts the next `n` values in the series, using the smoothing model to generate new values.
|
||||
* Default prediction mode is to simply continuing calling <code>next()</code> and adding the
|
||||
* predicted value back into the windowed buffer.
|
||||
* Predicts the next `n` values in the series.
|
||||
*
|
||||
* @param values Collection of numerics to movingAvg, usually windowed
|
||||
* @param numPredictions Number of newly generated predictions to return
|
||||
|
@ -67,34 +63,31 @@ public abstract class MovAvgModel {
|
|||
* @return Returns an array of doubles, since most smoothing methods operate on floating points
|
||||
*/
|
||||
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
|
||||
double[] predictions = new double[numPredictions];
|
||||
assert(numPredictions >= 1);
|
||||
|
||||
// If there are no values, we can't do anything. Return an array of NaNs.
|
||||
if (values.size() == 0) {
|
||||
if (values.isEmpty()) {
|
||||
return emptyPredictions(numPredictions);
|
||||
}
|
||||
|
||||
// special case for one prediction, avoids allocation
|
||||
if (numPredictions < 1) {
|
||||
throw new IllegalArgumentException("numPredictions may not be less than 1.");
|
||||
} else if (numPredictions == 1){
|
||||
predictions[0] = next(values);
|
||||
return predictions;
|
||||
}
|
||||
|
||||
Collection<Number> predictionBuffer = EvictingQueue.create(values.size());
|
||||
predictionBuffer.addAll(values);
|
||||
|
||||
for (int i = 0; i < numPredictions; i++) {
|
||||
predictions[i] = next(predictionBuffer);
|
||||
|
||||
// Add the last value to the buffer, so we can keep predicting
|
||||
predictionBuffer.add(predictions[i]);
|
||||
}
|
||||
|
||||
return predictions;
|
||||
return doPredict(values, numPredictions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls to the model-specific implementation which actually generates the predictions
|
||||
*
|
||||
* @param values Collection of numerics to movingAvg, usually windowed
|
||||
* @param numPredictions Number of newly generated predictions to return
|
||||
* @param <T> Type of numeric
|
||||
* @return Returns an array of doubles, since most smoothing methods operate on floating points
|
||||
*/
|
||||
protected abstract <T extends Number> double[] doPredict(Collection<T> values, int numPredictions);
|
||||
|
||||
/**
|
||||
* Returns an empty set of predictions, filled with NaNs
|
||||
* @param numPredictions
|
||||
* @return
|
||||
*/
|
||||
protected double[] emptyPredictions(int numPredictions) {
|
||||
double[] predictions = new double[numPredictions];
|
||||
Arrays.fill(predictions, Double.NaN);
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.search.internal.SearchContext;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -39,6 +40,16 @@ public class SimpleModel extends MovAvgModel {
|
|||
|
||||
protected static final ParseField NAME_FIELD = new ParseField("simple");
|
||||
|
||||
@Override
|
||||
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
|
||||
double[] predictions = new double[numPredictions];
|
||||
|
||||
// EWMA just emits the same final prediction repeatedly.
|
||||
Arrays.fill(predictions, next(values));
|
||||
|
||||
return predictions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends Number> double next(Collection<T> values) {
|
||||
double avg = 0;
|
||||
|
|
|
@ -50,6 +50,7 @@ public class PipelineAggregationHelperTests extends ElasticsearchTestCase {
|
|||
ArrayList<MockBucket> values = new ArrayList<>(size);
|
||||
|
||||
boolean lastWasGap = false;
|
||||
boolean emptyHisto = true;
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
MockBucket bucket = new MockBucket();
|
||||
|
@ -70,15 +71,27 @@ public class PipelineAggregationHelperTests extends ElasticsearchTestCase {
|
|||
bucket.count = randomIntBetween(1, 50);
|
||||
bucket.docValues = new double[bucket.count];
|
||||
for (int j = 0; j < bucket.count; j++) {
|
||||
bucket.docValues[j] = randomDouble() * randomIntBetween(-20,20);
|
||||
bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20);
|
||||
}
|
||||
lastWasGap = false;
|
||||
emptyHisto = false;
|
||||
}
|
||||
|
||||
bucket.key = i * interval;
|
||||
values.add(bucket);
|
||||
}
|
||||
|
||||
if (emptyHisto) {
|
||||
int idx = randomIntBetween(0, values.size()-1);
|
||||
MockBucket bucket = values.get(idx);
|
||||
bucket.count = randomIntBetween(1, 50);
|
||||
bucket.docValues = new double[bucket.count];
|
||||
for (int j = 0; j < bucket.count; j++) {
|
||||
bucket.docValues[j] = randomDouble() * randomIntBetween(-20, 20);
|
||||
}
|
||||
values.set(idx, bucket);
|
||||
}
|
||||
|
||||
return values;
|
||||
}
|
||||
|
||||
|
|
|
@ -148,15 +148,6 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
// Used for specially crafted gap tests
|
||||
builders.add(client().prepareIndex("idx", "gap_type").setSource(jsonBuilder().startObject()
|
||||
.field(INTERVAL_FIELD, 0)
|
||||
.field(GAP_FIELD, 1).endObject()));
|
||||
|
||||
builders.add(client().prepareIndex("idx", "gap_type").setSource(jsonBuilder().startObject()
|
||||
.field(INTERVAL_FIELD, 49)
|
||||
.field(GAP_FIELD, 1).endObject()));
|
||||
|
||||
for (int i = -10; i < 10; i++) {
|
||||
builders.add(client().prepareIndex("neg_idx", "type").setSource(
|
||||
jsonBuilder().startObject().field(INTERVAL_FIELD, i).field(VALUE_FIELD, 10).endObject()));
|
||||
|
@ -204,31 +195,36 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
|||
metricValue = target.equals(MetricTarget.VALUE) ? PipelineAggregationHelperTests.calculateMetric(docValues, metric) : mockBucket.count;
|
||||
}
|
||||
|
||||
window.offer(metricValue);
|
||||
switch (type) {
|
||||
case SIMPLE:
|
||||
values.add(simple(window));
|
||||
break;
|
||||
case LINEAR:
|
||||
values.add(linear(window));
|
||||
break;
|
||||
case EWMA:
|
||||
values.add(ewma(window));
|
||||
break;
|
||||
case HOLT:
|
||||
values.add(holt(window));
|
||||
break;
|
||||
case HOLT_WINTERS:
|
||||
// HW needs at least 2 periods of data to start
|
||||
if (window.size() >= period * 2) {
|
||||
values.add(holtWinters(window));
|
||||
} else {
|
||||
values.add(null);
|
||||
}
|
||||
if (window.size() > 0) {
|
||||
switch (type) {
|
||||
case SIMPLE:
|
||||
values.add(simple(window));
|
||||
break;
|
||||
case LINEAR:
|
||||
values.add(linear(window));
|
||||
break;
|
||||
case EWMA:
|
||||
values.add(ewma(window));
|
||||
break;
|
||||
case HOLT:
|
||||
values.add(holt(window));
|
||||
break;
|
||||
case HOLT_WINTERS:
|
||||
// HW needs at least 2 periods of data to start
|
||||
if (window.size() >= period * 2) {
|
||||
values.add(holtWinters(window));
|
||||
} else {
|
||||
values.add(null);
|
||||
}
|
||||
|
||||
break;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
values.add(null);
|
||||
}
|
||||
|
||||
window.offer(metricValue);
|
||||
|
||||
}
|
||||
testValues.put(type.toString() + "_" + target.toString(), values);
|
||||
}
|
||||
|
@ -685,7 +681,10 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
|||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(25));
|
||||
|
||||
for (int i = 0; i < 20; i++) {
|
||||
SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
|
||||
assertThat(current, nullValue());
|
||||
|
||||
for (int i = 1; i < 20; i++) {
|
||||
Bucket bucket = buckets.get(i);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat((long) bucket.getKey(), equalTo((long) i - 10));
|
||||
|
@ -699,7 +698,6 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
|||
}
|
||||
|
||||
for (int i = 20; i < 25; i++) {
|
||||
System.out.println(i);
|
||||
Bucket bucket = buckets.get(i);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat((long) bucket.getKey(), equalTo((long) i - 10));
|
||||
|
@ -877,350 +875,6 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test uses the "gap" dataset, which is simply a doc at the beginning and end of
|
||||
* the INTERVAL_FIELD range. These docs have a value of 1 in GAP_FIELD.
|
||||
* This test verifies that large gaps don't break things, and that the mov avg roughly works
|
||||
* in the correct manner (checks direction of change, but not actual values)
|
||||
*/
|
||||
@Test
|
||||
public void testGiantGap() {
|
||||
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx").setTypes("gap_type")
|
||||
.addAggregation(
|
||||
histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
|
||||
.subAggregation(min("the_metric").field(GAP_FIELD))
|
||||
.subAggregation(movingAvg("movavg_values")
|
||||
.window(windowSize)
|
||||
.modelBuilder(randomModelBuilder())
|
||||
.gapPolicy(gapPolicy)
|
||||
.setBucketsPaths("the_metric"))
|
||||
).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50));
|
||||
|
||||
double lastValue = ((SimpleValue)(buckets.get(0).getAggregations().get("movavg_values"))).value();
|
||||
assertThat(Double.compare(lastValue, 0.0d), greaterThanOrEqualTo(0));
|
||||
|
||||
double currentValue;
|
||||
for (int i = 1; i < 49; i++) {
|
||||
SimpleValue current = buckets.get(i).getAggregations().get("movavg_values");
|
||||
if (current != null) {
|
||||
currentValue = current.value();
|
||||
|
||||
// Since there are only two values in this test, at the beginning and end, the moving average should
|
||||
// decrease every step (until it reaches zero). Crude way to check that it's doing the right thing
|
||||
// without actually verifying the computed values. Should work for all types of moving avgs and
|
||||
// gap policies
|
||||
assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0));
|
||||
lastValue = currentValue;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SimpleValue current = buckets.get(49).getAggregations().get("movavg_values");
|
||||
assertThat(current, notNullValue());
|
||||
currentValue = current.value();
|
||||
|
||||
if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
|
||||
// if we are ignoring, movavg could go up (holt) or stay the same (simple, linear, ewma)
|
||||
assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
|
||||
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
|
||||
// If we insert zeros, this should always increase the moving avg since the last bucket has a real value
|
||||
assertThat(Double.compare(lastValue, currentValue), equalTo(-1));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Big gap, but with prediction at the end.
|
||||
*/
|
||||
@Test
|
||||
public void testGiantGapWithPredict() {
|
||||
int numPredictions = randomIntBetween(1, 10);
|
||||
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx").setTypes("gap_type")
|
||||
.addAggregation(
|
||||
histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
|
||||
.subAggregation(min("the_metric").field(GAP_FIELD))
|
||||
.subAggregation(movingAvg("movavg_values")
|
||||
.window(windowSize)
|
||||
.modelBuilder(randomModelBuilder())
|
||||
.gapPolicy(gapPolicy)
|
||||
.setBucketsPaths("the_metric")
|
||||
.predict(numPredictions))
|
||||
).execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
|
||||
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions));
|
||||
|
||||
|
||||
double lastValue = ((SimpleValue)(buckets.get(0).getAggregations().get("movavg_values"))).value();
|
||||
assertThat(Double.compare(lastValue, 0.0d), greaterThanOrEqualTo(0));
|
||||
|
||||
double currentValue;
|
||||
for (int i = 1; i < 49; i++) {
|
||||
SimpleValue current = buckets.get(i).getAggregations().get("movavg_values");
|
||||
if (current != null) {
|
||||
currentValue = current.value();
|
||||
|
||||
// Since there are only two values in this test, at the beginning and end, the moving average should
|
||||
// decrease every step (until it reaches zero). Crude way to check that it's doing the right thing
|
||||
// without actually verifying the computed values. Should work for all types of moving avgs and
|
||||
// gap policies
|
||||
assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0));
|
||||
lastValue = currentValue;
|
||||
}
|
||||
}
|
||||
|
||||
SimpleValue current = buckets.get(49).getAggregations().get("movavg_values");
|
||||
assertThat(current, notNullValue());
|
||||
currentValue = current.value();
|
||||
|
||||
if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
|
||||
// if we are ignoring, movavg could go up (holt) or stay the same (simple, linear, ewma)
|
||||
assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
|
||||
} else if (gapPolicy.equals(BucketHelpers.GapPolicy.INSERT_ZEROS)) {
|
||||
// If we insert zeros, this should always increase the moving avg since the last bucket has a real value
|
||||
assertThat(Double.compare(lastValue, currentValue), equalTo(-1));
|
||||
}
|
||||
|
||||
// Now check predictions
|
||||
for (int i = 50; i < 50 + numPredictions; i++) {
|
||||
// Unclear at this point which direction the predictions will go, just verify they are
|
||||
// not null, and that we don't have the_metric anymore
|
||||
assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue());
|
||||
assertThat((buckets.get(i).getAggregations().get("the_metric")), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test filters the "gap" data so that the first doc is excluded. This leaves a long stretch of empty
|
||||
* buckets until the final bucket. The moving avg should be zero up until the last bucket, and should work
|
||||
* regardless of mov avg type or gap policy.
|
||||
*/
|
||||
@Test
|
||||
public void testLeftGap() {
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx").setTypes("gap_type")
|
||||
.addAggregation(
|
||||
filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).from(1)).subAggregation(
|
||||
histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
|
||||
.subAggregation(randomMetric("the_metric", GAP_FIELD))
|
||||
.subAggregation(movingAvg("movavg_values")
|
||||
.window(windowSize)
|
||||
.modelBuilder(randomModelBuilder())
|
||||
.gapPolicy(gapPolicy)
|
||||
.setBucketsPaths("the_metric"))
|
||||
))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
InternalFilter filtered = response.getAggregations().get("filtered");
|
||||
assertThat(filtered, notNullValue());
|
||||
assertThat(filtered.getName(), equalTo("filtered"));
|
||||
|
||||
InternalHistogram<Bucket> histo = filtered.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50));
|
||||
|
||||
double lastValue = 0;
|
||||
|
||||
double currentValue;
|
||||
for (int i = 0; i < 50; i++) {
|
||||
SimpleValue current = buckets.get(i).getAggregations().get("movavg_values");
|
||||
if (current != null) {
|
||||
currentValue = current.value();
|
||||
|
||||
assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
|
||||
lastValue = currentValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLeftGapWithPredict() {
|
||||
int numPredictions = randomIntBetween(1, 10);
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx").setTypes("gap_type")
|
||||
.addAggregation(
|
||||
filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).from(1)).subAggregation(
|
||||
histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
|
||||
.subAggregation(randomMetric("the_metric", GAP_FIELD))
|
||||
.subAggregation(movingAvg("movavg_values")
|
||||
.window(windowSize)
|
||||
.modelBuilder(randomModelBuilder())
|
||||
.gapPolicy(gapPolicy)
|
||||
.setBucketsPaths("the_metric")
|
||||
.predict(numPredictions))
|
||||
))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
InternalFilter filtered = response.getAggregations().get("filtered");
|
||||
assertThat(filtered, notNullValue());
|
||||
assertThat(filtered.getName(), equalTo("filtered"));
|
||||
|
||||
InternalHistogram<Bucket> histo = filtered.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
|
||||
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions));
|
||||
|
||||
|
||||
double lastValue = 0;
|
||||
|
||||
double currentValue;
|
||||
for (int i = 0; i < 50; i++) {
|
||||
SimpleValue current = buckets.get(i).getAggregations().get("movavg_values");
|
||||
if (current != null) {
|
||||
currentValue = current.value();
|
||||
|
||||
assertThat(Double.compare(lastValue, currentValue), lessThanOrEqualTo(0));
|
||||
lastValue = currentValue;
|
||||
}
|
||||
}
|
||||
|
||||
// Now check predictions
|
||||
for (int i = 50; i < 50 + numPredictions; i++) {
|
||||
// Unclear at this point which direction the predictions will go, just verify they are
|
||||
// not null, and that we don't have the_metric anymore
|
||||
assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue());
|
||||
assertThat((buckets.get(i).getAggregations().get("the_metric")), nullValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test filters the "gap" data so that the last doc is excluded. This leaves a long stretch of empty
|
||||
* buckets after the first bucket.
|
||||
*/
|
||||
@Test
|
||||
public void testRightGap() {
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx").setTypes("gap_type")
|
||||
.addAggregation(
|
||||
filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).to(1)).subAggregation(
|
||||
histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
|
||||
.subAggregation(randomMetric("the_metric", GAP_FIELD))
|
||||
.subAggregation(movingAvg("movavg_values")
|
||||
.window(windowSize)
|
||||
.modelBuilder(randomModelBuilder())
|
||||
.gapPolicy(gapPolicy)
|
||||
.setBucketsPaths("the_metric"))
|
||||
))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
InternalFilter filtered = response.getAggregations().get("filtered");
|
||||
assertThat(filtered, notNullValue());
|
||||
assertThat(filtered.getName(), equalTo("filtered"));
|
||||
|
||||
InternalHistogram<Bucket> histo = filtered.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50));
|
||||
|
||||
|
||||
SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
|
||||
assertThat(current, notNullValue());
|
||||
|
||||
double lastValue = current.value();
|
||||
|
||||
double currentValue;
|
||||
for (int i = 1; i < 50; i++) {
|
||||
current = buckets.get(i).getAggregations().get("movavg_values");
|
||||
if (current != null) {
|
||||
currentValue = current.value();
|
||||
|
||||
assertThat(Double.compare(lastValue, currentValue), greaterThanOrEqualTo(0));
|
||||
lastValue = currentValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRightGapWithPredict() {
|
||||
int numPredictions = randomIntBetween(1, 10);
|
||||
SearchResponse response = client()
|
||||
.prepareSearch("idx").setTypes("gap_type")
|
||||
.addAggregation(
|
||||
filter("filtered").filter(new RangeQueryBuilder(INTERVAL_FIELD).to(1)).subAggregation(
|
||||
histogram("histo").field(INTERVAL_FIELD).interval(1).extendedBounds(0L, 49L)
|
||||
.subAggregation(randomMetric("the_metric", GAP_FIELD))
|
||||
.subAggregation(movingAvg("movavg_values")
|
||||
.window(windowSize)
|
||||
.modelBuilder(randomModelBuilder())
|
||||
.gapPolicy(gapPolicy)
|
||||
.setBucketsPaths("the_metric")
|
||||
.predict(numPredictions))
|
||||
))
|
||||
.execute().actionGet();
|
||||
|
||||
assertSearchResponse(response);
|
||||
|
||||
InternalFilter filtered = response.getAggregations().get("filtered");
|
||||
assertThat(filtered, notNullValue());
|
||||
assertThat(filtered.getName(), equalTo("filtered"));
|
||||
|
||||
InternalHistogram<Bucket> histo = filtered.getAggregations().get("histo");
|
||||
assertThat(histo, notNullValue());
|
||||
assertThat(histo.getName(), equalTo("histo"));
|
||||
List<? extends Bucket> buckets = histo.getBuckets();
|
||||
|
||||
// If we are skipping, there will only be predictions at the very beginning and won't append any new buckets
|
||||
if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
|
||||
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50));
|
||||
} else {
|
||||
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(50 + numPredictions));
|
||||
}
|
||||
|
||||
// Unlike left-gap tests, we cannot check the slope of prediction for right-gap. E.g. linear will
|
||||
// converge on zero, but holt-linear may trend upwards based on the first value
|
||||
// Just check for non-nullness
|
||||
SimpleValue current = buckets.get(0).getAggregations().get("movavg_values");
|
||||
assertThat(current, notNullValue());
|
||||
|
||||
// If we are skipping, there will only be predictions at the very beginning and won't append any new buckets
|
||||
if (gapPolicy.equals(BucketHelpers.GapPolicy.SKIP)) {
|
||||
// Now check predictions
|
||||
for (int i = 1; i < 1 + numPredictions; i++) {
|
||||
// Unclear at this point which direction the predictions will go, just verify they are
|
||||
// not null
|
||||
assertThat(buckets.get(i).getDocCount(), equalTo(0L));
|
||||
assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue());
|
||||
}
|
||||
} else {
|
||||
// Otherwise we'll have some predictions at the end
|
||||
for (int i = 50; i < 50 + numPredictions; i++) {
|
||||
// Unclear at this point which direction the predictions will go, just verify they are
|
||||
// not null
|
||||
assertThat(buckets.get(i).getDocCount(), equalTo(0L));
|
||||
assertThat((buckets.get(i).getAggregations().get("movavg_values")), notNullValue());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHoltWintersNotEnoughData() {
|
||||
try {
|
||||
|
@ -1288,8 +942,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
|||
assertThat(avgAgg.value(), equalTo(10d));
|
||||
|
||||
SimpleValue movAvgAgg = bucket.getAggregations().get("avg_movavg");
|
||||
assertThat(movAvgAgg, notNullValue());
|
||||
assertThat(movAvgAgg.value(), equalTo(10d));
|
||||
assertThat(movAvgAgg, nullValue());
|
||||
|
||||
Derivative deriv = bucket.getAggregations().get("deriv");
|
||||
assertThat(deriv, nullValue());
|
||||
|
@ -1297,7 +950,28 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
|
|||
SimpleValue derivMovAvg = bucket.getAggregations().get("deriv_movavg");
|
||||
assertThat(derivMovAvg, nullValue());
|
||||
|
||||
for (int i = 1; i < 12; i++) {
|
||||
// Second bucket
|
||||
bucket = buckets.get(1);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat((long) bucket.getKey(), equalTo(1L));
|
||||
assertThat(bucket.getDocCount(), equalTo(1l));
|
||||
|
||||
avgAgg = bucket.getAggregations().get("avg");
|
||||
assertThat(avgAgg, notNullValue());
|
||||
assertThat(avgAgg.value(), equalTo(10d));
|
||||
|
||||
deriv = bucket.getAggregations().get("deriv");
|
||||
assertThat(deriv, notNullValue());
|
||||
assertThat(deriv.value(), equalTo(0d));
|
||||
|
||||
movAvgAgg = bucket.getAggregations().get("avg_movavg");
|
||||
assertThat(movAvgAgg, notNullValue());
|
||||
assertThat(movAvgAgg.value(), equalTo(10d));
|
||||
|
||||
derivMovAvg = bucket.getAggregations().get("deriv_movavg");
|
||||
assertThat(derivMovAvg, Matchers.nullValue()); // still null because of movavg delay
|
||||
|
||||
for (int i = 2; i < 12; i++) {
|
||||
bucket = buckets.get(i);
|
||||
assertThat(bucket, notNullValue());
|
||||
assertThat((long) bucket.getKey(), equalTo((long) i));
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.search.aggregations.pipeline.moving.avg;
|
|||
|
||||
import com.google.common.collect.EvictingQueue;
|
||||
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.pipeline.movavg.models.*;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
|
||||
|
@ -47,7 +46,10 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
double randValue = randomDouble();
|
||||
double expected = 0;
|
||||
|
||||
window.offer(randValue);
|
||||
if (i == 0) {
|
||||
window.offer(randValue);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (double value : window) {
|
||||
expected += value;
|
||||
|
@ -56,6 +58,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
|
||||
double actual = model.next(window);
|
||||
assertThat(Double.compare(expected, actual), equalTo(0));
|
||||
window.offer(randValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,7 +67,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
MovAvgModel model = new SimpleModel();
|
||||
|
||||
int windowSize = randomIntBetween(1, 50);
|
||||
int numPredictions = randomIntBetween(1,50);
|
||||
int numPredictions = randomIntBetween(1, 50);
|
||||
|
||||
EvictingQueue<Double> window = EvictingQueue.create(windowSize);
|
||||
for (int i = 0; i < windowSize; i++) {
|
||||
|
@ -73,13 +76,12 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
double actual[] = model.predict(window, numPredictions);
|
||||
|
||||
double expected[] = new double[numPredictions];
|
||||
for (int i = 0; i < numPredictions; i++) {
|
||||
for (double value : window) {
|
||||
expected[i] += value;
|
||||
}
|
||||
expected[i] /= window.size();
|
||||
window.offer(expected[i]);
|
||||
double t = 0;
|
||||
for (double value : window) {
|
||||
t += value;
|
||||
}
|
||||
t /= window.size();
|
||||
Arrays.fill(expected, t);
|
||||
|
||||
for (int i = 0; i < numPredictions; i++) {
|
||||
assertThat(Double.compare(expected[i], actual[i]), equalTo(0));
|
||||
|
@ -96,7 +98,11 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
EvictingQueue<Double> window = EvictingQueue.create(windowSize);
|
||||
for (int i = 0; i < numValues; i++) {
|
||||
double randValue = randomDouble();
|
||||
window.offer(randValue);
|
||||
|
||||
if (i == 0) {
|
||||
window.offer(randValue);
|
||||
continue;
|
||||
}
|
||||
|
||||
double avg = 0;
|
||||
long totalWeight = 1;
|
||||
|
@ -110,6 +116,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
double expected = avg / totalWeight;
|
||||
double actual = model.next(window);
|
||||
assertThat(Double.compare(expected, actual), equalTo(0));
|
||||
window.offer(randValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,19 +134,17 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
double actual[] = model.predict(window, numPredictions);
|
||||
double expected[] = new double[numPredictions];
|
||||
|
||||
for (int i = 0; i < numPredictions; i++) {
|
||||
double avg = 0;
|
||||
long totalWeight = 1;
|
||||
long current = 1;
|
||||
double avg = 0;
|
||||
long totalWeight = 1;
|
||||
long current = 1;
|
||||
|
||||
for (double value : window) {
|
||||
avg += value * current;
|
||||
totalWeight += current;
|
||||
current += 1;
|
||||
}
|
||||
expected[i] = avg / totalWeight;
|
||||
window.offer(expected[i]);
|
||||
for (double value : window) {
|
||||
avg += value * current;
|
||||
totalWeight += current;
|
||||
current += 1;
|
||||
}
|
||||
avg = avg / totalWeight;
|
||||
Arrays.fill(expected, avg);
|
||||
|
||||
for (int i = 0; i < numPredictions; i++) {
|
||||
assertThat(Double.compare(expected[i], actual[i]), equalTo(0));
|
||||
|
@ -157,7 +162,11 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
EvictingQueue<Double> window = EvictingQueue.create(windowSize);
|
||||
for (int i = 0; i < numValues; i++) {
|
||||
double randValue = randomDouble();
|
||||
window.offer(randValue);
|
||||
|
||||
if (i == 0) {
|
||||
window.offer(randValue);
|
||||
continue;
|
||||
}
|
||||
|
||||
double avg = 0;
|
||||
boolean first = true;
|
||||
|
@ -173,6 +182,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
double expected = avg;
|
||||
double actual = model.next(window);
|
||||
assertThat(Double.compare(expected, actual), equalTo(0));
|
||||
window.offer(randValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,21 +201,18 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
double actual[] = model.predict(window, numPredictions);
|
||||
double expected[] = new double[numPredictions];
|
||||
|
||||
for (int i = 0; i < numPredictions; i++) {
|
||||
double avg = 0;
|
||||
boolean first = true;
|
||||
double avg = 0;
|
||||
boolean first = true;
|
||||
|
||||
for (double value : window) {
|
||||
if (first) {
|
||||
avg = value;
|
||||
first = false;
|
||||
} else {
|
||||
avg = (value * alpha) + (avg * (1 - alpha));
|
||||
}
|
||||
for (double value : window) {
|
||||
if (first) {
|
||||
avg = value;
|
||||
first = false;
|
||||
} else {
|
||||
avg = (value * alpha) + (avg * (1 - alpha));
|
||||
}
|
||||
expected[i] = avg;
|
||||
window.offer(expected[i]);
|
||||
}
|
||||
Arrays.fill(expected, avg);
|
||||
|
||||
for (int i = 0; i < numPredictions; i++) {
|
||||
assertThat(Double.compare(expected[i], actual[i]), equalTo(0));
|
||||
|
@ -224,7 +231,11 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
EvictingQueue<Double> window = EvictingQueue.create(windowSize);
|
||||
for (int i = 0; i < numValues; i++) {
|
||||
double randValue = randomDouble();
|
||||
window.offer(randValue);
|
||||
|
||||
if (i == 0) {
|
||||
window.offer(randValue);
|
||||
continue;
|
||||
}
|
||||
|
||||
double s = 0;
|
||||
double last_s = 0;
|
||||
|
@ -253,6 +264,7 @@ public class MovAvgUnitTests extends ElasticsearchTestCase {
|
|||
double expected = s + (0 * b) ;
|
||||
double actual = model.next(window);
|
||||
assertThat(Double.compare(expected, actual), equalTo(0));
|
||||
window.offer(randValue);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue