diff --git a/core/src/main/java/org/elasticsearch/search/SearchModule.java b/core/src/main/java/org/elasticsearch/search/SearchModule.java index 49acd1163fc..b78fe06f8e9 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchModule.java +++ b/core/src/main/java/org/elasticsearch/search/SearchModule.java @@ -19,7 +19,6 @@ package org.elasticsearch.search; -import org.elasticsearch.common.Classes; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; @@ -110,6 +109,8 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucke import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketParser; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser; @@ -143,8 +144,6 @@ import org.elasticsearch.search.highlight.HighlightPhase; import org.elasticsearch.search.highlight.Highlighter; import org.elasticsearch.search.highlight.Highlighters; import org.elasticsearch.search.query.QueryPhase; -import org.elasticsearch.search.suggest.SuggestParseElement; -import org.elasticsearch.search.suggest.SuggestPhase; import org.elasticsearch.search.suggest.Suggester; import org.elasticsearch.search.suggest.Suggesters; @@ -301,6 +300,7 @@ public class SearchModule extends AbstractModule { multibinderPipelineAggParser.addBinding().to(MinBucketParser.class); multibinderPipelineAggParser.addBinding().to(AvgBucketParser.class); multibinderPipelineAggParser.addBinding().to(SumBucketParser.class); + multibinderPipelineAggParser.addBinding().to(PercentilesBucketParser.class); multibinderPipelineAggParser.addBinding().to(MovAvgParser.class); multibinderPipelineAggParser.addBinding().to(CumulativeSumParser.class); multibinderPipelineAggParser.addBinding().to(BucketScriptParser.class); @@ -393,6 +393,7 @@ public class SearchModule extends AbstractModule { MinBucketPipelineAggregator.registerStreams(); AvgBucketPipelineAggregator.registerStreams(); SumBucketPipelineAggregator.registerStreams(); + PercentilesBucketPipelineAggregator.registerStreams(); MovAvgPipelineAggregator.registerStreams(); CumulativeSumPipelineAggregator.registerStreams(); BucketScriptPipelineAggregator.registerStreams(); diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java index 7fd1fe03308..96df702072d 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/PipelineAggregatorBuilders.java @@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.pipeline; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder; import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder; import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptBuilder; @@ -55,6 +56,10 @@ public final class PipelineAggregatorBuilders { return new SumBucketBuilder(name); } + public static final PercentilesBucketBuilder percentilesBucket(String name) { + return new PercentilesBucketBuilder(name); + } + public static final MovAvgBuilder movingAvg(String name) { return new MovAvgBuilder(name); } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java index 755655755e7..1e5dd46eca6 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsBuilder.java @@ -61,7 +61,7 @@ public abstract class BucketMetricsBuilder> ex return builder; } - protected void doInternalXContent(XContentBuilder builder, Params params) { + protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException { } } \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java index 533b6996cda..80b4c981d12 100644 --- a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/BucketMetricsParser.java @@ -105,7 +105,8 @@ public abstract class BucketMetricsParser implements PipelineAggregator.Parser { protected abstract PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter); - protected boolean doParse(String pipelineAggregatorName, String currentFieldName, Token token, XContentParser parser, SearchContext context) { + protected boolean doParse(String pipelineAggregatorName, String currentFieldName, Token token, + XContentParser parser, SearchContext context) throws IOException { return false; } diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java new file mode 100644 index 00000000000..10b1481e8d6 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/InternalPercentilesBucket.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import com.google.common.collect.UnmodifiableIterator; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.AggregationStreams; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.max.InternalMax; +import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentile; +import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class InternalPercentilesBucket extends InternalNumericMetricsAggregation.MultiValue implements PercentilesBucket { + + public final static Type TYPE = new Type("percentiles_bucket"); + + public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() { + @Override + public InternalPercentilesBucket readResult(StreamInput in) throws IOException { + InternalPercentilesBucket result = new InternalPercentilesBucket(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + AggregationStreams.registerStream(STREAM, TYPE.stream()); + } + + private double[] percentiles; + private double[] percents; + + protected InternalPercentilesBucket() { + } // for serialization + + public InternalPercentilesBucket(String name, double[] percents, double[] percentiles, + ValueFormatter formatter, List pipelineAggregators, + Map metaData) { + super(name, pipelineAggregators, metaData); + this.valueFormatter = formatter; + this.percentiles = percentiles; + this.percents = percents; + } + + @Override + public double percentile(double percent) throws IllegalArgumentException { + int index = Arrays.binarySearch(percents, percent); + if (index < 0) { + throw new IllegalArgumentException("Percent requested [" + String.valueOf(percent) + "] was not" + + " one of the computed percentiles. Available keys are: " + Arrays.toString(percents)); + } + return percentiles[index]; + } + + @Override + public String percentileAsString(double percent) { + return valueFormatter.format(percentile(percent)); + } + + @Override + public Iterator iterator() { + return new Iter(percents, percentiles); + } + + @Override + public double value(String name) { + return percentile(Double.parseDouble(name)); + } + + @Override + public Type type() { + return TYPE; + } + + @Override + public InternalMax doReduce(List aggregations, ReduceContext reduceContext) { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + protected void doReadFrom(StreamInput in) throws IOException { + valueFormatter = ValueFormatterStreams.readOptional(in); + percentiles = in.readDoubleArray(); + percents = in.readDoubleArray(); + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + ValueFormatterStreams.writeOptional(valueFormatter, out); + out.writeDoubleArray(percentiles); + out.writeDoubleArray(percents); + } + + @Override + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.startObject("values"); + for (double percent : percents) { + double value = percentile(percent); + boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value)); + String key = String.valueOf(percent); + builder.field(key, hasValue ? value : null); + if (hasValue && !(valueFormatter instanceof ValueFormatter.Raw)) { + builder.field(key + "_as_string", percentileAsString(percent)); + } + } + builder.endObject(); + return builder; + } + + public static class Iter extends UnmodifiableIterator { + + private final double[] percents; + private final double[] percentiles; + private int i; + + public Iter(double[] percents, double[] percentiles) { + this.percents = percents; + this.percentiles = percentiles; + i = 0; + } + + @Override + public boolean hasNext() { + return i < percents.length; + } + + @Override + public Percentile next() { + final Percentile next = new InternalPercentile(percents[i], percentiles[i]); + ++i; + return next; + } + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucket.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucket.java new file mode 100644 index 00000000000..64424ac5abc --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucket.java @@ -0,0 +1,25 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles; + +public interface PercentilesBucket extends Percentiles { +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketBuilder.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketBuilder.java new file mode 100644 index 00000000000..53e90808f8a --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketBuilder.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsBuilder; + +import java.io.IOException; + +public class PercentilesBucketBuilder extends BucketMetricsBuilder { + + Double[] percents; + + public PercentilesBucketBuilder(String name) { + super(name, PercentilesBucketPipelineAggregator.TYPE.name()); + } + + public PercentilesBucketBuilder percents(Double[] percents) { + this.percents = percents; + return this; + } + + @Override + protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException { + if (percents != null) { + builder.field(PercentilesBucketParser.PERCENTS.getPreferredName(), percents); + } + } + + +} + diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketParser.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketParser.java new file mode 100644 index 00000000000..01a428873c2 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketParser.java @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import com.google.common.primitives.Doubles; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; + + +public class PercentilesBucketParser extends BucketMetricsParser { + + public static final ParseField PERCENTS = new ParseField("percents"); + double[] percents = new double[] { 1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0 }; + + @Override + public String type() { + return PercentilesBucketPipelineAggregator.TYPE.name(); + } + + @Override + protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy, + ValueFormatter formatter) { + return new PercentilesBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter, percents); + } + + @Override + protected boolean doParse(String pipelineAggregatorName, String currentFieldName, + XContentParser.Token token, XContentParser parser, SearchContext context) throws IOException { + if (context.parseFieldMatcher().match(currentFieldName, PERCENTS)) { + + List parsedPercents = new ArrayList<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + parsedPercents.add(parser.doubleValue()); + } + percents = Doubles.toArray(parsedPercents); + return true; + } + return false; + } +} diff --git a/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java new file mode 100644 index 00000000000..92e8b01f43b --- /dev/null +++ b/core/src/main/java/org/elasticsearch/search/aggregations/pipeline/bucketmetrics/percentile/PercentilesBucketPipelineAggregator.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.search.aggregations.AggregatorFactory; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.Type; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator; +import org.elasticsearch.search.aggregations.support.format.ValueFormatter; + +import java.io.IOException; +import java.util.*; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; + +public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAggregator { + + public final static Type TYPE = new Type("percentiles_bucket"); + + public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() { + @Override + public PercentilesBucketPipelineAggregator readResult(StreamInput in) throws IOException { + PercentilesBucketPipelineAggregator result = new PercentilesBucketPipelineAggregator(); + result.readFrom(in); + return result; + } + }; + + public static void registerStreams() { + PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream()); + InternalPercentilesBucket.registerStreams(); + } + + private double[] percents; + private List data; + + private PercentilesBucketPipelineAggregator() { + } + + protected PercentilesBucketPipelineAggregator(String name, double[] percents, String[] bucketsPaths, GapPolicy gapPolicy, + ValueFormatter formatter, Map metaData) { + super(name, bucketsPaths, gapPolicy, formatter, metaData); + this.percents = percents; + } + + @Override + public Type type() { + return TYPE; + } + + @Override + protected void preCollection() { + data = new ArrayList<>(1024); + } + + @Override + protected void collectBucketValue(String bucketKey, Double bucketValue) { + data.add(bucketValue); + } + + @Override + protected InternalAggregation buildAggregation(List pipelineAggregators, Map metadata) { + + // Perform the sorting and percentile collection now that all the data + // has been collected. + Collections.sort(data); + + double[] percentiles = new double[percents.length]; + if (data.size() == 0) { + for (int i = 0; i < percents.length; i++) { + percentiles[i] = Double.NaN; + } + } else { + for (int i = 0; i < percents.length; i++) { + int index = (int)((percents[i] / 100.0) * data.size()); + percentiles[i] = data.get(index); + } + } + + // todo need postCollection() to clean up temp sorted data? + + return new InternalPercentilesBucket(name(), percents, percentiles, formatter, pipelineAggregators, metadata); + } + + @Override + public void doReadFrom(StreamInput in) throws IOException { + super.doReadFrom(in); + percents = in.readDoubleArray(); + } + + @Override + public void doWriteTo(StreamOutput out) throws IOException { + super.doWriteTo(out); + out.writeDoubleArray(percents); + } + + public static class Factory extends PipelineAggregatorFactory { + + private final ValueFormatter formatter; + private final GapPolicy gapPolicy; + private final double[] percents; + + public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter, double[] percents) { + super(name, TYPE.name(), bucketsPaths); + this.gapPolicy = gapPolicy; + this.formatter = formatter; + this.percents = percents; + } + + @Override + protected PipelineAggregator createInternal(Map metaData) throws IOException { + return new PercentilesBucketPipelineAggregator(name, percents, bucketsPaths, gapPolicy, formatter, metaData); + } + + @Override + public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories, + List pipelineAggregatorFactories) { + if (bucketsPaths.length != 1) { + throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName() + + " must contain a single entry for aggregation [" + name + "]"); + } + + for (Double p : percents) { + if (p == null || p < 0.0 || p > 100.0) { + throw new IllegalStateException(PercentilesBucketParser.PERCENTS.getPreferredName() + + " must only contain non-null doubles from 0.0-100.0 inclusive"); + } + } + } + + } + +} diff --git a/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java new file mode 100644 index 00000000000..507939e6858 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java @@ -0,0 +1,625 @@ +package org.elasticsearch.search.aggregations.pipeline; + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.SearchParseException; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucket; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.percentilesBucket; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.sumBucket; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.IsNull.notNullValue; + +@ESIntegTestCase.SuiteScopeTestCase +public class PercentilesBucketIT extends ESIntegTestCase { + + private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; + private static final Double[] PERCENTS = {1.0, 25.0, 50.0, 75.0, 99.0}; + static int numDocs; + static int interval; + static int minRandomValue; + static int maxRandomValue; + static int numValueBuckets; + static long[] valueCounts; + + @Override + public void setupSuiteScopeCluster() throws Exception { + createIndex("idx"); + createIndex("idx_unmapped"); + + numDocs = randomIntBetween(6, 20); + interval = randomIntBetween(2, 5); + + minRandomValue = 0; + maxRandomValue = 20; + + numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; + valueCounts = new long[numValueBuckets]; + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); + builders.add(client().prepareIndex("idx", "type").setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval)) + .endObject())); + final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); + valueCounts[bucket]++; + } + + assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); + for (int i = 0; i < 2; i++) { + builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource( + jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject())); + } + indexRandom(true, builders); + ensureSearchable(); + } + + @Test + public void testDocCount_topLevel() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>_count") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + double[] values = new double[numValueBuckets]; + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); + assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + values[i] = bucket.getDocCount(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + + } + + @Test + public void testDocCount_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>_count") + .percents(PERCENTS))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double[] values = new double[numValueBuckets]; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + values[j] = bucket.getDocCount(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + } + + @Test + public void testMetric_topLevel() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(interval)); + + double[] values = new double[interval]; + for (int i = 0; i < interval; ++i) { + Terms.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); + assertThat(bucket.getDocCount(), greaterThan(0l)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + values[i] = sum.value(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + + @Test + public void testMetric_topLevelDefaultPercents() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum")).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(interval)); + + double[] values = new double[interval]; + for (int i = 0; i < interval; ++i) { + Terms.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); + assertThat(bucket.getDocCount(), greaterThan(0l)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + values[i] = sum.value(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Percentile p : percentilesBucketValue) { + double expected = values[(int)((p.getPercent() / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p.getPercent()), equalTo(expected)); + assertThat(p.getValue(), equalTo(expected)); + } + } + + @Test + public void testMetric_asSubAgg() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>sum") + .percents(PERCENTS))).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + List values = new ArrayList<>(numValueBuckets); + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + if (bucket.getDocCount() != 0) { + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + values.add(sum.value()); + } + } + + Collections.sort(values); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values.get((int) ((p / 100) * values.size())); + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + } + + @Test + public void testMetric_asSubAggWithInsertZeros() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .subAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>sum") + .gapPolicy(BucketHelpers.GapPolicy.INSERT_ZEROS) + .percents(PERCENTS))) + .execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double[] values = new double[numValueBuckets]; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + Sum sum = bucket.getAggregations().get("sum"); + assertThat(sum, notNullValue()); + + values[j] = sum.value(); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + } + + @Test + public void testNoBuckets() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + for (Double p : PERCENTS) { + assertThat(percentilesBucketValue.percentile(p), equalTo(Double.NaN)); + } + } + + @Test + public void testWrongPercents() throws Exception { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); + + try { + percentilesBucketValue.percentile(2.0); + fail("2.0 was not a valid percent, should have thrown exception"); + } catch (IllegalArgumentException exception) { + // All good + } + } + + @Test + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/13179") + public void testBadPercents() throws Exception { + Double[] badPercents = {-1.0, 110.0}; + + try { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("terms>sum") + .percents(badPercents)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + + fail("Illegal percent's were provided but no exception was thrown."); + } catch (SearchPhaseExecutionException exception) { + // All good + } + + } + + @Test + public void testBadPercents_asSubAgg() throws Exception { + Double[] badPercents = {-1.0, 110.0}; + + try { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(percentilesBucket("percentiles_bucket") + .setBucketsPaths("histo>_count") + .percents(badPercents))).execute().actionGet(); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); + + fail("Illegal percent's were provided but no exception was thrown."); + } catch (SearchPhaseExecutionException exception) { + // All good + } + + } + + @Test + public void testNested() throws Exception { + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(percentilesBucket("percentile_histo_bucket").setBucketsPaths("histo>_count"))) + .addAggregation(percentilesBucket("percentile_terms_bucket") + .setBucketsPaths("terms>percentile_histo_bucket.50") + .percents(PERCENTS)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + double[] values = new double[termsBuckets.size()]; + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double[] innerValues = new double[numValueBuckets]; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + + innerValues[j] = bucket.getDocCount(); + } + Arrays.sort(innerValues); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket")); + for (Double p : PERCENTS) { + double expected = innerValues[(int)((p / 100) * innerValues.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + values[i] = percentilesBucketValue.percentile(50.0); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket")); + for (Double p : PERCENTS) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } + + @Test + public void testNestedWithDecimal() throws Exception { + Double[] percent = {99.9}; + SearchResponse response = client() + .prepareSearch("idx") + .addAggregation( + terms("terms") + .field("tag") + .order(Terms.Order.term(true)) + .subAggregation( + histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval) + .extendedBounds((long) minRandomValue, (long) maxRandomValue)) + .subAggregation(percentilesBucket("percentile_histo_bucket") + .percents(percent) + .setBucketsPaths("histo>_count"))) + .addAggregation(percentilesBucket("percentile_terms_bucket") + .setBucketsPaths("terms>percentile_histo_bucket[99.9]") + .percents(percent)).execute().actionGet(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get("terms"); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo("terms")); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + double[] values = new double[termsBuckets.size()]; + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get("histo"); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo("histo")); + List buckets = histo.getBuckets(); + + double[] innerValues = new double[numValueBuckets]; + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + + innerValues[j] = bucket.getDocCount(); + } + Arrays.sort(innerValues); + + PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket")); + for (Double p : percent) { + double expected = innerValues[(int)((p / 100) * innerValues.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + values[i] = percentilesBucketValue.percentile(99.9); + } + + Arrays.sort(values); + + PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket"); + assertThat(percentilesBucketValue, notNullValue()); + assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket")); + for (Double p : percent) { + double expected = values[(int)((p / 100) * values.length)]; + assertThat(percentilesBucketValue.percentile(p), equalTo(expected)); + } + } +} diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index 670ed6266b0..b6a1073156a 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -163,6 +163,7 @@ include::pipeline/derivative-aggregation.asciidoc[] include::pipeline/max-bucket-aggregation.asciidoc[] include::pipeline/min-bucket-aggregation.asciidoc[] include::pipeline/sum-bucket-aggregation.asciidoc[] +include::pipeline/percentiles-bucket-aggregation.asciidoc[] include::pipeline/movavg-aggregation.asciidoc[] include::pipeline/cumulative-sum-aggregation.asciidoc[] include::pipeline/bucket-script-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/percentiles-bucket-aggregation.asciidoc b/docs/reference/aggregations/pipeline/percentiles-bucket-aggregation.asciidoc new file mode 100644 index 00000000000..3e3c6d6fcff --- /dev/null +++ b/docs/reference/aggregations/pipeline/percentiles-bucket-aggregation.asciidoc @@ -0,0 +1,121 @@ +[[search-aggregations-pipeline-percentiles-bucket-aggregation]] +=== Percentiles Bucket Aggregation + +coming[2.1.0] + +experimental[] + +A sibling pipeline aggregation which calculates percentiles across all bucket of a specified metric in a sibling aggregation. +The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation. + +==== Syntax + +A `percentiles_bucket` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "percentiles_bucket": { + "buckets_path": "the_sum" + } +} +-------------------------------------------------- + +.`sum_bucket` Parameters +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |The path to the buckets we wish to find the sum for (see <> for more + details) |Required | +|`gap_policy` |The policy to apply when gaps are found in the data (see <> for more + details)|Optional | `skip` +|`format` |format to apply to the output value of this aggregation |Optional | `null` +|`percents` |The list of percentiles to calculate |Optional | `[ 1, 5, 25, 50, 75, 95, 99 ]` +|=== + +The following snippet calculates the sum of all the total monthly `sales` buckets: + +[source,js] +-------------------------------------------------- +{ + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "interval" : "month" + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + } + } + }, + "sum_monthly_sales": { + "percentiles_bucket": { + "buckets_paths": "sales_per_month>sales", <1> + "percents": [ 25.0, 50.0, 75.0 ] <2> + } + } + } +} +-------------------------------------------------- +<1> `bucket_paths` instructs this percentiles_bucket aggregation that we want to calculate percentiles for +the `sales` aggregation in the `sales_per_month` date histogram. +<2> `percents` specifies which percentiles we wish to calculate, in this case, the 25th, 50th and 75th percentil + +And the following may be the response: + +[source,js] +-------------------------------------------------- +{ + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "sales": { + "value": 550 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "sales": { + "value": 60 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "sales": { + "value": 375 + } + } + ] + }, + "percentiles_monthly_sales": { + "values" : { + "25.0": 60, + "50.0": 375", + "75.0": 550 + } + } + } +} +-------------------------------------------------- + + +==== Percentiles_bucket implementation + +The Percentile Bucket returns the nearest input data point that is not greater than the requested percentile; it does not +interpolate between data points. + +The percentiles are calculated exactly and is not an approximation (unlike the Percentiles Metric). This means +the implementation maintains an in-memory, sorted list of your data to compute the percentiles, before discarding the +data. You may run into memory pressure issues if you attempt to calculate percentiles over many millions of +data-points in a single `percentiles_bucket`. \ No newline at end of file