From 1016734b4c2950caf9ba6ef81d103787d1b1fc91 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Fri, 28 Aug 2015 12:23:19 -0400 Subject: [PATCH] Aggregations: Add percentiles_bucket pipeline aggregations This pipeline will calculate percentiles over a set of sibling buckets. This is an exact implementation, meaning it needs to cache a copy of the series in memory and sort it to determine the percentiles. This comes with a few limitations: to prevent serializing data around, only the requested percentiles are calculated (unlike the TDigest version, which allows the java API to ask for any percentile). It also needs to store the data in-memory, resulting in some overhead if the requested series is very large. --- .../elasticsearch/search/SearchModule.java | 7 +- .../pipeline/PipelineAggregatorBuilders.java | 5 + .../bucketmetrics/BucketMetricsBuilder.java | 2 +- .../bucketmetrics/BucketMetricsParser.java | 3 +- .../percentile/InternalPercentilesBucket.java | 163 +++++ .../percentile/PercentilesBucket.java | 25 + .../percentile/PercentilesBucketBuilder.java | 49 ++ .../percentile/PercentilesBucketParser.java | 67 ++ .../PercentilesBucketPipelineAggregator.java | 155 +++++ .../pipeline/PercentilesBucketIT.java | 625 ++++++++++++++++++ docs/reference/aggregations/pipeline.asciidoc | 1 + .../percentiles-bucket-aggregation.asciidoc | 121 ++++ 12 files changed, 1218 insertions(+), 5 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucket.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketBuilder.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketParser.java create mode 100644 core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java create mode 100644 core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java create mode 100644 docs/reference/aggregations/pipeline/percentiles-bucket-aggregation.asciidoc diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 49acd1163fc..b78fe06f8e9 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -19,7 +19,6 @@ package org.elasticsearch.search; -import org.elasticsearch.common.Classes; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; @@ -110,6 +109,8 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucke import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketParser; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser; @@ -143,8 +144,6 @@ import org.elasticsearch.search.highlight.HighlightPhase; import org.elasticsearch.search.highlight.Highlighter; import org.elasticsearch.search.highlight.Highlighters; import org.elasticsearch.search.query.QueryPhase; -import org.elasticsearch.search.suggest.SuggestParseElement; -import org.elasticsearch.search.suggest.SuggestPhase; import org.elasticsearch.search.suggest.Suggester; import org.elasticsearch.search.suggest.Suggesters; @@ -301,6 +300,7 @@ public class SearchModule extends AbstractModule { multibinderPipelineAggParser.addBinding().to(MinBucketParser.class); multibinderPipelineAggParser.addBinding().to(AvgBucketParser.class); multibinderPipelineAggParser.addBinding().to(SumBucketParser.class); + multibinderPipelineAggParser.addBinding().to(PercentilesBucketParser.class); multibinderPipelineAggParser.addBinding().to(MovAvgParser.class); multibinderPipelineAggParser.addBinding().to(CumulativeSumParser.class); multibinderPipelineAggParser.addBinding().to(BucketScriptParser.class); @@ -393,6 +393,7 @@ public class SearchModule extends AbstractModule { MinBucketPipelineAggregator.registerStreams(); AvgBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams(); + PercentilesBucketPipelineAggregator.registerStreams(); MovAvgPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java index 7fd1fe03308..96df702072d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java @@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.pipeline; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptBuilder; @@ -55,6 +56,10 @@ public final class PipelineAggregatorBuilders { return new SumBucketBuilder(name); } + public static final PercentilesBucketBuilder percentilesBucket(String name) { + return new PercentilesBucketBuilder(name); + } + public static final MovAvgBuilder movingAvg(String name) { return new MovAvgBuilder(name); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java index 755655755e7..1e5dd46eca6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java @@ -61,7 +61,7 @@ public abstract class BucketMetricsBuilder> ex return builder; } - protected void doInternalXContent(XContentBuilder builder, Params params) { + protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException { } } \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java index 533b6996cda..80b4c981d12 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java @@ -105,7 +105,8 @@ public abstract class BucketMetricsParser implements PipelineAggregator.Parser { protected abstract PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter); - protected boolean doParse(String pipelineAggregatorName, String currentFieldName, Token token, XContentParser parser, SearchContext context) { + protected boolean doParse(String pipelineAggregatorName, String currentFieldName, Token token, + XContentParser parser, SearchContext context) throws IOException { return false; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java new file mode 100644 index 00000000000..10b1481e8d6 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import com.google.common.collect.UnmodifiableIterator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.max.InternalMax; +import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentile; +import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class InternalPercentilesBucket extends InternalNumericMetricsAggregation.MultiValue implements PercentilesBucket { + + public final static Type TYPE = new Type("percentiles_bucket"); + + public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalPercentilesBucket readResult(StreamInput in) throws IOException { + InternalPercentilesBucket result = new InternalPercentilesBucket(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + private double[] percentiles; + private double[] percents; + + protected InternalPercentilesBucket() { + } // for serialization + + public InternalPercentilesBucket(String name, double[] percents, double[] percentiles, + ValueFormatter formatter, List pipelineAggregators, + Map metaData) { + super(name, pipelineAggregators, metaData); + this.valueFormatter = formatter; + this.percentiles = percentiles; + this.percents = percents; + } + + @Override + public double percentile(double percent) throws IllegalArgumentException { + int index = Arrays.binarySearch(percents, percent); + if (index < 0) { + throw new IllegalArgumentException("Percent requested [" + String.valueOf(percent) + "] was not" + + " one of the computed percentiles. Available keys are: " + Arrays.toString(percents)); + } + return percentiles[index]; + } + + @Override + public String percentileAsString(double percent) { + return valueFormatter.format(percentile(percent)); + } + + @Override + public Iterator iterator() { + return new Iter(percents, percentiles); + } + + @Override + public double value(String name) { + return percentile(Double.parseDouble(name)); + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalMax doReduce(List aggregations, ReduceContext reduceContext) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + protected void doReadFrom(StreamInput in) throws IOException { + valueFormatter = ValueFormatterStreams.readOptional(in); + percentiles = in.readDoubleArray(); + percents = in.readDoubleArray(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + ValueFormatterStreams.writeOptional(valueFormatter, out); + out.writeDoubleArray(percentiles); + out.writeDoubleArray(percents); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.startObject("values"); + for (double percent : percents) { + double value = percentile(percent); + boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value)); + String key = String.valueOf(percent); + builder.field(key, hasValue ? value : null); + if (hasValue && !(valueFormatter instanceof ValueFormatter.Raw)) { + builder.field(key + "_as_string", percentileAsString(percent)); + } + } + builder.endObject(); + return builder; + } + + public static class Iter extends UnmodifiableIterator { + + private final double[] percents; + private final double[] percentiles; + private int i; + + public Iter(double[] percents, double[] percentiles) { + this.percents = percents; + this.percentiles = percentiles; + i = 0; + } + + @Override + public boolean hasNext() { + return i < percents.length; + } + + @Override + public Percentile next() { + final Percentile next = new InternalPercentile(percents[i], percentiles[i]); + ++i; + return next; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucket.java new file mode 100644 index 00000000000..64424ac5abc --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucket.java @@ -0,0 +1,25 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles; + +public interface PercentilesBucket extends Percentiles { +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketBuilder.java new file mode 100644 index 00000000000..53e90808f8a --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketBuilder.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsBuilder; + +import java.io.IOException; + +public class PercentilesBucketBuilder extends BucketMetricsBuilder { + + Double[] percents; + + public PercentilesBucketBuilder(String name) { + super(name, PercentilesBucketPipelineAggregator.TYPE.name()); + } + + public PercentilesBucketBuilder percents(Double[] percents) { + this.percents = percents; + return this; + } + + @Override + protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException { + if (percents != null) { + builder.field(PercentilesBucketParser.PERCENTS.getPreferredName(), percents); + } + } + + +} + diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketParser.java new file mode 100644 index 00000000000..01a428873c2 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketParser.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import com.google.common.primitives.Doubles; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; + + +public class PercentilesBucketParser extends BucketMetricsParser { + + public static final ParseField PERCENTS = new ParseField("percents"); + double[] percents = new double[] { 1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0 }; + + @Override + public String type() { + return PercentilesBucketPipelineAggregator.TYPE.name(); + } + + @Override + protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy, + ValueFormatter formatter) { + return new PercentilesBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter, percents); + } + + @Override + protected boolean doParse(String pipelineAggregatorName, String currentFieldName, + XContentParser.Token token, XContentParser parser, SearchContext context) throws IOException { + if (context.parseFieldMatcher().match(currentFieldName, PERCENTS)) { + + List parsedPercents = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + parsedPercents.add(parser.doubleValue()); + } + percents = Doubles.toArray(parsedPercents); + return true; + } + return false; + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java new file mode 100644 index 00000000000..92e8b01f43b --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; + +import java.io.IOException; +import java.util.*; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; + +public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAggregator { + + public final static Type TYPE = new Type("percentiles_bucket"); + + public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { + @Override + public PercentilesBucketPipelineAggregator readResult(StreamInput in) throws IOException { + PercentilesBucketPipelineAggregator result = new PercentilesBucketPipelineAggregator(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); + InternalPercentilesBucket.registerStreams(); + } + + private double[] percents; + private List data; + + private PercentilesBucketPipelineAggregator() { + } + + protected PercentilesBucketPipelineAggregator(String name, double[] percents, String[] bucketsPaths, GapPolicy gapPolicy, + ValueFormatter formatter, Map metaData) { + super(name, bucketsPaths, gapPolicy, formatter, metaData); + this.percents = percents; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + protected void preCollection() { + data = new ArrayList<>(1024); + } + + @Override + protected void collectBucketValue(String bucketKey, Double bucketValue) { + data.add(bucketValue); + } + + @Override + protected InternalAggregation buildAggregation(List pipelineAggregators, Map metadata) { + + // Perform the sorting and percentile collection now that all the data + // has been collected. + Collections.sort(data); + + double[] percentiles = new double[percents.length]; + if (data.size() == 0) { + for (int i = 0; i < percents.length; i++) { + percentiles[i] = Double.NaN; + } + } else { + for (int i = 0; i < percents.length; i++) { + int index = (int)((percents[i] / 100.0) * data.size()); + percentiles[i] = data.get(index); + } + } + + // todo need postCollection() to clean up temp sorted data? + + return new InternalPercentilesBucket(name(), percents, percentiles, formatter, pipelineAggregators, metadata); + } + + @Override + public void doReadFrom(StreamInput in) throws IOException { + super.doReadFrom(in); + percents = in.readDoubleArray(); + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + super.doWriteTo(out); + out.writeDoubleArray(percents); + } + + public static class Factory extends PipelineAggregatorFactory { + + private final ValueFormatter formatter; + private final GapPolicy gapPolicy; + private final double[] percents; + + public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter, double[] percents) { + super(name, TYPE.name(), bucketsPaths); + this.gapPolicy = gapPolicy; + this.formatter = formatter; + this.percents = percents; + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new PercentilesBucketPipelineAggregator(name, percents, bucketsPaths, gapPolicy, formatter, metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, + List pipelineAggregatorFactories) { + if (bucketsPaths.length != 1) { + throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() + + " must contain a single entry for aggregation [" + name + "]"); + } + + for (Double p : percents) { + if (p == null || p < 0.0 || p > 100.0) { + throw new IllegalStateException(PercentilesBucketParser.PERCENTS.getPreferredName() + + " must only contain non-null doubles from 0.0-100.0 inclusive"); + } + } + } + + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java new file mode 100644 index 00000000000..507939e6858 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java @@ -0,0 +1,625 @@ +package org.elasticsearch.search.aggregations.pipeline; + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucket; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.percentilesBucket; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.sumBucket; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.IsNull.notNullValue; + +@ESIntegTestCase.SuiteScopeTestCase +public class PercentilesBucketIT extends ESIntegTestCase { + + private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; + private static final Double[] PERCENTS = {1.0, 25.0, 50.0, 75.0, 99.0}; + static int numDocs; + static int interval; + static int minRandomValue; + static int maxRandomValue; + static int numValueBuckets; + static long[] valueCounts; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + numDocs = randomIntBetween(6, 20); + interval = randomIntBetween(2, 5); + + minRandomValue = 0; + maxRandomValue = 20; + + numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; + valueCounts = new long[numValueBuckets]; + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); + builders.add(client().prepareIndex("idx", "type").setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval)) + .endObject())); + final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); + valueCounts[bucket]++; + } + + assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); + for (int i = 0; i < 2; i++) { + builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject())); + } + indexRandom(true, builders); + ensureSearchable(); + } + + @Test + public void testDocCount_topLevel() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>_count") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + double[] values = new double[numValueBuckets]; + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); + assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + values[i] = bucket.getDocCount(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + + } + + @Test + public void testDocCount_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>_count") + .percents(PERCENTS))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double[] values = new double[numValueBuckets]; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + values[j] = bucket.getDocCount(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + } + + @Test + public void testMetric_topLevel() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(interval)); + + double[] values = new double[interval]; + for (int i = 0; i < interval; ++i) { + Terms.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); + assertThat(bucket.getDocCount(), greaterThan(0l)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + values[i] = sum.value(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + + @Test + public void testMetric_topLevelDefaultPercents() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(interval)); + + double[] values = new double[interval]; + for (int i = 0; i < interval; ++i) { + Terms.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); + assertThat(bucket.getDocCount(), greaterThan(0l)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + values[i] = sum.value(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Percentile p : percentilesBucketValue) { + double expected = values[(int)((p.getPercent() / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p.getPercent()), equalTo(expected)); + assertThat(p.getValue(), equalTo(expected)); + } + } + + @Test + public void testMetric_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>sum") + .percents(PERCENTS))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + List values = new ArrayList<>(numValueBuckets); + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + if (bucket.getDocCount() != 0) { + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + values.add(sum.value()); + } + } + + Collections.sort(values); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values.get((int) ((p / 100) * values.size())); + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + } + + @Test + public void testMetric_asSubAggWithInsertZeros() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>sum") + .gapPolicy(BucketHelpers.GapPolicy.INSERT_ZEROS) + .percents(PERCENTS))) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double[] values = new double[numValueBuckets]; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + + values[j] = sum.value(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + } + + @Test + public void testNoBuckets() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + assertThat(percentilesBucketValue.percentile(p), equalTo(Double.NaN)); + } + } + + @Test + public void testWrongPercents() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + + try { + percentilesBucketValue.percentile(2.0); + fail("2.0 was not a valid percent, should have thrown exception"); + } catch (IllegalArgumentException exception) { + // All good + } + } + + @Test + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/13179") + public void testBadPercents() throws Exception { + Double[] badPercents = {-1.0, 110.0}; + + try { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum") + .percents(badPercents)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + + fail("Illegal percent's were provided but no exception was thrown."); + } catch (SearchPhaseExecutionException exception) { + // All good + } + + } + + @Test + public void testBadPercents_asSubAgg() throws Exception { + Double[] badPercents = {-1.0, 110.0}; + + try { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>_count") + .percents(badPercents))).execute().actionGet(); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + + fail("Illegal percent's were provided but no exception was thrown."); + } catch (SearchPhaseExecutionException exception) { + // All good + } + + } + + @Test + public void testNested() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(percentilesBucket("percentile_histo_bucket").setBucketsPaths("histo>_count"))) + .addAggregation(percentilesBucket("percentile_terms_bucket") + .setBucketsPaths("terms>percentile_histo_bucket.50") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + double[] values = new double[termsBuckets.size()]; + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double[] innerValues = new double[numValueBuckets]; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + + innerValues[j] = bucket.getDocCount(); + } + Arrays.sort(innerValues); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket")); + for (Double p : PERCENTS) { + double expected = innerValues[(int)((p / 100) * innerValues.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + values[i] = percentilesBucketValue.percentile(50.0); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + + @Test + public void testNestedWithDecimal() throws Exception { + Double[] percent = {99.9}; + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(percentilesBucket("percentile_histo_bucket") + .percents(percent) + .setBucketsPaths("histo>_count"))) + .addAggregation(percentilesBucket("percentile_terms_bucket") + .setBucketsPaths("terms>percentile_histo_bucket[99.9]") + .percents(percent)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + double[] values = new double[termsBuckets.size()]; + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double[] innerValues = new double[numValueBuckets]; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + + innerValues[j] = bucket.getDocCount(); + } + Arrays.sort(innerValues); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket")); + for (Double p : percent) { + double expected = innerValues[(int)((p / 100) * innerValues.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + values[i] = percentilesBucketValue.percentile(99.9); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket")); + for (Double p : percent) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } +} diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index 670ed6266b0..b6a1073156a 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -163,6 +163,7 @@ include::pipeline/derivative-aggregation.asciidoc[] include::pipeline/max-bucket-aggregation.asciidoc[] include::pipeline/min-bucket-aggregation.asciidoc[] include::pipeline/sum-bucket-aggregation.asciidoc[] +include::pipeline/percentiles-bucket-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[] include::pipeline/cumulative-sum-aggregation.asciidoc[] include::pipeline/bucket-script-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/percentiles-bucket-aggregation.asciidoc b/docs/reference/aggregations/pipeline/percentiles-bucket-aggregation.asciidoc new file mode 100644 index 00000000000..3e3c6d6fcff --- /dev/null +++ b/docs/reference/aggregations/pipeline/percentiles-bucket-aggregation.asciidoc @@ -0,0 +1,121 @@ +[[search-aggregations-pipeline-percentiles-bucket-aggregation]] +=== Percentiles Bucket Aggregation + +coming[2.1.0] + +experimental[] + +A sibling pipeline aggregation which calculates percentiles across all bucket of a specified metric in a sibling aggregation. +The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation. + +==== Syntax + +A `percentiles_bucket` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "percentiles_bucket": { + "buckets_path": "the_sum" + } +} +-------------------------------------------------- + +.`sum_bucket` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |The path to the buckets we wish to find the sum for (see <> for more + details) |Required | +|`gap_policy` |The policy to apply when gaps are found in the data (see <> for more + details)|Optional | `skip` +|`format` |format to apply to the output value of this aggregation |Optional | `null` +|`percents` |The list of percentiles to calculate |Optional | `[ 1, 5, 25, 50, 75, 95, 99 ]` +|=== + +The following snippet calculates the sum of all the total monthly `sales` buckets: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "interval" : "month" + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + } + } + }, + "sum_monthly_sales": { + "percentiles_bucket": { + "buckets_paths": "sales_per_month>sales", <1> + "percents": [ 25.0, 50.0, 75.0 ] <2> + } + } + } +} +-------------------------------------------------- +<1> `bucket_paths` instructs this percentiles_bucket aggregation that we want to calculate percentiles for +the `sales` aggregation in the `sales_per_month` date histogram. +<2> `percents` specifies which percentiles we wish to calculate, in this case, the 25th, 50th and 75th percentil + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "sales": { + "value": 550 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "sales": { + "value": 60 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "sales": { + "value": 375 + } + } + ] + }, + "percentiles_monthly_sales": { + "values" : { + "25.0": 60, + "50.0": 375", + "75.0": 550 + } + } + } +} +-------------------------------------------------- + + +==== Percentiles_bucket implementation + +The Percentile Bucket returns the nearest input data point that is not greater than the requested percentile; it does not +interpolate between data points. + +The percentiles are calculated exactly and is not an approximation (unlike the Percentiles Metric). This means +the implementation maintains an in-memory, sorted list of your data to compute the percentiles, before discarding the +data. You may run into memory pressure issues if you attempt to calculate percentiles over many millions of +data-points in a single `percentiles_bucket`. \ No newline at end of file