From 0f76e656dd05eef1dcc8eff7012a90c6070fc402 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Mon, 22 Jun 2015 19:46:07 -0400 Subject: [PATCH] Aggregations: add cost minimizer to moving_avg aggregation --- .../pipeline/movavg/MovAvgBuilder.java | 16 + .../pipeline/movavg/MovAvgParser.java | 19 +- .../movavg/MovAvgPipelineAggregator.java | 73 ++++- .../movavg/SimulatedAnealingMinimizer.java | 125 ++++++++ .../pipeline/movavg/models/EwmaModel.java | 31 +- .../movavg/models/HoltLinearModel.java | 54 +++- .../movavg/models/HoltWintersModel.java | 163 +++++++--- .../pipeline/movavg/models/LinearModel.java | 16 + .../pipeline/movavg/models/MovAvgModel.java | 42 ++- .../pipeline/movavg/models/SimpleModel.java | 16 + .../pipeline/moving/avg/MovAvgTests.java | 281 ++++++++++++++++-- .../pipeline/movavg-aggregation.asciidoc | 75 ++++- 12 files changed, 810 insertions(+), 101 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgBuilder.java index ca9ca2b23ad..10adf5125a4 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgBuilder.java @@ -36,6 +36,7 @@ public class MovAvgBuilder extends PipelineAggregatorBuilder { private MovAvgModelBuilder modelBuilder; private Integer window; private Integer predict; + private Boolean minimize; public MovAvgBuilder(String name) { super(name, MovAvgPipelineAggregator.TYPE.name()); @@ -94,6 +95,18 @@ public class MovAvgBuilder extends PipelineAggregatorBuilder { return this; } + /** + * Determines if the model should be fit to the data using a cost + * minimizing algorithm. + * + * @param minimize If the model should be fit to the underlying data + * @return Returns the builder to continue chaining + */ + public MovAvgBuilder minimize(boolean minimize) { + this.minimize = minimize; + return this; + } + @Override protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { @@ -112,6 +125,9 @@ public class MovAvgBuilder extends PipelineAggregatorBuilder { if (predict != null) { builder.field(MovAvgParser.PREDICT.getPreferredName(), predict); } + if (minimize != null) { + builder.field(MovAvgParser.MINIMIZE.getPreferredName(), minimize); + } return builder; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgParser.java index ac297b7ce0a..58567357d56 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgParser.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgParser.java @@ -44,6 +44,7 @@ public class MovAvgParser implements PipelineAggregator.Parser { public static final ParseField WINDOW = new ParseField("window"); public static final ParseField SETTINGS = new ParseField("settings"); public static final ParseField PREDICT = new ParseField("predict"); + public static final ParseField MINIMIZE = new ParseField("minimize"); private final MovAvgModelParserMapper movAvgModelParserMapper; @@ -69,6 +70,7 @@ public class MovAvgParser implements PipelineAggregator.Parser { Map settings = null; String model = "simple"; int predict = 0; + Boolean minimize = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { @@ -124,6 +126,13 @@ public class MovAvgParser implements PipelineAggregator.Parser { throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "].", parser.getTokenLocation()); } + } else if (token == XContentParser.Token.VALUE_BOOLEAN) { + if (context.parseFieldMatcher().match(currentFieldName, MINIMIZE)) { + minimize = parser.booleanValue(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + + currentFieldName + "].", parser.getTokenLocation()); + } } else { throw new SearchParseException(context, "Unexpected token " + token + " in [" + pipelineAggregatorName + "].", parser.getTokenLocation()); @@ -155,9 +164,17 @@ public class MovAvgParser implements PipelineAggregator.Parser { throw new SearchParseException(context, "Could not parse settings for model [" + model + "].", null, exception); } + // If the user doesn't set a preference for cost minimization, ask what the model prefers + if (minimize == null) { + minimize = movAvgModel.minimizeByDefault(); + } else if (minimize && !movAvgModel.canBeMinimized()) { + // If the user asks to minimize, but this model doesn't support it, throw exception + throw new SearchParseException(context, "The [" + model + "] model cannot be minimized.", null); + } + return new MovAvgPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter, gapPolicy, window, predict, - movAvgModel); + movAvgModel, minimize); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java index a01a242db84..5b7a8675540 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/MovAvgPipelineAggregator.java @@ -48,6 +48,7 @@ import org.joda.time.DateTime; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.ListIterator; import java.util.Map; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; @@ -81,18 +82,20 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { private int window; private MovAvgModel model; private int predict; + private boolean minimize; public MovAvgPipelineAggregator() { } public MovAvgPipelineAggregator(String name, String[] bucketsPaths, ValueFormatter formatter, GapPolicy gapPolicy, - int window, int predict, MovAvgModel model, Map metadata) { + int window, int predict, MovAvgModel model, boolean minimize, Map metadata) { super(name, bucketsPaths, metadata); this.formatter = formatter; this.gapPolicy = gapPolicy; this.window = window; this.model = model; this.predict = predict; + this.minimize = minimize; } @Override @@ -113,6 +116,12 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { int lastValidPosition = 0; int counter = 0; + // Do we need to fit the model parameters to the data? + if (minimize) { + assert (model.canBeMinimized()); + model = minimize(buckets, histo, model); + } + for (InternalHistogram.Bucket bucket : buckets) { Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy); @@ -194,6 +203,60 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { return factory.create(newBuckets, histo); } + private MovAvgModel minimize(List buckets, InternalHistogram histo, MovAvgModel model) { + + int counter = 0; + EvictingQueue values = EvictingQueue.create(window); + + double[] test = new double[window]; + ListIterator iter = buckets.listIterator(buckets.size()); + + // We have to walk the iterator backwards because we don't know if/how many buckets are empty. + while (iter.hasPrevious() && counter < window) { + + Double thisBucketValue = resolveBucketValue(histo, iter.previous(), bucketsPaths()[0], gapPolicy); + + if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) { + test[window - counter - 1] = thisBucketValue; + counter += 1; + } + } + + // If we didn't fill the test set, we don't have enough data to minimize. + // Just return the model with the starting coef + if (counter < window) { + return model; + } + + //And do it again, for the train set. Unfortunately we have to fill an array and then + //fill an evicting queue backwards :( + + counter = 0; + double[] train = new double[window]; + + while (iter.hasPrevious() && counter < window) { + + Double thisBucketValue = resolveBucketValue(histo, iter.previous(), bucketsPaths()[0], gapPolicy); + + if (!(thisBucketValue == null || thisBucketValue.equals(Double.NaN))) { + train[window - counter - 1] = thisBucketValue; + counter += 1; + } + } + + // If we didn't fill the train set, we don't have enough data to minimize. + // Just return the model with the starting coef + if (counter < window) { + return model; + } + + for (double v : train) { + values.add(v); + } + + return SimulatedAnealingMinimizer.minimize(model, values, test); + } + @Override public void doReadFrom(StreamInput in) throws IOException { formatter = ValueFormatterStreams.readOptional(in); @@ -201,6 +264,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { window = in.readVInt(); predict = in.readVInt(); model = MovAvgModelStreams.read(in); + minimize = in.readBoolean(); } @@ -211,6 +275,7 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { out.writeVInt(window); out.writeVInt(predict); model.writeTo(out); + out.writeBoolean(minimize); } @@ -221,20 +286,22 @@ public class MovAvgPipelineAggregator extends PipelineAggregator { private int window; private MovAvgModel model; private int predict; + private boolean minimize; public Factory(String name, String[] bucketsPaths, ValueFormatter formatter, GapPolicy gapPolicy, - int window, int predict, MovAvgModel model) { + int window, int predict, MovAvgModel model, boolean minimize) { super(name, TYPE.name(), bucketsPaths); this.formatter = formatter; this.gapPolicy = gapPolicy; this.window = window; this.model = model; this.predict = predict; + this.minimize = minimize; } @Override protected PipelineAggregator createInternal(Map metaData) throws IOException { - return new MovAvgPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, window, predict, model, metaData); + return new MovAvgPipelineAggregator(name, bucketsPaths, formatter, gapPolicy, window, predict, model, minimize, metaData); } @Override diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java new file mode 100644 index 00000000000..bb04502f60c --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/SimulatedAnealingMinimizer.java @@ -0,0 +1,125 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.movavg; + +import com.google.common.collect.EvictingQueue; +import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel; + +/** + * A cost minimizer which will fit a MovAvgModel to the data. + * + * This optimizer uses naive simulated annealing. Random solutions in the problem space + * are generated, compared against the last period of data, and the least absolute deviation + * is recorded as a cost. + * + * If the new cost is better than the old cost, the new coefficients are chosen. If the new + * solution is worse, there is a temperature-dependent probability it will be randomly selected + * anyway. This allows the algo to sample the problem space widely. As iterations progress, + * the temperature decreases and the algorithm rejects poor solutions more regularly, + * theoretically honing in on a global minimum. + */ +public class SimulatedAnealingMinimizer { + + /** + * Runs the simulated annealing algorithm and produces a model with new coefficients that, theoretically + * fit the data better and generalizes to future forecasts without overfitting. + * + * @param model The MovAvgModel to be optimized for + * @param train A training set provided to the model, which predictions will be + * generated from + * @param test A test set of data to compare the predictions against and derive + * a cost for the model + * @return A new, minimized model that (theoretically) better fits the data + */ + public static MovAvgModel minimize(MovAvgModel model, EvictingQueue train, double[] test) { + + double temp = 1; + double minTemp = 0.0001; + int iterations = 100; + double alpha = 0.9; + + MovAvgModel bestModel = model; + MovAvgModel oldModel = model; + + double oldCost = cost(model, train, test); + double bestCost = oldCost; + + while (temp > minTemp) { + for (int i = 0; i < iterations; i++) { + MovAvgModel newModel = oldModel.neighboringModel(); + double newCost = cost(newModel, train, test); + + double ap = acceptanceProbability(oldCost, newCost, temp); + if (ap > Math.random()) { + oldModel = newModel; + oldCost = newCost; + + if (newCost < bestCost) { + bestCost = newCost; + bestModel = newModel; + } + } + } + + temp *= alpha; + } + + return bestModel; + } + + /** + * If the new cost is better than old, return 1.0. Otherwise, return a double that increases + * as the two costs are closer to each other. + * + * @param oldCost Old model cost + * @param newCost New model cost + * @param temp Current annealing temperature + * @return The probability of accepting the new cost over the old + */ + private static double acceptanceProbability(double oldCost, double newCost, double temp) { + return newCost < oldCost ? 1.0 : Math.exp(-(newCost - oldCost) / temp); + } + + /** + * Calculates the "cost" of a model. E.g. when run on the training data, how closely do the predictions + * match the test data + * + * Uses Least Absolute Differences to calculate error. Note that this is not scale free, but seems + * to work fairly well in practice + * + * @param model The MovAvgModel we are fitting + * @param train A training set of data given to the model, which will then generate predictions from + * @param test A test set of data to compare against the predictions + * @return A cost, or error, of the model + */ + private static double cost(MovAvgModel model, EvictingQueue train, double[] test) { + double error = 0; + double[] predictions = model.predict(train, test.length); + + assert(predictions.length == test.length); + + for (int i = 0; i < predictions.length; i++) { + error += Math.abs(test[i] - predictions[i]) ; + } + + return error; + } + +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java index 8c9853d9b98..728cb1697ed 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/EwmaModel.java @@ -41,16 +41,33 @@ public class EwmaModel extends MovAvgModel { protected static final ParseField NAME_FIELD = new ParseField("ewma"); /** - * Controls smoothing of data. Alpha = 1 retains no memory of past values + * Controls smoothing of data. Also known as "level" value. + * Alpha = 1 retains no memory of past values * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. - * mean of the series). Useful values are somewhere in between + * mean of the series). */ - private double alpha; + private final double alpha; public EwmaModel(double alpha) { this.alpha = alpha; } + @Override + public boolean canBeMinimized() { + return true; + } + + @Override + public MovAvgModel neighboringModel() { + double alpha = Math.random(); + return new EwmaModel(alpha); + } + + @Override + public MovAvgModel clone() { + return new EwmaModel(this.alpha); + } + @Override protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; @@ -105,8 +122,7 @@ public class EwmaModel extends MovAvgModel { @Override public MovAvgModel parse(@Nullable Map settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException { - double alpha = parseDoubleParam(settings, "alpha", 0.5); - + double alpha = parseDoubleParam(settings, "alpha", 0.3); return new EwmaModel(alpha); } @@ -114,7 +130,7 @@ public class EwmaModel extends MovAvgModel { public static class EWMAModelBuilder implements MovAvgModelBuilder { - private double alpha = 0.5; + private Double alpha; /** * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values @@ -134,7 +150,10 @@ public class EwmaModel extends MovAvgModel { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); builder.startObject(MovAvgParser.SETTINGS.getPreferredName()); + if (alpha != null) { builder.field("alpha", alpha); + } + builder.endObject(); return builder; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java index 4b2edc920b3..6a3b9500e94 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltLinearModel.java @@ -39,22 +39,50 @@ public class HoltLinearModel extends MovAvgModel { protected static final ParseField NAME_FIELD = new ParseField("holt"); /** - * Controls smoothing of data. Alpha = 1 retains no memory of past values + * Controls smoothing of data. Also known as "level" value. + * Alpha = 1 retains no memory of past values * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. - * mean of the series). Useful values are somewhere in between + * mean of the series). */ - private double alpha; + private final double alpha; /** - * Equivalent to alpha, but controls the smoothing of the trend instead of the data + * Controls smoothing of trend. + * Beta = 1 retains no memory of past values + * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. + * mean of the series). */ - private double beta; + private final double beta; public HoltLinearModel(double alpha, double beta) { this.alpha = alpha; this.beta = beta; } + @Override + public boolean canBeMinimized() { + return true; + } + + @Override + public MovAvgModel neighboringModel() { + double newValue = Math.random(); + switch ((int) (Math.random() * 2)) { + case 0: + return new HoltLinearModel(newValue, this.beta); + case 1: + return new HoltLinearModel(this.alpha, newValue); + default: + assert (false): "Random value fell outside of range [0-1]"; + return new HoltLinearModel(newValue, this.beta); // This should never technically happen... + } + } + + @Override + public MovAvgModel clone() { + return new HoltLinearModel(this.alpha, this.beta); + } + /** * Predicts the next `n` values in the series, using the smoothing model to generate new values. * Unlike the other moving averages, Holt-Linear has forecasting/prediction built into the algorithm. @@ -154,16 +182,17 @@ public class HoltLinearModel extends MovAvgModel { @Override public MovAvgModel parse(@Nullable Map settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException { - double alpha = parseDoubleParam(settings, "alpha", 0.5); - double beta = parseDoubleParam(settings, "beta", 0.5); + double alpha = parseDoubleParam(settings, "alpha", 0.3); + double beta = parseDoubleParam(settings, "beta", 0.1); return new HoltLinearModel(alpha, beta); } } public static class HoltLinearModelBuilder implements MovAvgModelBuilder { - private double alpha = 0.5; - private double beta = 0.5; + + private Double alpha; + private Double beta; /** * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values @@ -195,8 +224,15 @@ public class HoltLinearModel extends MovAvgModel { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); builder.startObject(MovAvgParser.SETTINGS.getPreferredName()); + + if (alpha != null) { builder.field("alpha", alpha); + } + + if (beta != null) { builder.field("beta", beta); + } + builder.endObject(); return builder; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java index 5d9b869afbc..d8f2275f117 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/HoltWintersModel.java @@ -41,6 +41,47 @@ public class HoltWintersModel extends MovAvgModel { protected static final ParseField NAME_FIELD = new ParseField("holt_winters"); + /** + * Controls smoothing of data. Also known as "level" value. + * Alpha = 1 retains no memory of past values + * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. + * mean of the series). + */ + private final double alpha; + + /** + * Controls smoothing of trend. + * Beta = 1 retains no memory of past values + * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. + * mean of the series). + */ + private final double beta; + + /** + * Controls smoothing of seasonality. + * Gamma = 1 retains no memory of past values + * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. + * mean of the series). + */ + private final double gamma; + + /** + * Periodicity of the data + */ + private final int period; + + /** + * Whether this is a multiplicative or additive HW + */ + private final SeasonalityType seasonalityType; + + /** + * Padding is used to add a very small amount to values, so that zeroes do not interfere + * with multiplicative seasonality math (e.g. division by zero) + */ + private final boolean pad; + private final double padding; + public enum SeasonalityType { ADDITIVE((byte) 0, "add"), MULTIPLICATIVE((byte) 1, "mult"); @@ -116,27 +157,6 @@ public class HoltWintersModel extends MovAvgModel { } - /** - * Controls smoothing of data. Alpha = 1 retains no memory of past values - * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. - * mean of the series). Useful values are somewhere in between - */ - private double alpha; - - /** - * Equivalent to alpha, but controls the smoothing of the trend instead of the data - */ - private double beta; - - private double gamma; - - private int period; - - private SeasonalityType seasonalityType; - - private boolean pad; - private double padding; - public HoltWintersModel(double alpha, double beta, double gamma, int period, SeasonalityType seasonalityType, boolean pad) { this.alpha = alpha; this.beta = beta; @@ -150,11 +170,41 @@ public class HoltWintersModel extends MovAvgModel { this.padding = seasonalityType.equals(SeasonalityType.MULTIPLICATIVE) && pad ? 0.0000000001 : 0; } + @Override + public boolean minimizeByDefault() { + return true; + } @Override - public boolean hasValue(int windowLength) { + public boolean canBeMinimized() { + return true; + } + + @Override + public MovAvgModel neighboringModel() { + double newValue = Math.random(); + switch ((int) (Math.random() * 3)) { + case 0: + return new HoltWintersModel(newValue, beta, gamma, period, seasonalityType, pad); + case 1: + return new HoltWintersModel(alpha, newValue, gamma, period, seasonalityType, pad); + case 2: + return new HoltWintersModel(alpha, beta, newValue, period, seasonalityType, pad); + default: + assert (false): "Random value fell outside of range [0-2]"; + return new HoltWintersModel(newValue, beta, gamma, period, seasonalityType, pad); // This should never technically happen... + } + } + + @Override + public MovAvgModel clone() { + return new HoltWintersModel(alpha, beta, gamma, period, seasonalityType, pad); + } + + @Override + public boolean hasValue(int valuesAvailable) { // We need at least (period * 2) data-points (e.g. two "seasons") - return windowLength >= period * 2; + return valuesAvailable >= period * 2; } /** @@ -198,7 +248,7 @@ public class HoltWintersModel extends MovAvgModel { // Smoothed value double s = 0; - double last_s = 0; + double last_s; // Trend value double b = 0; @@ -218,12 +268,11 @@ public class HoltWintersModel extends MovAvgModel { // Calculate the slopes between first and second season for each period for (int i = 0; i < period; i++) { s += vs[i]; - b += (vs[i] - vs[i + period]) / 2; + b += (vs[i + period] - vs[i]) / period; } s /= (double) period; b /= (double) period; last_s = s; - last_b = b; // Calculate first seasonal if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) { @@ -247,7 +296,7 @@ public class HoltWintersModel extends MovAvgModel { if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) { seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; } else { - seasonal[i] = gamma * (vs[i] - (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; + seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period]; } last_s = s; @@ -255,18 +304,15 @@ public class HoltWintersModel extends MovAvgModel { } double[] forecastValues = new double[numForecasts]; - int seasonCounter = (values.size() - 1) - period; - - for (int i = 0; i < numForecasts; i++) { + for (int i = 1; i <= numForecasts; i++) { + int idx = values.size() - period + ((i - 1) % period); // TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo? if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) { - forecastValues[i] = s + (i * b) * seasonal[seasonCounter % values.size()]; + forecastValues[i-1] = (s + (i * b)) * seasonal[idx]; } else { - forecastValues[i] = s + (i * b) + seasonal[seasonCounter % values.size()]; + forecastValues[i-1] = s + (i * b) + seasonal[idx]; } - - seasonCounter += 1; } return forecastValues; @@ -312,9 +358,9 @@ public class HoltWintersModel extends MovAvgModel { @Override public MovAvgModel parse(@Nullable Map settings, String pipelineName, int windowSize, ParseFieldMatcher parseFieldMatcher) throws ParseException { - double alpha = parseDoubleParam(settings, "alpha", 0.5); - double beta = parseDoubleParam(settings, "beta", 0.5); - double gamma = parseDoubleParam(settings, "gamma", 0.5); + double alpha = parseDoubleParam(settings, "alpha", 0.3); + double beta = parseDoubleParam(settings, "beta", 0.1); + double gamma = parseDoubleParam(settings, "gamma", 0.3); int period = parseIntegerParam(settings, "period", 1); if (windowSize < 2 * period) { @@ -345,12 +391,12 @@ public class HoltWintersModel extends MovAvgModel { public static class HoltWintersModelBuilder implements MovAvgModelBuilder { - private double alpha = 0.5; - private double beta = 0.5; - private double gamma = 0.5; - private int period = 1; - private SeasonalityType seasonalityType = SeasonalityType.ADDITIVE; - private boolean pad = true; + private Double alpha; + private Double beta; + private Double gamma; + private Integer period; + private SeasonalityType seasonalityType; + private Boolean pad; /** * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values @@ -402,12 +448,31 @@ public class HoltWintersModel extends MovAvgModel { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); builder.startObject(MovAvgParser.SETTINGS.getPreferredName()); - builder.field("alpha", alpha); - builder.field("beta", beta); - builder.field("gamma", gamma); - builder.field("period", period); - builder.field("type", seasonalityType.getName()); - builder.field("pad", pad); + + if (alpha != null) { + builder.field("alpha", alpha); + } + + if (beta != null) { + builder.field("beta", beta); + } + + if (gamma != null) { + builder.field("gamma", gamma); + } + + if (period != null) { + builder.field("period", period); + } + + if (pad != null) { + builder.field("pad", pad); + } + + if (seasonalityType != null) { + builder.field("seasonalityType", seasonalityType); + } + builder.endObject(); return builder; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java index 5d81f719e5d..f6c5a2be407 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/LinearModel.java @@ -42,6 +42,22 @@ public class LinearModel extends MovAvgModel { protected static final ParseField NAME_FIELD = new ParseField("linear"); + + @Override + public boolean canBeMinimized() { + return false; + } + + @Override + public MovAvgModel neighboringModel() { + return new LinearModel(); + } + + @Override + public MovAvgModel clone() { + return new LinearModel(); + } + @Override protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java index a29e64d5398..1a085b37620 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/MovAvgModel.java @@ -32,17 +32,42 @@ import java.util.Map; public abstract class MovAvgModel { + /** + * Should this model be fit to the data via a cost minimizing algorithm by default? + * + * @return + */ + public boolean minimizeByDefault() { + return false; + } + + /** + * Returns if the model can be cost minimized. Not all models have parameters + * which can be tuned / optimized. + * + * @return + */ + public abstract boolean canBeMinimized(); + + /** + * Generates a "neighboring" model, where one of the tunable parameters has been + * randomly mutated within the allowed range. Used for minimization + * + * @return + */ + public abstract MovAvgModel neighboringModel(); + /** * Checks to see this model can produce a new value, without actually running the algo. * This can be used for models that have certain preconditions that need to be met in order * to short-circuit execution * - * @param windowLength Length of current window - * @return Returns `true` if calling next() will produce a value, `false` otherwise + * @param valuesAvailable Number of values in the current window of values + * @return Returns `true` if calling next() will produce a value, `false` otherwise */ - public boolean hasValue(int windowLength) { + public boolean hasValue(int valuesAvailable) { // Default implementation can always provide a next() value - return windowLength > 0; + return valuesAvailable > 0; } /** @@ -85,6 +110,8 @@ public abstract class MovAvgModel { /** * Returns an empty set of predictions, filled with NaNs + * @param numPredictions Number of empty predictions to generate + * @return */ protected double[] emptyPredictions(int numPredictions) { double[] predictions = new double[numPredictions]; @@ -100,6 +127,13 @@ public abstract class MovAvgModel { */ public abstract void writeTo(StreamOutput out) throws IOException; + /** + * Clone the model, returning an exact copy + * + * @return + */ + public abstract MovAvgModel clone(); + /** * Abstract class which also provides some concrete parsing functionality. */ diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java index 17582553c82..8d375561dd3 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/movavg/models/SimpleModel.java @@ -40,6 +40,22 @@ public class SimpleModel extends MovAvgModel { protected static final ParseField NAME_FIELD = new ParseField("simple"); + + @Override + public boolean canBeMinimized() { + return false; + } + + @Override + public MovAvgModel neighboringModel() { + return new SimpleModel(); + } + + @Override + public MovAvgModel clone() { + return new SimpleModel(); + } + @Override protected double[] doPredict(Collection values, int numPredictions) { double[] predictions = new double[numPredictions]; diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java index 3359e538783..81a065455ab 100644 --- a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/moving/avg/MovAvgTests.java @@ -25,8 +25,7 @@ import com.google.common.collect.EvictingQueue; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.index.query.RangeQueryBuilder; -import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter; +import org.elasticsearch.search.SearchParseException; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket; @@ -45,7 +44,6 @@ import java.util.*; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.search.aggregations.AggregationBuilders.avg; -import static org.elasticsearch.search.aggregations.AggregationBuilders.filter; import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; import static org.elasticsearch.search.aggregations.AggregationBuilders.max; import static org.elasticsearch.search.aggregations.AggregationBuilders.min; @@ -55,8 +53,6 @@ import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorB import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.core.IsNull.notNullValue; import static org.hamcrest.core.IsNull.nullValue; @@ -83,7 +79,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { enum MovAvgType { - SIMPLE ("simple"), LINEAR("linear"), EWMA("ewma"), HOLT("holt"), HOLT_WINTERS("holt_winters"); + SIMPLE ("simple"), LINEAR("linear"), EWMA("ewma"), HOLT("holt"), HOLT_WINTERS("holt_winters"), HOLT_BIG_MINIMIZE("holt"); private final String name; @@ -136,7 +132,12 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { for (MovAvgType type : MovAvgType.values()) { for (MetricTarget target : MetricTarget.values()) { - setupExpected(type, target); + if (type.equals(MovAvgType.HOLT_BIG_MINIMIZE)) { + setupExpected(type, target, numBuckets); + } else { + setupExpected(type, target, windowSize); + } + } } @@ -169,7 +170,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { * @param type The moving average model to use * @param target The document field "target", e.g. _count or a field value */ - private void setupExpected(MovAvgType type, MetricTarget target) { + private void setupExpected(MovAvgType type, MetricTarget target, int windowSize) { ArrayList values = new ArrayList<>(numBuckets); EvictingQueue window = EvictingQueue.create(windowSize); @@ -209,6 +210,9 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { case HOLT: values.add(holt(window)); break; + case HOLT_BIG_MINIMIZE: + values.add(holt(window)); + break; case HOLT_WINTERS: // HW needs at least 2 periods of data to start if (window.size() >= period * 2) { @@ -226,7 +230,7 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { window.offer(metricValue); } - testValues.put(type.toString() + "_" + target.toString(), values); + testValues.put(type.name() + "_" + target.name(), values); } /** @@ -349,12 +353,11 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { // Calculate the slopes between first and second season for each period for (int i = 0; i < period; i++) { s += vs[i]; - b += (vs[i] - vs[i + period]) / 2; + b += (vs[i + period] - vs[i]) / period; } s /= (double) period; b /= (double) period; last_s = s; - last_b = b; // Calculate first seasonal if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) { @@ -377,18 +380,20 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { if (seasonalityType.equals(HoltWintersModel.SeasonalityType.MULTIPLICATIVE)) { seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; } else { - seasonal[i] = gamma * (vs[i] - (last_s + last_b )) + (1 - gamma) * seasonal[i - period]; + seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period]; } last_s = s; last_b = b; } - int seasonCounter = (window.size() - 1) - period; + int idx = window.size() - period + (0 % period); + + // TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo? if (seasonalityType.equals(HoltWintersModel.SeasonalityType.MULTIPLICATIVE)) { - return s + (0 * b) * seasonal[seasonCounter % window.size()]; + return (s + (1 * b)) * seasonal[idx]; } else { - return s + (0 * b) + seasonal[seasonCounter % window.size()]; + return s + (1 * b) + seasonal[idx]; } } @@ -425,8 +430,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { List buckets = histo.getBuckets(); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); - List expectedCounts = testValues.get(MovAvgType.SIMPLE.toString() + "_" + MetricTarget.COUNT.toString()); - List expectedValues = testValues.get(MovAvgType.SIMPLE.toString() + "_" + MetricTarget.VALUE.toString()); + List expectedCounts = testValues.get(MovAvgType.SIMPLE.name() + "_" + MetricTarget.COUNT.name()); + List expectedValues = testValues.get(MovAvgType.SIMPLE.name() + "_" + MetricTarget.VALUE.name()); Iterator actualIter = buckets.iterator(); Iterator expectedBucketIter = mockHisto.iterator(); @@ -477,8 +482,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { List buckets = histo.getBuckets(); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); - List expectedCounts = testValues.get(MovAvgType.LINEAR.toString() + "_" + MetricTarget.COUNT.toString()); - List expectedValues = testValues.get(MovAvgType.LINEAR.toString() + "_" + MetricTarget.VALUE.toString()); + List expectedCounts = testValues.get(MovAvgType.LINEAR.name() + "_" + MetricTarget.COUNT.name()); + List expectedValues = testValues.get(MovAvgType.LINEAR.name() + "_" + MetricTarget.VALUE.name()); Iterator actualIter = buckets.iterator(); Iterator expectedBucketIter = mockHisto.iterator(); @@ -529,8 +534,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { List buckets = histo.getBuckets(); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); - List expectedCounts = testValues.get(MovAvgType.EWMA.toString() + "_" + MetricTarget.COUNT.toString()); - List expectedValues = testValues.get(MovAvgType.EWMA.toString() + "_" + MetricTarget.VALUE.toString()); + List expectedCounts = testValues.get(MovAvgType.EWMA.name() + "_" + MetricTarget.COUNT.name()); + List expectedValues = testValues.get(MovAvgType.EWMA.name() + "_" + MetricTarget.VALUE.name()); Iterator actualIter = buckets.iterator(); Iterator expectedBucketIter = mockHisto.iterator(); @@ -581,8 +586,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { List buckets = histo.getBuckets(); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); - List expectedCounts = testValues.get(MovAvgType.HOLT.toString() + "_" + MetricTarget.COUNT.toString()); - List expectedValues = testValues.get(MovAvgType.HOLT.toString() + "_" + MetricTarget.VALUE.toString()); + List expectedCounts = testValues.get(MovAvgType.HOLT.name() + "_" + MetricTarget.COUNT.name()); + List expectedValues = testValues.get(MovAvgType.HOLT.name() + "_" + MetricTarget.VALUE.name()); Iterator actualIter = buckets.iterator(); Iterator expectedBucketIter = mockHisto.iterator(); @@ -618,12 +623,14 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { .modelBuilder(new HoltWintersModel.HoltWintersModelBuilder() .alpha(alpha).beta(beta).gamma(gamma).period(period).seasonalityType(seasonalityType)) .gapPolicy(gapPolicy) + .minimize(false) .setBucketsPaths("_count")) .subAggregation(movingAvg("movavg_values") .window(windowSize) .modelBuilder(new HoltWintersModel.HoltWintersModelBuilder() .alpha(alpha).beta(beta).gamma(gamma).period(period).seasonalityType(seasonalityType)) .gapPolicy(gapPolicy) + .minimize(false) .setBucketsPaths("the_metric")) ).execute().actionGet(); @@ -635,8 +642,8 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { List buckets = histo.getBuckets(); assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); - List expectedCounts = testValues.get(MovAvgType.HOLT_WINTERS.toString() + "_" + MetricTarget.COUNT.toString()); - List expectedValues = testValues.get(MovAvgType.HOLT_WINTERS.toString() + "_" + MetricTarget.VALUE.toString()); + List expectedCounts = testValues.get(MovAvgType.HOLT_WINTERS.name() + "_" + MetricTarget.COUNT.name()); + List expectedValues = testValues.get(MovAvgType.HOLT_WINTERS.name() + "_" + MetricTarget.VALUE.name()); Iterator actualIter = buckets.iterator(); Iterator expectedBucketIter = mockHisto.iterator(); @@ -1038,6 +1045,230 @@ public class MovAvgTests extends ElasticsearchIntegrationTest { } + @Test + public void HoltWintersMinimization() { + + SearchResponse response = client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingAvg("movavg_counts") + .window(windowSize) + .modelBuilder(new HoltWintersModel.HoltWintersModelBuilder() + .period(period).seasonalityType(seasonalityType)) + .gapPolicy(gapPolicy) + .minimize(true) + .setBucketsPaths("_count")) + .subAggregation(movingAvg("movavg_values") + .window(windowSize) + .modelBuilder(new HoltWintersModel.HoltWintersModelBuilder() + .period(period).seasonalityType(seasonalityType)) + .gapPolicy(gapPolicy) + .minimize(true) + .setBucketsPaths("the_metric")) + ).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); + + + List expectedCounts = testValues.get(MovAvgType.HOLT_WINTERS.name() + "_" + MetricTarget.COUNT.name()); + List expectedValues = testValues.get(MovAvgType.HOLT_WINTERS.name() + "_" + MetricTarget.VALUE.name()); + + Iterator actualIter = buckets.iterator(); + Iterator expectedBucketIter = mockHisto.iterator(); + Iterator expectedCountsIter = expectedCounts.iterator(); + Iterator expectedValueIter = expectedValues.iterator(); + + // The minimizer is stochastic, so just make sure all the values coming back aren't null + while (actualIter.hasNext()) { + + Histogram.Bucket actual = actualIter.next(); + PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next(); + Double expectedCount = expectedCountsIter.next(); + Double expectedValue = expectedValueIter.next(); + + assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key)); + assertThat("doc counts do not match", actual.getDocCount(), equalTo((long)expected.count)); + + SimpleValue countMovAvg = actual.getAggregations().get("movavg_counts"); + SimpleValue valuesMovAvg = actual.getAggregations().get("movavg_values"); + + if (expectedCount == null) { + //this bucket wasn't supposed to have a value (empty, skipped, etc), so + //movavg should be null too + assertThat(countMovAvg, nullValue()); + } else { + + // Note that we don't compare against the mock values, since those are assuming + // a non-minimized set of coefficients. Just check for not-nullness + assertThat(countMovAvg, notNullValue()); + } + + if (expectedValue == null) { + //this bucket wasn't supposed to have a value (empty, skipped, etc), so + //movavg should be null too + assertThat(valuesMovAvg, nullValue()); + } else { + + // Note that we don't compare against the mock values, since those are assuming + // a non-minimized set of coefficients. Just check for not-nullness + assertThat(valuesMovAvg, notNullValue()); + } + } + + } + + + /** + * If the minimizer is turned on, but there isn't enough data to minimize with, it will simply use + * the default settings. Which means our mock histo will match the generated result (which it won't + * if the minimizer is actually working, since the coefficients will be different and thus generate different + * data) + * + * We can simulate this by setting the window size == size of histo + */ + @Test + public void minimizeNotEnoughData() { + + SearchResponse response = client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingAvg("movavg_counts") + .window(numBuckets) + .modelBuilder(new HoltLinearModel.HoltLinearModelBuilder().alpha(alpha).beta(beta)) + .gapPolicy(gapPolicy) + .minimize(true) + .setBucketsPaths("_count")) + .subAggregation(movingAvg("movavg_values") + .window(numBuckets) + .modelBuilder(new HoltLinearModel.HoltLinearModelBuilder().alpha(alpha).beta(beta)) + .gapPolicy(gapPolicy) + .minimize(true) + .setBucketsPaths("the_metric")) + ).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat("Size of buckets array is not correct.", buckets.size(), equalTo(mockHisto.size())); + + List expectedCounts = testValues.get(MovAvgType.HOLT_BIG_MINIMIZE.name() + "_" + MetricTarget.COUNT.name()); + List expectedValues = testValues.get(MovAvgType.HOLT_BIG_MINIMIZE.name() + "_" + MetricTarget.VALUE.name()); + + Iterator actualIter = buckets.iterator(); + Iterator expectedBucketIter = mockHisto.iterator(); + Iterator expectedCountsIter = expectedCounts.iterator(); + Iterator expectedValuesIter = expectedValues.iterator(); + + while (actualIter.hasNext()) { + assertValidIterators(expectedBucketIter, expectedCountsIter, expectedValuesIter); + + Histogram.Bucket actual = actualIter.next(); + PipelineAggregationHelperTests.MockBucket expected = expectedBucketIter.next(); + Double expectedCount = expectedCountsIter.next(); + Double expectedValue = expectedValuesIter.next(); + + assertThat("keys do not match", ((Number) actual.getKey()).longValue(), equalTo(expected.key)); + assertThat("doc counts do not match", actual.getDocCount(), equalTo((long)expected.count)); + + assertBucketContents(actual, expectedCount, expectedValue); + } + } + + /** + * Only some models can be minimized, should throw exception for: simple, linear + */ + @Test + public void checkIfNonTunableCanBeMinimized() { + + try { + client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingAvg("movavg_counts") + .window(numBuckets) + .modelBuilder(new SimpleModel.SimpleModelBuilder()) + .gapPolicy(gapPolicy) + .minimize(true) + .setBucketsPaths("_count")) + ).execute().actionGet(); + fail("Simple Model cannot be minimized, but an exception was not thrown"); + } catch (SearchPhaseExecutionException e) { + // All good + } + + try { + client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingAvg("movavg_counts") + .window(numBuckets) + .modelBuilder(new LinearModel.LinearModelBuilder()) + .gapPolicy(gapPolicy) + .minimize(true) + .setBucketsPaths("_count")) + ).execute().actionGet(); + fail("Linear Model cannot be minimized, but an exception was not thrown"); + } catch (SearchPhaseExecutionException e) { + // all good + } + } + + /** + * These models are all minimizable, so they should not throw exceptions + */ + @Test + public void checkIfTunableCanBeMinimized() { + + MovAvgModelBuilder[] builders = new MovAvgModelBuilder[]{ + new EwmaModel.EWMAModelBuilder(), + new HoltLinearModel.HoltLinearModelBuilder(), + new HoltWintersModel.HoltWintersModelBuilder() + }; + + for (MovAvgModelBuilder builder : builders) { + try { + client() + .prepareSearch("idx").setTypes("type") + .addAggregation( + histogram("histo").field(INTERVAL_FIELD).interval(interval) + .extendedBounds(0L, (long) (interval * (numBuckets - 1))) + .subAggregation(metric) + .subAggregation(movingAvg("movavg_counts") + .window(numBuckets) + .modelBuilder(builder) + .gapPolicy(gapPolicy) + .minimize(true) + .setBucketsPaths("_count")) + ).execute().actionGet(); + } catch (SearchPhaseExecutionException e) { + fail("Model [" + builder.toString() + "] can be minimized, but an exception was thrown"); + } + } + + + } + private void assertValidIterators(Iterator expectedBucketIter, Iterator expectedCountsIter, Iterator expectedValuesIter) { if (!expectedBucketIter.hasNext()) { diff --git a/docs/reference/aggregations/pipeline/movavg-aggregation.asciidoc b/docs/reference/aggregations/pipeline/movavg-aggregation.asciidoc index d612eacfa9e..83517c4430e 100644 --- a/docs/reference/aggregations/pipeline/movavg-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/movavg-aggregation.asciidoc @@ -44,6 +44,8 @@ A `moving_avg` aggregation looks like this in isolation: |`model` |The moving average weighting model that we wish to use |Optional |`simple` |`gap_policy` |Determines what should happen when a gap in the data is encountered. |Optional |`insert_zero` |`window` |The size of window to "slide" across the histogram. |Optional |`5` +|`minimize` |If the model should be algorithmically minimized. See <> for more + details |Optional |`false` for most models |`settings` |Model-specific settings, contents which differ depending on the model specified. |Optional | |=== @@ -100,6 +102,7 @@ the values from a `simple` moving average tend to "lag" behind the real data. "the_movavg":{ "moving_avg":{ "buckets_path": "the_sum", + "window" : 30, "model" : "simple" } } @@ -135,6 +138,7 @@ the "lag" behind the data's mean, since older points have less influence. "the_movavg":{ "moving_avg":{ "buckets_path": "the_sum", + "window" : 30, "model" : "linear" } } @@ -165,7 +169,9 @@ setting. Small values make the weight decay slowly, which provides greater smoo portion of the window. Larger valuers make the weight decay quickly, which reduces the impact of older values on the moving average. This tends to make the moving average track the data more closely but with less smoothing. -The default value of `alpha` is `0.5`, and the setting accepts any float from 0-1 inclusive. +The default value of `alpha` is `0.3`, and the setting accepts any float from 0-1 inclusive. + +The EWMA model can be <> [source,js] -------------------------------------------------- @@ -173,6 +179,7 @@ The default value of `alpha` is `0.5`, and the setting accepts any float from 0- "the_movavg":{ "moving_avg":{ "buckets_path": "the_sum", + "window" : 30, "model" : "ewma", "settings" : { "alpha" : 0.5 @@ -204,7 +211,9 @@ smoothed data). The trend value is also exponentially weighted. Values are produced by multiplying the level and trend components. -The default value of `alpha` and `beta` is `0.5`, and the settings accept any float from 0-1 inclusive. +The default value of `alpha` is `0.3` and `beta` is `0.1`. The settings accept any float from 0-1 inclusive. + +The Holt-Linear model can be <> [source,js] -------------------------------------------------- @@ -212,6 +221,7 @@ The default value of `alpha` and `beta` is `0.5`, and the settings accept any fl "the_movavg":{ "moving_avg":{ "buckets_path": "the_sum", + "window" : 30, "model" : "holt", "settings" : { "alpha" : 0.5, @@ -270,15 +280,18 @@ Additive seasonality is the default; it can also be specified by setting `"type" when the seasonal affect is additive to your data. E.g. you could simply subtract the seasonal effect to "de-seasonalize" your data into a flat trend. -The default value of `alpha`, `beta` and `gamma` is `0.5`, and the settings accept any float from 0-1 inclusive. +The default values of `alpha` and `gamma` are `0.3` while `beta` is `0.1`. The settings accept any float from 0-1 inclusive. The default value of `period` is `1`. +The additive Holt-Winters model can be <> + [source,js] -------------------------------------------------- { "the_movavg":{ "moving_avg":{ "buckets_path": "the_sum", + "window" : 30, "model" : "holt_winters", "settings" : { "type" : "add", @@ -301,9 +314,11 @@ image::images/pipeline_movavg/triple.png[] Multiplicative is specified by setting `"type": "mult"`. This variety is preferred when the seasonal affect is multiplied against your data. E.g. if the seasonal affect is x5 the data, rather than simply adding to it. -The default value of `alpha`, `beta` and `gamma` is `0.5`, and the settings accept any float from 0-1 inclusive. +The default values of `alpha` and `gamma` are `0.3` while `beta` is `0.1`. The settings accept any float from 0-1 inclusive. The default value of `period` is `1`. +The multiplicative Holt-Winters model can be <> + [WARNING] ====== Multiplicative Holt-Winters works by dividing each data point by the seasonal value. This is problematic if any of @@ -319,6 +334,7 @@ you can disable this behavior with `pad: false` "the_movavg":{ "moving_avg":{ "buckets_path": "the_sum", + "window" : 30, "model" : "holt_winters", "settings" : { "type" : "mult", @@ -347,6 +363,7 @@ as your buckets: "the_movavg":{ "moving_avg":{ "buckets_path": "the_sum", + "window" : 30, "model" : "simple", "predict" 10 } @@ -381,3 +398,53 @@ fluctuations into the model: [[holt_winters_prediction_global]] .Holt-Winters moving average with window of size 120, predict = 25, alpha = 0.8, beta = 0.2, gamma = 0.7, period = 30 image::images/pipeline_movavg/triple_prediction.png[] + +[[movavg-minimizer]] +==== Minimization + +Some of the models (EWMA, Holt-Linear, Holt-Winters) require one or more parameters to be configured. Parameter choice +can be tricky and sometimes non-intuitive. Furthermore, small deviations in these parameters can sometimes have a drastic +effect on the output moving average. + +For that reason, the three "tunable" models can be algorithmically *minimized*. Minimization is a process where parameters +are tweaked until the predictions generated by the model closely match the output data. Minimization is not fullproof +and can be susceptible to overfitting, but it often gives better results than hand-tuning. + +Minimization is disabled by default for `ewma` and `holt_linear`, while it is enabled by default for `holt_winters`. +Minimization is most useful with Holt-Winters, since it helps improve the accuracy of the predictions. EWMA and +Holt-Linear are not great predictors, and mostly used for smoothing data, so minimization is less useful on those +models. + +Minimization is enabled/disabled via the `minimize` parameter: + +[source,js] +-------------------------------------------------- +{ + "the_movavg":{ + "moving_avg":{ + "buckets_path": "the_sum", + "model" : "holt_winters", + "window" : 30, + "minimize" : true, <1> + "settings" : { + "period" : 7 + } + } +} +-------------------------------------------------- +<1> Minimization is enabled with the `minimize` parameter + +When enabled, minimization will find the optimal values for `alpha`, `beta` and `gamma`. The user should still provide +appropriate values for `window`, `period` and `type`. + +[WARNING] +====== +Minimization works by running a stochastic process called *simulated annealing*. This process will usually generate +a good solution, but is not guaranteed to find the global optimum. It also requires some amount of additional +computational power, since the model needs to be re-run multiple times as the values are tweaked. The run-time of +minimization is linear to the size of the window being processed: excessively large windows may cause latency. + +Finally, minimization fits the model to the last `n` values, where `n = window`. This generally produces +better forecasts into the future, since the parameters are tuned around the end of the series. It can, however, generate +poorer fitting moving averages at the beginning of the series. +====== \ No newline at end of file