From a824184bf2fddecb0ed4daa2c2deacbb66d33c30 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 8 Apr 2015 10:15:46 -0400 Subject: [PATCH] Aggregations: Add MovAvg Reducer Allows the user to calculate a Moving Average over a histogram of buckets. Provides four different moving averages: - Simple - Linear weighted - Single Exponentially weighted (aka EWMA) - Double Exponentially weighted (aka Holt-winters) Closes #10024 --- .../aggregations/AggregationModule.java | 5 +- .../TransportAggregationModule.java | 5 +- .../reducers/ReducerBuilders.java | 5 + .../reducers/movavg/MovAvgBuilder.java | 102 ++++ .../reducers/movavg/MovAvgParser.java | 142 +++++ .../reducers/movavg/MovAvgReducer.java | 182 +++++++ .../movavg/models/DoubleExpModel.java | 194 +++++++ .../reducers/movavg/models/LinearModel.java | 93 ++++ .../reducers/movavg/models/MovAvgModel.java | 49 ++ .../movavg/models/MovAvgModelBuilder.java | 33 ++ .../movavg/models/MovAvgModelModule.java | 55 ++ .../movavg/models/MovAvgModelParser.java | 34 ++ .../models/MovAvgModelParserMapper.java | 54 ++ .../movavg/models/MovAvgModelStreams.java | 74 +++ .../reducers/movavg/models/SimpleModel.java | 86 +++ .../movavg/models/SingleExpModel.java | 133 +++++ .../models/TransportMovAvgModelModule.java | 51 ++ .../aggregations/reducers/MovAvgTests.java | 500 ++++++++++++++++++ 18 files changed, 1795 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgBuilder.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgParser.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgReducer.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/DoubleExpModel.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/LinearModel.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModel.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelBuilder.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelModule.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelParser.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelParserMapper.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelStreams.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/SimpleModel.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/SingleExpModel.java create mode 100644 src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/TransportMovAvgModelModule.java create mode 100644 src/test/java/org/elasticsearch/search/aggregations/reducers/MovAvgTests.java diff --git a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java index d1cb6d96800..a8d3895ec78 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/AggregationModule.java @@ -57,6 +57,8 @@ import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser; import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser; import org.elasticsearch.search.aggregations.reducers.Reducer; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser; +import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser; +import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelModule; import java.util.List; @@ -101,6 +103,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ aggParsers.add(ChildrenParser.class); reducerParsers.add(DerivativeParser.class); + reducerParsers.add(MovAvgParser.class); } /** @@ -129,7 +132,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{ @Override public Iterable spawnModules() { - return ImmutableList.of(new SignificantTermsHeuristicModule()); + return ImmutableList.of(new SignificantTermsHeuristicModule(), new MovAvgModelModule()); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java index fe4542830cc..c3d89cf4f8f 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java +++ b/src/main/java/org/elasticsearch/search/aggregations/TransportAggregationModule.java @@ -59,6 +59,8 @@ import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits; import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount; import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer; +import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer; +import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule; /** * A module that registers all the transport streams for the addAggregation @@ -108,10 +110,11 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM // Reducers DerivativeReducer.registerStreams(); + MovAvgReducer.registerStreams(); } @Override public Iterable spawnModules() { - return ImmutableList.of(new TransportSignificantTermsHeuristicModule()); + return ImmutableList.of(new TransportSignificantTermsHeuristicModule(), new TransportMovAvgModelModule()); } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerBuilders.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerBuilders.java index 21c901af80d..0aa8be4e992 100644 --- a/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerBuilders.java +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/ReducerBuilders.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.reducers; import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder; +import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgBuilder; public final class ReducerBuilders { @@ -29,4 +30,8 @@ public final class ReducerBuilders { public static final DerivativeBuilder derivative(String name) { return new DerivativeBuilder(name); } + + public static final MovAvgBuilder smooth(String name) { + return new MovAvgBuilder(name); + } } diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgBuilder.java new file mode 100644 index 00000000000..9790604197d --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgBuilder.java @@ -0,0 +1,102 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.reducers.ReducerBuilder; +import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelBuilder; + +import java.io.IOException; + +import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; + +/** + * A builder to create MovingAvg reducer aggregations + */ +public class MovAvgBuilder extends ReducerBuilder { + + private String format; + private GapPolicy gapPolicy; + private MovAvgModelBuilder modelBuilder; + private Integer window; + + public MovAvgBuilder(String name) { + super(name, MovAvgReducer.TYPE.name()); + } + + public MovAvgBuilder format(String format) { + this.format = format; + return this; + } + + /** + * Defines what should be done when a gap in the series is discovered + * + * @param gapPolicy A GapPolicy enum defining the selected policy + * @return Returns the builder to continue chaining + */ + public MovAvgBuilder gapPolicy(GapPolicy gapPolicy) { + this.gapPolicy = gapPolicy; + return this; + } + + /** + * Sets a MovAvgModelBuilder for the Moving Average. The model builder is used to + * define what type of moving average you want to use on the series + * + * @param modelBuilder A MovAvgModelBuilder which has been prepopulated with settings + * @return Returns the builder to continue chaining + */ + public MovAvgBuilder modelBuilder(MovAvgModelBuilder modelBuilder) { + this.modelBuilder = modelBuilder; + return this; + } + + /** + * Sets the window size for the moving average. This window will "slide" across the + * series, and the values inside that window will be used to calculate the moving avg value + * + * @param window Size of window + * @return Returns the builder to continue chaining + */ + public MovAvgBuilder window(int window) { + this.window = window; + return this; + } + + + @Override + protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(MovAvgParser.FORMAT.getPreferredName(), format); + } + if (gapPolicy != null) { + builder.field(MovAvgParser.GAP_POLICY.getPreferredName(), gapPolicy.getName()); + } + if (modelBuilder != null) { + modelBuilder.toXContent(builder, params); + } + if (window != null) { + builder.field(MovAvgParser.WINDOW.getPreferredName(), window); + } + return builder; + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgParser.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgParser.java new file mode 100644 index 00000000000..3f241a67b3a --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgParser.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.reducers.Reducer; +import org.elasticsearch.search.aggregations.reducers.ReducerFactory; +import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModel; +import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelParser; +import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelParserMapper; +import org.elasticsearch.search.aggregations.support.format.ValueFormat; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; + +public class MovAvgParser implements Reducer.Parser { + + public static final ParseField FORMAT = new ParseField("format"); + public static final ParseField GAP_POLICY = new ParseField("gap_policy"); + public static final ParseField MODEL = new ParseField("model"); + public static final ParseField WINDOW = new ParseField("window"); + public static final ParseField SETTINGS = new ParseField("settings"); + + private final MovAvgModelParserMapper movAvgModelParserMapper; + + @Inject + public MovAvgParser(MovAvgModelParserMapper movAvgModelParserMapper) { + this.movAvgModelParserMapper = movAvgModelParserMapper; + } + + @Override + public String type() { + return MovAvgReducer.TYPE.name(); + } + + @Override + public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException { + XContentParser.Token token; + String currentFieldName = null; + String[] bucketsPaths = null; + String format = null; + GapPolicy gapPolicy = GapPolicy.IGNORE; + int window = 5; + Map settings = null; + String model = "simple"; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (WINDOW.match(currentFieldName)) { + window = parser.intValue(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (FORMAT.match(currentFieldName)) { + format = parser.text(); + } else if (BUCKETS_PATH.match(currentFieldName)) { + bucketsPaths = new String[] { parser.text() }; + } else if (GAP_POLICY.match(currentFieldName)) { + gapPolicy = GapPolicy.parse(context, parser.text()); + } else if (MODEL.match(currentFieldName)) { + model = parser.text(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (BUCKETS_PATH.match(currentFieldName)) { + List paths = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + String path = parser.text(); + paths.add(path); + } + bucketsPaths = paths.toArray(new String[paths.size()]); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "]."); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (SETTINGS.match(currentFieldName)) { + settings = parser.map(); + } else { + throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: [" + + currentFieldName + "]."); + } + } else { + throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "]."); + } + } + + if (bucketsPaths == null) { + throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName() + + "] for smooth aggregation [" + reducerName + "]"); + } + + ValueFormatter formatter = null; + if (format != null) { + formatter = ValueFormat.Patternable.Number.format(format).formatter(); + } + + MovAvgModelParser modelParser = movAvgModelParserMapper.get(model); + if (modelParser == null) { + throw new SearchParseException(context, "Unknown model [" + model + + "] specified. Valid options are:" + movAvgModelParserMapper.getAllNames().toString()); + } + MovAvgModel movAvgModel = modelParser.parse(settings); + + + return new MovAvgReducer.Factory(reducerName, bucketsPaths, formatter, gapPolicy, window, movAvgModel); + } + + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgReducer.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgReducer.java new file mode 100644 index 00000000000..b339cdf487d --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/MovAvgReducer.java @@ -0,0 +1,182 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg; + +import com.google.common.base.Function; +import com.google.common.collect.EvictingQueue; +import com.google.common.collect.Lists; +import org.elasticsearch.ElasticsearchIllegalStateException; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.aggregations.*; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; +import org.elasticsearch.search.aggregations.reducers.*; +import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModel; +import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelStreams; +import org.elasticsearch.search.aggregations.support.AggregationContext; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy; +import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.resolveBucketValue; + +public class MovAvgReducer extends Reducer { + + public final static Type TYPE = new Type("moving_avg"); + + public final static ReducerStreams.Stream STREAM = new ReducerStreams.Stream() { + @Override + public MovAvgReducer readResult(StreamInput in) throws IOException { + MovAvgReducer result = new MovAvgReducer(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + ReducerStreams.registerStream(STREAM, TYPE.stream()); + } + + private static final Function FUNCTION = new Function() { + @Override + public InternalAggregation apply(Aggregation input) { + return (InternalAggregation) input; + } + }; + + private ValueFormatter formatter; + private GapPolicy gapPolicy; + private int window; + private MovAvgModel model; + + public MovAvgReducer() { + } + + public MovAvgReducer(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, + int window, MovAvgModel model, Map metadata) { + super(name, bucketsPaths, metadata); + this.formatter = formatter; + this.gapPolicy = gapPolicy; + this.window = window; + this.model = model; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalHistogram histo = (InternalHistogram) aggregation; + List buckets = histo.getBuckets(); + InternalHistogram.Factory factory = histo.getFactory(); + + List newBuckets = new ArrayList<>(); + EvictingQueue values = EvictingQueue.create(this.window); + + for (InternalHistogram.Bucket bucket : buckets) { + Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy); + if (thisBucketValue != null) { + values.offer(thisBucketValue); + + // TODO handle "edge policy" + double movavg = model.next(values); + + List aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION)); + aggs.add(new InternalSimpleValue(name(), movavg, formatter, new ArrayList(), metaData())); + InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations( + aggs), bucket.getKeyed(), bucket.getFormatter()); + newBuckets.add(newBucket); + } else { + newBuckets.add(bucket); + } + } + //return factory.create(histo.getName(), newBuckets, histo); + return factory.create(newBuckets, histo); + } + + @Override + public void doReadFrom(StreamInput in) throws IOException { + formatter = ValueFormatterStreams.readOptional(in); + gapPolicy = GapPolicy.readFrom(in); + window = in.readVInt(); + model = MovAvgModelStreams.read(in); + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + ValueFormatterStreams.writeOptional(formatter, out); + gapPolicy.writeTo(out); + out.writeVInt(window); + model.writeTo(out); + } + + public static class Factory extends ReducerFactory { + + private final ValueFormatter formatter; + private GapPolicy gapPolicy; + private int window; + private MovAvgModel model; + + public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy, + int window, MovAvgModel model) { + super(name, TYPE.name(), bucketsPaths); + this.formatter = formatter; + this.gapPolicy = gapPolicy; + this.window = window; + this.model = model; + } + + @Override + protected Reducer createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket, + Map metaData) throws IOException { + return new MovAvgReducer(name, bucketsPaths, formatter, gapPolicy, window, model, metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List reducerFactories) { + if (bucketsPaths.length != 1) { + throw new ElasticsearchIllegalStateException(Reducer.Parser.BUCKETS_PATH.getPreferredName() + + " must contain a single entry for reducer [" + name + "]"); + } + if (!(parent instanceof HistogramAggregator.Factory)) { + throw new ElasticsearchIllegalStateException("derivative reducer [" + name + + "] must have a histogram or date_histogram as parent"); + } else { + HistogramAggregator.Factory histoParent = (HistogramAggregator.Factory) parent; + if (histoParent.minDocCount() != 0) { + throw new ElasticsearchIllegalStateException("parent histogram of derivative reducer [" + name + + "] must have min_doc_count of 0"); + } + } + } + + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/DoubleExpModel.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/DoubleExpModel.java new file mode 100644 index 00000000000..907c23fd213 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/DoubleExpModel.java @@ -0,0 +1,194 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser; + +import java.io.IOException; +import java.util.*; + +/** + * Calculate a doubly exponential weighted moving average + */ +public class DoubleExpModel extends MovAvgModel { + + protected static final ParseField NAME_FIELD = new ParseField("double_exp"); + + /** + * Controls smoothing of data. Alpha = 1 retains no memory of past values + * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. + * mean of the series). Useful values are somewhere in between + */ + private double alpha; + + /** + * Equivalent to alpha, but controls the smoothing of the trend instead of the data + */ + private double beta; + + public DoubleExpModel(double alpha, double beta) { + this.alpha = alpha; + this.beta = beta; + } + + + @Override + public double next(Collection values) { + return next(values, 1).get(0); + } + + /** + * Calculate a doubly exponential weighted moving average + * + * @param values Collection of values to calculate avg for + * @param numForecasts number of forecasts into the future to return + * + * @param Type T extending Number + * @return Returns a Double containing the moving avg for the window + */ + public List next(Collection values, int numForecasts) { + // Smoothed value + double s = 0; + double last_s = 0; + + // Trend value + double b = 0; + double last_b = 0; + + int counter = 0; + + //TODO bail if too few values + + T last; + for (T v : values) { + last = v; + if (counter == 1) { + s = v.doubleValue(); + b = v.doubleValue() - last.doubleValue(); + } else { + s = alpha * v.doubleValue() + (1.0d - alpha) * (last_s + last_b); + b = beta * (s - last_s) + (1 - beta) * last_b; + } + + counter += 1; + last_s = s; + last_b = b; + } + + List forecastValues = new ArrayList<>(numForecasts); + for (int i = 0; i < numForecasts; i++) { + forecastValues.add(s + (i * b)); + } + + return forecastValues; + } + + public static final MovAvgModelStreams.Stream STREAM = new MovAvgModelStreams.Stream() { + @Override + public MovAvgModel readResult(StreamInput in) throws IOException { + return new DoubleExpModel(in.readDouble(), in.readDouble()); + } + + @Override + public String getName() { + return NAME_FIELD.getPreferredName(); + } + }; + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(STREAM.getName()); + out.writeDouble(alpha); + out.writeDouble(beta); + } + + public static class DoubleExpModelParser implements MovAvgModelParser { + + @Override + public String getName() { + return NAME_FIELD.getPreferredName(); + } + + @Override + public MovAvgModel parse(@Nullable Map settings) { + + Double alpha; + Double beta; + + if (settings == null || (alpha = (Double)settings.get("alpha")) == null) { + alpha = 0.5; + } + + if (settings == null || (beta = (Double)settings.get("beta")) == null) { + beta = 0.5; + } + + return new DoubleExpModel(alpha, beta); + } + } + + public static class DoubleExpModelBuilder implements MovAvgModelBuilder { + + private double alpha = 0.5; + private double beta = 0.5; + + /** + * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values + * (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g. + * the series mean). Useful values are somewhere in between. Defaults to 0.5. + * + * @param alpha A double between 0-1 inclusive, controls data smoothing + * + * @return The builder to continue chaining + */ + public DoubleExpModelBuilder alpha(double alpha) { + this.alpha = alpha; + return this; + } + + /** + * Equivalent to alpha, but controls the smoothing of the trend instead of the data + * + * @param beta a double between 0-1 inclusive, controls trend smoothing + * + * @return The builder to continue chaining + */ + public DoubleExpModelBuilder beta(double beta) { + this.beta = beta; + return this; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); + builder.startObject(MovAvgParser.SETTINGS.getPreferredName()); + builder.field("alpha", alpha); + builder.field("beta", beta); + builder.endObject(); + return builder; + } + } +} + diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/LinearModel.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/LinearModel.java new file mode 100644 index 00000000000..6c269590d33 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/LinearModel.java @@ -0,0 +1,93 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +/** + * Calculate a linearly weighted moving average, such that older values are + * linearly less important. "Time" is determined by position in collection + */ +public class LinearModel extends MovAvgModel { + + protected static final ParseField NAME_FIELD = new ParseField("linear"); + + @Override + public double next(Collection values) { + double avg = 0; + long totalWeight = 1; + long current = 1; + + for (T v : values) { + avg += v.doubleValue() * current; + totalWeight += current; + current += 1; + } + return avg / totalWeight; + } + + public static final MovAvgModelStreams.Stream STREAM = new MovAvgModelStreams.Stream() { + @Override + public MovAvgModel readResult(StreamInput in) throws IOException { + return new LinearModel(); + } + + @Override + public String getName() { + return NAME_FIELD.getPreferredName(); + } + }; + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(STREAM.getName()); + } + + public static class LinearModelParser implements MovAvgModelParser { + + @Override + public String getName() { + return NAME_FIELD.getPreferredName(); + } + + @Override + public MovAvgModel parse(@Nullable Map settings) { + return new LinearModel(); + } + } + + public static class LinearModelBuilder implements MovAvgModelBuilder { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); + return builder; + } + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModel.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModel.java new file mode 100644 index 00000000000..84f7832f893 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModel.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.*; + +public abstract class MovAvgModel { + + /** + * Returns the next value in the series, according to the underlying smoothing model + * + * @param values Collection of numerics to smooth, usually windowed + * @param Type of numeric + * @return Returns a double, since most smoothing methods operate on floating points + */ + public abstract double next(Collection values); + + /** + * Write the model to the output stream + * + * @param out Output stream + * @throws IOException + */ + public abstract void writeTo(StreamOutput out) throws IOException; +} + + + + diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelBuilder.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelBuilder.java new file mode 100644 index 00000000000..96bc9427de3 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelBuilder.java @@ -0,0 +1,33 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Represents the common interface that all moving average models share. Moving + * average models are used by the MovAvg reducer + */ +public interface MovAvgModelBuilder extends ToXContent { + public abstract XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException; +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelModule.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelModule.java new file mode 100644 index 00000000000..71ccbcb31b0 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelModule.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import com.google.common.collect.Lists; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.multibindings.Multibinder; + +import java.util.List; + +/** + * Register the various model parsers + */ +public class MovAvgModelModule extends AbstractModule { + + private List> parsers = Lists.newArrayList(); + + public MovAvgModelModule() { + registerParser(SimpleModel.SimpleModelParser.class); + registerParser(LinearModel.LinearModelParser.class); + registerParser(SingleExpModel.SingleExpModelParser.class); + registerParser(DoubleExpModel.DoubleExpModelParser.class); + } + + public void registerParser(Class parser) { + parsers.add(parser); + } + + @Override + protected void configure() { + Multibinder parserMapBinder = Multibinder.newSetBinder(binder(), MovAvgModelParser.class); + for (Class clazz : parsers) { + parserMapBinder.addBinding().to(clazz); + } + bind(MovAvgModelParserMapper.class); + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelParser.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelParser.java new file mode 100644 index 00000000000..d27e447baa4 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelParser.java @@ -0,0 +1,34 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + + +import org.elasticsearch.common.Nullable; + +import java.util.Map; + +/** + * Common interface for parsers used by the various Moving Average models + */ +public interface MovAvgModelParser { + public MovAvgModel parse(@Nullable Map settings); + + public String getName(); +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelParserMapper.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelParserMapper.java new file mode 100644 index 00000000000..459729d8960 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelParserMapper.java @@ -0,0 +1,54 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.inject.Inject; + +import java.util.Set; + +/** + * Contains a map of all concrete model parsers which can be used to build Models + */ +public class MovAvgModelParserMapper { + + protected ImmutableMap movAvgParsers; + + @Inject + public MovAvgModelParserMapper(Set parsers) { + MapBuilder builder = MapBuilder.newMapBuilder(); + for (MovAvgModelParser parser : parsers) { + builder.put(parser.getName(), parser); + } + movAvgParsers = builder.immutableMap(); + } + + public @Nullable + MovAvgModelParser get(String parserName) { + return movAvgParsers.get(parserName); + } + + public ImmutableSet getAllNames() { + return movAvgParsers.keySet(); + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelStreams.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelStreams.java new file mode 100644 index 00000000000..b11a3687021 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/MovAvgModelStreams.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * A registry for all moving average models. This is needed for reading them from a stream without knowing which + * one it is. + */ +public class MovAvgModelStreams { + + private static ImmutableMap STREAMS = ImmutableMap.of(); + + public static MovAvgModel read(StreamInput in) throws IOException { + return stream(in.readString()).readResult(in); + } + + /** + * A stream that knows how to read an heuristic from the input. + */ + public static interface Stream { + + MovAvgModel readResult(StreamInput in) throws IOException; + + String getName(); + } + + /** + * Registers the given stream and associate it with the given types. + * + * @param stream The stream to register + * @param names The names associated with the streams + */ + public static synchronized void registerStream(Stream stream, String... names) { + MapBuilder uStreams = MapBuilder.newMapBuilder(STREAMS); + for (String name : names) { + uStreams.put(name, stream); + } + STREAMS = uStreams.immutableMap(); + } + + /** + * Returns the stream that is registered for the given name + * + * @param name The given name + * @return The associated stream + */ + public static Stream stream(String name) { + return STREAMS.get(name); + } + +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/SimpleModel.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/SimpleModel.java new file mode 100644 index 00000000000..243b022af2c --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/SimpleModel.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +/** + * Calculate a simple unweighted (arithmetic) moving average + */ +public class SimpleModel extends MovAvgModel { + + protected static final ParseField NAME_FIELD = new ParseField("simple"); + + @Override + public double next(Collection values) { + double avg = 0; + for (T v : values) { + avg += v.doubleValue(); + } + return avg / values.size(); + } + + public static final MovAvgModelStreams.Stream STREAM = new MovAvgModelStreams.Stream() { + @Override + public MovAvgModel readResult(StreamInput in) throws IOException { + return new SimpleModel(); + } + + @Override + public String getName() { + return NAME_FIELD.getPreferredName(); + } + }; + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(STREAM.getName()); + } + + public static class SimpleModelParser implements MovAvgModelParser { + + @Override + public String getName() { + return NAME_FIELD.getPreferredName(); + } + + @Override + public MovAvgModel parse(@Nullable Map settings) { + return new SimpleModel(); + } + } + + public static class SimpleModelBuilder implements MovAvgModelBuilder { + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); + return builder; + } + } +} diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/SingleExpModel.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/SingleExpModel.java new file mode 100644 index 00000000000..f17ba68f498 --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/SingleExpModel.java @@ -0,0 +1,133 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +/** + * Calculate a exponentially weighted moving average + */ +public class SingleExpModel extends MovAvgModel { + + protected static final ParseField NAME_FIELD = new ParseField("single_exp"); + + /** + * Controls smoothing of data. Alpha = 1 retains no memory of past values + * (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g. + * mean of the series). Useful values are somewhere in between + */ + private double alpha; + + public SingleExpModel(double alpha) { + this.alpha = alpha; + } + + + @Override + public double next(Collection values) { + double avg = 0; + boolean first = true; + + for (T v : values) { + if (first) { + avg = v.doubleValue(); + first = false; + } else { + avg = (v.doubleValue() * alpha) + (avg * (1 - alpha)); + } + } + return avg; + } + + public static final MovAvgModelStreams.Stream STREAM = new MovAvgModelStreams.Stream() { + @Override + public MovAvgModel readResult(StreamInput in) throws IOException { + return new SingleExpModel(in.readDouble()); + } + + @Override + public String getName() { + return NAME_FIELD.getPreferredName(); + } + }; + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(STREAM.getName()); + out.writeDouble(alpha); + } + + public static class SingleExpModelParser implements MovAvgModelParser { + + @Override + public String getName() { + return NAME_FIELD.getPreferredName(); + } + + @Override + public MovAvgModel parse(@Nullable Map settings) { + + Double alpha; + if (settings == null || (alpha = (Double)settings.get("alpha")) == null) { + alpha = 0.5; + } + + return new SingleExpModel(alpha); + } + } + + public static class SingleExpModelBuilder implements MovAvgModelBuilder { + + private double alpha = 0.5; + + /** + * Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values + * (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g. + * the series mean). Useful values are somewhere in between. Defaults to 0.5. + * + * @param alpha A double between 0-1 inclusive, controls data smoothing + * + * @return The builder to continue chaining + */ + public SingleExpModelBuilder alpha(double alpha) { + this.alpha = alpha; + return this; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName()); + builder.startObject(MovAvgParser.SETTINGS.getPreferredName()); + builder.field("alpha", alpha); + builder.endObject(); + return builder; + } + } +} + diff --git a/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/TransportMovAvgModelModule.java b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/TransportMovAvgModelModule.java new file mode 100644 index 00000000000..bc085f6241a --- /dev/null +++ b/src/main/java/org/elasticsearch/search/aggregations/reducers/movavg/models/TransportMovAvgModelModule.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers.movavg.models; + +import com.google.common.collect.Lists; +import org.elasticsearch.common.inject.AbstractModule; + +import java.util.List; + +/** + * Register the transport streams so that models can be serialized/deserialized from the stream + */ +public class TransportMovAvgModelModule extends AbstractModule { + + private List streams = Lists.newArrayList(); + + public TransportMovAvgModelModule() { + registerStream(SimpleModel.STREAM); + registerStream(LinearModel.STREAM); + registerStream(SingleExpModel.STREAM); + registerStream(DoubleExpModel.STREAM); + } + + public void registerStream(MovAvgModelStreams.Stream stream) { + streams.add(stream); + } + + @Override + protected void configure() { + for (MovAvgModelStreams.Stream stream : streams) { + MovAvgModelStreams.registerStream(stream, stream.getName()); + } + } +} diff --git a/src/test/java/org/elasticsearch/search/aggregations/reducers/MovAvgTests.java b/src/test/java/org/elasticsearch/search/aggregations/reducers/MovAvgTests.java new file mode 100644 index 00000000000..d22656b0ad5 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/reducers/MovAvgTests.java @@ -0,0 +1,500 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.reducers; + + +import com.google.common.collect.EvictingQueue; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram; +import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket; +import org.elasticsearch.search.aggregations.reducers.movavg.models.DoubleExpModel; +import org.elasticsearch.search.aggregations.reducers.movavg.models.LinearModel; +import org.elasticsearch.search.aggregations.reducers.movavg.models.SimpleModel; +import org.elasticsearch.search.aggregations.reducers.movavg.models.SingleExpModel; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.*; +import static org.elasticsearch.search.aggregations.reducers.ReducerBuilders.smooth; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.core.IsNull.notNullValue; + +@ElasticsearchIntegrationTest.SuiteScopeTest +public class MovAvgTests extends ElasticsearchIntegrationTest { + + private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; + private static final String SINGLE_VALUED_VALUE_FIELD_NAME = "v_value"; + + static int interval; + static int numValueBuckets; + static int numFilledValueBuckets; + static int windowSize; + static BucketHelpers.GapPolicy gapPolicy; + + static long[] docCounts; + static long[] valueCounts; + static Double[] simpleMovAvgCounts; + static Double[] linearMovAvgCounts; + static Double[] singleExpMovAvgCounts; + static Double[] doubleExpMovAvgCounts; + + static Double[] simpleMovAvgValueCounts; + static Double[] linearMovAvgValueCounts; + static Double[] singleExpMovAvgValueCounts; + static Double[] doubleExpMovAvgValueCounts; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + interval = 5; + numValueBuckets = randomIntBetween(6, 80); + numFilledValueBuckets = numValueBuckets; + windowSize = randomIntBetween(3,10); + gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.IGNORE : BucketHelpers.GapPolicy.INSERT_ZEROS; + + docCounts = new long[numValueBuckets]; + valueCounts = new long[numValueBuckets]; + for (int i = 0; i < numValueBuckets; i++) { + docCounts[i] = randomIntBetween(0, 20); + valueCounts[i] = randomIntBetween(1,20); //this will be used as a constant for all values within a bucket + } + + this.setupSimple(); + this.setupLinear(); + this.setupSingle(); + this.setupDouble(); + + + List builders = new ArrayList<>(); + for (int i = 0; i < numValueBuckets; i++) { + for (int docs = 0; docs < docCounts[i]; docs++) { + builders.add(client().prepareIndex("idx", "type").setSource(jsonBuilder().startObject() + .field(SINGLE_VALUED_FIELD_NAME, i * interval) + .field(SINGLE_VALUED_VALUE_FIELD_NAME, 1).endObject())); + } + } + + indexRandom(true, builders); + ensureSearchable(); + } + + private void setupSimple() { + simpleMovAvgCounts = new Double[numValueBuckets]; + EvictingQueue window = EvictingQueue.create(windowSize); + for (int i = 0; i < numValueBuckets; i++) { + double thisValue = docCounts[i]; + window.offer(thisValue); + + double movAvg = 0; + for (double value : window) { + movAvg += value; + } + movAvg /= window.size(); + + simpleMovAvgCounts[i] = movAvg; + } + + window.clear(); + simpleMovAvgValueCounts = new Double[numValueBuckets]; + for (int i = 0; i < numValueBuckets; i++) { + window.offer((double)docCounts[i]); + + double movAvg = 0; + for (double value : window) { + movAvg += value; + } + movAvg /= window.size(); + + simpleMovAvgValueCounts[i] = movAvg; + + } + + } + + private void setupLinear() { + EvictingQueue window = EvictingQueue.create(windowSize); + linearMovAvgCounts = new Double[numValueBuckets]; + window.clear(); + for (int i = 0; i < numValueBuckets; i++) { + double thisValue = docCounts[i]; + if (thisValue == -1) { + thisValue = 0; + } + window.offer(thisValue); + + double avg = 0; + long totalWeight = 1; + long current = 1; + + for (double value : window) { + avg += value * current; + totalWeight += current; + current += 1; + } + linearMovAvgCounts[i] = avg / totalWeight; + } + + window.clear(); + linearMovAvgValueCounts = new Double[numValueBuckets]; + + for (int i = 0; i < numValueBuckets; i++) { + double thisValue = docCounts[i]; + window.offer(thisValue); + + double avg = 0; + long totalWeight = 1; + long current = 1; + + for (double value : window) { + avg += value * current; + totalWeight += current; + current += 1; + } + linearMovAvgValueCounts[i] = avg / totalWeight; + } + } + + private void setupSingle() { + EvictingQueue window = EvictingQueue.create(windowSize); + singleExpMovAvgCounts = new Double[numValueBuckets]; + for (int i = 0; i < numValueBuckets; i++) { + double thisValue = docCounts[i]; + if (thisValue == -1) { + thisValue = 0; + } + window.offer(thisValue); + + double avg = 0; + double alpha = 0.5; + boolean first = true; + + for (double value : window) { + if (first) { + avg = value; + first = false; + } else { + avg = (value * alpha) + (avg * (1 - alpha)); + } + } + singleExpMovAvgCounts[i] = avg ; + } + + singleExpMovAvgValueCounts = new Double[numValueBuckets]; + window.clear(); + + for (int i = 0; i < numValueBuckets; i++) { + window.offer((double)docCounts[i]); + + double avg = 0; + double alpha = 0.5; + boolean first = true; + + for (double value : window) { + if (first) { + avg = value; + first = false; + } else { + avg = (value * alpha) + (avg * (1 - alpha)); + } + } + singleExpMovAvgCounts[i] = avg ; + } + + } + + private void setupDouble() { + EvictingQueue window = EvictingQueue.create(windowSize); + doubleExpMovAvgCounts = new Double[numValueBuckets]; + + for (int i = 0; i < numValueBuckets; i++) { + double thisValue = docCounts[i]; + if (thisValue == -1) { + thisValue = 0; + } + window.offer(thisValue); + + double s = 0; + double last_s = 0; + + // Trend value + double b = 0; + double last_b = 0; + + double alpha = 0.5; + double beta = 0.5; + int counter = 0; + + double last; + for (double value : window) { + last = value; + if (counter == 1) { + s = value; + b = value - last; + } else { + s = alpha * value + (1.0d - alpha) * (last_s + last_b); + b = beta * (s - last_s) + (1 - beta) * last_b; + } + + counter += 1; + last_s = s; + last_b = b; + } + + doubleExpMovAvgCounts[i] = s + (0 * b) ; + } + + doubleExpMovAvgValueCounts = new Double[numValueBuckets]; + window.clear(); + + for (int i = 0; i < numValueBuckets; i++) { + window.offer((double)docCounts[i]); + + double s = 0; + double last_s = 0; + + // Trend value + double b = 0; + double last_b = 0; + + double alpha = 0.5; + double beta = 0.5; + int counter = 0; + + double last; + for (double value : window) { + last = value; + if (counter == 1) { + s = value; + b = value - last; + } else { + s = alpha * value + (1.0d - alpha) * (last_s + last_b); + b = beta * (s - last_s) + (1 - beta) * last_b; + } + + counter += 1; + last_s = s; + last_b = b; + } + + doubleExpMovAvgValueCounts[i] = s + (0 * b) ; + } + } + + /** + * test simple moving average on single value field + */ + @Test + public void simpleSingleValuedField() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds(0L, (long) (interval * (numValueBuckets - 1))) + .subAggregation(sum("the_sum").field(SINGLE_VALUED_VALUE_FIELD_NAME)) + .subAggregation(smooth("smooth") + .window(windowSize) + .modelBuilder(new SimpleModel.SimpleModelBuilder()) + .gapPolicy(gapPolicy) + .setBucketsPaths("_count")) + .subAggregation(smooth("movavg_values") + .window(windowSize) + .modelBuilder(new SimpleModel.SimpleModelBuilder()) + .gapPolicy(gapPolicy) + .setBucketsPaths("the_sum")) + ).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, docCounts[i]); + SimpleValue docCountMovAvg = bucket.getAggregations().get("smooth"); + assertThat(docCountMovAvg, notNullValue()); + assertThat(docCountMovAvg.value(), equalTo(simpleMovAvgCounts[i])); + + SimpleValue valuesMovAvg = bucket.getAggregations().get("movavg_values"); + assertThat(valuesMovAvg, notNullValue()); + assertThat(valuesMovAvg.value(), equalTo(simpleMovAvgCounts[i])); + } + } + + /** + * test linear moving average on single value field + */ + @Test + public void linearSingleValuedField() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds(0L, (long) (interval * (numValueBuckets - 1))) + .subAggregation(sum("the_sum").field(SINGLE_VALUED_VALUE_FIELD_NAME)) + .subAggregation(smooth("smooth") + .window(windowSize) + .modelBuilder(new LinearModel.LinearModelBuilder()) + .gapPolicy(gapPolicy) + .setBucketsPaths("_count")) + .subAggregation(smooth("movavg_values") + .window(windowSize) + .modelBuilder(new LinearModel.LinearModelBuilder()) + .gapPolicy(gapPolicy) + .setBucketsPaths("the_sum")) + ).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, docCounts[i]); + SimpleValue docCountMovAvg = bucket.getAggregations().get("smooth"); + assertThat(docCountMovAvg, notNullValue()); + assertThat(docCountMovAvg.value(), equalTo(linearMovAvgCounts[i])); + + SimpleValue valuesMovAvg = bucket.getAggregations().get("movavg_values"); + assertThat(valuesMovAvg, notNullValue()); + assertThat(valuesMovAvg.value(), equalTo(linearMovAvgCounts[i])); + } + } + + /** + * test single exponential moving average on single value field + */ + @Test + public void singleExpSingleValuedField() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds(0L, (long) (interval * (numValueBuckets - 1))) + .subAggregation(sum("the_sum").field(SINGLE_VALUED_VALUE_FIELD_NAME)) + .subAggregation(smooth("smooth") + .window(windowSize) + .modelBuilder(new SingleExpModel.SingleExpModelBuilder().alpha(0.5)) + .gapPolicy(gapPolicy) + .setBucketsPaths("_count")) + .subAggregation(smooth("movavg_values") + .window(windowSize) + .modelBuilder(new SingleExpModel.SingleExpModelBuilder().alpha(0.5)) + .gapPolicy(gapPolicy) + .setBucketsPaths("the_sum")) + ).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, docCounts[i]); + SimpleValue docCountMovAvg = bucket.getAggregations().get("smooth"); + assertThat(docCountMovAvg, notNullValue()); + assertThat(docCountMovAvg.value(), equalTo(singleExpMovAvgCounts[i])); + + SimpleValue valuesMovAvg = bucket.getAggregations().get("movavg_values"); + assertThat(valuesMovAvg, notNullValue()); + assertThat(valuesMovAvg.value(), equalTo(singleExpMovAvgCounts[i])); + } + } + + /** + * test double exponential moving average on single value field + */ + @Test + public void doubleExpSingleValuedField() { + + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0) + .extendedBounds(0L, (long) (interval * (numValueBuckets - 1))) + .subAggregation(sum("the_sum").field(SINGLE_VALUED_VALUE_FIELD_NAME)) + .subAggregation(smooth("smooth") + .window(windowSize) + .modelBuilder(new DoubleExpModel.DoubleExpModelBuilder().alpha(0.5).beta(0.5)) + .gapPolicy(gapPolicy) + .setBucketsPaths("_count")) + .subAggregation(smooth("movavg_values") + .window(windowSize) + .modelBuilder(new DoubleExpModel.DoubleExpModelBuilder().alpha(0.5).beta(0.5)) + .gapPolicy(gapPolicy) + .setBucketsPaths("the_sum")) + ).execute().actionGet(); + + assertSearchResponse(response); + + InternalHistogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, docCounts[i]); + SimpleValue docCountMovAvg = bucket.getAggregations().get("smooth"); + assertThat(docCountMovAvg, notNullValue()); + assertThat(docCountMovAvg.value(), equalTo(doubleExpMovAvgCounts[i])); + + SimpleValue valuesMovAvg = bucket.getAggregations().get("movavg_values"); + assertThat(valuesMovAvg, notNullValue()); + assertThat(valuesMovAvg.value(), equalTo(doubleExpMovAvgCounts[i])); + } + } + + + private void checkBucketKeyAndDocCount(final String msg, final Histogram.Bucket bucket, final long expectedKey, + long expectedDocCount) { + if (expectedDocCount == -1) { + expectedDocCount = 0; + } + assertThat(msg, bucket, notNullValue()); + assertThat(msg + " key", ((Number) bucket.getKey()).longValue(), equalTo(expectedKey)); + assertThat(msg + " docCount", bucket.getDocCount(), equalTo(expectedDocCount)); + } + +}