Aggregations: Add MovAvg Reducer

Allows the user to calculate a Moving Average over a histogram  of buckets.  Provides four different
moving averages:
 - Simple
 - Linear weighted
 - Single Exponentially weighted (aka EWMA)
 - Double Exponentially weighted (aka Holt-winters)

Closes #10024
This commit is contained in:
Zachary Tong 2015-04-08 10:15:46 -04:00
parent caeb85ef19
commit a824184bf2
18 changed files with 1795 additions and 2 deletions

View File

@ -57,6 +57,8 @@ import org.elasticsearch.search.aggregations.metrics.tophits.TopHitsParser;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelModule;
import java.util.List;
@ -101,6 +103,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
aggParsers.add(ChildrenParser.class);
reducerParsers.add(DerivativeParser.class);
reducerParsers.add(MovAvgParser.class);
}
/**
@ -129,7 +132,7 @@ public class AggregationModule extends AbstractModule implements SpawnModules{
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new SignificantTermsHeuristicModule());
return ImmutableList.of(new SignificantTermsHeuristicModule(), new MovAvgModelModule());
}
}

View File

@ -59,6 +59,8 @@ import org.elasticsearch.search.aggregations.metrics.tophits.InternalTopHits;
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer;
import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule;
/**
* A module that registers all the transport streams for the addAggregation
@ -108,10 +110,11 @@ public class TransportAggregationModule extends AbstractModule implements SpawnM
// Reducers
DerivativeReducer.registerStreams();
MovAvgReducer.registerStreams();
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new TransportSignificantTermsHeuristicModule());
return ImmutableList.of(new TransportSignificantTermsHeuristicModule(), new TransportMovAvgModelModule());
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.reducers;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgBuilder;
public final class ReducerBuilders {
@ -29,4 +30,8 @@ public final class ReducerBuilders {
public static final DerivativeBuilder derivative(String name) {
return new DerivativeBuilder(name);
}
public static final MovAvgBuilder smooth(String name) {
return new MovAvgBuilder(name);
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelBuilder;
import java.io.IOException;
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
/**
* A builder to create MovingAvg reducer aggregations
*/
public class MovAvgBuilder extends ReducerBuilder<MovAvgBuilder> {
private String format;
private GapPolicy gapPolicy;
private MovAvgModelBuilder modelBuilder;
private Integer window;
public MovAvgBuilder(String name) {
super(name, MovAvgReducer.TYPE.name());
}
public MovAvgBuilder format(String format) {
this.format = format;
return this;
}
/**
* Defines what should be done when a gap in the series is discovered
*
* @param gapPolicy A GapPolicy enum defining the selected policy
* @return Returns the builder to continue chaining
*/
public MovAvgBuilder gapPolicy(GapPolicy gapPolicy) {
this.gapPolicy = gapPolicy;
return this;
}
/**
* Sets a MovAvgModelBuilder for the Moving Average. The model builder is used to
* define what type of moving average you want to use on the series
*
* @param modelBuilder A MovAvgModelBuilder which has been prepopulated with settings
* @return Returns the builder to continue chaining
*/
public MovAvgBuilder modelBuilder(MovAvgModelBuilder modelBuilder) {
this.modelBuilder = modelBuilder;
return this;
}
/**
* Sets the window size for the moving average. This window will "slide" across the
* series, and the values inside that window will be used to calculate the moving avg value
*
* @param window Size of window
* @return Returns the builder to continue chaining
*/
public MovAvgBuilder window(int window) {
this.window = window;
return this;
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(MovAvgParser.FORMAT.getPreferredName(), format);
}
if (gapPolicy != null) {
builder.field(MovAvgParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
}
if (modelBuilder != null) {
modelBuilder.toXContent(builder, params);
}
if (window != null) {
builder.field(MovAvgParser.WINDOW.getPreferredName(), window);
}
return builder;
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelParser;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelParserMapper;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
public class MovAvgParser implements Reducer.Parser {
public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField MODEL = new ParseField("model");
public static final ParseField WINDOW = new ParseField("window");
public static final ParseField SETTINGS = new ParseField("settings");
private final MovAvgModelParserMapper movAvgModelParserMapper;
@Inject
public MovAvgParser(MovAvgModelParserMapper movAvgModelParserMapper) {
this.movAvgModelParserMapper = movAvgModelParserMapper;
}
@Override
public String type() {
return MovAvgReducer.TYPE.name();
}
@Override
public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
XContentParser.Token token;
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
GapPolicy gapPolicy = GapPolicy.IGNORE;
int window = 5;
Map<String, Object> settings = null;
String model = "simple";
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (WINDOW.match(currentFieldName)) {
window = parser.intValue();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (FORMAT.match(currentFieldName)) {
format = parser.text();
} else if (BUCKETS_PATH.match(currentFieldName)) {
bucketsPaths = new String[] { parser.text() };
} else if (GAP_POLICY.match(currentFieldName)) {
gapPolicy = GapPolicy.parse(context, parser.text());
} else if (MODEL.match(currentFieldName)) {
model = parser.text();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (BUCKETS_PATH.match(currentFieldName)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (SETTINGS.match(currentFieldName)) {
settings = parser.map();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].");
}
}
if (bucketsPaths == null) {
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for smooth aggregation [" + reducerName + "]");
}
ValueFormatter formatter = null;
if (format != null) {
formatter = ValueFormat.Patternable.Number.format(format).formatter();
}
MovAvgModelParser modelParser = movAvgModelParserMapper.get(model);
if (modelParser == null) {
throw new SearchParseException(context, "Unknown model [" + model
+ "] specified. Valid options are:" + movAvgModelParserMapper.getAllNames().toString());
}
MovAvgModel movAvgModel = modelParser.parse(settings);
return new MovAvgReducer.Factory(reducerName, bucketsPaths, formatter, gapPolicy, window, movAvgModel);
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg;
import com.google.common.base.Function;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.*;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregator;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.reducers.*;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelStreams;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;
import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.resolveBucketValue;
public class MovAvgReducer extends Reducer {
public final static Type TYPE = new Type("moving_avg");
public final static ReducerStreams.Stream STREAM = new ReducerStreams.Stream() {
@Override
public MovAvgReducer readResult(StreamInput in) throws IOException {
MovAvgReducer result = new MovAvgReducer();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
ReducerStreams.registerStream(STREAM, TYPE.stream());
}
private static final Function<Aggregation, InternalAggregation> FUNCTION = new Function<Aggregation, InternalAggregation>() {
@Override
public InternalAggregation apply(Aggregation input) {
return (InternalAggregation) input;
}
};
private ValueFormatter formatter;
private GapPolicy gapPolicy;
private int window;
private MovAvgModel model;
public MovAvgReducer() {
}
public MovAvgReducer(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy,
int window, MovAvgModel model, Map<String, Object> metadata) {
super(name, bucketsPaths, metadata);
this.formatter = formatter;
this.gapPolicy = gapPolicy;
this.window = window;
this.model = model;
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalHistogram histo = (InternalHistogram) aggregation;
List<? extends InternalHistogram.Bucket> buckets = histo.getBuckets();
InternalHistogram.Factory<? extends InternalHistogram.Bucket> factory = histo.getFactory();
List newBuckets = new ArrayList<>();
EvictingQueue<Double> values = EvictingQueue.create(this.window);
for (InternalHistogram.Bucket bucket : buckets) {
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
if (thisBucketValue != null) {
values.offer(thisBucketValue);
// TODO handle "edge policy"
double movavg = model.next(values);
List<InternalAggregation> aggs = new ArrayList<>(Lists.transform(bucket.getAggregations().asList(), FUNCTION));
aggs.add(new InternalSimpleValue(name(), movavg, formatter, new ArrayList<Reducer>(), metaData()));
InternalHistogram.Bucket newBucket = factory.createBucket(bucket.getKey(), bucket.getDocCount(), new InternalAggregations(
aggs), bucket.getKeyed(), bucket.getFormatter());
newBuckets.add(newBucket);
} else {
newBuckets.add(bucket);
}
}
//return factory.create(histo.getName(), newBuckets, histo);
return factory.create(newBuckets, histo);
}
@Override
public void doReadFrom(StreamInput in) throws IOException {
formatter = ValueFormatterStreams.readOptional(in);
gapPolicy = GapPolicy.readFrom(in);
window = in.readVInt();
model = MovAvgModelStreams.read(in);
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(formatter, out);
gapPolicy.writeTo(out);
out.writeVInt(window);
model.writeTo(out);
}
public static class Factory extends ReducerFactory {
private final ValueFormatter formatter;
private GapPolicy gapPolicy;
private int window;
private MovAvgModel model;
public Factory(String name, String[] bucketsPaths, @Nullable ValueFormatter formatter, GapPolicy gapPolicy,
int window, MovAvgModel model) {
super(name, TYPE.name(), bucketsPaths);
this.formatter = formatter;
this.gapPolicy = gapPolicy;
this.window = window;
this.model = model;
}
@Override
protected Reducer createInternal(AggregationContext context, Aggregator parent, boolean collectsFromSingleBucket,
Map<String, Object> metaData) throws IOException {
return new MovAvgReducer(name, bucketsPaths, formatter, gapPolicy, window, model, metaData);
}
@Override
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, List<ReducerFactory> reducerFactories) {
if (bucketsPaths.length != 1) {
throw new ElasticsearchIllegalStateException(Reducer.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for reducer [" + name + "]");
}
if (!(parent instanceof HistogramAggregator.Factory)) {
throw new ElasticsearchIllegalStateException("derivative reducer [" + name
+ "] must have a histogram or date_histogram as parent");
} else {
HistogramAggregator.Factory histoParent = (HistogramAggregator.Factory) parent;
if (histoParent.minDocCount() != 0) {
throw new ElasticsearchIllegalStateException("parent histogram of derivative reducer [" + name
+ "] must have min_doc_count of 0");
}
}
}
}
}

View File

@ -0,0 +1,194 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser;
import java.io.IOException;
import java.util.*;
/**
* Calculate a doubly exponential weighted moving average
*/
public class DoubleExpModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("double_exp");
/**
* Controls smoothing of data. Alpha = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series). Useful values are somewhere in between
*/
private double alpha;
/**
* Equivalent to <code>alpha</code>, but controls the smoothing of the trend instead of the data
*/
private double beta;
public DoubleExpModel(double alpha, double beta) {
this.alpha = alpha;
this.beta = beta;
}
@Override
public <T extends Number> double next(Collection<T> values) {
return next(values, 1).get(0);
}
/**
* Calculate a doubly exponential weighted moving average
*
* @param values Collection of values to calculate avg for
* @param numForecasts number of forecasts into the future to return
*
* @param <T> Type T extending Number
* @return Returns a Double containing the moving avg for the window
*/
public <T extends Number> List<Double> next(Collection<T> values, int numForecasts) {
// Smoothed value
double s = 0;
double last_s = 0;
// Trend value
double b = 0;
double last_b = 0;
int counter = 0;
//TODO bail if too few values
T last;
for (T v : values) {
last = v;
if (counter == 1) {
s = v.doubleValue();
b = v.doubleValue() - last.doubleValue();
} else {
s = alpha * v.doubleValue() + (1.0d - alpha) * (last_s + last_b);
b = beta * (s - last_s) + (1 - beta) * last_b;
}
counter += 1;
last_s = s;
last_b = b;
}
List<Double> forecastValues = new ArrayList<>(numForecasts);
for (int i = 0; i < numForecasts; i++) {
forecastValues.add(s + (i * b));
}
return forecastValues;
}
public static final MovAvgModelStreams.Stream STREAM = new MovAvgModelStreams.Stream() {
@Override
public MovAvgModel readResult(StreamInput in) throws IOException {
return new DoubleExpModel(in.readDouble(), in.readDouble());
}
@Override
public String getName() {
return NAME_FIELD.getPreferredName();
}
};
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(STREAM.getName());
out.writeDouble(alpha);
out.writeDouble(beta);
}
public static class DoubleExpModelParser implements MovAvgModelParser {
@Override
public String getName() {
return NAME_FIELD.getPreferredName();
}
@Override
public MovAvgModel parse(@Nullable Map<String, Object> settings) {
Double alpha;
Double beta;
if (settings == null || (alpha = (Double)settings.get("alpha")) == null) {
alpha = 0.5;
}
if (settings == null || (beta = (Double)settings.get("beta")) == null) {
beta = 0.5;
}
return new DoubleExpModel(alpha, beta);
}
}
public static class DoubleExpModelBuilder implements MovAvgModelBuilder {
private double alpha = 0.5;
private double beta = 0.5;
/**
* Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values
* (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g.
* the series mean). Useful values are somewhere in between. Defaults to 0.5.
*
* @param alpha A double between 0-1 inclusive, controls data smoothing
*
* @return The builder to continue chaining
*/
public DoubleExpModelBuilder alpha(double alpha) {
this.alpha = alpha;
return this;
}
/**
* Equivalent to <code>alpha</code>, but controls the smoothing of the trend instead of the data
*
* @param beta a double between 0-1 inclusive, controls trend smoothing
*
* @return The builder to continue chaining
*/
public DoubleExpModelBuilder beta(double beta) {
this.beta = beta;
return this;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName());
builder.startObject(MovAvgParser.SETTINGS.getPreferredName());
builder.field("alpha", alpha);
builder.field("beta", beta);
builder.endObject();
return builder;
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
/**
* Calculate a linearly weighted moving average, such that older values are
* linearly less important. "Time" is determined by position in collection
*/
public class LinearModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("linear");
@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
long totalWeight = 1;
long current = 1;
for (T v : values) {
avg += v.doubleValue() * current;
totalWeight += current;
current += 1;
}
return avg / totalWeight;
}
public static final MovAvgModelStreams.Stream STREAM = new MovAvgModelStreams.Stream() {
@Override
public MovAvgModel readResult(StreamInput in) throws IOException {
return new LinearModel();
}
@Override
public String getName() {
return NAME_FIELD.getPreferredName();
}
};
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(STREAM.getName());
}
public static class LinearModelParser implements MovAvgModelParser {
@Override
public String getName() {
return NAME_FIELD.getPreferredName();
}
@Override
public MovAvgModel parse(@Nullable Map<String, Object> settings) {
return new LinearModel();
}
}
public static class LinearModelBuilder implements MovAvgModelBuilder {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName());
return builder;
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.*;
public abstract class MovAvgModel {
/**
* Returns the next value in the series, according to the underlying smoothing model
*
* @param values Collection of numerics to smooth, usually windowed
* @param <T> Type of numeric
* @return Returns a double, since most smoothing methods operate on floating points
*/
public abstract <T extends Number> double next(Collection<T> values);
/**
* Write the model to the output stream
*
* @param out Output stream
* @throws IOException
*/
public abstract void writeTo(StreamOutput out) throws IOException;
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
/**
* Represents the common interface that all moving average models share. Moving
* average models are used by the MovAvg reducer
*/
public interface MovAvgModelBuilder extends ToXContent {
public abstract XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException;
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import java.util.List;
/**
* Register the various model parsers
*/
public class MovAvgModelModule extends AbstractModule {
private List<Class<? extends MovAvgModelParser>> parsers = Lists.newArrayList();
public MovAvgModelModule() {
registerParser(SimpleModel.SimpleModelParser.class);
registerParser(LinearModel.LinearModelParser.class);
registerParser(SingleExpModel.SingleExpModelParser.class);
registerParser(DoubleExpModel.DoubleExpModelParser.class);
}
public void registerParser(Class<? extends MovAvgModelParser> parser) {
parsers.add(parser);
}
@Override
protected void configure() {
Multibinder<MovAvgModelParser> parserMapBinder = Multibinder.newSetBinder(binder(), MovAvgModelParser.class);
for (Class<? extends MovAvgModelParser> clazz : parsers) {
parserMapBinder.addBinding().to(clazz);
}
bind(MovAvgModelParserMapper.class);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import org.elasticsearch.common.Nullable;
import java.util.Map;
/**
* Common interface for parsers used by the various Moving Average models
*/
public interface MovAvgModelParser {
public MovAvgModel parse(@Nullable Map<String, Object> settings);
public String getName();
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.inject.Inject;
import java.util.Set;
/**
* Contains a map of all concrete model parsers which can be used to build Models
*/
public class MovAvgModelParserMapper {
protected ImmutableMap<String, MovAvgModelParser> movAvgParsers;
@Inject
public MovAvgModelParserMapper(Set<MovAvgModelParser> parsers) {
MapBuilder<String, MovAvgModelParser> builder = MapBuilder.newMapBuilder();
for (MovAvgModelParser parser : parsers) {
builder.put(parser.getName(), parser);
}
movAvgParsers = builder.immutableMap();
}
public @Nullable
MovAvgModelParser get(String parserName) {
return movAvgParsers.get(parserName);
}
public ImmutableSet<String> getAllNames() {
return movAvgParsers.keySet();
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
/**
* A registry for all moving average models. This is needed for reading them from a stream without knowing which
* one it is.
*/
public class MovAvgModelStreams {
private static ImmutableMap<String, Stream> STREAMS = ImmutableMap.of();
public static MovAvgModel read(StreamInput in) throws IOException {
return stream(in.readString()).readResult(in);
}
/**
* A stream that knows how to read an heuristic from the input.
*/
public static interface Stream {
MovAvgModel readResult(StreamInput in) throws IOException;
String getName();
}
/**
* Registers the given stream and associate it with the given types.
*
* @param stream The stream to register
* @param names The names associated with the streams
*/
public static synchronized void registerStream(Stream stream, String... names) {
MapBuilder<String, Stream> uStreams = MapBuilder.newMapBuilder(STREAMS);
for (String name : names) {
uStreams.put(name, stream);
}
STREAMS = uStreams.immutableMap();
}
/**
* Returns the stream that is registered for the given name
*
* @param name The given name
* @return The associated stream
*/
public static Stream stream(String name) {
return STREAMS.get(name);
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
/**
* Calculate a simple unweighted (arithmetic) moving average
*/
public class SimpleModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("simple");
@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
for (T v : values) {
avg += v.doubleValue();
}
return avg / values.size();
}
public static final MovAvgModelStreams.Stream STREAM = new MovAvgModelStreams.Stream() {
@Override
public MovAvgModel readResult(StreamInput in) throws IOException {
return new SimpleModel();
}
@Override
public String getName() {
return NAME_FIELD.getPreferredName();
}
};
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(STREAM.getName());
}
public static class SimpleModelParser implements MovAvgModelParser {
@Override
public String getName() {
return NAME_FIELD.getPreferredName();
}
@Override
public MovAvgModel parse(@Nullable Map<String, Object> settings) {
return new SimpleModel();
}
}
public static class SimpleModelBuilder implements MovAvgModelBuilder {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName());
return builder;
}
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
/**
* Calculate a exponentially weighted moving average
*/
public class SingleExpModel extends MovAvgModel {
protected static final ParseField NAME_FIELD = new ParseField("single_exp");
/**
* Controls smoothing of data. Alpha = 1 retains no memory of past values
* (e.g. random walk), while alpha = 0 retains infinite memory of past values (e.g.
* mean of the series). Useful values are somewhere in between
*/
private double alpha;
public SingleExpModel(double alpha) {
this.alpha = alpha;
}
@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
boolean first = true;
for (T v : values) {
if (first) {
avg = v.doubleValue();
first = false;
} else {
avg = (v.doubleValue() * alpha) + (avg * (1 - alpha));
}
}
return avg;
}
public static final MovAvgModelStreams.Stream STREAM = new MovAvgModelStreams.Stream() {
@Override
public MovAvgModel readResult(StreamInput in) throws IOException {
return new SingleExpModel(in.readDouble());
}
@Override
public String getName() {
return NAME_FIELD.getPreferredName();
}
};
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(STREAM.getName());
out.writeDouble(alpha);
}
public static class SingleExpModelParser implements MovAvgModelParser {
@Override
public String getName() {
return NAME_FIELD.getPreferredName();
}
@Override
public MovAvgModel parse(@Nullable Map<String, Object> settings) {
Double alpha;
if (settings == null || (alpha = (Double)settings.get("alpha")) == null) {
alpha = 0.5;
}
return new SingleExpModel(alpha);
}
}
public static class SingleExpModelBuilder implements MovAvgModelBuilder {
private double alpha = 0.5;
/**
* Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values
* (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g.
* the series mean). Useful values are somewhere in between. Defaults to 0.5.
*
* @param alpha A double between 0-1 inclusive, controls data smoothing
*
* @return The builder to continue chaining
*/
public SingleExpModelBuilder alpha(double alpha) {
this.alpha = alpha;
return this;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(MovAvgParser.MODEL.getPreferredName(), NAME_FIELD.getPreferredName());
builder.startObject(MovAvgParser.SETTINGS.getPreferredName());
builder.field("alpha", alpha);
builder.endObject();
return builder;
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers.movavg.models;
import com.google.common.collect.Lists;
import org.elasticsearch.common.inject.AbstractModule;
import java.util.List;
/**
* Register the transport streams so that models can be serialized/deserialized from the stream
*/
public class TransportMovAvgModelModule extends AbstractModule {
private List<MovAvgModelStreams.Stream> streams = Lists.newArrayList();
public TransportMovAvgModelModule() {
registerStream(SimpleModel.STREAM);
registerStream(LinearModel.STREAM);
registerStream(SingleExpModel.STREAM);
registerStream(DoubleExpModel.STREAM);
}
public void registerStream(MovAvgModelStreams.Stream stream) {
streams.add(stream);
}
@Override
protected void configure() {
for (MovAvgModelStreams.Stream stream : streams) {
MovAvgModelStreams.registerStream(stream, stream.getName());
}
}
}

View File

@ -0,0 +1,500 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.reducers;
import com.google.common.collect.EvictingQueue;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram.Bucket;
import org.elasticsearch.search.aggregations.reducers.movavg.models.DoubleExpModel;
import org.elasticsearch.search.aggregations.reducers.movavg.models.LinearModel;
import org.elasticsearch.search.aggregations.reducers.movavg.models.SimpleModel;
import org.elasticsearch.search.aggregations.reducers.movavg.models.SingleExpModel;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
import static org.elasticsearch.search.aggregations.reducers.ReducerBuilders.smooth;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsNull.notNullValue;
@ElasticsearchIntegrationTest.SuiteScopeTest
public class MovAvgTests extends ElasticsearchIntegrationTest {
private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
private static final String SINGLE_VALUED_VALUE_FIELD_NAME = "v_value";
static int interval;
static int numValueBuckets;
static int numFilledValueBuckets;
static int windowSize;
static BucketHelpers.GapPolicy gapPolicy;
static long[] docCounts;
static long[] valueCounts;
static Double[] simpleMovAvgCounts;
static Double[] linearMovAvgCounts;
static Double[] singleExpMovAvgCounts;
static Double[] doubleExpMovAvgCounts;
static Double[] simpleMovAvgValueCounts;
static Double[] linearMovAvgValueCounts;
static Double[] singleExpMovAvgValueCounts;
static Double[] doubleExpMovAvgValueCounts;
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
interval = 5;
numValueBuckets = randomIntBetween(6, 80);
numFilledValueBuckets = numValueBuckets;
windowSize = randomIntBetween(3,10);
gapPolicy = randomBoolean() ? BucketHelpers.GapPolicy.IGNORE : BucketHelpers.GapPolicy.INSERT_ZEROS;
docCounts = new long[numValueBuckets];
valueCounts = new long[numValueBuckets];
for (int i = 0; i < numValueBuckets; i++) {
docCounts[i] = randomIntBetween(0, 20);
valueCounts[i] = randomIntBetween(1,20); //this will be used as a constant for all values within a bucket
}
this.setupSimple();
this.setupLinear();
this.setupSingle();
this.setupDouble();
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numValueBuckets; i++) {
for (int docs = 0; docs < docCounts[i]; docs++) {
builders.add(client().prepareIndex("idx", "type").setSource(jsonBuilder().startObject()
.field(SINGLE_VALUED_FIELD_NAME, i * interval)
.field(SINGLE_VALUED_VALUE_FIELD_NAME, 1).endObject()));
}
}
indexRandom(true, builders);
ensureSearchable();
}
private void setupSimple() {
simpleMovAvgCounts = new Double[numValueBuckets];
EvictingQueue<Double> window = EvictingQueue.create(windowSize);
for (int i = 0; i < numValueBuckets; i++) {
double thisValue = docCounts[i];
window.offer(thisValue);
double movAvg = 0;
for (double value : window) {
movAvg += value;
}
movAvg /= window.size();
simpleMovAvgCounts[i] = movAvg;
}
window.clear();
simpleMovAvgValueCounts = new Double[numValueBuckets];
for (int i = 0; i < numValueBuckets; i++) {
window.offer((double)docCounts[i]);
double movAvg = 0;
for (double value : window) {
movAvg += value;
}
movAvg /= window.size();
simpleMovAvgValueCounts[i] = movAvg;
}
}
private void setupLinear() {
EvictingQueue<Double> window = EvictingQueue.create(windowSize);
linearMovAvgCounts = new Double[numValueBuckets];
window.clear();
for (int i = 0; i < numValueBuckets; i++) {
double thisValue = docCounts[i];
if (thisValue == -1) {
thisValue = 0;
}
window.offer(thisValue);
double avg = 0;
long totalWeight = 1;
long current = 1;
for (double value : window) {
avg += value * current;
totalWeight += current;
current += 1;
}
linearMovAvgCounts[i] = avg / totalWeight;
}
window.clear();
linearMovAvgValueCounts = new Double[numValueBuckets];
for (int i = 0; i < numValueBuckets; i++) {
double thisValue = docCounts[i];
window.offer(thisValue);
double avg = 0;
long totalWeight = 1;
long current = 1;
for (double value : window) {
avg += value * current;
totalWeight += current;
current += 1;
}
linearMovAvgValueCounts[i] = avg / totalWeight;
}
}
private void setupSingle() {
EvictingQueue<Double> window = EvictingQueue.create(windowSize);
singleExpMovAvgCounts = new Double[numValueBuckets];
for (int i = 0; i < numValueBuckets; i++) {
double thisValue = docCounts[i];
if (thisValue == -1) {
thisValue = 0;
}
window.offer(thisValue);
double avg = 0;
double alpha = 0.5;
boolean first = true;
for (double value : window) {
if (first) {
avg = value;
first = false;
} else {
avg = (value * alpha) + (avg * (1 - alpha));
}
}
singleExpMovAvgCounts[i] = avg ;
}
singleExpMovAvgValueCounts = new Double[numValueBuckets];
window.clear();
for (int i = 0; i < numValueBuckets; i++) {
window.offer((double)docCounts[i]);
double avg = 0;
double alpha = 0.5;
boolean first = true;
for (double value : window) {
if (first) {
avg = value;
first = false;
} else {
avg = (value * alpha) + (avg * (1 - alpha));
}
}
singleExpMovAvgCounts[i] = avg ;
}
}
private void setupDouble() {
EvictingQueue<Double> window = EvictingQueue.create(windowSize);
doubleExpMovAvgCounts = new Double[numValueBuckets];
for (int i = 0; i < numValueBuckets; i++) {
double thisValue = docCounts[i];
if (thisValue == -1) {
thisValue = 0;
}
window.offer(thisValue);
double s = 0;
double last_s = 0;
// Trend value
double b = 0;
double last_b = 0;
double alpha = 0.5;
double beta = 0.5;
int counter = 0;
double last;
for (double value : window) {
last = value;
if (counter == 1) {
s = value;
b = value - last;
} else {
s = alpha * value + (1.0d - alpha) * (last_s + last_b);
b = beta * (s - last_s) + (1 - beta) * last_b;
}
counter += 1;
last_s = s;
last_b = b;
}
doubleExpMovAvgCounts[i] = s + (0 * b) ;
}
doubleExpMovAvgValueCounts = new Double[numValueBuckets];
window.clear();
for (int i = 0; i < numValueBuckets; i++) {
window.offer((double)docCounts[i]);
double s = 0;
double last_s = 0;
// Trend value
double b = 0;
double last_b = 0;
double alpha = 0.5;
double beta = 0.5;
int counter = 0;
double last;
for (double value : window) {
last = value;
if (counter == 1) {
s = value;
b = value - last;
} else {
s = alpha * value + (1.0d - alpha) * (last_s + last_b);
b = beta * (s - last_s) + (1 - beta) * last_b;
}
counter += 1;
last_s = s;
last_b = b;
}
doubleExpMovAvgValueCounts[i] = s + (0 * b) ;
}
}
/**
* test simple moving average on single value field
*/
@Test
public void simpleSingleValuedField() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds(0L, (long) (interval * (numValueBuckets - 1)))
.subAggregation(sum("the_sum").field(SINGLE_VALUED_VALUE_FIELD_NAME))
.subAggregation(smooth("smooth")
.window(windowSize)
.modelBuilder(new SimpleModel.SimpleModelBuilder())
.gapPolicy(gapPolicy)
.setBucketsPaths("_count"))
.subAggregation(smooth("movavg_values")
.window(windowSize)
.modelBuilder(new SimpleModel.SimpleModelBuilder())
.gapPolicy(gapPolicy)
.setBucketsPaths("the_sum"))
).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, docCounts[i]);
SimpleValue docCountMovAvg = bucket.getAggregations().get("smooth");
assertThat(docCountMovAvg, notNullValue());
assertThat(docCountMovAvg.value(), equalTo(simpleMovAvgCounts[i]));
SimpleValue valuesMovAvg = bucket.getAggregations().get("movavg_values");
assertThat(valuesMovAvg, notNullValue());
assertThat(valuesMovAvg.value(), equalTo(simpleMovAvgCounts[i]));
}
}
/**
* test linear moving average on single value field
*/
@Test
public void linearSingleValuedField() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds(0L, (long) (interval * (numValueBuckets - 1)))
.subAggregation(sum("the_sum").field(SINGLE_VALUED_VALUE_FIELD_NAME))
.subAggregation(smooth("smooth")
.window(windowSize)
.modelBuilder(new LinearModel.LinearModelBuilder())
.gapPolicy(gapPolicy)
.setBucketsPaths("_count"))
.subAggregation(smooth("movavg_values")
.window(windowSize)
.modelBuilder(new LinearModel.LinearModelBuilder())
.gapPolicy(gapPolicy)
.setBucketsPaths("the_sum"))
).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, docCounts[i]);
SimpleValue docCountMovAvg = bucket.getAggregations().get("smooth");
assertThat(docCountMovAvg, notNullValue());
assertThat(docCountMovAvg.value(), equalTo(linearMovAvgCounts[i]));
SimpleValue valuesMovAvg = bucket.getAggregations().get("movavg_values");
assertThat(valuesMovAvg, notNullValue());
assertThat(valuesMovAvg.value(), equalTo(linearMovAvgCounts[i]));
}
}
/**
* test single exponential moving average on single value field
*/
@Test
public void singleExpSingleValuedField() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds(0L, (long) (interval * (numValueBuckets - 1)))
.subAggregation(sum("the_sum").field(SINGLE_VALUED_VALUE_FIELD_NAME))
.subAggregation(smooth("smooth")
.window(windowSize)
.modelBuilder(new SingleExpModel.SingleExpModelBuilder().alpha(0.5))
.gapPolicy(gapPolicy)
.setBucketsPaths("_count"))
.subAggregation(smooth("movavg_values")
.window(windowSize)
.modelBuilder(new SingleExpModel.SingleExpModelBuilder().alpha(0.5))
.gapPolicy(gapPolicy)
.setBucketsPaths("the_sum"))
).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, docCounts[i]);
SimpleValue docCountMovAvg = bucket.getAggregations().get("smooth");
assertThat(docCountMovAvg, notNullValue());
assertThat(docCountMovAvg.value(), equalTo(singleExpMovAvgCounts[i]));
SimpleValue valuesMovAvg = bucket.getAggregations().get("movavg_values");
assertThat(valuesMovAvg, notNullValue());
assertThat(valuesMovAvg.value(), equalTo(singleExpMovAvgCounts[i]));
}
}
/**
* test double exponential moving average on single value field
*/
@Test
public void doubleExpSingleValuedField() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).minDocCount(0)
.extendedBounds(0L, (long) (interval * (numValueBuckets - 1)))
.subAggregation(sum("the_sum").field(SINGLE_VALUED_VALUE_FIELD_NAME))
.subAggregation(smooth("smooth")
.window(windowSize)
.modelBuilder(new DoubleExpModel.DoubleExpModelBuilder().alpha(0.5).beta(0.5))
.gapPolicy(gapPolicy)
.setBucketsPaths("_count"))
.subAggregation(smooth("movavg_values")
.window(windowSize)
.modelBuilder(new DoubleExpModel.DoubleExpModelBuilder().alpha(0.5).beta(0.5))
.gapPolicy(gapPolicy)
.setBucketsPaths("the_sum"))
).execute().actionGet();
assertSearchResponse(response);
InternalHistogram<Bucket> histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
checkBucketKeyAndDocCount("Bucket " + i, bucket, i * interval, docCounts[i]);
SimpleValue docCountMovAvg = bucket.getAggregations().get("smooth");
assertThat(docCountMovAvg, notNullValue());
assertThat(docCountMovAvg.value(), equalTo(doubleExpMovAvgCounts[i]));
SimpleValue valuesMovAvg = bucket.getAggregations().get("movavg_values");
assertThat(valuesMovAvg, notNullValue());
assertThat(valuesMovAvg.value(), equalTo(doubleExpMovAvgCounts[i]));
}
}
private void checkBucketKeyAndDocCount(final String msg, final Histogram.Bucket bucket, final long expectedKey,
long expectedDocCount) {
if (expectedDocCount == -1) {
expectedDocCount = 0;
}
assertThat(msg, bucket, notNullValue());
assertThat(msg + " key", ((Number) bucket.getKey()).longValue(), equalTo(expectedKey));
assertThat(msg + " docCount", bucket.getDocCount(), equalTo(expectedDocCount));
}
}