Add Normalize Pipeline Aggregation (#56399) (#56792)

This aggregation will perform normalizations of metrics
for a given series of data in the form of bucket values.

The aggregations supports the following normalizations

- rescale 0-1
- rescale 0-100
- percentage of sum
- mean normalization
- z-score normalization
- softmax normalization

To specify which normalization is to be used, it can be specified
in the normalize agg's `normalizer` field.

For example:

```
{
  "normalize": {
    "buckets_path": <>,
    "normalizer": "percent"
  }
}
```
This commit is contained in:
Tal Levy 2020-05-14 17:40:15 -07:00 committed by GitHub
parent a73d7d9e2b
commit 5e90ff32f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1000 additions and 2 deletions

View File

@ -288,3 +288,4 @@ include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/bucket-sort-aggregation.asciidoc[] include::pipeline/bucket-sort-aggregation.asciidoc[]
include::pipeline/serial-diff-aggregation.asciidoc[] include::pipeline/serial-diff-aggregation.asciidoc[]
include::pipeline/moving-percentiles-aggregation.asciidoc[] include::pipeline/moving-percentiles-aggregation.asciidoc[]
include::pipeline/normalize-aggregation.asciidoc[]

View File

@ -0,0 +1,182 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-pipeline-normalize-aggregation]]
=== Normalize Aggregation
A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value.
Values that cannot be normalized, will be skipped using the <<gap-policy, skip gap policy>>.
==== Syntax
A `normalize` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"normalize": {
"buckets_path": "normalized",
"method": "percent_of_sum"
}
}
--------------------------------------------------
// NOTCONSOLE
[[normalize_pipeline-params]]
.`normalize_pipeline` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to normalize (see <<buckets-path-syntax, `buckets_path` syntax>> for more details) |Required |
|`method` | The specific <<normalize_pipeline-method, method>> to apply | Required |
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|===
==== Methods
[[normalize_pipeline-method]]
The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use
the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`.
_rescale_0_1_::
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized
linearly in-between.
x' = (x - min_x) / (max_x - min_x)
[0, 0, .1111, 1, .1111, .3333]
_rescale_0_100_::
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized
linearly in-between.
x' = 100 * (x - min_x) / (max_x - min_x)
[0, 0, 11.11, 100, 11.11, 33.33]
_percent_of_sum_::
This method normalizes each value so that it represents a percentage of the total sum it attributes to.
x' = x / sum_x
[5%, 5%, 10%, 50%, 10%, 20%]
_mean_::
This method normalizes such that each value is normalized by how much it differs from the average.
x' = (x - mean_x) / (max_x - min_x)
[4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63]
_zscore_::
This method normalizes such that each value represents how far it is from the mean relative to the standard deviation
x' = (x - mean_x) / stdev_x
[-0.68, -0.68, -0.39, 1.94, -0.39, 0.19]
_softmax_::
This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values.
x' = e^x / sum_e_x
[2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18]
==== Example
The following snippet calculates the percent of total sales for each month:
[source,console]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
},
"percent_of_total_sales": {
"normalize": {
"buckets_path": "sales", <1>
"method": "percent_of_sum", <2>
"format": "00.00%" <3>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling
<2> `method` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales
in the parent bucket
<3> `format` influences how to format the metric as a string using Java's `DecimalFormat` pattern. In this case, multiplying by 100
and adding a '%'
And the following may be the response:
[source,console-result]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
},
"percent_of_total_sales": {
"value": 0.5583756345177665,
"value_as_string": "55.84%"
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
},
"percent_of_total_sales": {
"value": 0.06091370558375635,
"value_as_string": "06.09%"
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
},
"percent_of_total_sales": {
"value": 0.38071065989847713,
"value_as_string": "38.07%"
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]

View File

@ -35,7 +35,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
public static final String NAME = "simple_value"; public static final String NAME = "simple_value";
protected final double value; protected final double value;
InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) { public InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata); super(name, metadata);
this.format = formatter; this.format = formatter;
this.value = value; this.value = value;

View File

@ -32,6 +32,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction; import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory; import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory;
import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder; import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot; import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
@ -84,6 +85,11 @@ public class AnalyticsPlugin extends Plugin implements SearchPlugin, ActionPlugi
MovingPercentilesPipelineAggregationBuilder::new, MovingPercentilesPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES, usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES,
checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER)))); checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER))));
pipelineAggs.add(new PipelineAggregationSpec(
NormalizePipelineAggregationBuilder.NAME,
NormalizePipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.NORMALIZE,
checkLicense(NormalizePipelineAggregationBuilder.PARSER))));
return pipelineAggs; return pipelineAggs;
} }

View File

@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.normalize;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Mean;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Percent;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOne;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOneHundred;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Softmax;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.ZScore;
public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<NormalizePipelineAggregationBuilder> {
public static final String NAME = "normalize";
static final ParseField METHOD_FIELD = new ParseField("method");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NormalizePipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0],
(String) args[1], (List<String>) args[2]));
static {
PARSER.declareString(optionalConstructorArg(), FORMAT);
PARSER.declareString(constructorArg(), METHOD_FIELD);
PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD);
}
static final Map<String, Function<double[], DoubleUnaryOperator>> NAME_MAP;
static {
NAME_MAP = new HashMap<>();
NAME_MAP.put(RescaleZeroToOne.NAME, RescaleZeroToOne::new);
NAME_MAP.put(RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new);
NAME_MAP.put(Mean.NAME, Mean::new);
NAME_MAP.put(ZScore.NAME, ZScore::new);
NAME_MAP.put(Percent.NAME, Percent::new);
NAME_MAP.put(Softmax.NAME, Softmax::new);
}
static String validateMethodName(String name) {
if (NAME_MAP.containsKey(name)) {
return name;
}
throw new IllegalArgumentException("invalid method [" + name + "]");
}
private final String format;
private final String method;
public NormalizePipelineAggregationBuilder(String name, String format, String method, List<String> bucketsPath) {
super(name, NAME, bucketsPath.toArray(new String[0]));
this.format = format;
this.method = validateMethodName(method);
}
/**
* Read from a stream.
*/
public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
format = in.readOptionalString();
method = in.readString();
}
@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(format);
out.writeString(method);
}
/**
* Gets the format to use on the output of this aggregation.
*/
public String format() {
return format;
}
protected DocValueFormat formatter() {
if (format != null) {
return new DocValueFormat.Decimal(format);
} else {
return DocValueFormat.RAW;
}
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metadata) {
return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(method), metadata);
}
@Override
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
}
context.validateHasParent(NAME, name);
}
@Override
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
}
builder.field(METHOD_FIELD.getPreferredName(), method);
return builder;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), format, method);
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj;
return Objects.equals(format, other.format) && Objects.equals(method, other.method);
}
@Override
public String getWriteableName() {
return NAME;
}
}

View File

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.normalize;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
public class NormalizePipelineAggregator extends PipelineAggregator {
private final DocValueFormat formatter;
private final Function<double[], DoubleUnaryOperator> methodSupplier;
NormalizePipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter,
Function<double[], DoubleUnaryOperator> methodSupplier,
Map<String, Object> metadata) {
super(name, bucketsPaths, metadata);
this.formatter = formatter;
this.methodSupplier = methodSupplier;
}
@Override
@SuppressWarnings("unchecked")
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg =
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = originalAgg.getBuckets();
List<InternalMultiBucketAggregation.InternalBucket> newBuckets = new ArrayList<>(buckets.size());
double[] values = buckets.stream()
.mapToDouble(bucket -> resolveBucketValue(originalAgg, bucket, bucketsPaths()[0], GapPolicy.SKIP)).toArray();
DoubleUnaryOperator method = methodSupplier.apply(values);
for (int i = 0; i < buckets.size(); i++) {
InternalMultiBucketAggregation.InternalBucket bucket = buckets.get(i);
final double normalizedBucketValue;
// Only account for valid values. infite-valued buckets were converted to NaNs by
// the time they reach here.
if (Double.isNaN(values[i])) {
normalizedBucketValue = Double.NaN;
} else {
normalizedBucketValue = method.applyAsDouble(values[i]);
}
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata()));
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs), bucket);
newBuckets.add(newBucket);
}
return originalAgg.create(newBuckets);
}
}

View File

@ -0,0 +1,142 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.normalize;
import java.util.function.DoubleUnaryOperator;
class NormalizePipelineMethods {
// never to be instantiated
private NormalizePipelineMethods() {}
static class RescaleZeroToOne extends SinglePassSimpleStatisticsMethod {
static final String NAME = "rescale_0_1";
RescaleZeroToOne(double[] values) {
super(values);
}
@Override
public double applyAsDouble(double value) {
return (value - min) / (max - min);
}
}
static class RescaleZeroToOneHundred extends SinglePassSimpleStatisticsMethod {
static final String NAME = "rescale_0_100";
RescaleZeroToOneHundred(double[] values) {
super(values);
}
@Override
public double applyAsDouble(double value) {
return 100 * (value - min) / (max - min);
}
}
static class Mean extends SinglePassSimpleStatisticsMethod {
static final String NAME = "mean";
Mean(double[] values) {
super(values);
}
@Override
public double applyAsDouble(double value) {
return (value - mean) / (max - min);
}
}
static class Percent extends SinglePassSimpleStatisticsMethod {
static final String NAME = "percent_of_sum";
Percent(double[] values) {
super(values);
}
@Override
public double applyAsDouble(double value) {
return value / sum;
}
}
static class ZScore extends SinglePassSimpleStatisticsMethod {
static final String NAME = "z-score";
private final double stdev;
ZScore(double[] values) {
super(values);
double variance = 0.0;
for (Double value : values) {
if (value.isNaN() == false) {
variance += Math.pow(value - mean, 2);
}
}
this.stdev = Math.sqrt(variance / count);
}
@Override
public double applyAsDouble(double value) {
return (value - mean) / stdev;
}
}
static class Softmax implements DoubleUnaryOperator {
static final String NAME = "softmax";
private double sumExp;
Softmax(double[] values) {
double sumExp = 0.0;
for (Double value : values) {
if (value.isNaN() == false) {
sumExp += Math.exp(value);
}
}
this.sumExp = sumExp;
}
@Override
public double applyAsDouble(double value) {
return Math.exp(value) / sumExp;
}
}
abstract static class SinglePassSimpleStatisticsMethod implements DoubleUnaryOperator {
protected final double max;
protected final double min;
protected final double sum;
protected final double mean;
protected final int count;
SinglePassSimpleStatisticsMethod(double[] values) {
int count = 0;
double sum = 0.0;
double min = Double.MAX_VALUE;
double max = Double.MIN_VALUE;
for (double value : values) {
if (Double.isNaN(value) == false) {
count += 1;
min = Math.min(value, min);
max = Math.max(value, max);
sum += value;
}
}
this.count = count;
this.min = min;
this.max = max;
this.sum = sum;
this.mean = this.count == 0 ? Double.NaN : this.sum / this.count;
}
}
}

View File

@ -44,6 +44,7 @@ public class AnalyticsStatsActionNodeResponseTests extends AbstractWireSerializi
assertThat(AnalyticsStatsAction.Item.TOP_METRICS.ordinal(), equalTo(i++)); assertThat(AnalyticsStatsAction.Item.TOP_METRICS.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.T_TEST.ordinal(), equalTo(i++)); assertThat(AnalyticsStatsAction.Item.T_TEST.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.MOVING_PERCENTILES.ordinal(), equalTo(i++)); assertThat(AnalyticsStatsAction.Item.MOVING_PERCENTILES.ordinal(), equalTo(i++));
assertThat(AnalyticsStatsAction.Item.NORMALIZE.ordinal(), equalTo(i++));
// Please add tests for newly added items here // Please add tests for newly added items here
assertThat(AnalyticsStatsAction.Item.values().length, equalTo(i)); assertThat(AnalyticsStatsAction.Item.values().length, equalTo(i));
} }

View File

@ -0,0 +1,171 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.normalize;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.time.DateFormatters;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
public class NormalizeAggregatorTests extends AggregatorTestCase {
private static final String DATE_FIELD = "date";
private static final String TERM_FIELD = "term";
private static final String VALUE_FIELD = "value_field";
private static final List<String> datasetTimes = Arrays.asList(
"2017-01-01T01:07:45", //1
"2017-01-01T03:43:34", //1
"2017-01-03T04:11:00", //3
"2017-01-03T05:11:31", //3
"2017-01-05T08:24:05", //5
"2017-01-05T13:09:32", //5
"2017-01-07T13:47:43", //7
"2017-01-08T16:14:34", //8
"2017-01-09T17:09:50", //9
"2017-01-09T22:55:46");//9
private static final List<String> datasetTerms = Arrays.asList(
"a", //1
"a", //1
"b", //2
"b", //2
"c", //3
"c", //3
"d", //4
"e", //5
"f", //6
"f");//6
private static final List<Integer> datasetValues = Arrays.asList(1,1,42,6,5,0,2,8,30,13);
private static final List<Double> datePercentOfSum = Arrays.asList(0.2,0.0,0.2,0.0,0.2,0.0,0.1,0.1,0.2);
private static final List<Double> termPercentOfSum = Arrays.asList(0.2,0.2,0.2,0.2,0.1,0.1);
private static final List<Double> rescaleOneHundred = Arrays.asList(0.0,Double.NaN,100.0,Double.NaN,6.521739130434782,
Double.NaN,0.0,13.043478260869565,89.1304347826087);
public void testPercentOfTotalDocCount() throws IOException {
DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD);
aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "percent_of_sum",
Collections.singletonList("_count")));
testCase(aggBuilder, (agg) -> {
assertEquals(9, ((Histogram) agg).getBuckets().size());
List<? extends Histogram.Bucket> buckets = ((Histogram) agg).getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Histogram.Bucket bucket = buckets.get(i);
assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(),
equalTo(datePercentOfSum.get(i)));
}
});
}
public void testValueMean() throws IOException {
DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo");
aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD);
aggBuilder.subAggregation(new StatsAggregationBuilder("stats").field(VALUE_FIELD));
aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "rescale_0_100",
Collections.singletonList("stats.sum")));
testCase(aggBuilder, (agg) -> {
assertEquals(9, ((Histogram) agg).getBuckets().size());
List<? extends Histogram.Bucket> buckets = ((Histogram) agg).getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Histogram.Bucket bucket = buckets.get(i);
assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(),
equalTo(rescaleOneHundred.get(i)));
}
});
}
public void testTermsAggParent() throws IOException {
TermsAggregationBuilder aggBuilder = new TermsAggregationBuilder("terms").field(TERM_FIELD);
aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "percent_of_sum",
Collections.singletonList("_count")));
testCase(aggBuilder, (agg) -> {
assertEquals(6, ((Terms) agg).getBuckets().size());
List<? extends Terms.Bucket> buckets = ((Terms) agg).getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Terms.Bucket bucket = buckets.get(i);
assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(),
equalTo(termPercentOfSum.get(i)));
}
});
}
private void testCase(ValuesSourceAggregationBuilder<?> aggBuilder, Consumer<InternalAggregation> aggAssertion) throws IOException {
Query query = new MatchAllDocsQuery();
// index date data
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
for (int i = 0; i < datasetValues.size(); i++) {
if (frequently()) {
indexWriter.commit();
}
long instant = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(datasetTimes.get(i)))
.toInstant().toEpochMilli();
document.add(new SortedNumericDocValuesField(DATE_FIELD, instant));
document.add(new NumericDocValuesField(VALUE_FIELD, datasetValues.get(i)));
document.add(new SortedSetDocValuesField(TERM_FIELD, new BytesRef(datasetTerms.get(i))));
indexWriter.addDocument(document);
document.clear();
}
}
// setup mapping
DateFieldMapper.DateFieldType dateFieldType = new DateFieldMapper.Builder("_name").fieldType();
dateFieldType.setHasDocValues(true);
dateFieldType.setName(DATE_FIELD);
MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
valueFieldType.setHasDocValues(true);
valueFieldType.setName(VALUE_FIELD);
MappedFieldType termFieldType = new KeywordFieldMapper.KeywordFieldType();
termFieldType.setName(TERM_FIELD);
termFieldType.setHasDocValues(true);
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
InternalAggregation internalAggregation = searchAndReduce(indexSearcher, query, aggBuilder, dateFieldType,
valueFieldType, termFieldType);
aggAssertion.accept(internalAggregation);
}
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.normalize;
import org.elasticsearch.test.ESTestCase;
import java.util.function.DoubleUnaryOperator;
import static org.hamcrest.Matchers.equalTo;
public class NormalizePipelineMethodsTests extends ESTestCase {
private static final double[] DATA =
new double[] { 1, 50, Double.NaN, 8, 10, 4, 3, 0, 10, -10, -4};
private static final int COUNT = 10;
private static final double MIN = -10;
private static final double MAX = 50;
private static final double SUM = 72;
private static final double MEAN = SUM / COUNT;
public void testRescaleZeroToOne() {
NormalizePipelineMethods.RescaleZeroToOne normalizer = new NormalizePipelineMethods.RescaleZeroToOne(DATA);
assertSinglePassStatistics(normalizer);
double[] normalized = new double[] { 0.18333333333333332, 1.0, Double.NaN, 0.3, 0.3333333333333333, 0.23333333333333334,
0.21666666666666667, 0.16666666666666666, 0.3333333333333333, 0.0, 0.1 };
assertNormalized(normalizer, normalized);
}
public void testRescaleZeroToOneHundred() {
NormalizePipelineMethods.RescaleZeroToOneHundred normalizer = new NormalizePipelineMethods.RescaleZeroToOneHundred(DATA);
assertSinglePassStatistics(normalizer);
double[] normalized = new double[] { 18.333333333333332, 100.0, Double.NaN, 30.0, 33.333333333333336, 23.333333333333332,
21.666666666666668, 16.666666666666668, 33.333333333333336, 0.0, 10.0 };
assertNormalized(normalizer, normalized);
}
public void testMean() {
NormalizePipelineMethods.Mean normalizer = new NormalizePipelineMethods.Mean(DATA);
assertSinglePassStatistics(normalizer);
double[] normalized = new double[] { -0.10333333333333333, 0.7133333333333333, Double.NaN, 0.01333333333333333,
0.04666666666666666, -0.05333333333333334, -0.07, -0.12000000000000001, 0.04666666666666666,
-0.2866666666666667, -0.18666666666666665 };
assertNormalized(normalizer, normalized);
}
public void testZScore() {
NormalizePipelineMethods.ZScore normalizer = new NormalizePipelineMethods.ZScore(DATA);
assertSinglePassStatistics(normalizer);
double[] normalized = new double[] { -0.4012461740749068, 2.7698929436138724, Double.NaN, 0.05177369988063312,
0.18120794958221595, -0.20709479952253254, -0.27181192437332397, -0.4659632989256982, 0.18120794958221595,
-1.1131345474336123, -0.7248317983288638 };
assertNormalized(normalizer, normalized);
}
public void testSoftmax() {
NormalizePipelineMethods.Softmax normalizer = new NormalizePipelineMethods.Softmax(DATA);
double[] normalized = new double[] { 5.242885663363464E-22, 1.0, Double.NaN, 5.74952226429356E-19, 4.24835425529159E-18,
1.0530617357553813E-20, 3.8739976286871875E-21, 1.928749847963918E-22, 4.24835425529159E-18, 8.756510762696521E-27,
3.532628572200807E-24 };
assertNormalized(normalizer, normalized);
}
private void assertSinglePassStatistics(NormalizePipelineMethods.SinglePassSimpleStatisticsMethod normalizer) {
assertThat(normalizer.min, equalTo(MIN));
assertThat(normalizer.max, equalTo(MAX));
assertThat(normalizer.count, equalTo(COUNT));
assertThat(normalizer.sum, equalTo(SUM));
assertThat(normalizer.mean, equalTo(MEAN));
}
private void assertNormalized(DoubleUnaryOperator op, double[] normalizedData) {
assertThat(normalizedData.length, equalTo(DATA.length));
for (int i = 0; i < DATA.length; i++) {
if (Double.isNaN(DATA[i])) {
assertTrue(Double.isNaN(normalizedData[i]));
} else {
assertThat(op.applyAsDouble(DATA[i]), equalTo(normalizedData[i]));
}
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.analytics.normalize;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase;
import org.hamcrest.CoreMatchers;
import java.util.Collections;
import java.util.List;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
public class NormalizeTests extends BasePipelineAggregationTestCase<NormalizePipelineAggregationBuilder> {
@Override
protected List<SearchPlugin> plugins() {
return singletonList(new SearchPlugin() {
@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return singletonList(new PipelineAggregationSpec(
NormalizePipelineAggregationBuilder.NAME,
NormalizePipelineAggregationBuilder::new,
NormalizePipelineAggregationBuilder.PARSER));
}
});
}
public void testInvalidNormalizer() {
NormalizePipelineAggregationBuilder builder = createTestAggregatorFactory();
String invalidNormalizer = randomFrom(NormalizePipelineAggregationBuilder.NAME_MAP.keySet()) + randomAlphaOfLength(10);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
() -> new NormalizePipelineAggregationBuilder(builder.getName(), builder.format(), invalidNormalizer,
org.elasticsearch.common.collect.List.of(builder.getBucketsPaths())));
assertThat(exception.getMessage(), equalTo("invalid method [" + invalidNormalizer + "]"));
}
public void testHasParentValidation() {
NormalizePipelineAggregationBuilder builder = createTestAggregatorFactory();
assertThat(validate(emptyList(), builder), CoreMatchers.equalTo(
"Validation Failed: 1: normalize aggregation [" + builder.getName() + "] must be declared inside" +
" of another aggregation;"));
}
@Override
protected NormalizePipelineAggregationBuilder createTestAggregatorFactory() {
String name = randomAlphaOfLengthBetween(3, 20);
String bucketsPath = randomAlphaOfLengthBetween(3, 20);
String format = null;
if (randomBoolean()) {
format = randomAlphaOfLengthBetween(1, 10);
}
String normalizer = randomFrom(NormalizePipelineAggregationBuilder.NAME_MAP.keySet());
return new NormalizePipelineAggregationBuilder(name, format, normalizer, Collections.singletonList(bucketsPath));
}
}

View File

@ -44,7 +44,8 @@ public class AnalyticsStatsAction extends ActionType<AnalyticsStatsAction.Respon
STRING_STATS, STRING_STATS,
TOP_METRICS, TOP_METRICS,
T_TEST, T_TEST,
MOVING_PERCENTILES; MOVING_PERCENTILES,
NORMALIZE;
} }
public static class Request extends BaseNodesRequest<Request> implements ToXContentObject { public static class Request extends BaseNodesRequest<Request> implements ToXContentObject {

View File

@ -0,0 +1,79 @@
setup:
- skip:
features: headers
- do:
indices.create:
index: foo
body:
mappings:
properties:
timestamp:
type: date
user:
type: keyword
- do:
bulk:
refresh: true
body:
- index:
_index: "foo"
- timestamp: "2017-01-01T05:00:00Z"
user: "a"
- index:
_index: "foo"
- timestamp: "2017-01-01T05:00:00Z"
user: "b"
- index:
_index: "foo"
- timestamp: "2017-01-01T05:00:00Z"
user: "c"
- index:
_index: "foo"
- timestamp: "2017-01-02T05:00:00Z"
user: "a"
- index:
_index: "foo"
- timestamp: "2017-01-02T05:00:00Z"
user: "b"
- index:
_index: "foo"
- timestamp: "2017-01-03T05:00:00Z"
user: "d"
---
"Basic Search":
- do:
search:
index: "foo"
body:
size: 0
aggs:
users_by_day:
date_histogram:
field: "timestamp"
calendar_interval: "day"
aggs:
percent_of_total_users:
normalize:
buckets_path: "_count"
method: "percent_of_sum"
- length: { aggregations.users_by_day.buckets: 3 }
- match: { aggregations.users_by_day.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" }
- match: { aggregations.users_by_day.buckets.0.doc_count: 3 }
- match: { aggregations.users_by_day.buckets.0.percent_of_total_users.value: 0.5 }
- match: { aggregations.users_by_day.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" }
- match: { aggregations.users_by_day.buckets.1.doc_count: 2 }
- match: { aggregations.users_by_day.buckets.1.percent_of_total_users.value: 0.3333333333333333 }
- match: { aggregations.users_by_day.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" }
- match: { aggregations.users_by_day.buckets.2.doc_count: 1 }
- match: { aggregations.users_by_day.buckets.2.percent_of_total_users.value: 0.16666666666666666 }

View File

@ -27,6 +27,7 @@ setup:
- set: {analytics.stats.t_test_usage: t_test_usage} - set: {analytics.stats.t_test_usage: t_test_usage}
- set: {analytics.stats.string_stats_usage: string_stats_usage} - set: {analytics.stats.string_stats_usage: string_stats_usage}
- set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage} - set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage}
- set: { analytics.stats.normalize_usage: normalize_usage }
# use boxplot agg # use boxplot agg
- do: - do:
@ -52,6 +53,7 @@ setup:
- match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
# use top_metrics agg # use top_metrics agg
- do: - do:
@ -80,6 +82,7 @@ setup:
- match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
# use cumulative_cardinality agg # use cumulative_cardinality agg
- do: - do:
@ -112,6 +115,7 @@ setup:
- match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
# use t-test agg # use t-test agg
- do: - do:
@ -138,6 +142,7 @@ setup:
- set: {analytics.stats.t_test_usage: t_test_usage} - set: {analytics.stats.t_test_usage: t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
- do: - do:
search: search:
@ -160,6 +165,7 @@ setup:
- gt: { analytics.stats.string_stats_usage: $string_stats_usage } - gt: { analytics.stats.string_stats_usage: $string_stats_usage }
- set: {analytics.stats.string_stats_usage: string_stats_usage} - set: {analytics.stats.string_stats_usage: string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
# use moving_percentile agg # use moving_percentile agg
- do: - do:
@ -193,3 +199,38 @@ setup:
- match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage}
- gt: { analytics.stats.moving_percentiles_usage: $moving_percentiles_usage } - gt: { analytics.stats.moving_percentiles_usage: $moving_percentiles_usage }
- set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage} - set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage}
- match: { analytics.stats.normalize_usage: $normalize_usage }
# use normalize agg
- do:
search:
index: "test"
body:
size: 0
aggs:
histo:
date_histogram:
field: "timestamp"
calendar_interval: "day"
aggs:
total_users:
sum:
field: "s"
percent_of_total_users:
normalize:
buckets_path: "total_users"
method: "percent_of_sum"
- length: { aggregations.histo.buckets: 1 }
- do: {xpack.usage: {}}
- match: { analytics.available: true }
- match: { analytics.enabled: true }
- match: {analytics.stats.boxplot_usage: $boxplot_usage}
- match: {analytics.stats.top_metrics_usage: $top_metrics_usage}
- match: {analytics.stats.cumulative_cardinality_usage: $cumulative_cardinality_usage}
- match: {analytics.stats.t_test_usage: $t_test_usage}
- match: {analytics.stats.string_stats_usage: $string_stats_usage}
- match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage}
- gt: { analytics.stats.normalize_usage: $normalize_usage }
- set: {analytics.stats.normalize_usage: normalize_usage}