Add moving percentiles pipeline aggregation (#55441) (#56575)

Similar to what the moving function aggregation does, except merging windows of percentiles
sketches together instead of cumulatively merging final metrics
This commit is contained in:
Ignacio Vera 2020-05-12 11:35:23 +02:00 committed by GitHub
parent 7b1f978931
commit 222ee721ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1170 additions and 16 deletions

View File

@ -287,3 +287,4 @@ include::pipeline/bucket-script-aggregation.asciidoc[]
include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/bucket-sort-aggregation.asciidoc[]
include::pipeline/serial-diff-aggregation.asciidoc[]
include::pipeline/moving-percentiles-aggregation.asciidoc[]

View File

@ -0,0 +1,162 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-pipeline-moving-percentiles-aggregation]]
=== Moving Percentiles Aggregation
Given an ordered series of <<search-aggregations-metrics-percentile-aggregation, percentiles>>, the Moving Percentile aggregation
will slide a window across those percentiles and allow the user to compute the cumulative percentile.
This is conceptually very similar to the <<search-aggregations-pipeline-movfn-aggregation, Moving Function>> pipeline aggregation,
except it works on the percentiles sketches instead of the actual buckets values.
==== Syntax
A `moving_percentiles` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"moving_percentiles": {
"buckets_path": "the_percentile",
"window": 10
}
}
--------------------------------------------------
// NOTCONSOLE
[[moving-percentiles-params]]
.`moving_percentiles` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |Path to the percentile of interest (see <<buckets-path-syntax, `buckets_path` Syntax>> for more details |Required |
|`window` |The size of window to "slide" across the histogram. |Required |
|`shift` |<<shift-parameter, Shift>> of window position. |Optional | 0
|===
`moving_percentiles` aggregations must be embedded inside of a `histogram` or `date_histogram` aggregation. They can be
embedded like any other metric aggregation:
[source,console]
--------------------------------------------------
POST /_search
{
"size": 0,
"aggs": {
"my_date_histo":{ <1>
"date_histogram":{
"field":"date",
"calendar_interval":"1M"
},
"aggs":{
"the_percentile":{ <2>
"percentiles":{
"field": "price",
"percents": [ 1.0, 99.0 ]
}
},
"the_movperc": {
"moving_percentiles": {
"buckets_path": "the_percentile", <3>
"window": 10
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> A `date_histogram` named "my_date_histo" is constructed on the "timestamp" field, with one-day intervals
<2> A `percentile` metric is used to calculate the percentiles of a field.
<3> Finally, we specify a `moving_percentiles` aggregation which uses "the_percentile" sketch as its input.
Moving percentiles are built by first specifying a `histogram` or `date_histogram` over a field. You then add
a percentile metric inside of that histogram. Finally, the `moving_percentiles` is embedded inside the histogram.
The `buckets_path` parameter is then used to "point" at the percentiles aggregation inside of the histogram (see
<<buckets-path-syntax>> for a description of the syntax for `buckets_path`).
And the following may be the response:
[source,console-result]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"my_date_histo": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"the_percentile": {
"values": {
"1.0": 150.0,
"99.0": 200.0
}
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"the_percentile": {
"values": {
"1.0": 10.0,
"99.0": 50.0
}
},
"the_movperc": {
"values": {
"1.0": 150.0,
"99.0": 200.0
}
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"the_percentile": {
"values": {
"1.0": 175.0,
"99.0": 200.0
}
},
"the_movperc": {
"values": {
"1.0": 10.0,
"99.0": 200.0
}
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
The output format of the `moving_percentiles` aggregation is inherited from the format of the referenced
<<search-aggregations-metrics-percentile-aggregation,`percentiles`>> aggregation.
Moving percentiles pipeline aggregations always run with `skip` gap policy.
[[moving-percentiles-shift-parameter]]
==== shift parameter
By default (with `shift = 0`), the window that is offered for calculation is the last `n` values excluding the current bucket.
Increasing `shift` by 1 moves starting window position by `1` to the right.
- To include current bucket to the window, use `shift = 1`.
- For center alignment (`n / 2` values before and after the current bucket), use `shift = window / 2`.
- For right alignment (`n` values after the current bucket), use `shift = window`.
If either of window edges moves outside the borders of data series, the window shrinks to include available values only.

View File

@ -86,7 +86,7 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr
return value(Double.parseDouble(name));
}
DocValueFormat formatter() {
public DocValueFormat formatter() {
return format;
}
@ -96,10 +96,27 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr
return state.getEstimatedFootprintInBytes();
}
DoubleHistogram getState() {
/**
* Return the internal {@link DoubleHistogram} sketch for this metric.
*/
public DoubleHistogram getState() {
return state;
}
/**
* Return the keys (percentiles) requested.
*/
public double[] getKeys() {
return keys;
}
/**
* Should the output be keyed.
*/
public boolean keyed() {
return keyed;
}
@Override
public AbstractInternalHDRPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
DoubleHistogram merged = null;

View File

@ -72,7 +72,7 @@ abstract class AbstractInternalTDigestPercentiles extends InternalNumericMetrics
public abstract double value(double key);
DocValueFormat formatter() {
public DocValueFormat formatter() {
return format;
}
@ -80,10 +80,27 @@ abstract class AbstractInternalTDigestPercentiles extends InternalNumericMetrics
return state.byteSize();
}
TDigestState getState() {
/**
* Return the internal {@link TDigestState} sketch for this metric.
*/
public TDigestState getState() {
return state;
}
/**
* Return the keys (percentiles) requested.
*/
public double[] getKeys() {
return keys;
}
/**
* Should the output be keyed.
*/
public boolean keyed() {
return keyed;
}
@Override
public AbstractInternalTDigestPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
TDigestState merged = null;

View File

@ -35,8 +35,8 @@ import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregato
import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregator;
import org.elasticsearch.xpack.analytics.mapper.HistogramFieldMapper;
import org.elasticsearch.xpack.analytics.movingPercentiles.MovingPercentilesPipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.stringstats.InternalStringStats;
import org.elasticsearch.xpack.analytics.stringstats.StringStatsAggregationBuilder;
import org.elasticsearch.xpack.analytics.topmetrics.InternalTopMetrics;
@ -73,14 +73,18 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return singletonList(
new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
CumulativeCardinalityPipelineAggregator::new,
usage.track(AnalyticsStatsAction.Item.CUMULATIVE_CARDINALITY,
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER)))
);
List<PipelineAggregationSpec> pipelineAggs = new ArrayList<>();
pipelineAggs.add(new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.CUMULATIVE_CARDINALITY,
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER))));
pipelineAggs.add(new PipelineAggregationSpec(
MovingPercentilesPipelineAggregationBuilder.NAME,
MovingPercentilesPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES,
checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER))));
return pipelineAggs;
}
@Override

View File

@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.movingPercentiles;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public class MovingPercentilesPipelineAggregationBuilder
extends AbstractPipelineAggregationBuilder<MovingPercentilesPipelineAggregationBuilder> {
public static final String NAME = "moving_percentiles";
private static final ParseField WINDOW = new ParseField("window");
private static final ParseField SHIFT = new ParseField("shift");
public static final ConstructingObjectParser<MovingPercentilesPipelineAggregationBuilder, String> PARSER =
new ConstructingObjectParser<>(NAME, false, (args, name) -> {
return new MovingPercentilesPipelineAggregationBuilder(name, (String) args[0], (int) args[1]);
});
static {
PARSER.declareString(constructorArg(), BUCKETS_PATH_FIELD);
PARSER.declareInt(constructorArg(), WINDOW);
PARSER.declareInt(MovingPercentilesPipelineAggregationBuilder::setShift, SHIFT);
}
private final int window;
private int shift;
public MovingPercentilesPipelineAggregationBuilder(String name, String bucketsPath, int window) {
super(name, NAME, new String[] { bucketsPath });
if (window <= 0) {
throw new IllegalArgumentException("[" + WINDOW.getPreferredName() + "] must be a positive, non-zero integer.");
}
this.window = window;
}
/**
* Read from a stream.
*/
public MovingPercentilesPipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
window = in.readVInt();
shift = in.readInt();
}
@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeVInt(window);
out.writeInt(shift);
}
/**
* Returns the window size for this aggregation
*/
public int getWindow() {
return window;
}
/**
* Returns the shift for this aggregation
*/
public int getShift() {
return shift;
}
/**
* Sets the shift for this aggregation
*/
public void setShift(int shift) {
this.shift = shift;
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) {
return new MovingPercentilesPipelineAggregator(name, bucketsPaths, getWindow(), getShift(), metaData);
}
@Override
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
}
context.validateParentAggSequentiallyOrdered(NAME, name);
}
@Override
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(BUCKETS_PATH_FIELD.getPreferredName(), bucketsPaths[0]);
builder.field(WINDOW.getPreferredName(), window);
builder.field(SHIFT.getPreferredName(), shift);
return builder;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), window, shift);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
MovingPercentilesPipelineAggregationBuilder other = (MovingPercentilesPipelineAggregationBuilder) obj;
return window == other.window && shift == other.shift;
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
protected boolean overrideBucketsPath() {
return true;
}
}

View File

@ -0,0 +1,254 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.movingPercentiles;
import org.HdrHistogram.DoubleHistogram;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.PercentilesMethod;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class MovingPercentilesPipelineAggregator extends PipelineAggregator {
private final int window;
private final int shift;
MovingPercentilesPipelineAggregator(String name, String[] bucketsPaths, int window, int shift,
Map<String, Object> metadata) {
super(name, bucketsPaths, metadata);
this.window = window;
this.shift = shift;
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
HistogramFactory factory = (HistogramFactory) histo;
List<Bucket> newBuckets = new ArrayList<>(buckets.size());
if (buckets.size() == 0) {
return factory.createAggregation(newBuckets);
}
PercentileConfig config = resolvePercentileConfig(histo, buckets.get(0), bucketsPaths()[0]);
switch (config.method) {
case TDIGEST:
reduceTDigest(buckets, histo, newBuckets, factory, config);
break;
case HDR:
reduceHDR(buckets, histo, newBuckets, factory, config);
break;
default:
throw new AggregationExecutionException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+ " references an unknown percentile aggregation method: [" + config.method + "]");
}
return factory.createAggregation(newBuckets);
}
private void reduceTDigest(List<? extends InternalMultiBucketAggregation.InternalBucket> buckets,
MultiBucketsAggregation histo,
List<Bucket> newBuckets,
HistogramFactory factory,
PercentileConfig config) {
List<TDigestState> values = buckets.stream()
.map(b -> resolveTDigestBucketValue(histo, b, bucketsPaths()[0]))
.filter(v -> v != null)
.collect(Collectors.toList());
int index = 0;
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
// Default is to reuse existing bucket. Simplifies the rest of the logic,
// since we only change newBucket if we can add to it
MultiBucketsAggregation.Bucket newBucket = bucket;
TDigestState state = null;
int fromIndex = clamp(index - window + shift, values.size());
int toIndex = clamp(index + shift, values.size());
for (int i = fromIndex; i < toIndex; i++) {
TDigestState bucketState = values.get(i);
if (bucketState != null) {
if (state == null) {
// We have to create a new TDigest histogram because otherwise it will alter the
// existing histogram and bucket value
state = new TDigestState(bucketState.compression());
}
state.add(bucketState);
}
}
if (state != null) {
List<InternalAggregation> aggs = bucket.getAggregations().asList().stream()
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(new InternalTDigestPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata()));
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
}
newBuckets.add(newBucket);
index++;
}
}
private void reduceHDR(List<? extends InternalMultiBucketAggregation.InternalBucket> buckets,
MultiBucketsAggregation histo,
List<Bucket> newBuckets,
HistogramFactory factory,
PercentileConfig config) {
List<DoubleHistogram> values = buckets.stream()
.map(b -> resolveHDRBucketValue(histo, b, bucketsPaths()[0]))
.filter(v -> v != null)
.collect(Collectors.toList());
int index = 0;
for (InternalMultiBucketAggregation.InternalBucket bucket : buckets) {
DoubleHistogram state = null;
// Default is to reuse existing bucket. Simplifies the rest of the logic,
// since we only change newBucket if we can add to it
MultiBucketsAggregation.Bucket newBucket = bucket;
int fromIndex = clamp(index - window + shift, values.size());
int toIndex = clamp(index + shift, values.size());
for (int i = fromIndex; i < toIndex; i++) {
DoubleHistogram bucketState = values.get(i);
if (bucketState != null) {
if (state == null) {
// We have to create a new HDR histogram because otherwise it will alter the
// existing histogram and bucket value
state = new DoubleHistogram(bucketState.getNumberOfSignificantValueDigits());
}
state.add(bucketState);
}
}
if (state != null) {
List<InternalAggregation> aggs = bucket.getAggregations().asList().stream()
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(new InternalHDRPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata()));
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
}
newBuckets.add(newBucket);
index++;
}
}
private PercentileConfig resolvePercentileConfig(MultiBucketsAggregation agg,
InternalMultiBucketAggregation.InternalBucket bucket,
String aggPath) {
List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList();
Object propertyValue = bucket.getProperty(agg.getName(), aggPathsList);
if (propertyValue == null) {
throw buildResolveError(agg, aggPathsList, propertyValue, "percentiles");
}
if (propertyValue instanceof InternalTDigestPercentiles) {
InternalTDigestPercentiles internalTDigestPercentiles = ((InternalTDigestPercentiles) propertyValue);
return new PercentileConfig(PercentilesMethod.TDIGEST,
internalTDigestPercentiles.getKeys(),
internalTDigestPercentiles.keyed(),
internalTDigestPercentiles.formatter());
}
if (propertyValue instanceof InternalHDRPercentiles) {
InternalHDRPercentiles internalHDRPercentiles = ((InternalHDRPercentiles) propertyValue);
return new PercentileConfig(PercentilesMethod.HDR,
internalHDRPercentiles.getKeys(),
internalHDRPercentiles.keyed(),
internalHDRPercentiles.formatter());
}
throw buildResolveError(agg, aggPathsList, propertyValue, "percentiles");
}
private TDigestState resolveTDigestBucketValue(MultiBucketsAggregation agg,
InternalMultiBucketAggregation.InternalBucket bucket,
String aggPath) {
List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList();
Object propertyValue = bucket.getProperty(agg.getName(), aggPathsList);
if (propertyValue == null || (propertyValue instanceof InternalTDigestPercentiles) == false) {
throw buildResolveError(agg, aggPathsList, propertyValue, "TDigest");
}
return ((InternalTDigestPercentiles) propertyValue).getState();
}
private DoubleHistogram resolveHDRBucketValue(MultiBucketsAggregation agg,
InternalMultiBucketAggregation.InternalBucket bucket,
String aggPath) {
List<String> aggPathsList = AggregationPath.parse(aggPath).getPathElementsAsStringList();
Object propertyValue = bucket.getProperty(agg.getName(), aggPathsList);
if (propertyValue == null || (propertyValue instanceof InternalHDRPercentiles) == false) {
throw buildResolveError(agg, aggPathsList, propertyValue, "HDR");
}
return ((InternalHDRPercentiles) propertyValue).getState();
}
private IllegalArgumentException buildResolveError(MultiBucketsAggregation agg, List<String> aggPathsList,
Object propertyValue, String method) {
if (propertyValue == null) {
return new IllegalArgumentException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+ " must reference a " + method + " percentile aggregation");
} else {
String currentAggName;
if (aggPathsList.isEmpty()) {
currentAggName = agg.getName();
} else {
currentAggName = aggPathsList.get(0);
}
return new IllegalArgumentException(AbstractPipelineAggregationBuilder.BUCKETS_PATH_FIELD.getPreferredName()
+ " must reference a " + method + " percentiles aggregation, got: ["
+ propertyValue.getClass().getSimpleName() + "] at aggregation [" + currentAggName + "]");
}
}
private int clamp(int index, int length) {
if (index < 0) {
return 0;
}
if (index > length) {
return length;
}
return index;
}
// TODO: replace this with the PercentilesConfig that's used by the percentiles builder.
// The config isn't available through the Internal objects
/** helper class to collect the percentile's configuration */
private static class PercentileConfig {
final double[] keys;
final boolean keyed;
final PercentilesMethod method;
final DocValueFormat formatter;
PercentileConfig(PercentilesMethod method, double[] keys, boolean keyed, DocValueFormat formatter) {
this.method = method;
this.keys = keys;
this.keyed = keyed;
this.formatter = formatter;
}
}
}

View File

@ -43,6 +43,7 @@ public class AnalyticsStatsActionNodeResponseTests extends AbstractWireSerializi
assertThat(AnalyticsStatsAction.Item.STRING_STATS.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.TOP_METRICS.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.T_TEST.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.MOVING_PERCENTILES.ordinal(), equalTo(i++));
// Please add tests for newly added items here
assertThat(AnalyticsStatsAction.Item.values().length, equalTo(i));
}

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.movingPercentiles;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.metrics.PercentilesAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.PercentilesConfig;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public abstract class MovingPercentilesAbstractAggregatorTests extends AggregatorTestCase {
protected static final String DATE_FIELD = "date";
protected static final String INSTANT_FIELD = "instant";
protected static final String VALUE_FIELD = "value_field";
protected static final List<String> datasetTimes = Arrays.asList(
"2017-01-01T01:07:45",
"2017-01-02T03:43:34",
"2017-01-03T04:11:00",
"2017-01-04T05:11:31",
"2017-01-05T08:24:05",
"2017-01-06T13:09:32",
"2017-01-07T13:47:43",
"2017-01-08T16:14:34",
"2017-01-09T17:09:50",
"2017-01-10T22:55:46",
"2017-01-11T22:55:46",
"2017-01-12T22:55:46",
"2017-01-13T22:55:46",
"2017-01-14T22:55:46",
"2017-01-15T22:55:46",
"2017-01-16T22:55:46",
"2017-01-17T22:55:46",
"2017-01-18T22:55:46",
"2017-01-19T22:55:46",
"2017-01-20T22:55:46");
public void testMatchAllDocs() throws IOException {
check(randomIntBetween(0, 10), randomIntBetween(1, 25));
}
private void check(int shift, int window) throws IOException {
MovingPercentilesPipelineAggregationBuilder builder =
new MovingPercentilesPipelineAggregationBuilder("MovingPercentiles", "percentiles", window);
builder.setShift(shift);
Query query = new MatchAllDocsQuery();
DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD);
aggBuilder.subAggregation(new PercentilesAggregationBuilder("percentiles").field(VALUE_FIELD)
.percentilesConfig(getPercentileConfig()));
aggBuilder.subAggregation(builder);
executeTestCase(window, shift, query, aggBuilder);
}
protected abstract PercentilesConfig getPercentileConfig();
protected abstract void executeTestCase(int window, int shift, Query query,
DateHistogramAggregationBuilder aggBuilder) throws IOException;
protected int clamp(int index, int length) {
if (index < 0) {
return 0;
}
if (index > length) {
return length;
}
return index;
}
protected static long asLong(String dateTime) {
return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli();
}
}

View File

@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.movingPercentiles;
import org.HdrHistogram.DoubleHistogram;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.metrics.InternalHDRPercentiles;
import org.elasticsearch.search.aggregations.metrics.PercentilesConfig;
import java.io.IOException;
public class MovingPercentilesHDRAggregatorTests extends MovingPercentilesAbstractAggregatorTests {
@Override
protected PercentilesConfig getPercentileConfig() {
return new PercentilesConfig.Hdr(1);
}
@Override
protected void executeTestCase(int window, int shift, Query query,
DateHistogramAggregationBuilder aggBuilder) throws IOException {
DoubleHistogram[] states = new DoubleHistogram[datasetTimes.size()];
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
int counter = 0;
for (String date : datasetTimes) {
states[counter] = new DoubleHistogram(1);
final int numberDocs = randomIntBetween(5, 50);
long instant = asLong(date);
for (int i =0; i < numberDocs; i++) {
if (frequently()) {
indexWriter.commit();
}
double value = randomDoubleBetween(0, 10, true);
states[counter].recordValue(value);
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
document.add(new LongPoint(INSTANT_FIELD, instant));
document.add(new NumericDocValuesField(VALUE_FIELD, NumericUtils.doubleToSortableLong(value)));
indexWriter.addDocument(document);
document.clear();
}
counter++;
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
DateFieldMapper.Builder builder = new DateFieldMapper.Builder("_name");
DateFieldMapper.DateFieldType fieldType = builder.fieldType();
fieldType.setHasDocValues(true);
fieldType.setName(aggBuilder.field());
MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE);
valueFieldType.setHasDocValues(true);
valueFieldType.setName("value_field");
InternalDateHistogram histogram;
histogram = searchAndReduce(indexSearcher, query, aggBuilder, 1000,
new MappedFieldType[]{fieldType, valueFieldType});
for (int i = 0; i < histogram.getBuckets().size(); i++) {
InternalDateHistogram.Bucket bucket = histogram.getBuckets().get(i);
InternalHDRPercentiles values = bucket.getAggregations().get("MovingPercentiles");
DoubleHistogram expected = reduce(i, window, shift, states);
if (values == null) {
assertNull(expected);
} else {
DoubleHistogram agg = values.getState();
assertEquals(expected.getTotalCount(), agg.getTotalCount());
assertEquals(expected.getMaxValue(), agg.getMaxValue(), 0d);
assertEquals(expected.getMinValue(), agg.getMinValue(), 0d);
}
}
}
}
}
private DoubleHistogram reduce(int index, int window, int shift, DoubleHistogram[] buckets) {
int fromIndex = clamp(index - window + shift, buckets.length);
int toIndex = clamp(index + shift, buckets.length);
if (fromIndex == toIndex) {
return null;
}
DoubleHistogram result = new DoubleHistogram(buckets[0].getNumberOfSignificantValueDigits());
for (int i = fromIndex; i < toIndex; i++) {
result.add(buckets[i]);
}
return result;
}
}

View File

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.movingPercentiles;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.PercentilesConfig;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import java.io.IOException;
public class MovingPercentilesTDigestAggregatorTests extends MovingPercentilesAbstractAggregatorTests {
@Override
protected PercentilesConfig getPercentileConfig() {
return new PercentilesConfig.TDigest(50);
}
@Override
protected void executeTestCase(int window, int shift, Query query,
DateHistogramAggregationBuilder aggBuilder) throws IOException {
TDigestState[] states = new TDigestState[datasetTimes.size()];
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
int counter = 0;
for (String date : datasetTimes) {
states[counter] = new TDigestState(50);
final int numberDocs = randomIntBetween(5, 50);
long instant = asLong(date);
for (int i =0; i < numberDocs; i++) {
if (frequently()) {
indexWriter.commit();
}
double value = randomDoubleBetween(-1000, 1000, true);
states[counter].add(value);
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
document.add(new LongPoint(INSTANT_FIELD, instant));
document.add(new NumericDocValuesField(VALUE_FIELD, NumericUtils.doubleToSortableLong(value)));
indexWriter.addDocument(document);
document.clear();
}
counter++;
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
DateFieldMapper.Builder builder = new DateFieldMapper.Builder("_name");
DateFieldMapper.DateFieldType fieldType = builder.fieldType();
fieldType.setHasDocValues(true);
fieldType.setName(aggBuilder.field());
MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE);
valueFieldType.setHasDocValues(true);
valueFieldType.setName("value_field");
InternalDateHistogram histogram;
histogram = searchAndReduce(indexSearcher, query, aggBuilder, 1000,
new MappedFieldType[]{fieldType, valueFieldType});
for (int i = 0; i < histogram.getBuckets().size(); i++) {
InternalDateHistogram.Bucket bucket = histogram.getBuckets().get(i);
InternalTDigestPercentiles values = bucket.getAggregations().get("MovingPercentiles");
TDigestState expected = reduce(i, window, shift, states);
if (values == null) {
assertNull(expected);
} else {
TDigestState agg = values.getState();
assertEquals(expected.size(), agg.size());
assertEquals(expected.getMax(), agg.getMax(), 0d);
assertEquals(expected.getMin(), agg.getMin(), 0d);
}
}
}
}
}
private TDigestState reduce(int index, int window, int shift, TDigestState[] buckets) {
int fromIndex = clamp(index - window + shift, buckets.length);
int toIndex = clamp(index + shift, buckets.length);
if (fromIndex == toIndex) {
return null;
}
TDigestState result = new TDigestState(buckets[0].compression());
for (int i = fromIndex; i < toIndex; i++) {
result.add(buckets[i]);
}
return result;
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.movingPercentiles;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MovingPercentilesTests extends BasePipelineAggregationTestCase<MovingPercentilesPipelineAggregationBuilder> {
@Override
protected List<SearchPlugin> plugins() {
return singletonList(new SearchPlugin() {
@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return singletonList(new PipelineAggregationSpec(
MovingPercentilesPipelineAggregationBuilder.NAME,
MovingPercentilesPipelineAggregationBuilder::new,
MovingPercentilesPipelineAggregationBuilder.PARSER));
}
});
}
@Override
protected MovingPercentilesPipelineAggregationBuilder createTestAggregatorFactory() {
String name = randomAlphaOfLengthBetween(3, 20);
String bucketsPath = randomAlphaOfLengthBetween(3, 20);
MovingPercentilesPipelineAggregationBuilder builder =
new MovingPercentilesPipelineAggregationBuilder(name, bucketsPath, TestUtil.nextInt(random(), 1, 10));
if (randomBoolean()) {
builder.setShift(randomIntBetween(0, 10));
}
return builder;
}
public void testParentValidations() throws IOException {
MovingPercentilesPipelineAggregationBuilder builder =
new MovingPercentilesPipelineAggregationBuilder("name", randomAlphaOfLength(5), TestUtil.nextInt(random(), 1, 10));
assertThat(validate(new HistogramAggregationBuilder("name"), builder), nullValue());
assertThat(validate(new DateHistogramAggregationBuilder("name"), builder), nullValue());
assertThat(validate(new AutoDateHistogramAggregationBuilder("name"), builder), nullValue());
// Mocked "test" agg, should fail validation
AggregationBuilder stubParent = mock(AggregationBuilder.class);
when(stubParent.getName()).thenReturn("name");
assertThat(validate(stubParent, builder), equalTo(
"Validation Failed: 1: moving_percentiles aggregation [name] must have a histogram, "
+ "date_histogram or auto_date_histogram as parent;"));
assertThat(validate(emptyList(), builder), equalTo(
"Validation Failed: 1: moving_percentiles aggregation [name] must have a histogram, "
+ "date_histogram or auto_date_histogram as parent but doesn't have a parent;"));
}
}

View File

@ -43,7 +43,8 @@ public class AnalyticsStatsAction extends ActionType<AnalyticsStatsAction.Respon
CUMULATIVE_CARDINALITY,
STRING_STATS,
TOP_METRICS,
T_TEST;
T_TEST,
MOVING_PERCENTILES;
}
public static class Request extends BaseNodesRequest<Request> implements ToXContentObject {

View File

@ -0,0 +1,142 @@
setup:
- skip:
features: headers
- do:
indices.create:
index: foo
body:
mappings:
properties:
timestamp:
type: date
histogram:
type: histogram
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
bulk:
refresh: true
body:
- index:
_index: "foo"
- timestamp: "2017-01-01T05:00:00Z"
histogram:
values: [0.1, 0.5, 1, 2, 4, 10]
counts: [1, 4, 5, 4, 5, 1]
- index:
_index: "foo"
- timestamp: "2017-01-01T05:00:00Z"
histogram:
values: [0.1, 0.5, 1, 2, 4, 10]
counts: [1, 4, 5, 4, 5, 1]
- index:
_index: "foo"
- timestamp: "2017-01-01T05:00:00Z"
histogram:
values: [0.1, 0.5, 1, 2, 4, 10]
counts: [1, 4, 5, 4, 5, 1]
- index:
_index: "foo"
- timestamp: "2017-01-02T05:00:00Z"
histogram:
values: [0.1, 0.5, 1, 2, 4, 10]
counts: [1, 4, 5, 4, 5, 1]
- index:
_index: "foo"
- timestamp: "2017-01-02T05:00:00Z"
histogram:
values: [0.1, 0.5, 1, 2, 4, 10]
counts: [1, 4, 5, 4, 5, 1]
- index:
_index: "foo"
- timestamp: "2017-01-03T05:00:00Z"
histogram:
values: [0.1, 0.5, 1, 2, 4, 10]
counts: [1, 4, 5, 4, 5, 1]
---
"Basic Search TDigest":
- do:
search:
index: "foo"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
calendar_interval: "day"
aggs:
percentiles:
percentiles:
field: "histogram"
percents: [90]
keyed: false
moving_percentiles:
moving_percentiles:
buckets_path: "percentiles"
window: 2
shift: 1
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" }
- match: { aggregations.histo.buckets.0.doc_count: 3 }
- match: { aggregations.histo.buckets.0.percentiles.values.0.value: 4.0 }
- match: { aggregations.histo.buckets.0.moving_percentiles.values.0.value: 4.0 }
- match: { aggregations.histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" }
- match: { aggregations.histo.buckets.1.doc_count: 2 }
- match: { aggregations.histo.buckets.1.percentiles.values.0.value: 5.0 }
- match: { aggregations.histo.buckets.1.moving_percentiles.values.0.value: 4.0 }
- match: { aggregations.histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" }
- match: { aggregations.histo.buckets.2.doc_count: 1 }
- match: { aggregations.histo.buckets.2.percentiles.values.0.value: 7.0 }
- match: { aggregations.histo.buckets.2.moving_percentiles.values.0.value: 4.0 }
---
"Basic Search HDR":
- do:
search:
index: "foo"
body:
size: 10
aggs:
histo:
date_histogram:
field: "timestamp"
calendar_interval: "day"
aggs:
percentiles:
percentiles:
field: "histogram"
percents: [90]
keyed: false
hdr:
number_of_significant_value_digits: 1
moving_percentiles:
moving_percentiles:
buckets_path: "percentiles"
window: 3
shift: 1
- length: { aggregations.histo.buckets: 3 }
- match: { aggregations.histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" }
- match: { aggregations.histo.buckets.0.doc_count: 3 }
- match: { aggregations.histo.buckets.0.percentiles.values.0.value: 4.24609375 }
- match: { aggregations.histo.buckets.0.moving_percentiles.values.0.value: 4.24609375 }
- match: { aggregations.histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" }
- match: { aggregations.histo.buckets.1.doc_count: 2 }
- match: { aggregations.histo.buckets.1.percentiles.values.0.value: 4.24609375 }
- match: { aggregations.histo.buckets.1.moving_percentiles.values.0.value: 4.24609375 }
- match: { aggregations.histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" }
- match: { aggregations.histo.buckets.2.doc_count: 1 }
- match: { aggregations.histo.buckets.2.percentiles.values.0.value: 4.24609375 }
- match: { aggregations.histo.buckets.2.moving_percentiles.values.0.value: 4.24609375 }

View File

@ -26,6 +26,7 @@ setup:
- set: {analytics.stats.cumulative_cardinality_usage: cumulative_cardinality_usage}
- set: {analytics.stats.t_test_usage: t_test_usage}
- set: {analytics.stats.string_stats_usage: string_stats_usage}
- set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage}
# use boxplot agg
- do:
@ -50,7 +51,7 @@ setup:
- match: {analytics.stats.cumulative_cardinality_usage: $cumulative_cardinality_usage}
- match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
# use top_metrics agg
- do:
@ -78,7 +79,7 @@ setup:
- match: {analytics.stats.cumulative_cardinality_usage: $cumulative_cardinality_usage}
- match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
# use cumulative_cardinality agg
- do:
@ -110,6 +111,7 @@ setup:
- set: {analytics.stats.cumulative_cardinality_usage: cumulative_cardinality_usage}
- match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
# use t-test agg
- do:
@ -135,6 +137,7 @@ setup:
- gt: { analytics.stats.t_test_usage: $t_test_usage }
- set: {analytics.stats.t_test_usage: t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- do:
search:
@ -156,3 +159,37 @@ setup:
- match: {analytics.stats.t_test_usage: $t_test_usage}
- gt: { analytics.stats.string_stats_usage: $string_stats_usage }
- set: {analytics.stats.string_stats_usage: string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
# use moving_percentile agg
- do:
search:
index: "test"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
calendar_interval: "day"
aggs:
percentiles:
percentiles:
field: "v1"
moving_percentiles:
moving_percentiles:
buckets_path: "percentiles"
window: 2
- length: { aggregations.histo.buckets: 1 }
- do: {xpack.usage: {}}
- match: { analytics.available: true }
- match: { analytics.enabled: true }
- match: {analytics.stats.boxplot_usage: $boxplot_usage}
- match: {analytics.stats.top_metrics_usage: $top_metrics_usage}
- match: { analytics.stats.cumulative_cardinality_usage: $cumulative_cardinality_usage }
- match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- gt: { analytics.stats.moving_percentiles_usage: $moving_percentiles_usage }
- set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage}