Add a MovingFunction pipeline aggregation, deprecate MovingAvg agg (#29594)

This pipeline aggregation gives the user the ability to script functions that "move" across a window
of data, instead of single data points.  It is the scripted version of MovingAvg pipeline agg.

Through custom script contexts, we expose a number of convenience methods:

 - MovingFunctions.max()
 - MovingFunctions.min()
 - MovingFunctions.sum()
 - MovingFunctions.unweightedAvg()
 - MovingFunctions.linearWeightedAvg()
 - MovingFunctions.ewma()
 - MovingFunctions.holt()
 - MovingFunctions.holtWinters()
 - MovingFunctions.stdDev()

The user can also define any arbitrary logic via their own scripting, or combine with the above methods.
This commit is contained in:
Zachary Tong 2018-05-16 10:57:00 -04:00 committed by GitHub
parent fa43aacd06
commit df853c49c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2876 additions and 177 deletions

View File

@ -72,6 +72,7 @@ POST /_search
}
--------------------------------------------------
// CONSOLE
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
<1> The metric is called `"the_sum"`
<2> The `buckets_path` refers to the metric via a relative path `"the_sum"`
@ -136,6 +137,7 @@ POST /_search
}
--------------------------------------------------
// CONSOLE
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
<1> By using `_count` instead of a metric name, we can calculate the moving average of document counts in the histogram
The `buckets_path` can also use `"_bucket_count"` and path to a multi-bucket aggregation to use the number of buckets
@ -231,6 +233,7 @@ include::pipeline/stats-bucket-aggregation.asciidoc[]
include::pipeline/extended-stats-bucket-aggregation.asciidoc[]
include::pipeline/percentiles-bucket-aggregation.asciidoc[]
include::pipeline/movavg-aggregation.asciidoc[]
include::pipeline/movfn-aggregation.asciidoc[]
include::pipeline/cumulative-sum-aggregation.asciidoc[]
include::pipeline/bucket-script-aggregation.asciidoc[]
include::pipeline/bucket-selector-aggregation.asciidoc[]

View File

@ -1,6 +1,10 @@
[[search-aggregations-pipeline-movavg-aggregation]]
=== Moving Average Aggregation
deprecated[6.4.0, The Moving Average aggregation has been deprecated in favor of the more general
<<search-aggregations-pipeline-movfn-aggregation,Moving Function Aggregation>>. The new Moving Function aggregation provides
all the same functionality as the Moving Average aggregation, but also provides more flexibility.]
Given an ordered series of data, the Moving Average aggregation will slide a window across the data and emit the average
value of that window. For example, given the data `[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]`, we can calculate a simple moving
average with windows size of `5` as follows:
@ -74,6 +78,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals
<2> A `sum` metric is used to calculate the sum of a field. This could be any metric (sum, min, max, etc)
@ -180,6 +185,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
A `simple` model has no special settings to configure
@ -233,6 +239,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
A `linear` model has no special settings to configure
@ -295,7 +302,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
[[single_0.2alpha]]
.EWMA with window of size 10, alpha = 0.2
@ -355,6 +362,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
In practice, the `alpha` value behaves very similarly in `holt` as `ewma`: small values produce more smoothing
and more lag, while larger values produce closer tracking and less lag. The value of `beta` is often difficult
@ -446,7 +454,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
[[holt_winters_add]]
.Holt-Winters moving average with window of size 120, alpha = 0.5, beta = 0.7, gamma = 0.3, period = 30
@ -508,6 +516,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
==== Prediction
@ -550,6 +559,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
The `simple`, `linear` and `ewma` models all produce "flat" predictions: they essentially converge on the mean
of the last value in the series, producing a flat:
@ -631,6 +641,7 @@ POST /_search
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
// TEST[warning:The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.]
<1> Minimization is enabled with the `minimize` parameter

View File

@ -0,0 +1,633 @@
[[search-aggregations-pipeline-movfn-aggregation]]
=== Moving Function Aggregation
Given an ordered series of data, the Moving Function aggregation will slide a window across the data and allow the user to specify a custom
script that is executed on each window of data. For convenience, a number of common functions are predefined such as min/max, moving averages,
etc.
This is conceptually very similar to the <<search-aggregations-pipeline-movavg-aggregation, Moving Average>> pipeline aggregation, except
it provides more functionality.
==== Syntax
A `moving_fn` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.min(values)"
}
}
--------------------------------------------------
// NOTCONSOLE
.`moving_avg` Parameters
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |Path to the metric of interest (see <<buckets-path-syntax, `buckets_path` Syntax>> for more details |Required |
|`window` |The size of window to "slide" across the histogram. |Required |
|`script` |The script that should be executed on each window of data |Required |
|===
`moving_fn` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation. They can be
embedded like any other metric aggregation:
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{ <1>
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" } <2>
},
"the_movfn": {
"moving_fn": {
"buckets_path": "the_sum", <3>
"window": 10,
"script": "MovingFunctions.unweightedAvg(values)"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals
<2> A `sum` metric is used to calculate the sum of a field. This could be any numeric metric (sum, min, max, etc)
<3> Finally, we specify a `moving_fn` aggregation which uses "the_sum" metric as its input.
Moving averages are built by first specifying a `histogram` or `date_histogram` over a field. You can then optionally
add numeric metrics, such as a `sum`, inside of that histogram. Finally, the `moving_fn` is embedded inside the histogram.
The `buckets_path` parameter is then used to "point" at one of the sibling metrics inside of the histogram (see
<<buckets-path-syntax>> for a description of the syntax for `buckets_path`.
An example response from the above aggregation may look like:
[source,js]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"my_date_histo": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"the_sum": {
"value": 550.0
},
"the_movfn": {
"value": null
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"the_sum": {
"value": 60.0
},
"the_movfn": {
"value": 550.0
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"the_sum": {
"value": 375.0
},
"the_movfn": {
"value": 305.0
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
==== Custom user scripting
The Moving Function aggregation allows the user to specify any arbitrary script to define custom logic. The script is invoked each time a
new window of data is collected. These values are provided to the script in the `values` variable. The script should then perform some
kind of calculation and emit a single `double` as the result. Emitting `null` is not permitted, although `NaN` and +/- `Inf` are allowed.
For example, this script will simply return the first value from the window, or `NaN` if no values are available:
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_movavg": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "return values.length > 0 ? values[0] : Double.NaN"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
==== Pre-built Functions
For convenience, a number of functions have been prebuilt and are available inside the `moving_fn` script context:
- `max()`
- `min()`
- `sum()`
- `stdDev()`
- `unweightedAvg()`
- `linearWeightedAvg()`
- `ewma()`
- `holt()`
- `holtWinters()`
The functions are available from the `MovingFunctions` namespace. E.g. `MovingFunctions.max()`
===== max Function
This function accepts a collection of doubles and returns the maximum value in that window. `null` and `NaN` values are ignored; the maximum
is only calculated over the real values. If the window is empty, or all values are `null`/`NaN`, `NaN` is returned as the result.
.`max(double[] values)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the maximum
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_moving_max": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.max(values)"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
===== min Function
This function accepts a collection of doubles and returns the minimum value in that window. `null` and `NaN` values are ignored; the minimum
is only calculated over the real values. If the window is empty, or all values are `null`/`NaN`, `NaN` is returned as the result.
.`min(double[] values)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the minimum
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_moving_min": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.min(values)"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
===== sum Function
This function accepts a collection of doubles and returns the sum of the values in that window. `null` and `NaN` values are ignored;
the sum is only calculated over the real values. If the window is empty, or all values are `null`/`NaN`, `0.0` is returned as the result.
.`sum(double[] values)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the sum of
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_moving_sum": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.sum(values)"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
===== stdDev Function
This function accepts a collection of doubles and and average, then returns the standard deviation of the values in that window.
`null` and `NaN` values are ignored; the sum is only calculated over the real values. If the window is empty, or all values are
`null`/`NaN`, `0.0` is returned as the result.
.`stdDev(double[] values)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the standard deviation of
|`avg` |The average of the window
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_moving_sum": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.stdDev(values, MovingFunctions.unweightedAvg(values))"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
The `avg` parameter must be provided to the standard deviation function because different styles of averages can be computed on the window
(simple, linearly weighted, etc). The various moving averages that are detailed below can be used to calculate the average for the
standard deviation function.
===== unweightedAvg Function
The `unweightedAvg` function calculates the sum of all values in the window, then divides by the size of the window. It is effectively
a simple arithmetic mean of the window. The simple moving average does not perform any time-dependent weighting, which means
the values from a `simple` moving average tend to "lag" behind the real data.
`null` and `NaN` values are ignored; the average is only calculated over the real values. If the window is empty, or all values are
`null`/`NaN`, `NaN` is returned as the result. This means that the count used in the average calculation is count of non-`null`,non-`NaN`
values.
.`unweightedAvg(double[] values)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the sum of
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_movavg": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.unweightedAvg(values)"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
==== linearWeightedAvg Function
The `linearWeightedAvg` function assigns a linear weighting to points in the series, such that "older" datapoints (e.g. those at
the beginning of the window) contribute a linearly less amount to the total average. The linear weighting helps reduce
the "lag" behind the data's mean, since older points have less influence.
If the window is empty, or all values are `null`/`NaN`, `NaN` is returned as the result.
.`linearWeightedAvg(double[] values)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the sum of
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_movavg": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.linearWeightedAvg(values)"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
==== ewma Function
The `ewma` function (aka "single-exponential") is similar to the `linearMovAvg` function,
except older data-points become exponentially less important,
rather than linearly less important. The speed at which the importance decays can be controlled with an `alpha`
setting. Small values make the weight decay slowly, which provides greater smoothing and takes into account a larger
portion of the window. Larger valuers make the weight decay quickly, which reduces the impact of older values on the
moving average. This tends to make the moving average track the data more closely but with less smoothing.
`null` and `NaN` values are ignored; the average is only calculated over the real values. If the window is empty, or all values are
`null`/`NaN`, `NaN` is returned as the result. This means that the count used in the average calculation is count of non-`null`,non-`NaN`
values.
.`ewma(double[] values, double alpha)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the sum of
|`alpha` |Exponential decay
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_movavg": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.ewma(values, 0.3)"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
==== holt Function
The `holt` function (aka "double exponential") incorporates a second exponential term which
tracks the data's trend. Single exponential does not perform well when the data has an underlying linear trend. The
double exponential model calculates two values internally: a "level" and a "trend".
The level calculation is similar to `ewma`, and is an exponentially weighted view of the data. The difference is
that the previously smoothed value is used instead of the raw value, which allows it to stay close to the original series.
The trend calculation looks at the difference between the current and last value (e.g. the slope, or trend, of the
smoothed data). The trend value is also exponentially weighted.
Values are produced by multiplying the level and trend components.
`null` and `NaN` values are ignored; the average is only calculated over the real values. If the window is empty, or all values are
`null`/`NaN`, `NaN` is returned as the result. This means that the count used in the average calculation is count of non-`null`,non-`NaN`
values.
.`holt(double[] values, double alpha)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the sum of
|`alpha` |Level decay value
|`beta` |Trend decay value
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_movavg": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "MovingFunctions.holt(values, 0.3, 0.1)"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
In practice, the `alpha` value behaves very similarly in `holtMovAvg` as `ewmaMovAvg`: small values produce more smoothing
and more lag, while larger values produce closer tracking and less lag. The value of `beta` is often difficult
to see. Small values emphasize long-term trends (such as a constant linear trend in the whole series), while larger
values emphasize short-term trends.
==== holtWinters Function
The `holtWinters` function (aka "triple exponential") incorporates a third exponential term which
tracks the seasonal aspect of your data. This aggregation therefore smooths based on three components: "level", "trend"
and "seasonality".
The level and trend calculation is identical to `holt` The seasonal calculation looks at the difference between
the current point, and the point one period earlier.
Holt-Winters requires a little more handholding than the other moving averages. You need to specify the "periodicity"
of your data: e.g. if your data has cyclic trends every 7 days, you would set `period = 7`. Similarly if there was
a monthly trend, you would set it to `30`. There is currently no periodicity detection, although that is planned
for future enhancements.
`null` and `NaN` values are ignored; the average is only calculated over the real values. If the window is empty, or all values are
`null`/`NaN`, `NaN` is returned as the result. This means that the count used in the average calculation is count of non-`null`,non-`NaN`
values.
.`holtWinters(double[] values, double alpha)` Parameters
|===
|Parameter Name |Description
|`values` |The window of values to find the sum of
|`alpha` |Level decay value
|`beta` |Trend decay value
|`gamma` |Seasonality decay value
|`period` |The periodicity of the data
|`multiplicative` |True if you wish to use multiplicative holt-winters, false to use additive
|===
[source,js]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{
"date_histogram":{
"field":"date",
"interval":"1M"
},
"aggs":{
"the_sum":{
"sum":{ "field": "price" }
},
"the_movavg": {
"moving_fn": {
"buckets_path": "the_sum",
"window": 10,
"script": "if (values.length > 5*2) {MovingFunctions.holtWinters(values, 0.3, 0.1, 0.1, 5, false)}"
}
}
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sales]
[WARNING]
======
Multiplicative Holt-Winters works by dividing each data point by the seasonal value. This is problematic if any of
your data is zero, or if there are gaps in the data (since this results in a divid-by-zero). To combat this, the
`mult` Holt-Winters pads all values by a very small amount (1*10^-10^) so that all values are non-zero. This affects
the result, but only minimally. If your data is non-zero, or you prefer to see `NaN` when zero's are encountered,
you can disable this behavior with `pad: false`
======
===== "Cold Start"
Unfortunately, due to the nature of Holt-Winters, it requires two periods of data to "bootstrap" the algorithm. This
means that your `window` must always be *at least* twice the size of your period. An exception will be thrown if it
isn't. It also means that Holt-Winters will not emit a value for the first `2 * period` buckets; the current algorithm
does not backcast.
You'll notice in the above example we have an `if ()` statement checking the size of values. This is checking to make sure
we have two periods worth of data (`5 * 2`, where 5 is the period specified in the `holtWintersMovAvg` function) before calling
the holt-winters function.

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.painless.spi.PainlessExtension;
import org.elasticsearch.painless.spi.Whitelist;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.painless.spi.WhitelistLoader;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ScriptPlugin;
@ -39,6 +40,7 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctionScript;
import java.util.ArrayList;
import java.util.Arrays;
@ -55,18 +57,34 @@ import java.util.function.Supplier;
*/
public final class PainlessPlugin extends Plugin implements ScriptPlugin, ExtensiblePlugin, ActionPlugin {
private final Map<ScriptContext<?>, List<Whitelist>> extendedWhitelists = new HashMap<>();
private static final Map<ScriptContext<?>, List<Whitelist>> whitelists;
/*
* Contexts from Core that need custom whitelists can add them to the map below.
* Whitelist resources should be added as appropriately named, separate files
* under Painless' resources
*/
static {
Map<ScriptContext<?>, List<Whitelist>> map = new HashMap<>();
// Moving Function Pipeline Agg
List<Whitelist> movFn = new ArrayList<>(Whitelist.BASE_WHITELISTS);
movFn.add(WhitelistLoader.loadFromResourceFiles(Whitelist.class, "org.elasticsearch.aggs.movfn.txt"));
map.put(MovingFunctionScript.CONTEXT, movFn);
whitelists = map;
}
@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
Map<ScriptContext<?>, List<Whitelist>> contextsWithWhitelists = new HashMap<>();
for (ScriptContext<?> context : contexts) {
// we might have a context that only uses the base whitelists, so would not have been filled in by reloadSPI
List<Whitelist> whitelists = extendedWhitelists.get(context);
if (whitelists == null) {
whitelists = new ArrayList<>(Whitelist.BASE_WHITELISTS);
List<Whitelist> contextWhitelists = whitelists.get(context);
if (contextWhitelists == null) {
contextWhitelists = new ArrayList<>(Whitelist.BASE_WHITELISTS);
}
contextsWithWhitelists.put(context, whitelists);
contextsWithWhitelists.put(context, contextWhitelists);
}
return new PainlessScriptEngine(settings, contextsWithWhitelists);
}
@ -80,7 +98,7 @@ public final class PainlessPlugin extends Plugin implements ScriptPlugin, Extens
public void reloadSPI(ClassLoader loader) {
for (PainlessExtension extension : ServiceLoader.load(PainlessExtension.class, loader)) {
for (Map.Entry<ScriptContext<?>, List<Whitelist>> entry : extension.getContextWhitelists().entrySet()) {
List<Whitelist> existing = extendedWhitelists.computeIfAbsent(entry.getKey(),
List<Whitelist> existing = whitelists.computeIfAbsent(entry.getKey(),
c -> new ArrayList<>(Whitelist.BASE_WHITELISTS));
existing.addAll(entry.getValue());
}

View File

@ -0,0 +1,32 @@
#
# Licensed to Elasticsearch under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This file contains a whitelist for the Moving Function pipeline aggregator in core
class org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions {
double max(double[])
double min(double[])
double sum(double[])
double stdDev(double[], double)
double unweightedAvg(double[])
double linearWeightedAvg(double[])
double ewma(double[], double)
double holt(double[], double, double)
double holtWinters(double[], double, double, double, int, boolean)
}

View File

@ -0,0 +1,315 @@
# Sanity integration test to make sure the custom context and whitelist work for moving_fn pipeline agg
#
setup:
- skip:
version: " - 6.4.0"
reason: "moving_fn added in 6.4.0"
- do:
indices.create:
index: test
body:
mappings:
_doc:
properties:
value_field:
type: integer
date:
type: date
- do:
bulk:
refresh: true
body:
- index:
_index: test
_type: _doc
_id: 1
- date: "2017-01-01T00:00:00"
value_field: 1
- index:
_index: test
_type: _doc
_id: 2
- date: "2017-01-02T00:00:00"
value_field: 2
- index:
_index: test
_type: _doc
_id: 3
- date: "2017-01-03T00:00:00"
value_field: 3
- index:
_index: test
_type: _doc
_id: 4
- date: "2017-01-04T00:00:00"
value_field: 4
- index:
_index: test
_type: _doc
_id: 5
- date: "2017-01-05T00:00:00"
value_field: 5
- index:
_index: test
_type: _doc
_id: 6
- date: "2017-01-06T00:00:00"
value_field: 6
- do:
indices.refresh:
index: [test]
---
"max":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.max(values)"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }
- is_false: aggregations.the_histo.buckets.0.the_mov_fn.value
- match: { aggregations.the_histo.buckets.1.the_mov_fn.value: 1.0 }
- match: { aggregations.the_histo.buckets.2.the_mov_fn.value: 2.0 }
- match: { aggregations.the_histo.buckets.3.the_mov_fn.value: 3.0 }
- match: { aggregations.the_histo.buckets.4.the_mov_fn.value: 4.0 }
- match: { aggregations.the_histo.buckets.5.the_mov_fn.value: 5.0 }
---
"min":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.min(values)"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }
- is_false: aggregations.the_histo.buckets.0.the_mov_fn.value
- match: { aggregations.the_histo.buckets.1.the_mov_fn.value: 1.0 }
- match: { aggregations.the_histo.buckets.2.the_mov_fn.value: 1.0 }
- match: { aggregations.the_histo.buckets.3.the_mov_fn.value: 1.0 }
- match: { aggregations.the_histo.buckets.4.the_mov_fn.value: 2.0 }
- match: { aggregations.the_histo.buckets.5.the_mov_fn.value: 3.0 }
---
"sum":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.sum(values)"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }
- match: { aggregations.the_histo.buckets.0.the_mov_fn.value: 0.0 }
- match: { aggregations.the_histo.buckets.1.the_mov_fn.value: 1.0 }
- match: { aggregations.the_histo.buckets.2.the_mov_fn.value: 3.0 }
- match: { aggregations.the_histo.buckets.3.the_mov_fn.value: 6.0 }
- match: { aggregations.the_histo.buckets.4.the_mov_fn.value: 9.0 }
- match: { aggregations.the_histo.buckets.5.the_mov_fn.value: 12.0 }
---
"unweightedAvg":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.unweightedAvg(values)"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }
---
"linearWeightedAvg":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.linearWeightedAvg(values)"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }
---
"ewma":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.ewma(values, 0.1)"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }
---
"holt":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.holt(values, 0.1, 0.1)"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }
---
"holtWinters":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 1
script: "if (values.length > 1) { MovingFunctions.holtWinters(values, 0.1, 0.1, 0.1, 1, true)}"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }
---
"stdDev":
- do:
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: 3
script: "MovingFunctions.stdDev(values, MovingFunctions.unweightedAvg(values))"
- match: { hits.total: 6 }
- length: { hits.hits: 0 }

View File

@ -0,0 +1,46 @@
setup:
- skip:
version: " - 6.4.0"
reason: "moving_fn added in 6.4.0"
---
"Bad window":
- do:
catch: /\[window\] must be a positive, non-zero integer\./
search:
body:
size: 0
aggs:
the_histo:
date_histogram:
field: "date"
interval: "1d"
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: -1
script: "MovingFunctions.windowMax(values)"
---
"Not under date_histo":
- do:
catch: /\[window\] must be a positive, non-zero integer\./
search:
body:
size: 0
aggs:
the_avg:
avg:
field: "value_field"
the_mov_fn:
moving_fn:
buckets_path: "the_avg"
window: -1
script: "MovingFunctions.windowMax(values)"

View File

@ -195,7 +195,13 @@ setup:
---
"Test typed keys parameter for date_histogram aggregation and max_bucket pipeline aggregation":
- skip:
features: warnings
version: " - 6.4.0"
reason: "deprecation added in 6.4.0"
- do:
warnings:
- 'The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.'
search:
typed_keys: true
body:

View File

@ -30,6 +30,7 @@ import java.util.stream.Stream;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctionScript;
/**
* Manages building {@link ScriptService}.
@ -48,7 +49,8 @@ public class ScriptModule {
FilterScript.CONTEXT,
SimilarityScript.CONTEXT,
SimilarityWeightScript.CONTEXT,
TemplateScript.CONTEXT
TemplateScript.CONTEXT,
MovingFunctionScript.CONTEXT
).collect(Collectors.toMap(c -> c.name, Function.identity()));
}

View File

@ -220,6 +220,8 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersM
import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.fetch.FetchPhase;
@ -514,6 +516,11 @@ public class SearchModule {
SerialDiffPipelineAggregationBuilder::new,
SerialDiffPipelineAggregator::new,
SerialDiffPipelineAggregationBuilder::parse));
registerPipelineAggregation(new PipelineAggregationSpec(
MovFnPipelineAggregationBuilder.NAME,
MovFnPipelineAggregationBuilder::new,
MovFnPipelineAggregator::new,
MovFnPipelineAggregationBuilder::parse));
registerFromPlugin(plugins, SearchPlugin::getPipelineAggregations, this::registerPipelineAggregation);
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.search.aggregations.pipeline.bucketsort.BucketSortPipel
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovFnPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
@ -78,6 +79,10 @@ public final class PipelineAggregatorBuilders {
return new PercentilesBucketPipelineAggregationBuilder(name, bucketsPath);
}
/**
* @deprecated use {@link #movingFunction(String, Script, String, int)} instead
*/
@Deprecated
public static MovAvgPipelineAggregationBuilder movingAvg(String name, String bucketsPath) {
return new MovAvgPipelineAggregationBuilder(name, bucketsPath);
}
@ -114,4 +119,9 @@ public final class PipelineAggregatorBuilders {
public static SerialDiffPipelineAggregationBuilder diff(String name, String bucketsPath) {
return new SerialDiffPipelineAggregationBuilder(name, bucketsPath);
}
public static MovFnPipelineAggregationBuilder movingFunction(String name, Script script,
String bucketsPaths, int window) {
return new MovFnPipelineAggregationBuilder(name, bucketsPaths, script, window);
}
}

View File

@ -23,6 +23,8 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ParseFieldRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -59,6 +61,8 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio
public static final ParseField SETTINGS = new ParseField("settings");
private static final ParseField PREDICT = new ParseField("predict");
private static final ParseField MINIMIZE = new ParseField("minimize");
private static final DeprecationLogger DEPRECATION_LOGGER
= new DeprecationLogger(Loggers.getLogger(MovAvgPipelineAggregationBuilder.class));
private String format;
private GapPolicy gapPolicy = GapPolicy.SKIP;
@ -318,6 +322,8 @@ public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregatio
Integer predict = null;
Boolean minimize = null;
DEPRECATION_LOGGER.deprecated("The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.");
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions;
import java.io.IOException;
import java.text.ParseException;
@ -90,7 +91,7 @@ public class EwmaModel extends MovAvgModel {
}
@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
protected double[] doPredict(Collection<Double> values, int numPredictions) {
double[] predictions = new double[numPredictions];
// EWMA just emits the same final prediction repeatedly.
@ -100,19 +101,8 @@ public class EwmaModel extends MovAvgModel {
}
@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
boolean first = true;
for (T v : values) {
if (first) {
avg = v.doubleValue();
first = false;
} else {
avg = (v.doubleValue() * alpha) + (avg * (1 - alpha));
}
}
return avg;
public double next(Collection<Double> values) {
return MovingFunctions.ewma(values.stream().mapToDouble(Double::doubleValue).toArray(), alpha);
}
@Override

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions;
import java.io.IOException;
import java.text.ParseException;
@ -116,16 +117,15 @@ public class HoltLinearModel extends MovAvgModel {
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param numPredictions Number of newly generated predictions to return
* @param <T> Type of numeric
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
protected double[] doPredict(Collection<Double> values, int numPredictions) {
return next(values, numPredictions);
}
@Override
public <T extends Number> double next(Collection<T> values) {
public double next(Collection<Double> values) {
return next(values, 1)[0];
}
@ -135,47 +135,13 @@ public class HoltLinearModel extends MovAvgModel {
* @param values Collection of values to calculate avg for
* @param numForecasts number of forecasts into the future to return
*
* @param <T> Type T extending Number
* @return Returns a Double containing the moving avg for the window
*/
public <T extends Number> double[] next(Collection<T> values, int numForecasts) {
public double[] next(Collection<Double> values, int numForecasts) {
if (values.size() == 0) {
return emptyPredictions(numForecasts);
}
// Smoothed value
double s = 0;
double last_s = 0;
// Trend value
double b = 0;
double last_b = 0;
int counter = 0;
T last;
for (T v : values) {
last = v;
if (counter == 1) {
s = v.doubleValue();
b = v.doubleValue() - last.doubleValue();
} else {
s = alpha * v.doubleValue() + (1.0d - alpha) * (last_s + last_b);
b = beta * (s - last_s) + (1 - beta) * last_b;
}
counter += 1;
last_s = s;
last_b = b;
}
double[] forecastValues = new double[numForecasts];
for (int i = 0; i < numForecasts; i++) {
forecastValues[i] = s + (i * b);
}
return forecastValues;
return MovingFunctions.holtForecast(values.stream().mapToDouble(Double::doubleValue).toArray(), alpha, beta, numForecasts);
}
@Override

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions;
import java.io.IOException;
import java.text.ParseException;
@ -259,16 +260,15 @@ public class HoltWintersModel extends MovAvgModel {
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param numPredictions Number of newly generated predictions to return
* @param <T> Type of numeric
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
protected double[] doPredict(Collection<Double> values, int numPredictions) {
return next(values, numPredictions);
}
@Override
public <T extends Number> double next(Collection<T> values) {
public double next(Collection<Double> values) {
return next(values, 1)[0];
}
@ -278,88 +278,11 @@ public class HoltWintersModel extends MovAvgModel {
* @param values Collection of values to calculate avg for
* @param numForecasts number of forecasts into the future to return
*
* @param <T> Type T extending Number
* @return Returns a Double containing the moving avg for the window
*/
public <T extends Number> double[] next(Collection<T> values, int numForecasts) {
if (values.size() < period * 2) {
// We need at least two full "seasons" to use HW
// This should have been caught earlier, we can't do anything now...bail
throw new AggregationExecutionException("Holt-Winters aggregation requires at least (2 * period == 2 * "
+ period + " == "+(2 * period)+") data-points to function. Only [" + values.size() + "] were provided.");
}
// Smoothed value
double s = 0;
double last_s;
// Trend value
double b = 0;
double last_b = 0;
// Seasonal value
double[] seasonal = new double[values.size()];
int counter = 0;
double[] vs = new double[values.size()];
for (T v : values) {
vs[counter] = v.doubleValue() + padding;
counter += 1;
}
// Initial level value is average of first season
// Calculate the slopes between first and second season for each period
for (int i = 0; i < period; i++) {
s += vs[i];
b += (vs[i + period] - vs[i]) / period;
}
s /= period;
b /= period;
last_s = s;
// Calculate first seasonal
if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) {
Arrays.fill(seasonal, 0.0);
} else {
for (int i = 0; i < period; i++) {
seasonal[i] = vs[i] / s;
}
}
for (int i = period; i < vs.length; i++) {
// TODO if perf is a problem, we can specialize a subclass to avoid conditionals on each iteration
if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) {
s = alpha * (vs[i] / seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b);
} else {
s = alpha * (vs[i] - seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b);
}
b = beta * (s - last_s) + (1 - beta) * last_b;
if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) {
seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period];
} else {
seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period];
}
last_s = s;
last_b = b;
}
double[] forecastValues = new double[numForecasts];
for (int i = 1; i <= numForecasts; i++) {
int idx = values.size() - period + ((i - 1) % period);
// TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo?
if (seasonalityType.equals(SeasonalityType.MULTIPLICATIVE)) {
forecastValues[i-1] = (s + (i * b)) * seasonal[idx];
} else {
forecastValues[i-1] = s + (i * b) + seasonal[idx];
}
}
return forecastValues;
public double[] next(Collection<Double> values, int numForecasts) {
return MovingFunctions.holtWintersForecast(values.stream().mapToDouble(Double::doubleValue).toArray(),
alpha, beta, gamma, period, padding, seasonalityType.equals(SeasonalityType.MULTIPLICATIVE), numForecasts);
}
@Override

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions;
import java.io.IOException;
import java.text.ParseException;
@ -74,7 +75,7 @@ public class LinearModel extends MovAvgModel {
}
@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
protected double[] doPredict(Collection<Double> values, int numPredictions) {
double[] predictions = new double[numPredictions];
// EWMA just emits the same final prediction repeatedly.
@ -84,17 +85,8 @@ public class LinearModel extends MovAvgModel {
}
@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
long totalWeight = 1;
long current = 1;
for (T v : values) {
avg += v.doubleValue() * current;
totalWeight += current;
current += 1;
}
return avg / totalWeight;
public double next(Collection<Double> values) {
return MovingFunctions.linearWeightedAvg(values.stream().mapToDouble(Double::doubleValue).toArray());
}
@Override

View File

@ -68,20 +68,18 @@ public abstract class MovAvgModel implements NamedWriteable, ToXContentFragment
* Returns the next value in the series, according to the underlying smoothing model
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param <T> Type of numeric
* @return Returns a double, since most smoothing methods operate on floating points
*/
public abstract <T extends Number> double next(Collection<T> values);
public abstract double next(Collection<Double> values);
/**
* Predicts the next `n` values in the series.
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param numPredictions Number of newly generated predictions to return
* @param <T> Type of numeric
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
public <T extends Number> double[] predict(Collection<T> values, int numPredictions) {
public double[] predict(Collection<Double> values, int numPredictions) {
assert(numPredictions >= 1);
// If there are no values, we can't do anything. Return an array of NaNs.
@ -97,10 +95,9 @@ public abstract class MovAvgModel implements NamedWriteable, ToXContentFragment
*
* @param values Collection of numerics to movingAvg, usually windowed
* @param numPredictions Number of newly generated predictions to return
* @param <T> Type of numeric
* @return Returns an array of doubles, since most smoothing methods operate on floating points
*/
protected abstract <T extends Number> double[] doPredict(Collection<T> values, int numPredictions);
protected abstract double[] doPredict(Collection<Double> values, int numPredictions);
/**
* Returns an empty set of predictions, filled with NaNs

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions;
import java.io.IOException;
import java.text.ParseException;
@ -72,7 +73,7 @@ public class SimpleModel extends MovAvgModel {
}
@Override
protected <T extends Number> double[] doPredict(Collection<T> values, int numPredictions) {
protected double[] doPredict(Collection<Double> values, int numPredictions) {
double[] predictions = new double[numPredictions];
// Simple just emits the same final prediction repeatedly.
@ -82,12 +83,8 @@ public class SimpleModel extends MovAvgModel {
}
@Override
public <T extends Number> double next(Collection<T> values) {
double avg = 0;
for (T v : values) {
avg += v.doubleValue();
}
return avg / values.size();
public double next(Collection<Double> values) {
return MovingFunctions.unweightedAvg(values.stream().mapToDouble(Double::doubleValue).toArray());
}
@Override

View File

@ -0,0 +1,264 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.movfn;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.BUCKETS_PATH;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.GAP_POLICY;
public class MovFnPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<MovFnPipelineAggregationBuilder> {
public static final String NAME = "moving_fn";
private static final ParseField WINDOW = new ParseField("window");
private final Script script;
private final String bucketsPathString;
private String format = null;
private GapPolicy gapPolicy = GapPolicy.SKIP;
private int window;
private static final Function<String, ConstructingObjectParser<MovFnPipelineAggregationBuilder, Void>> PARSER
= name -> {
@SuppressWarnings("unchecked")
ConstructingObjectParser<MovFnPipelineAggregationBuilder, Void> parser = new ConstructingObjectParser<>(
MovFnPipelineAggregationBuilder.NAME,
false,
o -> new MovFnPipelineAggregationBuilder(name, (String) o[0], (Script) o[1], (int)o[2]));
parser.declareString(ConstructingObjectParser.constructorArg(), BUCKETS_PATH_FIELD);
parser.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> Script.parse(p), Script.SCRIPT_PARSE_FIELD, ObjectParser.ValueType.OBJECT_OR_STRING);
parser.declareInt(ConstructingObjectParser.constructorArg(), WINDOW);
parser.declareString(MovFnPipelineAggregationBuilder::format, FORMAT);
parser.declareField(MovFnPipelineAggregationBuilder::gapPolicy, p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return GapPolicy.parse(p.text().toLowerCase(Locale.ROOT), p.getTokenLocation());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, GAP_POLICY, ObjectParser.ValueType.STRING);
return parser;
};
public MovFnPipelineAggregationBuilder(String name, String bucketsPath, Script script, int window) {
super(name, NAME, new String[]{bucketsPath});
this.bucketsPathString = bucketsPath;
this.script = script;
if (window <= 0) {
throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer.");
}
this.window = window;
}
public MovFnPipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
bucketsPathString = in.readString();
script = new Script(in);
format = in.readOptionalString();
gapPolicy = GapPolicy.readFrom(in);
window = in.readInt();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeString(bucketsPathString);
script.writeTo(out);
out.writeOptionalString(format);
gapPolicy.writeTo(out);
out.writeInt(window);
}
/**
* Sets the format to use on the output of this aggregation.
*/
public MovFnPipelineAggregationBuilder format(String format) {
if (Strings.isNullOrEmpty(format)) {
throw new IllegalArgumentException("[" + FORMAT.getPreferredName() + "] must not be null or an empty string.");
}
this.format = format;
return this;
}
/**
* Gets the format to use on the output of this aggregation.
*/
public String format() {
return format;
}
protected DocValueFormat formatter() {
if (format != null) {
return new DocValueFormat.Decimal(format);
}
return DocValueFormat.RAW;
}
/**
* Sets the gap policy to use for this aggregation.
*/
public MovFnPipelineAggregationBuilder gapPolicy(GapPolicy gapPolicy) {
if (gapPolicy == null) {
throw new IllegalArgumentException("[" + GAP_POLICY.getPreferredName() + "] must not be null.");
}
this.gapPolicy = gapPolicy;
return this;
}
/**
* Gets the gap policy to use for this aggregation.
*/
public GapPolicy gapPolicy() {
return gapPolicy;
}
/**
* Returns the window size for this aggregation
*/
public int getWindow() {
return window;
}
/**
* Sets the window size for this aggregation
*/
public void setWindow(int window) {
if (window <= 0) {
throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer.");
}
this.window = window;
}
@Override
public void doValidate(AggregatorFactory<?> parent, List<AggregationBuilder> aggFactories,
List<PipelineAggregationBuilder> pipelineAggregatoractories) {
if (window <= 0) {
throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer.");
}
if (parent instanceof HistogramAggregatorFactory) {
HistogramAggregatorFactory histoParent = (HistogramAggregatorFactory) parent;
if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of moving_function aggregation [" + name
+ "] must have min_doc_count of 0");
}
} else if (parent instanceof DateHistogramAggregatorFactory) {
DateHistogramAggregatorFactory histoParent = (DateHistogramAggregatorFactory) parent;
if (histoParent.minDocCount() != 0) {
throw new IllegalStateException("parent histogram of moving_function aggregation [" + name
+ "] must have min_doc_count of 0");
}
} else {
throw new IllegalStateException("moving_function aggregation [" + name
+ "] must have a histogram or date_histogram as parent");
}
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new MovFnPipelineAggregator(name, bucketsPathString, script, window, formatter(), gapPolicy, metaData);
}
@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(BUCKETS_PATH.getPreferredName(), bucketsPathString);
builder.field(Script.SCRIPT_PARSE_FIELD.getPreferredName(), script);
if (format != null) {
builder.field(FORMAT.getPreferredName(), format);
}
builder.field(GAP_POLICY.getPreferredName(), gapPolicy.getName());
builder.field(WINDOW.getPreferredName(), window);
return builder;
}
public static MovFnPipelineAggregationBuilder parse(String aggName, XContentParser parser) {
return PARSER.apply(aggName).apply(parser, null);
}
/**
* Used for serialization testing, since pipeline aggs serialize themselves as a named object but are parsed
* as a regular object with the name passed in.
*/
static MovFnPipelineAggregationBuilder parse(XContentParser parser) throws IOException {
parser.nextToken();
if (parser.currentToken().equals(XContentParser.Token.START_OBJECT)) {
parser.nextToken();
if (parser.currentToken().equals(XContentParser.Token.FIELD_NAME)) {
String aggName = parser.currentName();
parser.nextToken(); // "moving_fn"
parser.nextToken(); // start_object
return PARSER.apply(aggName).apply(parser, null);
}
}
throw new IllegalStateException("Expected aggregation name but none found");
}
@Override
protected boolean overrideBucketsPath() {
return true;
}
@Override
protected int doHashCode() {
return Objects.hash(bucketsPathString, script, format, gapPolicy, window);
}
@Override
protected boolean doEquals(Object obj) {
MovFnPipelineAggregationBuilder other = (MovFnPipelineAggregationBuilder) obj;
return Objects.equals(bucketsPathString, other.bucketsPathString)
&& Objects.equals(script, other.script)
&& Objects.equals(format, other.format)
&& Objects.equals(gapPolicy, other.gapPolicy)
&& Objects.equals(window, other.window);
}
@Override
public String getWriteableName() {
return NAME;
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.movfn;
import org.elasticsearch.common.collect.EvictingQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
/**
* This pipeline aggregation gives the user the ability to script functions that "move" across a window
* of data, instead of single data points. It is the scripted version of MovingAvg pipeline agg.
*
* Through custom script contexts, we expose a number of convenience methods:
*
* - max
* - min
* - sum
* - unweightedAvg
* - linearWeightedAvg
* - ewma
* - holt
* - holtWintersMovAvg
*
* The user can also define any arbitrary logic via their own scripting, or combine with the above methods.
*/
public class MovFnPipelineAggregator extends PipelineAggregator {
private final DocValueFormat formatter;
private final BucketHelpers.GapPolicy gapPolicy;
private final Script script;
private final String bucketsPath;
private final int window;
MovFnPipelineAggregator(String name, String bucketsPath, Script script, int window, DocValueFormat formatter,
BucketHelpers.GapPolicy gapPolicy, Map<String, Object> metadata) {
super(name, new String[]{bucketsPath}, metadata);
this.bucketsPath = bucketsPath;
this.script = script;
this.formatter = formatter;
this.gapPolicy = gapPolicy;
this.window = window;
}
public MovFnPipelineAggregator(StreamInput in) throws IOException {
super(in);
script = new Script(in);
formatter = in.readNamedWriteable(DocValueFormat.class);
gapPolicy = BucketHelpers.GapPolicy.readFrom(in);
bucketsPath = in.readString();
window = in.readInt();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
script.writeTo(out);
out.writeNamedWriteable(formatter);
gapPolicy.writeTo(out);
out.writeString(bucketsPath);
out.writeInt(window);
}
@Override
public String getWriteableName() {
return MovFnPipelineAggregationBuilder.NAME;
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, InternalAggregation.ReduceContext reduceContext) {
InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
HistogramFactory factory = (HistogramFactory) histo;
List<MultiBucketsAggregation.Bucket> newBuckets = new ArrayList<>();
EvictingQueue<Double> values = new EvictingQueue<>(this.window);
// Initialize the script
MovingFunctionScript.Factory scriptFactory = reduceContext.scriptService().compile(script, MovingFunctionScript.CONTEXT);
Map<String, Object> vars = new HashMap<>();
if (script.getParams() != null) {
vars.putAll(script.getParams());
}
MovingFunctionScript executableScript = scriptFactory.newInstance();
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
Double thisBucketValue = resolveBucketValue(histo, bucket, bucketsPaths()[0], gapPolicy);
// Default is to reuse existing bucket. Simplifies the rest of the logic,
// since we only change newBucket if we can add to it
MultiBucketsAggregation.Bucket newBucket = bucket;
if (thisBucketValue != null && thisBucketValue.equals(Double.NaN) == false) {
// The custom context mandates that the script returns a double (not Double) so we
// don't need null checks, etc.
double movavg = executableScript.execute(vars, values.stream().mapToDouble(Double::doubleValue).toArray());
List<InternalAggregation> aggs = StreamSupport
.stream(bucket.getAggregations().spliterator(), false)
.map(InternalAggregation.class::cast)
.collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), movavg, formatter, new ArrayList<>(), metaData()));
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
values.offer(thisBucketValue);
}
newBuckets.add(newBucket);
}
return factory.createAggregation(newBuckets);
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.movfn;
import org.elasticsearch.script.ScriptContext;
import java.util.Collection;
import java.util.Map;
/**
* This class provides a custom script context for the Moving Function pipeline aggregation,
* so that we can expose a number of pre-baked moving functions like min, max, movavg, etc
*/
public abstract class MovingFunctionScript {
/**
* @param params The user-provided parameters
* @param values The values in the window that we are moving a function across
* @return A double representing the value from this particular window
*/
public abstract double execute(Map<String, Object> params, double[] values);
public interface Factory {
MovingFunctionScript newInstance();
}
public static final String[] PARAMETERS = new String[] {"params", "values"};
public static final ScriptContext<Factory> CONTEXT = new ScriptContext<>("moving-function", Factory.class);
}

View File

@ -0,0 +1,359 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.movfn;
import java.util.Arrays;
import java.util.Collection;
/**
* Provides a collection of static utility methods that can be referenced from MovingFunction script contexts
*/
public class MovingFunctions {
/**
* Find the maximum value in a window of values.
* If all values are missing/null/NaN, the return value will be NaN
*/
public static double max(double[] values) {
return Arrays.stream(values).max().orElse(Double.NaN);
}
/**
* Find the minimum value in a window of values
* If all values are missing/null/NaN, the return value will be NaN
*/
public static double min(double[] values) {
return Arrays.stream(values).min().orElse(Double.NaN);
}
/**
* Find the sum of a window of values
* If all values are missing/null/NaN, the return value will be 0.0
*/
public static double sum(double[] values) {
if (values.length == 0) {
return 0.0;
}
return Arrays.stream(values).map(value -> {
if (Double.isNaN(value) == false) {
return value;
}
return 0.0;
}).sum();
}
/**
* Calculate a simple unweighted (arithmetic) moving average.
*
* Only finite values are averaged. NaN or null are ignored.
* If all values are missing/null/NaN, the return value will be NaN.
* The average is based on the count of non-null, non-NaN values.
*/
public static double unweightedAvg(double[] values) {
double avg = 0.0;
long count = 0;
for (double v : values) {
if (Double.isNaN(v) == false) {
avg += v;
count += 1;
}
}
return count == 0 ? Double.NaN : avg / count;
}
/**
* Calculate a standard deviation over the values using the provided average.
*
* Only finite values are averaged. NaN or null are ignored.
* If all values are missing/null/NaN, the return value will be NaN.
* The average is based on the count of non-null, non-NaN values.
*/
public static double stdDev(double[] values, double avg) {
if (avg == Double.NaN) {
return Double.NaN;
} else {
long count = 0;
double squaredMean = 0;
for (double v : values) {
if (Double.isNaN(v) == false) {
squaredMean += Math.pow(v - avg, 2);
count += 1;
}
}
return Math.sqrt(squaredMean / count);
}
}
/**
* Calculate a linearly weighted moving average, such that older values are
* linearly less important. "Time" is determined by position in collection
*
* Only finite values are averaged. NaN or null are ignored.
* If all values are missing/null/NaN, the return value will be NaN
* The average is based on the count of non-null, non-NaN values.
*/
public static double linearWeightedAvg(double[] values) {
double avg = 0;
long totalWeight = 1;
long current = 1;
for (double v : values) {
if (Double.isNaN(v) == false) {
avg += v * current;
totalWeight += current;
current += 1;
}
}
return totalWeight == 1 ? Double.NaN : avg / totalWeight;
}
/**
*
* Calculate a exponentially weighted moving average.
*
* Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values
* (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g.
* the series mean). Useful values are somewhere in between. Defaults to 0.5.
*
* Only finite values are averaged. NaN or null are ignored.
* If all values are missing/null/NaN, the return value will be NaN
* The average is based on the count of non-null, non-NaN values.
*
* @param alpha A double between 0-1 inclusive, controls data smoothing
*/
public static double ewma(double[] values, double alpha) {
double avg = Double.NaN;
boolean first = true;
for (double v : values) {
if (Double.isNaN(v) == false) {
if (first) {
avg = v;
first = false;
} else {
avg = (v * alpha) + (avg * (1 - alpha));
}
}
}
return avg;
}
/**
* Calculate a doubly exponential weighted moving average
*
* Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values
* (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g.
* the series mean). Useful values are somewhere in between. Defaults to 0.5.
*
* Beta is equivalent to alpha, but controls the smoothing of the trend instead of the data
*
* Only finite values are averaged. NaN or null are ignored.
* If all values are missing/null/NaN, the return value will be NaN
* The average is based on the count of non-null, non-NaN values.
*
* @param alpha A double between 0-1 inclusive, controls data smoothing
* @param beta a double between 0-1 inclusive, controls trend smoothing
*/
public static double holt(double[] values, double alpha, double beta) {
if (values.length == 0) {
return Double.NaN;
}
return holtForecast(values, alpha, beta, 1)[0];
}
/**
* Version of holt that can "forecast", not exposed as a whitelisted function for moving_fn scripts, but
* here as compatibility/code sharing for existing moving_avg agg. Can be removed when moving_avg is gone.
*/
public static double[] holtForecast(double[] values, double alpha, double beta, int numForecasts) {
// Smoothed value
double s = 0;
double last_s = 0;
// Trend value
double b = 0;
double last_b = 0;
int counter = 0;
Double last;
for (double v : values) {
if (Double.isNaN(v) == false) {
last = v;
if (counter == 0) {
s = v;
b = v - last;
} else {
s = alpha * v + (1.0d - alpha) * (last_s + last_b);
b = beta * (s - last_s) + (1 - beta) * last_b;
}
counter += 1;
last_s = s;
last_b = b;
}
}
if (counter == 0) {
return emptyPredictions(numForecasts);
}
double[] forecastValues = new double[numForecasts];
for (int i = 0; i < numForecasts; i++) {
forecastValues[i] = s + (i * b);
}
return forecastValues;
}
/**
* Calculate a triple exponential weighted moving average
*
* Alpha controls the smoothing of the data. Alpha = 1 retains no memory of past values
* (e.g. a random walk), while alpha = 0 retains infinite memory of past values (e.g.
* the series mean). Useful values are somewhere in between. Defaults to 0.5.
*
* Beta is equivalent to alpha, but controls the smoothing of the trend instead of the data.
* Gamma is equivalent to alpha, but controls the smoothing of the seasonality instead of the data
*
* Only finite values are averaged. NaN or null are ignored.
* If all values are missing/null/NaN, the return value will be NaN
* The average is based on the count of non-null, non-NaN values.
*
* @param alpha A double between 0-1 inclusive, controls data smoothing
* @param beta a double between 0-1 inclusive, controls trend smoothing
* @param gamma a double between 0-1 inclusive, controls seasonality smoothing
* @param period the expected periodicity of the data
* @param multiplicative true if multiplicative HW should be used. False for additive
*/
public static double holtWinters(double[] values, double alpha, double beta, double gamma,
int period, boolean multiplicative) {
if (values.length == 0) {
return Double.NaN;
}
double padding = multiplicative ? 0.0000000001 : 0.0;
return holtWintersForecast(values, alpha, beta, gamma, period, padding, multiplicative, 1)[0];
}
/**
* Version of holt-winters that can "forecast", not exposed as a whitelisted function for moving_fn scripts, but
* here as compatibility/code sharing for existing moving_avg agg. Can be removed when moving_avg is gone.
*/
public static double[] holtWintersForecast(double[] values, double alpha, double beta, double gamma,
int period, double padding, boolean multiplicative, int numForecasts) {
if (values.length < period * 2) {
// We need at least two full "seasons" to use HW
// This should have been caught earlier, we can't do anything now...bail
throw new IllegalArgumentException("Holt-Winters aggregation requires at least (2 * period == 2 * "
+ period + " == "+(2 * period)+") data-points to function. Only [" + values.length + "] were provided.");
}
// Smoothed value
double s = 0;
double last_s;
// Trend value
double b = 0;
double last_b = 0;
// Seasonal value
double[] seasonal = new double[values.length];
int counter = 0;
double[] vs = new double[values.length];
for (double v : values) {
if (Double.isNaN(v) == false) {
vs[counter] = v + padding;
counter += 1;
}
}
if (counter == 0) {
return emptyPredictions(numForecasts);
}
// Initial level value is average of first season
// Calculate the slopes between first and second season for each period
for (int i = 0; i < period; i++) {
s += vs[i];
b += (vs[i + period] - vs[i]) / period;
}
s /= period;
b /= period;
last_s = s;
// Calculate first seasonal
if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) {
Arrays.fill(seasonal, 0.0);
} else {
for (int i = 0; i < period; i++) {
seasonal[i] = vs[i] / s;
}
}
for (int i = period; i < vs.length; i++) {
// TODO if perf is a problem, we can specialize a subclass to avoid conditionals on each iteration
if (multiplicative) {
s = alpha * (vs[i] / seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b);
} else {
s = alpha * (vs[i] - seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b);
}
b = beta * (s - last_s) + (1 - beta) * last_b;
if (multiplicative) {
seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period];
} else {
seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period];
}
last_s = s;
last_b = b;
}
double[] forecastValues = new double[numForecasts];
for (int i = 1; i <= numForecasts; i++) {
int idx = values.length - period + ((i - 1) % period);
// TODO perhaps pad out seasonal to a power of 2 and use a mask instead of modulo?
if (multiplicative) {
forecastValues[i-1] = (s + (i * b)) * seasonal[idx];
} else {
forecastValues[i-1] = s + (i * b) + seasonal[idx];
}
}
return forecastValues;
}
/**
* Returns an empty set of predictions, filled with NaNs
* @param numPredictions Number of empty predictions to generate
*/
private static double[] emptyPredictions(int numPredictions) {
double[] predictions = new double[numPredictions];
Arrays.fill(predictions, Double.NaN);
return predictions;
}
}

View File

@ -446,7 +446,7 @@ public class DateHistogramAggregatorTests extends AggregatorTestCase {
InternalDateHistogram histogram;
if (reduced) {
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, maxBucket, fieldType);
histogram = searchAndReduce(indexSearcher, query, aggregationBuilder, maxBucket, null, fieldType);
} else {
histogram = search(indexSearcher, query, aggregationBuilder, maxBucket, fieldType);
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.movfn;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.script.Script;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class MovFnPipelineAggregationBuilderSerializationTests extends AbstractSerializingTestCase<MovFnPipelineAggregationBuilder> {
@Override
protected MovFnPipelineAggregationBuilder createTestInstance() {
return new MovFnPipelineAggregationBuilder(randomAlphaOfLength(10), "foo", new Script("foo"), randomIntBetween(1, 10));
}
@Override
protected Writeable.Reader<MovFnPipelineAggregationBuilder> instanceReader() {
return MovFnPipelineAggregationBuilder::new;
}
@Override
protected MovFnPipelineAggregationBuilder doParseInstance(XContentParser parser) throws IOException {
return MovFnPipelineAggregationBuilder.parse(parser);
}
@Override
protected boolean supportsUnknownFields() {
return false;
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.movfn;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MovFnUnitTests extends AggregatorTestCase {
private static final String DATE_FIELD = "date";
private static final String INSTANT_FIELD = "instant";
private static final String VALUE_FIELD = "value_field";
private static final List<String> datasetTimes = Arrays.asList(
"2017-01-01T01:07:45",
"2017-01-02T03:43:34",
"2017-01-03T04:11:00",
"2017-01-04T05:11:31",
"2017-01-05T08:24:05",
"2017-01-06T13:09:32",
"2017-01-07T13:47:43",
"2017-01-08T16:14:34",
"2017-01-09T17:09:50",
"2017-01-10T22:55:46");
private static final List<Integer> datasetValues = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
public void testMatchAllDocs() throws IOException {
Query query = new MatchAllDocsQuery();
Script script = new Script(Script.DEFAULT_SCRIPT_TYPE, "painless", "test", Collections.emptyMap());
DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
aggBuilder.dateHistogramInterval(DateHistogramInterval.DAY).field(DATE_FIELD);
aggBuilder.subAggregation(new AvgAggregationBuilder("avg").field(VALUE_FIELD));
aggBuilder.subAggregation(new MovFnPipelineAggregationBuilder("mov_fn", "avg", script, 3));
executeTestCase(query, aggBuilder, histogram -> {
assertEquals(10, histogram.getBuckets().size());
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
if (i == 0) {
assertThat(((InternalSimpleValue)(buckets.get(i).getAggregations().get("mov_fn"))).value(), equalTo(Double.NaN));
} else {
assertThat(((InternalSimpleValue)(buckets.get(i).getAggregations().get("mov_fn"))).value(), equalTo(((double) i)));
}
}
}, 1000, script);
}
@SuppressWarnings("unchecked")
private void executeTestCase(Query query,
DateHistogramAggregationBuilder aggBuilder,
Consumer<Histogram> verify,
int maxBucket, Script script) throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
int counter = 0;
for (String date : datasetTimes) {
if (frequently()) {
indexWriter.commit();
}
long instant = asLong(date);
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
document.add(new LongPoint(INSTANT_FIELD, instant));
document.add(new NumericDocValuesField(VALUE_FIELD, datasetValues.get(counter)));
indexWriter.addDocument(document);
document.clear();
counter += 1;
}
}
ScriptService scriptService = mock(ScriptService.class);
MovingFunctionScript.Factory factory = mock(MovingFunctionScript.Factory.class);
when(scriptService.compile(script, MovingFunctionScript.CONTEXT)).thenReturn(factory);
MovingFunctionScript scriptInstance = new MovingFunctionScript() {
@Override
public double execute(Map<String, Object> params, double[] values) {
assertNotNull(values);
return MovingFunctions.max(values);
}
};
when(factory.newInstance()).thenReturn(scriptInstance);
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
DateFieldMapper.Builder builder = new DateFieldMapper.Builder("_name");
DateFieldMapper.DateFieldType fieldType = builder.fieldType();
fieldType.setHasDocValues(true);
fieldType.setName(aggBuilder.field());
MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
valueFieldType.setHasDocValues(true);
valueFieldType.setName("value_field");
InternalDateHistogram histogram;
histogram = searchAndReduce(indexSearcher, query, aggBuilder, maxBucket, scriptService,
new MappedFieldType[]{fieldType, valueFieldType});
verify.accept(histogram);
}
}
}
private static long asLong(String dateTime) {
return DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parser().parseDateTime(dateTime).getMillis();
}
}

View File

@ -0,0 +1,684 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.movfn;
import org.elasticsearch.common.collect.EvictingQueue;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import static org.hamcrest.Matchers.equalTo;
public class MovFnWhitelistedFunctionTests extends ESTestCase {
public void testWindowMax() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
double randValue = randomDouble();
double expected = -Double.MAX_VALUE;
if (i == 0) {
window.offer(randValue);
continue;
}
for (double value : window) {
expected = Math.max(expected, value);
}
double actual = MovingFunctions.max(window.stream().mapToDouble(Double::doubleValue).toArray());
assertEquals(expected, actual, 0.01 * Math.abs(expected));
window.offer(randValue);
}
}
public void testNullWindowMax() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
Double randValue = randomBoolean() ? Double.NaN : null;
if (i == 0) {
if (randValue != null) {
window.offer(randValue);
}
continue;
}
double actual = MovingFunctions.max(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(Double.NaN));
if (randValue != null) {
window.offer(randValue);
}
}
}
public void testEmptyWindowMax() {
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.max(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(Double.NaN));
}
public void testWindowMin() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
double randValue = randomDouble();
double expected = Double.MAX_VALUE;
if (i == 0) {
window.offer(randValue);
continue;
}
for (double value : window) {
expected = Math.min(expected, value);
}
double actual = MovingFunctions.min(window.stream().mapToDouble(Double::doubleValue).toArray());
assertEquals(expected, actual, 0.01 * Math.abs(expected));
window.offer(randValue);
}
}
public void testNullWindowMin() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
Double randValue = randomBoolean() ? Double.NaN : null;
if (i == 0) {
if (randValue != null) {
window.offer(randValue);
}
continue;
}
double actual = MovingFunctions.min(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(Double.NaN));
if (randValue != null) {
window.offer(randValue);
}
}
}
public void testEmptyWindowMin() {
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.min(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(Double.NaN));
}
public void testWindowSum() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
double randValue = randomDouble();
double expected = 0;
if (i == 0) {
window.offer(randValue);
continue;
}
for (double value : window) {
expected += value;
}
double actual = MovingFunctions.sum(window.stream().mapToDouble(Double::doubleValue).toArray());
assertEquals(expected, actual, 0.01 * Math.abs(expected));
window.offer(randValue);
}
}
public void testNullWindowSum() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
Double randValue = randomBoolean() ? Double.NaN : null;
if (i == 0) {
if (randValue != null) {
window.offer(randValue);
}
continue;
}
double actual = MovingFunctions.sum(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(0.0));
if (randValue != null) {
window.offer(randValue);
}
}
}
public void testEmptyWindowSum() {
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.sum(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(0.0));
}
public void testSimpleMovAvg() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
double randValue = randomDouble();
double expected = 0;
if (i == 0) {
window.offer(randValue);
continue;
}
for (double value : window) {
expected += value;
}
expected /= window.size();
double actual = MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray());
assertEquals(expected, actual, 0.01 * Math.abs(expected));
window.offer(randValue);
}
}
public void testNullSimpleMovAvg() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
Double randValue = randomBoolean() ? Double.NaN : null;
if (i == 0) {
if (randValue != null) {
window.offer(randValue);
}
continue;
}
double actual = MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(Double.NaN));
if (randValue != null) {
window.offer(randValue);
}
}
}
public void testEmptySimpleMovAvg() {
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(Double.NaN));
}
public void testSimpleMovStdDev() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
double randValue = randomDouble();
double mean = 0;
if (i == 0) {
window.offer(randValue);
continue;
}
for (double value : window) {
mean += value;
}
mean /= window.size();
double expected = 0.0;
for (double value : window) {
expected += Math.pow(value - mean, 2);
}
expected = Math.sqrt(expected / window.size());
double actual = MovingFunctions.stdDev(window.stream().mapToDouble(Double::doubleValue).toArray(), mean);
assertEquals(expected, actual, 0.01 * Math.abs(expected));
window.offer(randValue);
}
}
public void testNullSimpleStdDev() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
Double randValue = randomBoolean() ? Double.NaN : null;
if (i == 0) {
if (randValue != null) {
window.offer(randValue);
}
continue;
}
double actual = MovingFunctions.stdDev(window.stream().mapToDouble(Double::doubleValue).toArray(),
MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray()));
assertThat(actual, equalTo(Double.NaN));
if (randValue != null) {
window.offer(randValue);
}
}
}
public void testEmptySimpleStdDev() {
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.stdDev(window.stream().mapToDouble(Double::doubleValue).toArray(),
MovingFunctions.unweightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray()));
assertThat(actual, equalTo(Double.NaN));
}
public void testLinearMovAvg() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
double randValue = randomDouble();
if (i == 0) {
window.offer(randValue);
continue;
}
double avg = 0;
long totalWeight = 1;
long current = 1;
for (double value : window) {
avg += value * current;
totalWeight += current;
current += 1;
}
double expected = avg / totalWeight;
double actual = MovingFunctions.linearWeightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray());
assertEquals(expected, actual, 0.01 * Math.abs(expected));
window.offer(randValue);
}
}
public void testNullLinearMovAvg() {
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
Double randValue = randomBoolean() ? Double.NaN : null;
if (i == 0) {
if (randValue != null) {
window.offer(randValue);
}
continue;
}
double actual = MovingFunctions.linearWeightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(Double.NaN));
if (randValue != null) {
window.offer(randValue);
}
}
}
public void testEmptyLinearMovAvg() {
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.linearWeightedAvg(window.stream().mapToDouble(Double::doubleValue).toArray());
assertThat(actual, equalTo(Double.NaN));
}
public void testEWMAMovAvg() {
double alpha = randomDouble();
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
double randValue = randomDouble();
if (i == 0) {
window.offer(randValue);
continue;
}
double avg = 0;
boolean first = true;
for (double value : window) {
if (first) {
avg = value;
first = false;
} else {
avg = (value * alpha) + (avg * (1 - alpha));
}
}
double expected = avg;
double actual = MovingFunctions.ewma(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha);
assertEquals(expected, actual, 0.01 * Math.abs(expected));
window.offer(randValue);
}
}
public void testNullEwmaMovAvg() {
double alpha = randomDouble();
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
Double randValue = randomBoolean() ? Double.NaN : null;
if (i == 0) {
if (randValue != null) {
window.offer(randValue);
}
continue;
}
double actual = MovingFunctions.ewma(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha);
assertThat(actual, equalTo(Double.NaN));
if (randValue != null) {
window.offer(randValue);
}
}
}
public void testEmptyEwmaMovAvg() {
double alpha = randomDouble();
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.ewma(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha);
assertThat(actual, equalTo(Double.NaN));
}
public void testHoltLinearMovAvg() {
double alpha = randomDouble();
double beta = randomDouble();
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
double randValue = randomDouble();
if (i == 0) {
window.offer(randValue);
continue;
}
double s = 0;
double last_s = 0;
// Trend value
double b = 0;
double last_b = 0;
int counter = 0;
double last;
for (double value : window) {
last = value;
if (counter == 0) {
s = value;
b = value - last;
} else {
s = alpha * value + (1.0d - alpha) * (last_s + last_b);
b = beta * (s - last_s) + (1 - beta) * last_b;
}
counter += 1;
last_s = s;
last_b = b;
}
double expected = s + (0 * b) ;
double actual = MovingFunctions.holt(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha, beta);
assertEquals(expected, actual, 0.01 * Math.abs(expected));
window.offer(randValue);
}
}
public void testNullHoltMovAvg() {
double alpha = randomDouble();
double beta = randomDouble();
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(1, 50);
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < numValues; i++) {
Double randValue = randomBoolean() ? Double.NaN : null;
if (i == 0) {
if (randValue != null) {
window.offer(randValue);
}
continue;
}
double actual = MovingFunctions.holt(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha, beta);
assertThat(actual, equalTo(Double.NaN));
if (randValue != null) {
window.offer(randValue);
}
}
}
public void testEmptyHoltMovAvg() {
double alpha = randomDouble();
double beta = randomDouble();
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.holt(window.stream().mapToDouble(Double::doubleValue).toArray(), alpha, beta);
assertThat(actual, equalTo(Double.NaN));
}
public void testHoltWintersMultiplicative() {
double alpha = randomDouble();
double beta = randomDouble();
double gamma = randomDouble();
int period = randomIntBetween(1,10);
int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < windowSize; i++) {
window.offer(randomDouble());
}
// Smoothed value
double s = 0;
double last_s = 0;
// Trend value
double b = 0;
double last_b = 0;
// Seasonal value
double[] seasonal = new double[windowSize];
int counter = 0;
double[] vs = new double[windowSize];
for (double v : window) {
vs[counter] = v + 0.0000000001;
counter += 1;
}
// Initial level value is average of first season
// Calculate the slopes between first and second season for each period
for (int i = 0; i < period; i++) {
s += vs[i];
b += (vs[i + period] - vs[i]) / period;
}
s /= period;
b /= period;
last_s = s;
// Calculate first seasonal
if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) {
Arrays.fill(seasonal, 0.0);
} else {
for (int i = 0; i < period; i++) {
seasonal[i] = vs[i] / s;
}
}
for (int i = period; i < vs.length; i++) {
s = alpha * (vs[i] / seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b);
b = beta * (s - last_s) + (1 - beta) * last_b;
seasonal[i] = gamma * (vs[i] / (last_s + last_b )) + (1 - gamma) * seasonal[i - period];
last_s = s;
last_b = b;
}
int idx = window.size() - period + (0 % period);
double expected = (s + (1 * b)) * seasonal[idx];
double actual = MovingFunctions.holtWinters(window.stream().mapToDouble(Double::doubleValue).toArray(),
alpha, beta, gamma, period, true);
assertEquals(expected, actual, 0.01 * Math.abs(expected));
}
public void testNullHoltWintersMovAvg() {
double alpha = randomDouble();
double beta = randomDouble();
double gamma = randomDouble();
int period = randomIntBetween(1,10);
int numValues = randomIntBetween(1, 100);
int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < windowSize; i++) {
window.offer(Double.NaN);
}
for (int i = 0; i < numValues; i++) {
double actual = MovingFunctions.holtWinters(window.stream().mapToDouble(Double::doubleValue).toArray(),
alpha, beta, gamma, period, false);
assertThat(actual, equalTo(Double.NaN));
}
}
public void testEmptyHoltWintersMovAvg() {
double alpha = randomDouble();
double beta = randomDouble();
double gamma = randomDouble();
int period = randomIntBetween(1,10);
EvictingQueue<Double> window = new EvictingQueue<>(0);
double actual = MovingFunctions.holtWinters(window.stream().mapToDouble(Double::doubleValue).toArray(),
alpha, beta, gamma, period, false);
assertThat(actual, equalTo(Double.NaN));
}
public void testHoltWintersAdditive() {
double alpha = randomDouble();
double beta = randomDouble();
double gamma = randomDouble();
int period = randomIntBetween(1,10);
int windowSize = randomIntBetween(period * 2, 50); // HW requires at least two periods of data
EvictingQueue<Double> window = new EvictingQueue<>(windowSize);
for (int i = 0; i < windowSize; i++) {
window.offer(randomDouble());
}
// Smoothed value
double s = 0;
double last_s = 0;
// Trend value
double b = 0;
double last_b = 0;
// Seasonal value
double[] seasonal = new double[windowSize];
int counter = 0;
double[] vs = new double[windowSize];
for (double v : window) {
vs[counter] = v;
counter += 1;
}
// Initial level value is average of first season
// Calculate the slopes between first and second season for each period
for (int i = 0; i < period; i++) {
s += vs[i];
b += (vs[i + period] - vs[i]) / period;
}
s /= period;
b /= period;
last_s = s;
// Calculate first seasonal
if (Double.compare(s, 0.0) == 0 || Double.compare(s, -0.0) == 0) {
Arrays.fill(seasonal, 0.0);
} else {
for (int i = 0; i < period; i++) {
seasonal[i] = vs[i] / s;
}
}
for (int i = period; i < vs.length; i++) {
s = alpha * (vs[i] - seasonal[i - period]) + (1.0d - alpha) * (last_s + last_b);
b = beta * (s - last_s) + (1 - beta) * last_b;
seasonal[i] = gamma * (vs[i] - (last_s - last_b )) + (1 - gamma) * seasonal[i - period];
last_s = s;
last_b = b;
}
int idx = window.size() - period + (0 % period);
double expected = s + (1 * b) + seasonal[idx];
double actual = MovingFunctions.holtWinters(window.stream().mapToDouble(Double::doubleValue).toArray(),
alpha, beta, gamma, period, false);
assertEquals(expected, actual, 0.01 * Math.abs(expected));
}
}

View File

@ -312,7 +312,7 @@ public class MovAvgIT extends ESIntegTestCase {
double last;
for (double value : window) {
last = value;
if (counter == 1) {
if (counter == 0) {
s = value;
b = value - last;
} else {

View File

@ -31,6 +31,8 @@ import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersM
import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
import java.io.IOException;
public class MovAvgTests extends BasePipelineAggregationTestCase<MovAvgPipelineAggregationBuilder> {
@Override
@ -94,6 +96,12 @@ public class MovAvgTests extends BasePipelineAggregationTestCase<MovAvgPipelineA
return factory;
}
@Override
public void testFromXContent() throws IOException {
super.testFromXContent();
assertWarnings("The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.");
}
public void testDefaultParsing() throws Exception {
MovAvgPipelineAggregationBuilder expected = new MovAvgPipelineAggregationBuilder("commits_moving_avg", "commits");
String json = "{" +
@ -104,6 +112,7 @@ public class MovAvgTests extends BasePipelineAggregationTestCase<MovAvgPipelineA
" }" +
"}";
PipelineAggregationBuilder newAgg = parse(createParser(JsonXContent.jsonXContent, json));
assertWarnings("The moving_avg aggregation has been deprecated in favor of the moving_fn aggregation.");
assertNotSame(newAgg, expected);
assertEquals(expected, newAgg);
assertEquals(expected.hashCode(), newAgg.hashCode());

View File

@ -246,7 +246,7 @@ public class MovAvgUnitTests extends ESTestCase {
double last;
for (double value : window) {
last = value;
if (counter == 1) {
if (counter == 0) {
s = value;
b = value - last;
} else {
@ -292,7 +292,7 @@ public class MovAvgUnitTests extends ESTestCase {
double last;
for (double value : window) {
last = value;
if (counter == 1) {
if (counter == 0) {
s = value;
b = value - last;
} else {

View File

@ -26,6 +26,8 @@ import org.elasticsearch.index.similarity.ScriptedSimilarity.Field;
import org.elasticsearch.index.similarity.ScriptedSimilarity.Query;
import org.elasticsearch.index.similarity.ScriptedSimilarity.Term;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctionScript;
import org.elasticsearch.search.aggregations.pipeline.movfn.MovingFunctions;
import org.elasticsearch.search.lookup.LeafSearchLookup;
import org.elasticsearch.search.lookup.SearchLookup;
@ -109,6 +111,9 @@ public class MockScriptEngine implements ScriptEngine {
} else if (context.instanceClazz.equals(SimilarityWeightScript.class)) {
SimilarityWeightScript.Factory factory = mockCompiled::createSimilarityWeightScript;
return context.factoryClazz.cast(factory);
} else if (context.instanceClazz.equals(MovingFunctionScript.class)) {
MovingFunctionScript.Factory factory = mockCompiled::createMovingFunctionScript;
return context.factoryClazz.cast(factory);
}
throw new IllegalArgumentException("mock script engine does not know how to handle context [" + context.name + "]");
}
@ -169,6 +174,10 @@ public class MockScriptEngine implements ScriptEngine {
public SimilarityWeightScript createSimilarityWeightScript() {
return new MockSimilarityWeightScript(script != null ? script : ctx -> 42d);
}
public MovingFunctionScript createMovingFunctionScript() {
return new MockMovingFunctionScript();
}
}
public class MockExecutableScript implements ExecutableScript {
@ -327,4 +336,11 @@ public class MockScriptEngine implements ScriptEngine {
return new Script(ScriptType.INLINE, "mock", script, emptyMap());
}
public class MockMovingFunctionScript extends MovingFunctionScript {
@Override
public double execute(Map<String, Object> params, double[] values) {
return MovingFunctions.unweightedAvg(values);
}
}
}

View File

@ -61,8 +61,11 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.subphase.DocValueFieldsFetchSubPhase;
import org.elasticsearch.search.fetch.subphase.FetchSourceSubPhase;
@ -302,7 +305,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
Query query,
AggregationBuilder builder,
MappedFieldType... fieldTypes) throws IOException {
return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, null, fieldTypes);
}
/**
@ -314,6 +317,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
Query query,
AggregationBuilder builder,
int maxBucket,
ScriptService scriptService,
MappedFieldType... fieldTypes) throws IOException {
final IndexReaderContext ctx = searcher.getTopReaderContext();
@ -368,7 +372,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
// now do the final reduce
MultiBucketConsumer reduceBucketConsumer = new MultiBucketConsumer(maxBucket);
InternalAggregation.ReduceContext context =
new InternalAggregation.ReduceContext(root.context().bigArrays(), null, reduceBucketConsumer, true);
new InternalAggregation.ReduceContext(root.context().bigArrays(), scriptService, reduceBucketConsumer, true);
@SuppressWarnings("unchecked")
A internalAgg = (A) aggs.get(0).doReduce(aggs, context);