Aggregations: add cost minimizer to moving_avg aggregation

This commit is contained in:
Zachary Tong 2015-06-22 19:46:07 -04:00
parent cbcc553912
commit 0f76e656dd
12 changed files with 810 additions and 101 deletions

View File

@ -36,6 +36,7 @@ public class MovAvgBuilder extends PipelineAggregatorBuilder<MovAvgBuilder> {
private MovAvgModelBuilder modelBuilder; private MovAvgModelBuilder modelBuilder;
private Integer window; private Integer window;
private Integer predict; private Integer predict;
private Boolean minimize;
public MovAvgBuilder(String name) { public MovAvgBuilder(String name) {
super(name, MovAvgPipelineAggregator.TYPE.name()); super(name, MovAvgPipelineAggregator.TYPE.name());
@ -94,6 +95,18 @@ public class MovAvgBuilder extends PipelineAggregatorBuilder<MovAvgBuilder> {
return this; return this;
} }
/**
* Determines if the model should be fit to the data using a cost
* minimizing algorithm.
*
* @param minimize If the model should be fit to the underlying data
* @return Returns the builder to continue chaining
*/
public MovAvgBuilder minimize(boolean minimize) {
this.minimize = minimize;
return this;
}
@Override @Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
@ -112,6 +125,9 @@ public class MovAvgBuilder extends PipelineAggregatorBuilder<MovAvgBuilder> {
if (predict != null) { if (predict != null) {
builder.field(MovAvgParser.PREDICT.getPreferredName(), predict); builder.field(MovAvgParser.PREDICT.getPreferredName(), predict);
} }
if (minimize != null) {
builder.field(MovAvgParser.MINIMIZE.getPreferredName(), minimize);
}
return builder; return builder;
} }

View File

@ -44,6 +44,7 @@ public class MovAvgParser implements PipelineAggregator.Parser {
public static final ParseField WINDOW = new ParseField("window"); public static final ParseField WINDOW = new ParseField("window");
public static final ParseField SETTINGS = new ParseField("settings"); public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField PREDICT = new ParseField("predict"); public static final ParseField PREDICT = new ParseField("predict");
public static final ParseField MINIMIZE = new ParseField("minimize");
private final MovAvgModelParserMapper movAvgModelParserMapper; private final MovAvgModelParserMapper movAvgModelParserMapper;
@ -69,6 +70,7 @@ public class MovAvgParser implements PipelineAggregator.Parser {
Map<String, Object> settings = null; Map<String, Object> settings = null;
String model = "simple"; String model = "simple";
int predict = 0; int predict = 0;
Boolean minimize = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) { if (token == XContentParser.Token.FIELD_NAME) {
@ -124,6 +126,13 @@ public class MovAvgParser implements PipelineAggregator.Parser {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation()); + currentFieldName + "].", parser.getTokenLocation());
} }
} else if (token == XContentParser.Token.VALUE_BOOLEAN) {
if (context.parseFieldMatcher().match(currentFieldName, MINIMIZE)) {
minimize = parser.booleanValue();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: ["
+ currentFieldName + "].", parser.getTokenLocation());
}
} else { } else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + pipelineAggregatorName + "].", throw new SearchParseException(context, "Unexpected token " + token + " in [" + pipelineAggregatorName + "].",
parser.getTokenLocation()); parser.getTokenLocation());
@ -155,9 +164,17 @@ public class MovAvgParser implements PipelineAggregator.Parser {
throw new SearchParseException(context, "Could not parse settings for model [" + model + "].", null, exception); throw new SearchParseException(context, "Could not parse settings for model [" + model + "].", null, exception);
} }
// If the user doesn't set a preference for cost minimization, ask what the model prefers
if (minimize == null) {
minimize = movAvgModel.minimizeByDefault();
} else if (minimize && !movAvgModel.canBeMinimized()) {
// If the user asks to minimize, but this model doesn't support it, throw exception
throw new SearchParseException(context, "The [" + model + "] model cannot be minimized.", null);
}
return new MovAvgPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter, gapPolicy, window, predict, return new MovAvgPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter, gapPolicy, window, predict,
movAvgModel); movAvgModel, minimize);
} }

View File

@ -48,6 +48,7 @@ import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.ListIterator;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
@ -81,18 +82,20 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
private int window; private int window;
private MovAvgModel model; private MovAvgModel model;
private int predict; private int predict;
private boolean minimize;
public MovAvgPipelineAggregator() { public MovAvgPipelineAggregator() {
} }
public MovAvgPipelineAggregator(String name, String[] bucketsPaths, ValueFormatter formatter, GapPolicy gapPolicy, public MovAvgPipelineAggregator(String name, String[] bucketsPaths, ValueFormatter formatter, GapPolicy gapPolicy,
int window, int predict, MovAvgModel model, Map<String, Object> metadata) { int window, int predict, MovAvgModel model, boolean minimize, Map<String, Object> metadata) {
super(name, bucketsPaths, metadata); super(name, bucketsPaths, metadata);
this.formatter = formatter; this.formatter = formatter;
this.gapPolicy = gapPolicy; this.gapPolicy = gapPolicy;
this.window = window; this.window = window;
this.model = model; this.model = model;
this.predict = predict; this.predict = predict;
this.minimize = minimize;
} }
@Override @Override
@ -113,6 +116,12 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
int lastValidPosition = 0; int lastValidPosition = 0;
int counter = 0; int counter = 0;
// Do we need to fit the model parameters to the data?
if (minimize) {
assert (model.canBeMinimized());
model = minimize(buckets, histo, model);
}
for (InternalHistogram.Bucket bucket : buckets) { for (InternalHistogram.Bucket bucket : buckets) {
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy); Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
@ -194,6 +203,60 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
return factory.create(newBuckets, histo); return factory.create(newBuckets, histo);
} }
private MovAvgModel minimize(List<? extends InternalHistogram.Bucket> buckets, InternalHistogram histo, MovAvgModel model) {
int counter = 0;
EvictingQueue<Double> values = EvictingQueue.create(window);
double[] test = new double[window];
ListIterator<? extends InternalHistogram.Bucket> iter = buckets.listIterator(buckets.size());
// We have to walk the iterator backwards because we don't know if/how many buckets are empty.
while (iter.hasPrevious() && counter < window) {
Double thisBucketValue = resolveBucketValue(histo, iter.previous(), bucketsPaths()[0], gapPolicy);
if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) {
test[window - counter - 1] = thisBucketValue;
counter += 1;
}
}
// If we didn't fill the test set, we don't have enough data to minimize.
// Just return the model with the starting coef
if (counter < window) {
return model;
}
//And do it again, for the train set. Unfortunately we have to fill an array and then
//fill an evicting queue backwards :(
counter = 0;
double[] train = new double[window];
while (iter.hasPrevious() && counter < window) {
Double thisBucketValue = resolveBucketValue(histo, iter.previous(), bucketsPaths()[0], gapPolicy);
if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) {
train[window - counter - 1] = thisBucketValue;
counter += 1;
}
}
// If we didn't fill the train set, we don't have enough data to minimize.
// Just return the model with the starting coef
if (counter < window) {
return model;
}
for (double v : train) {
values.add(v);
}
return SimulatedAnealingMinimizer.minimize(model, values, test);
}
@Override @Override
public void doReadFrom(StreamInput in) throws IOException { public void doReadFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in); formatter = ValueFormatterStreams.readOptional(in);
@ -201,6 +264,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
window = in.readVInt(); window = in.readVInt();
predict = in.readVInt(); predict = in.readVInt();
model = MovAvgModelStreams.read(in); model = MovAvgModelStreams.read(in);
minimize = in.readBoolean();
} }
@ -211,6 +275,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
out.writeVInt(window); out.writeVInt(window);
out.writeVInt(predict); out.writeVInt(predict);
model.writeTo(out); model.writeTo(out);
out.writeBoolean(minimize);
} }
@ -221,20 +286,22 @@ public class MovAvgPipelineAggregator extends PipelineAggregator {
private int window; private int window;
private MovAvgModel model; private MovAvgModel model;
private int predict; private int predict;
private boolean minimize;
public Factory(String name, String[] bucketsPaths, ValueFormatter formatter, GapPolicy gapPolicy, public Factory(String name, String[] bucketsPaths, ValueFormatter formatter, GapPolicy gapPolicy,
int window, int predict, MovAvgModel model) { int window, int predict, MovAvgModel model, boolean minimize) {
super(name, TYPE.name(), bucketsPaths); super(name, TYPE.name(), bucketsPaths);
this.formatter = formatter; this.formatter = formatter;
this.gapPolicy = gapPolicy; this.gapPolicy = gapPolicy;
this.window = window; this.window = window;
this.model = model; this.model = model;
this.predict = predict; this.predict = predict;
this.minimize = minimize;
} }
@Override @Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException { protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new MovAvgPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, window, predict, model, metaData); return new MovAvgPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, window, predict, model, minimize, metaData);
} }
@Override @Override

View File

@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.movavg;
import com.google.common.collect.EvictingQueue;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
/**
* A cost minimizer which will fit a MovAvgModel to the data.
*
* This optimizer uses naive simulated annealing. Random solutions in the problem space
* are generated, compared against the last period of data, and the least absolute deviation
* is recorded as a cost.
*
* If the new cost is better than the old cost, the new coefficients are chosen. If the new
* solution is worse, there is a temperature-dependent probability it will be randomly selected
* anyway. This allows the algo to sample the problem space widely. As iterations progress,
* the temperature decreases and the algorithm rejects poor solutions more regularly,
* theoretically honing in on a global minimum.
*/
public class SimulatedAnealingMinimizer {
/**
* Runs the simulated annealing algorithm and produces a model with new coefficients that, theoretically
* fit the data better and generalizes to future forecasts without overfitting.
*
* @param model The MovAvgModel to be optimized for
* @param train A training set provided to the model, which predictions will be
* generated from
* @param test A test set of data to compare the predictions against and derive
* a cost for the model
* @return A new, minimized model that (theoretically) better fits the data
*/
public static MovAvgModel minimize(MovAvgModel model, EvictingQueue<Double> train, double[] test) {
double temp = 1;
double minTemp = 0.0001;
int iterations = 100;
double alpha = 0.9;
MovAvgModel bestModel = model;
MovAvgModel oldModel = model;
double oldCost = cost(model, train, test);
double bestCost = oldCost;
while (temp > minTemp) {
for (int i = 0; i < iterations; i++) {
MovAvgModel newModel = oldModel.neighboringModel();
double newCost = cost(newModel, train, test);
double ap = acceptanceProbability(oldCost, newCost, temp);
if (ap > Math.random()) {
oldModel = newModel;
oldCost = newCost;
if (newCost < bestCost) {
bestCost = newCost;
bestModel = newModel;
}
}
}
temp *= alpha;
}
return bestModel;
}
/**
* If the new cost is better than old, return 1.0. Otherwise, return a double that increases
* as the two costs are closer to each other.
*
* @param oldCost Old model cost
* @param newCost New model cost
* @param temp Current annealing temperature
* @return The probability of accepting the new cost over the old
*/
private static double acceptanceProbability(double oldCost, double newCost, double temp) {
return newCost < oldCost ? 1.0 : Math.exp(-(newCost - oldCost) / temp);
}
/**
* Calculates the "cost" of a model. E.g. when run on the training data, how closely do the predictions
* match the test data
*
* Uses Least Absolute Differences to calculate error. Note that this is not scale free, but seems
* to work fairly well in practice
*
* @param model The MovAvgModel we are fitting
* @param train A training set of data given to the model, which will then generate predictions from
* @param test A test set of data to compare against the predictions
* @return A cost, or error, of the model
*/
private static double cost(MovAvgModel model, EvictingQueue<Double> train, double[] test) {
double error = 0;
double[] predictions = model.predict(train, test.length);
assert(predictions.length == test.length);
for (int i = 0; i < predictions.length; i++) {
error += Math.abs(test[i] - predictions[i]) ;
}
return error;
}
}

View File

@ -41,16 +41,33 @@ public class EwmaModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("ewma"); protected static final ParseField NAME_FIELD = new ParseField("ewma");
/** /**
* Controls smoothing of data. Alpha = 1 retains no memory of past values * Controls smoothing of data. Also known as "level" value.
* Alpha = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series). Useful values are somewhere in between * mean of the series).
*/ */
private double alpha; private final double alpha;
public EwmaModel(double alpha) { public EwmaModel(double alpha) {
this.alpha = alpha; this.alpha = alpha;
} }
@Override
public boolean canBeMinimized() {
return true;
}
@Override
public MovAvgModel neighboringModel() {
double alpha = Math.random();
return new EwmaModel(alpha);
}
@Override
public MovAvgModel clone() {
return new EwmaModel(this.alpha);
}
@Override @Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) { protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions]; double[] predictions = new double[numPredictions];
@ -105,8 +122,7 @@ public class EwmaModel extends MovAvgModel {
@Override @Override
public MovAvgModel parse(@Nullable Map<String, Object> settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException { public MovAvgModel parse(@Nullable Map<String, Object> settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException {
double alpha = parseDoubleParam(settings, "alpha", 0.5); double alpha = parseDoubleParam(settings, "alpha", 0.3);
return new EwmaModel(alpha); return new EwmaModel(alpha);
} }
@ -114,7 +130,7 @@ public class EwmaModel extends MovAvgModel {
public static class EWMAModelBuilder implements MovAvgModelBuilder { public static class EWMAModelBuilder implements MovAvgModelBuilder {
private double alpha = 0.5; private Double alpha;
/** /**
* Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values
@ -134,7 +150,10 @@ public class EwmaModel extends MovAvgModel {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName());
builder.startObject(MovAvgParser.SETTINGS.getPreferredName()); builder.startObject(MovAvgParser.SETTINGS.getPreferredName());
if (alpha != null) {
builder.field("alpha", alpha); builder.field("alpha", alpha);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -39,22 +39,50 @@ public class HoltLinearModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("holt"); protected static final ParseField NAME_FIELD = new ParseField("holt");
/** /**
* Controls smoothing of data. Alpha = 1 retains no memory of past values * Controls smoothing of data. Also known as "level" value.
* Alpha = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series). Useful values are somewhere in between * mean of the series).
*/ */
private double alpha; private final double alpha;
/** /**
* Equivalent to <code>alpha</code>, but controls the smoothing of the trend instead of the data * Controls smoothing of trend.
* Beta = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series).
*/ */
private double beta; private final double beta;
public HoltLinearModel(double alpha, double beta) { public HoltLinearModel(double alpha, double beta) {
this.alpha = alpha; this.alpha = alpha;
this.beta = beta; this.beta = beta;
} }
@Override
public boolean canBeMinimized() {
return true;
}
@Override
public MovAvgModel neighboringModel() {
double newValue = Math.random();
switch ((int) (Math.random() * 2)) {
case 0:
return new HoltLinearModel(newValue, this.beta);
case 1:
return new HoltLinearModel(this.alpha, newValue);
default:
assert (false): "Random value fell outside of range [0-1]";
return new HoltLinearModel(newValue, this.beta); // This should never technically happen...
}
}
@Override
public MovAvgModel clone() {
return new HoltLinearModel(this.alpha, this.beta);
}
/** /**
* Predicts the next `n` values in the series, using the smoothing model to generate new values. * Predicts the next `n` values in the series, using the smoothing model to generate new values.
* Unlike the other moving averages, Holt-Linear has forecasting/prediction built into the algorithm. * Unlike the other moving averages, Holt-Linear has forecasting/prediction built into the algorithm.
@ -154,16 +182,17 @@ public class HoltLinearModel extends MovAvgModel {
@Override @Override
public MovAvgModel parse(@Nullable Map<String, Object> settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException { public MovAvgModel parse(@Nullable Map<String, Object> settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException {
double alpha = parseDoubleParam(settings, "alpha", 0.5); double alpha = parseDoubleParam(settings, "alpha", 0.3);
double beta = parseDoubleParam(settings, "beta", 0.5); double beta = parseDoubleParam(settings, "beta", 0.1);
return new HoltLinearModel(alpha, beta); return new HoltLinearModel(alpha, beta);
} }
} }
public static class HoltLinearModelBuilder implements MovAvgModelBuilder { public static class HoltLinearModelBuilder implements MovAvgModelBuilder {
private double alpha = 0.5;
private double beta = 0.5; private Double alpha;
private Double beta;
/** /**
* Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values
@ -195,8 +224,15 @@ public class HoltLinearModel extends MovAvgModel {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName());
builder.startObject(MovAvgParser.SETTINGS.getPreferredName()); builder.startObject(MovAvgParser.SETTINGS.getPreferredName());
if (alpha != null) {
builder.field("alpha", alpha); builder.field("alpha", alpha);
}
if (beta != null) {
builder.field("beta", beta); builder.field("beta", beta);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -41,6 +41,47 @@ public class HoltWintersModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("holt_winters"); protected static final ParseField NAME_FIELD = new ParseField("holt_winters");
/**
* Controls smoothing of data. Also known as "level" value.
* Alpha = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series).
*/
private final double alpha;
/**
* Controls smoothing of trend.
* Beta = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series).
*/
private final double beta;
/**
* Controls smoothing of seasonality.
* Gamma = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series).
*/
private final double gamma;
/**
* Periodicity of the data
*/
private final int period;
/**
* Whether this is a multiplicative or additive HW
*/
private final SeasonalityType seasonalityType;
/**
* Padding is used to add a very small amount to values, so that zeroes do not interfere
* with multiplicative seasonality math (e.g. division by zero)
*/
private final boolean pad;
private final double padding;
public enum SeasonalityType { public enum SeasonalityType {
ADDITIVE((byte) 0, "add"), MULTIPLICATIVE((byte) 1, "mult"); ADDITIVE((byte) 0, "add"), MULTIPLICATIVE((byte) 1, "mult");
@ -116,27 +157,6 @@ public class HoltWintersModel extends MovAvgModel {
} }
/**
* Controls smoothing of data. Alpha = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series). Useful values are somewhere in between
*/
private double alpha;
/**
* Equivalent to <code>alpha</code>, but controls the smoothing of the trend instead of the data
*/
private double beta;
private double gamma;
private int period;
private SeasonalityType seasonalityType;
private boolean pad;
private double padding;
public HoltWintersModel(double alpha, double beta, double gamma, int period, SeasonalityType seasonalityType, boolean pad) { public HoltWintersModel(double alpha, double beta, double gamma, int period, SeasonalityType seasonalityType, boolean pad) {
this.alpha = alpha; this.alpha = alpha;
this.beta = beta; this.beta = beta;
@ -150,11 +170,41 @@ public class HoltWintersModel extends MovAvgModel {
this.padding = seasonalityType.equals(SeasonalityType.MULTIPLICATIVE) && pad ? 0.0000000001 : 0; this.padding = seasonalityType.equals(SeasonalityType.MULTIPLICATIVE) && pad ? 0.0000000001 : 0;
} }
@Override
public boolean minimizeByDefault() {
return true;
}
@Override @Override
public boolean hasValue(int windowLength) { public boolean canBeMinimized() {
return true;
}
@Override
public MovAvgModel neighboringModel() {
double newValue = Math.random();
switch ((int) (Math.random() * 3)) {
case 0:
return new HoltWintersModel(newValue, beta, gamma, period, seasonalityType, pad);
case 1:
return new HoltWintersModel(alpha, newValue, gamma, period, seasonalityType, pad);
case 2:
return new HoltWintersModel(alpha, beta, newValue, period, seasonalityType, pad);
default:
assert (false): "Random value fell outside of range [0-2]";
return new HoltWintersModel(newValue, beta, gamma, period, seasonalityType, pad); // This should never technically happen...
}
}
@Override
public MovAvgModel clone() {
return new HoltWintersModel(alpha, beta, gamma, period, seasonalityType, pad);
}
@Override
public boolean hasValue(int valuesAvailable) {
// We need at least (period * 2) data-points (e.g. two "seasons") // We need at least (period * 2) data-points (e.g. two "seasons")
return windowLength >= period * 2; return valuesAvailable >= period * 2;
} }
/** /**
@ -198,7 +248,7 @@ public class HoltWintersModel extends MovAvgModel {
// Smoothed value // Smoothed value
double s = 0; double s = 0;
double last_s = 0; double last_s;
// Trend value // Trend value
double b = 0; double b = 0;
@ -218,12 +268,11 @@ public class HoltWintersModel extends MovAvgModel {
// Calculate the slopes between first and second season for each period // Calculate the slopes between first and second season for each period
for (int i = 0; i < period; i++) { for (int i = 0; i < period; i++) {
s += vs[i]; s += vs[i];
b += (vs[i] - vs[i + period]) / 2; b += (vs[i + period] - vs[i]) / period;
} }
s /= (double) period; s /= (double) period;
b /= (double) period; b /= (double) period;
last_s = s; last_s = s;
last_b = b;
// Calculate first seasonal // Calculate first seasonal
if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) { if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) {
@ -247,7 +296,7 @@ public class HoltWintersModel extends MovAvgModel {
if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) { if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) {
seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period];
} else { } else {
seasonal[i] = gamma * (vs[i] - (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period];
} }
last_s = s; last_s = s;
@ -255,18 +304,15 @@ public class HoltWintersModel extends MovAvgModel {
} }
double[] forecastValues = new double[numForecasts]; double[] forecastValues = new double[numForecasts];
int seasonCounter = (values.size() - 1) - period; for (int i = 1; i <= numForecasts; i++) {
int idx = values.size() - period + ((i - 1) % period);
for (int i = 0; i < numForecasts; i++) {
// TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo? // TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo?
if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) { if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) {
forecastValues[i] = s + (i * b) * seasonal[seasonCounter % values.size()]; forecastValues[i-1] = (s + (i * b)) * seasonal[idx];
} else { } else {
forecastValues[i] = s + (i * b) + seasonal[seasonCounter % values.size()]; forecastValues[i-1] = s + (i * b) + seasonal[idx];
} }
seasonCounter += 1;
} }
return forecastValues; return forecastValues;
@ -312,9 +358,9 @@ public class HoltWintersModel extends MovAvgModel {
@Override @Override
public MovAvgModel parse(@Nullable Map<String, Object> settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException { public MovAvgModel parse(@Nullable Map<String, Object> settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException {
double alpha = parseDoubleParam(settings, "alpha", 0.5); double alpha = parseDoubleParam(settings, "alpha", 0.3);
double beta = parseDoubleParam(settings, "beta", 0.5); double beta = parseDoubleParam(settings, "beta", 0.1);
double gamma = parseDoubleParam(settings, "gamma", 0.5); double gamma = parseDoubleParam(settings, "gamma", 0.3);
int period = parseIntegerParam(settings, "period", 1); int period = parseIntegerParam(settings, "period", 1);
if (windowSize < 2 * period) { if (windowSize < 2 * period) {
@ -345,12 +391,12 @@ public class HoltWintersModel extends MovAvgModel {
public static class HoltWintersModelBuilder implements MovAvgModelBuilder { public static class HoltWintersModelBuilder implements MovAvgModelBuilder {
private double alpha = 0.5; private Double alpha;
private double beta = 0.5; private Double beta;
private double gamma = 0.5; private Double gamma;
private int period = 1; private Integer period;
private SeasonalityType seasonalityType = SeasonalityType.ADDITIVE; private SeasonalityType seasonalityType;
private boolean pad = true; private Boolean pad;
/** /**
* Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values
@ -402,12 +448,31 @@ public class HoltWintersModel extends MovAvgModel {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName());
builder.startObject(MovAvgParser.SETTINGS.getPreferredName()); builder.startObject(MovAvgParser.SETTINGS.getPreferredName());
builder.field("alpha", alpha);
builder.field("beta", beta); if (alpha != null) {
builder.field("gamma", gamma); builder.field("alpha", alpha);
builder.field("period", period); }
builder.field("type", seasonalityType.getName());
builder.field("pad", pad); if (beta != null) {
builder.field("beta", beta);
}
if (gamma != null) {
builder.field("gamma", gamma);
}
if (period != null) {
builder.field("period", period);
}
if (pad != null) {
builder.field("pad", pad);
}
if (seasonalityType != null) {
builder.field("seasonalityType", seasonalityType);
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }

View File

@ -42,6 +42,22 @@ public class LinearModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("linear"); protected static final ParseField NAME_FIELD = new ParseField("linear");
@Override
public boolean canBeMinimized() {
return false;
}
@Override
public MovAvgModel neighboringModel() {
return new LinearModel();
}
@Override
public MovAvgModel clone() {
return new LinearModel();
}
@Override @Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) { protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions]; double[] predictions = new double[numPredictions];

View File

@ -32,17 +32,42 @@ import java.util.Map;
public abstract class MovAvgModel { public abstract class MovAvgModel {
/**
* Should this model be fit to the data via a cost minimizing algorithm by default?
*
* @return
*/
public boolean minimizeByDefault() {
return false;
}
/**
* Returns if the model can be cost minimized. Not all models have parameters
* which can be tuned / optimized.
*
* @return
*/
public abstract boolean canBeMinimized();
/**
* Generates a "neighboring" model, where one of the tunable parameters has been
* randomly mutated within the allowed range. Used for minimization
*
* @return
*/
public abstract MovAvgModel neighboringModel();
/** /**
* Checks to see this model can produce a new value, without actually running the algo. * Checks to see this model can produce a new value, without actually running the algo.
* This can be used for models that have certain preconditions that need to be met in order * This can be used for models that have certain preconditions that need to be met in order
* to short-circuit execution * to short-circuit execution
* *
* @param windowLength Length of current window * @param valuesAvailable Number of values in the current window of values
* @return Returns `true` if calling next() will produce a value, `false` otherwise * @return Returns `true` if calling next() will produce a value, `false` otherwise
*/ */
public boolean hasValue(int windowLength) { public boolean hasValue(int valuesAvailable) {
// Default implementation can always provide a next() value // Default implementation can always provide a next() value
return windowLength > 0; return valuesAvailable > 0;
} }
/** /**
@ -85,6 +110,8 @@ public abstract class MovAvgModel {
/** /**
* Returns an empty set of predictions, filled with NaNs * Returns an empty set of predictions, filled with NaNs
* @param numPredictions Number of empty predictions to generate
* @return
*/ */
protected double[] emptyPredictions(int numPredictions) { protected double[] emptyPredictions(int numPredictions) {
double[] predictions = new double[numPredictions]; double[] predictions = new double[numPredictions];
@ -100,6 +127,13 @@ public abstract class MovAvgModel {
*/ */
public abstract void writeTo(StreamOutput out) throws IOException; public abstract void writeTo(StreamOutput out) throws IOException;
/**
* Clone the model, returning an exact copy
*
* @return
*/
public abstract MovAvgModel clone();
/** /**
* Abstract class which also provides some concrete parsing functionality. * Abstract class which also provides some concrete parsing functionality.
*/ */

View File

@ -40,6 +40,22 @@ public class SimpleModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("simple"); protected static final ParseField NAME_FIELD = new ParseField("simple");
@Override
public boolean canBeMinimized() {
return false;
}
@Override
public MovAvgModel neighboringModel() {
return new SimpleModel();
}
@Override
public MovAvgModel clone() {
return new SimpleModel();
}
@Override @Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) { protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
double[] predictions = new double[numPredictions]; double[] predictions = new double[numPredictions];

View File

@ -25,8 +25,7 @@ import com.google.common.collect.EvictingQueue;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket;
@ -45,7 +44,6 @@ import java.util.*;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.avg; import static org.elasticsearch.search.aggregations.AggregationBuilders.avg;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.max; import static org.elasticsearch.search.aggregations.AggregationBuilders.max;
import static org.elasticsearch.search.aggregations.AggregationBuilders.min; import static org.elasticsearch.search.aggregations.AggregationBuilders.min;
@ -55,8 +53,6 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorB
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.core.IsNull.notNullValue; import static org.hamcrest.core.IsNull.notNullValue;
import static org.hamcrest.core.IsNull.nullValue; import static org.hamcrest.core.IsNull.nullValue;
@ -83,7 +79,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
enum MovAvgType { enum MovAvgType {
SIMPLE ("simple"), LINEAR("linear"), EWMA("ewma"), HOLT("holt"), HOLT_WINTERS("holt_winters"); SIMPLE ("simple"), LINEAR("linear"), EWMA("ewma"), HOLT("holt"), HOLT_WINTERS("holt_winters"), HOLT_BIG_MINIMIZE("holt");
private final String name; private final String name;
@ -136,7 +132,12 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
for (MovAvgType type : MovAvgType.values()) { for (MovAvgType type : MovAvgType.values()) {
for (MetricTarget target : MetricTarget.values()) { for (MetricTarget target : MetricTarget.values()) {
setupExpected(type, target); if (type.equals(MovAvgType.HOLT_BIG_MINIMIZE)) {
setupExpected(type, target, numBuckets);
} else {
setupExpected(type, target, windowSize);
}
} }
} }
@ -169,7 +170,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
* @param type The moving average model to use * @param type The moving average model to use
* @param target The document field "target", e.g. _count or a field value * @param target The document field "target", e.g. _count or a field value
*/ */
private void setupExpected(MovAvgType type, MetricTarget target) { private void setupExpected(MovAvgType type, MetricTarget target, int windowSize) {
ArrayList<Double> values = new ArrayList<>(numBuckets); ArrayList<Double> values = new ArrayList<>(numBuckets);
EvictingQueue<Double> window = EvictingQueue.create(windowSize); EvictingQueue<Double> window = EvictingQueue.create(windowSize);
@ -209,6 +210,9 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
case HOLT: case HOLT:
values.add(holt(window)); values.add(holt(window));
break; break;
case HOLT_BIG_MINIMIZE:
values.add(holt(window));
break;
case HOLT_WINTERS: case HOLT_WINTERS:
// HW needs at least 2 periods of data to start // HW needs at least 2 periods of data to start
if (window.size() >= period * 2) { if (window.size() >= period * 2) {
@ -226,7 +230,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
window.offer(metricValue); window.offer(metricValue);
} }
testValues.put(type.toString() + "_" + target.toString(), values); testValues.put(type.name() + "_" + target.name(), values);
} }
/** /**
@ -349,12 +353,11 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
// Calculate the slopes between first and second season for each period // Calculate the slopes between first and second season for each period
for (int i = 0; i < period; i++) { for (int i = 0; i < period; i++) {
s += vs[i]; s += vs[i];
b += (vs[i] - vs[i + period]) / 2; b += (vs[i + period] - vs[i]) / period;
} }
s /= (double) period; s /= (double) period;
b /= (double) period; b /= (double) period;
last_s = s; last_s = s;
last_b = b;
// Calculate first seasonal // Calculate first seasonal
if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) { if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) {
@ -377,18 +380,20 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
if (seasonalityType.equals(HoltWintersModel.SeasonalityType.MULTIPLICATIVE)) { if (seasonalityType.equals(HoltWintersModel.SeasonalityType.MULTIPLICATIVE)) {
seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period];
} else { } else {
seasonal[i] = gamma * (vs[i] - (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period];
} }
last_s = s; last_s = s;
last_b = b; last_b = b;
} }
int seasonCounter = (window.size() - 1) - period; int idx = window.size() - period + (0 % period);
// TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo?
if (seasonalityType.equals(HoltWintersModel.SeasonalityType.MULTIPLICATIVE)) { if (seasonalityType.equals(HoltWintersModel.SeasonalityType.MULTIPLICATIVE)) {
return s + (0 * b) * seasonal[seasonCounter % window.size()]; return (s + (1 * b)) * seasonal[idx];
} else { } else {
return s + (0 * b) + seasonal[seasonCounter % window.size()]; return s + (1 * b) + seasonal[idx];
} }
} }
@ -425,8 +430,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
List<? extends Bucket> buckets = histo.getBuckets(); List<? extends Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
List<Double> expectedCounts = testValues.get(MovAvgType.SIMPLE.toString() + "_" + MetricTarget.COUNT.toString()); List<Double> expectedCounts = testValues.get(MovAvgType.SIMPLE.name() + "_" + MetricTarget.COUNT.name());
List<Double> expectedValues = testValues.get(MovAvgType.SIMPLE.toString() + "_" + MetricTarget.VALUE.toString()); List<Double> expectedValues = testValues.get(MovAvgType.SIMPLE.name() + "_" + MetricTarget.VALUE.name());
Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator(); Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator(); Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
@ -477,8 +482,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
List<? extends Bucket> buckets = histo.getBuckets(); List<? extends Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
List<Double> expectedCounts = testValues.get(MovAvgType.LINEAR.toString() + "_" + MetricTarget.COUNT.toString()); List<Double> expectedCounts = testValues.get(MovAvgType.LINEAR.name() + "_" + MetricTarget.COUNT.name());
List<Double> expectedValues = testValues.get(MovAvgType.LINEAR.toString() + "_" + MetricTarget.VALUE.toString()); List<Double> expectedValues = testValues.get(MovAvgType.LINEAR.name() + "_" + MetricTarget.VALUE.name());
Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator(); Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator(); Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
@ -529,8 +534,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
List<? extends Bucket> buckets = histo.getBuckets(); List<? extends Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
List<Double> expectedCounts = testValues.get(MovAvgType.EWMA.toString() + "_" + MetricTarget.COUNT.toString()); List<Double> expectedCounts = testValues.get(MovAvgType.EWMA.name() + "_" + MetricTarget.COUNT.name());
List<Double> expectedValues = testValues.get(MovAvgType.EWMA.toString() + "_" + MetricTarget.VALUE.toString()); List<Double> expectedValues = testValues.get(MovAvgType.EWMA.name() + "_" + MetricTarget.VALUE.name());
Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator(); Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator(); Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
@ -581,8 +586,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
List<? extends Bucket> buckets = histo.getBuckets(); List<? extends Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
List<Double> expectedCounts = testValues.get(MovAvgType.HOLT.toString() + "_" + MetricTarget.COUNT.toString()); List<Double> expectedCounts = testValues.get(MovAvgType.HOLT.name() + "_" + MetricTarget.COUNT.name());
List<Double> expectedValues = testValues.get(MovAvgType.HOLT.toString() + "_" + MetricTarget.VALUE.toString()); List<Double> expectedValues = testValues.get(MovAvgType.HOLT.name() + "_" + MetricTarget.VALUE.name());
Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator(); Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator(); Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
@ -618,12 +623,14 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
.modelBuilder(new HoltWintersModel.HoltWintersModelBuilder() .modelBuilder(new HoltWintersModel.HoltWintersModelBuilder()
.alpha(alpha).beta(beta).gamma(gamma).period(period).seasonalityType(seasonalityType)) .alpha(alpha).beta(beta).gamma(gamma).period(period).seasonalityType(seasonalityType))
.gapPolicy(gapPolicy) .gapPolicy(gapPolicy)
.minimize(false)
.setBucketsPaths("_count")) .setBucketsPaths("_count"))
.subAggregation(movingAvg("movavg_values") .subAggregation(movingAvg("movavg_values")
.window(windowSize) .window(windowSize)
.modelBuilder(new HoltWintersModel.HoltWintersModelBuilder() .modelBuilder(new HoltWintersModel.HoltWintersModelBuilder()
.alpha(alpha).beta(beta).gamma(gamma).period(period).seasonalityType(seasonalityType)) .alpha(alpha).beta(beta).gamma(gamma).period(period).seasonalityType(seasonalityType))
.gapPolicy(gapPolicy) .gapPolicy(gapPolicy)
.minimize(false)
.setBucketsPaths("the_metric")) .setBucketsPaths("the_metric"))
).execute().actionGet(); ).execute().actionGet();
@ -635,8 +642,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
List<? extends Bucket> buckets = histo.getBuckets(); List<? extends Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
List<Double> expectedCounts = testValues.get(MovAvgType.HOLT_WINTERS.toString() + "_" + MetricTarget.COUNT.toString()); List<Double> expectedCounts = testValues.get(MovAvgType.HOLT_WINTERS.name() + "_" + MetricTarget.COUNT.name());
List<Double> expectedValues = testValues.get(MovAvgType.HOLT_WINTERS.toString() + "_" + MetricTarget.VALUE.toString()); List<Double> expectedValues = testValues.get(MovAvgType.HOLT_WINTERS.name() + "_" + MetricTarget.VALUE.name());
Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator(); Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator(); Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
@ -1038,6 +1045,230 @@ public class MovAvgTests extends ElasticsearchIntegrationTest {
} }
@Test
public void HoltWintersMinimization() {
SearchResponse response = client()
.prepareSearch("idx").setTypes("type")
.addAggregation(
histogram("histo").field(INTERVAL_FIELD).interval(interval)
.extendedBounds(0L, (long) (interval * (numBuckets - 1)))
.subAggregation(metric)
.subAggregation(movingAvg("movavg_counts")
.window(windowSize)
.modelBuilder(new HoltWintersModel.HoltWintersModelBuilder()
.period(period).seasonalityType(seasonalityType))
.gapPolicy(gapPolicy)
.minimize(true)
.setBucketsPaths("_count"))
.subAggregation(movingAvg("movavg_values")
.window(windowSize)
.modelBuilder(new HoltWintersModel.HoltWintersModelBuilder()
.period(period).seasonalityType(seasonalityType))
.gapPolicy(gapPolicy)
.minimize(true)
.setBucketsPaths("the_metric"))
).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
List<Double> expectedCounts = testValues.get(MovAvgType.HOLT_WINTERS.name() + "_" + MetricTarget.COUNT.name());
List<Double> expectedValues = testValues.get(MovAvgType.HOLT_WINTERS.name() + "_" + MetricTarget.VALUE.name());
Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
Iterator<Double> expectedCountsIter = expectedCounts.iterator();
Iterator<Double> expectedValueIter = expectedValues.iterator();
// The minimizer is stochastic, so just make sure all the values coming back aren't null
while (actualIter.hasNext()) {
Histogram.Bucket actual = actualIter.next();
PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next();
Double expectedCount = expectedCountsIter.next();
Double expectedValue = expectedValueIter.next();
assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key));
assertThat("doc counts do not match", actual.getDocCount(), equalTo((long)expected.count));
SimpleValue countMovAvg = actual.getAggregations().get("movavg_counts");
SimpleValue valuesMovAvg = actual.getAggregations().get("movavg_values");
if (expectedCount == null) {
//this bucket wasn't supposed to have a value (empty, skipped, etc), so
//movavg should be null too
assertThat(countMovAvg, nullValue());
} else {
// Note that we don't compare against the mock values, since those are assuming
// a non-minimized set of coefficients. Just check for not-nullness
assertThat(countMovAvg, notNullValue());
}
if (expectedValue == null) {
//this bucket wasn't supposed to have a value (empty, skipped, etc), so
//movavg should be null too
assertThat(valuesMovAvg, nullValue());
} else {
// Note that we don't compare against the mock values, since those are assuming
// a non-minimized set of coefficients. Just check for not-nullness
assertThat(valuesMovAvg, notNullValue());
}
}
}
/**
* If the minimizer is turned on, but there isn't enough data to minimize with, it will simply use
* the default settings. Which means our mock histo will match the generated result (which it won't
* if the minimizer is actually working, since the coefficients will be different and thus generate different
* data)
*
* We can simulate this by setting the window size == size of histo
*/
@Test
public void minimizeNotEnoughData() {
SearchResponse response = client()
.prepareSearch("idx").setTypes("type")
.addAggregation(
histogram("histo").field(INTERVAL_FIELD).interval(interval)
.extendedBounds(0L, (long) (interval * (numBuckets - 1)))
.subAggregation(metric)
.subAggregation(movingAvg("movavg_counts")
.window(numBuckets)
.modelBuilder(new HoltLinearModel.HoltLinearModelBuilder().alpha(alpha).beta(beta))
.gapPolicy(gapPolicy)
.minimize(true)
.setBucketsPaths("_count"))
.subAggregation(movingAvg("movavg_values")
.window(numBuckets)
.modelBuilder(new HoltLinearModel.HoltLinearModelBuilder().alpha(alpha).beta(beta))
.gapPolicy(gapPolicy)
.minimize(true)
.setBucketsPaths("the_metric"))
).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size()));
List<Double> expectedCounts = testValues.get(MovAvgType.HOLT_BIG_MINIMIZE.name() + "_" + MetricTarget.COUNT.name());
List<Double> expectedValues = testValues.get(MovAvgType.HOLT_BIG_MINIMIZE.name() + "_" + MetricTarget.VALUE.name());
Iterator<? extends Histogram.Bucket> actualIter = buckets.iterator();
Iterator<PipelineAggregationHelperTests.MockBucket> expectedBucketIter = mockHisto.iterator();
Iterator<Double> expectedCountsIter = expectedCounts.iterator();
Iterator<Double> expectedValuesIter = expectedValues.iterator();
while (actualIter.hasNext()) {
assertValidIterators(expectedBucketIter, expectedCountsIter, expectedValuesIter);
Histogram.Bucket actual = actualIter.next();
PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next();
Double expectedCount = expectedCountsIter.next();
Double expectedValue = expectedValuesIter.next();
assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key));
assertThat("doc counts do not match", actual.getDocCount(), equalTo((long)expected.count));
assertBucketContents(actual, expectedCount, expectedValue);
}
}
/**
* Only some models can be minimized, should throw exception for: simple, linear
*/
@Test
public void checkIfNonTunableCanBeMinimized() {
try {
client()
.prepareSearch("idx").setTypes("type")
.addAggregation(
histogram("histo").field(INTERVAL_FIELD).interval(interval)
.extendedBounds(0L, (long) (interval * (numBuckets - 1)))
.subAggregation(metric)
.subAggregation(movingAvg("movavg_counts")
.window(numBuckets)
.modelBuilder(new SimpleModel.SimpleModelBuilder())
.gapPolicy(gapPolicy)
.minimize(true)
.setBucketsPaths("_count"))
).execute().actionGet();
fail("Simple Model cannot be minimized, but an exception was not thrown");
} catch (SearchPhaseExecutionException e) {
// All good
}
try {
client()
.prepareSearch("idx").setTypes("type")
.addAggregation(
histogram("histo").field(INTERVAL_FIELD).interval(interval)
.extendedBounds(0L, (long) (interval * (numBuckets - 1)))
.subAggregation(metric)
.subAggregation(movingAvg("movavg_counts")
.window(numBuckets)
.modelBuilder(new LinearModel.LinearModelBuilder())
.gapPolicy(gapPolicy)
.minimize(true)
.setBucketsPaths("_count"))
).execute().actionGet();
fail("Linear Model cannot be minimized, but an exception was not thrown");
} catch (SearchPhaseExecutionException e) {
// all good
}
}
/**
* These models are all minimizable, so they should not throw exceptions
*/
@Test
public void checkIfTunableCanBeMinimized() {
MovAvgModelBuilder[] builders = new MovAvgModelBuilder[]{
new EwmaModel.EWMAModelBuilder(),
new HoltLinearModel.HoltLinearModelBuilder(),
new HoltWintersModel.HoltWintersModelBuilder()
};
for (MovAvgModelBuilder builder : builders) {
try {
client()
.prepareSearch("idx").setTypes("type")
.addAggregation(
histogram("histo").field(INTERVAL_FIELD).interval(interval)
.extendedBounds(0L, (long) (interval * (numBuckets - 1)))
.subAggregation(metric)
.subAggregation(movingAvg("movavg_counts")
.window(numBuckets)
.modelBuilder(builder)
.gapPolicy(gapPolicy)
.minimize(true)
.setBucketsPaths("_count"))
).execute().actionGet();
} catch (SearchPhaseExecutionException e) {
fail("Model [" + builder.toString() + "] can be minimized, but an exception was thrown");
}
}
}
private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) { private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) {
if (!expectedBucketIter.hasNext()) { if (!expectedBucketIter.hasNext()) {

View File

@ -44,6 +44,8 @@ A `moving_avg` aggregation looks like this in isolation:
|`model` |The moving average weighting model that we wish to use |Optional |`simple` |`model` |The moving average weighting model that we wish to use |Optional |`simple`
|`gap_policy` |Determines what should happen when a gap in the data is encountered. |Optional |`insert_zero` |`gap_policy` |Determines what should happen when a gap in the data is encountered. |Optional |`insert_zero`
|`window` |The size of window to "slide" across the histogram. |Optional |`5` |`window` |The size of window to "slide" across the histogram. |Optional |`5`
|`minimize` |If the model should be algorithmically minimized. See <<movavg-minimizer, Minimization>> for more
details |Optional |`false` for most models
|`settings` |Model-specific settings, contents which differ depending on the model specified. |Optional | |`settings` |Model-specific settings, contents which differ depending on the model specified. |Optional |
|=== |===
@ -100,6 +102,7 @@ the values from a `simple` moving average tend to "lag" behind the real data.
"the_movavg":{ "the_movavg":{
"moving_avg":{ "moving_avg":{
"buckets_path": "the_sum", "buckets_path": "the_sum",
"window" : 30,
"model" : "simple" "model" : "simple"
} }
} }
@ -135,6 +138,7 @@ the "lag" behind the data's mean, since older points have less influence.
"the_movavg":{ "the_movavg":{
"moving_avg":{ "moving_avg":{
"buckets_path": "the_sum", "buckets_path": "the_sum",
"window" : 30,
"model" : "linear" "model" : "linear"
} }
} }
@ -165,7 +169,9 @@ setting. Small values make the weight decay slowly, which provides greater smoo
portion of the window. Larger valuers make the weight decay quickly, which reduces the impact of older values on the portion of the window. Larger valuers make the weight decay quickly, which reduces the impact of older values on the
moving average. This tends to make the moving average track the data more closely but with less smoothing. moving average. This tends to make the moving average track the data more closely but with less smoothing.
The default value of `alpha` is `0.5`, and the setting accepts any float from 0-1 inclusive. The default value of `alpha` is `0.3`, and the setting accepts any float from 0-1 inclusive.
The EWMA model can be <<movavg-minimizer, Minimized>>
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
@ -173,6 +179,7 @@ The default value of `alpha` is `0.5`, and the setting accepts any float from 0-
"the_movavg":{ "the_movavg":{
"moving_avg":{ "moving_avg":{
"buckets_path": "the_sum", "buckets_path": "the_sum",
"window" : 30,
"model" : "ewma", "model" : "ewma",
"settings" : { "settings" : {
"alpha" : 0.5 "alpha" : 0.5
@ -204,7 +211,9 @@ smoothed data). The trend value is also exponentially weighted.
Values are produced by multiplying the level and trend components. Values are produced by multiplying the level and trend components.
The default value of `alpha` and `beta` is `0.5`, and the settings accept any float from 0-1 inclusive. The default value of `alpha` is `0.3` and `beta` is `0.1`. The settings accept any float from 0-1 inclusive.
The Holt-Linear model can be <<movavg-minimizer, Minimized>>
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
@ -212,6 +221,7 @@ The default value of `alpha` and `beta` is `0.5`, and the settings accept any fl
"the_movavg":{ "the_movavg":{
"moving_avg":{ "moving_avg":{
"buckets_path": "the_sum", "buckets_path": "the_sum",
"window" : 30,
"model" : "holt", "model" : "holt",
"settings" : { "settings" : {
"alpha" : 0.5, "alpha" : 0.5,
@ -270,15 +280,18 @@ Additive seasonality is the default; it can also be specified by setting `"type"
when the seasonal affect is additive to your data. E.g. you could simply subtract the seasonal effect to "de-seasonalize" when the seasonal affect is additive to your data. E.g. you could simply subtract the seasonal effect to "de-seasonalize"
your data into a flat trend. your data into a flat trend.
The default value of `alpha`, `beta` and `gamma` is `0.5`, and the settings accept any float from 0-1 inclusive. The default values of `alpha` and `gamma` are `0.3` while `beta` is `0.1`. The settings accept any float from 0-1 inclusive.
The default value of `period` is `1`. The default value of `period` is `1`.
The additive Holt-Winters model can be <<movavg-minimizer, Minimized>>
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
{ {
"the_movavg":{ "the_movavg":{
"moving_avg":{ "moving_avg":{
"buckets_path": "the_sum", "buckets_path": "the_sum",
"window" : 30,
"model" : "holt_winters", "model" : "holt_winters",
"settings" : { "settings" : {
"type" : "add", "type" : "add",
@ -301,9 +314,11 @@ image::images/pipeline_movavg/triple.png[]
Multiplicative is specified by setting `"type": "mult"`. This variety is preferred when the seasonal affect is Multiplicative is specified by setting `"type": "mult"`. This variety is preferred when the seasonal affect is
multiplied against your data. E.g. if the seasonal affect is x5 the data, rather than simply adding to it. multiplied against your data. E.g. if the seasonal affect is x5 the data, rather than simply adding to it.
The default value of `alpha`, `beta` and `gamma` is `0.5`, and the settings accept any float from 0-1 inclusive. The default values of `alpha` and `gamma` are `0.3` while `beta` is `0.1`. The settings accept any float from 0-1 inclusive.
The default value of `period` is `1`. The default value of `period` is `1`.
The multiplicative Holt-Winters model can be <<movavg-minimizer, Minimized>>
[WARNING] [WARNING]
====== ======
Multiplicative Holt-Winters works by dividing each data point by the seasonal value. This is problematic if any of Multiplicative Holt-Winters works by dividing each data point by the seasonal value. This is problematic if any of
@ -319,6 +334,7 @@ you can disable this behavior with `pad: false`
"the_movavg":{ "the_movavg":{
"moving_avg":{ "moving_avg":{
"buckets_path": "the_sum", "buckets_path": "the_sum",
"window" : 30,
"model" : "holt_winters", "model" : "holt_winters",
"settings" : { "settings" : {
"type" : "mult", "type" : "mult",
@ -347,6 +363,7 @@ as your buckets:
"the_movavg":{ "the_movavg":{
"moving_avg":{ "moving_avg":{
"buckets_path": "the_sum", "buckets_path": "the_sum",
"window" : 30,
"model" : "simple", "model" : "simple",
"predict" 10 "predict" 10
} }
@ -381,3 +398,53 @@ fluctuations into the model:
[[holt_winters_prediction_global]] [[holt_winters_prediction_global]]
.Holt-Winters moving average with window of size 120, predict = 25, alpha = 0.8, beta = 0.2, gamma = 0.7, period = 30 .Holt-Winters moving average with window of size 120, predict = 25, alpha = 0.8, beta = 0.2, gamma = 0.7, period = 30
image::images/pipeline_movavg/triple_prediction.png[] image::images/pipeline_movavg/triple_prediction.png[]
[[movavg-minimizer]]
==== Minimization
Some of the models (EWMA, Holt-Linear, Holt-Winters) require one or more parameters to be configured. Parameter choice
can be tricky and sometimes non-intuitive. Furthermore, small deviations in these parameters can sometimes have a drastic
effect on the output moving average.
For that reason, the three "tunable" models can be algorithmically *minimized*. Minimization is a process where parameters
are tweaked until the predictions generated by the model closely match the output data. Minimization is not fullproof
and can be susceptible to overfitting, but it often gives better results than hand-tuning.
Minimization is disabled by default for `ewma` and `holt_linear`, while it is enabled by default for `holt_winters`.
Minimization is most useful with Holt-Winters, since it helps improve the accuracy of the predictions. EWMA and
Holt-Linear are not great predictors, and mostly used for smoothing data, so minimization is less useful on those
models.
Minimization is enabled/disabled via the `minimize` parameter:
[source,js]
--------------------------------------------------
{
"the_movavg":{
"moving_avg":{
"buckets_path": "the_sum",
"model" : "holt_winters",
"window" : 30,
"minimize" : true, <1>
"settings" : {
"period" : 7
}
}
}
--------------------------------------------------
<1> Minimization is enabled with the `minimize` parameter
When enabled, minimization will find the optimal values for `alpha`, `beta` and `gamma`. The user should still provide
appropriate values for `window`, `period` and `type`.
[WARNING]
======
Minimization works by running a stochastic process called *simulated annealing*. This process will usually generate
a good solution, but is not guaranteed to find the global optimum. It also requires some amount of additional
computational power, since the model needs to be re-run multiple times as the values are tweaked. The run-time of
minimization is linear to the size of the window being processed: excessively large windows may cause latency.
Finally, minimization fits the model to the last `n` values, where `n = window`. This generally produces
better forecasts into the future, since the parameters are tuned around the end of the series. It can, however, generate
poorer fitting moving averages at the beginning of the series.
======