Aggregations: Add percentiles_bucket pipeline aggregations

This pipeline will calculate percentiles over a set of sibling buckets.  This is an exact
implementation, meaning it needs to cache a copy of the series in memory and sort it to determine
the percentiles.

This comes with a few limitations: to prevent serializing data around, only the requested percentiles
are calculated (unlike the TDigest version, which allows the java API to ask for any percentile).
It also needs to store the data in-memory, resulting in some overhead if the requested series is
very large.
This commit is contained in:
Zachary Tong 2015-08-28 12:23:19 -04:00
parent 17460ae92d
commit 1016734b4c
12 changed files with 1218 additions and 5 deletions

View File

@ -19,7 +19,6 @@
package org.elasticsearch.search;
import org.elasticsearch.common.Classes;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
@ -110,6 +109,8 @@ import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucke
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketParser;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketParser;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketParser;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptParser;
@ -143,8 +144,6 @@ import org.elasticsearch.search.highlight.HighlightPhase;
import org.elasticsearch.search.highlight.Highlighter;
import org.elasticsearch.search.highlight.Highlighters;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.suggest.SuggestParseElement;
import org.elasticsearch.search.suggest.SuggestPhase;
import org.elasticsearch.search.suggest.Suggester;
import org.elasticsearch.search.suggest.Suggesters;
@ -301,6 +300,7 @@ public class SearchModule extends AbstractModule {
multibinderPipelineAggParser.addBinding().to(MinBucketParser.class);
multibinderPipelineAggParser.addBinding().to(AvgBucketParser.class);
multibinderPipelineAggParser.addBinding().to(SumBucketParser.class);
multibinderPipelineAggParser.addBinding().to(PercentilesBucketParser.class);
multibinderPipelineAggParser.addBinding().to(MovAvgParser.class);
multibinderPipelineAggParser.addBinding().to(CumulativeSumParser.class);
multibinderPipelineAggParser.addBinding().to(BucketScriptParser.class);
@ -393,6 +393,7 @@ public class SearchModule extends AbstractModule {
MinBucketPipelineAggregator.registerStreams();
AvgBucketPipelineAggregator.registerStreams();
SumBucketPipelineAggregator.registerStreams();
PercentilesBucketPipelineAggregator.registerStreams();
MovAvgPipelineAggregator.registerStreams();
CumulativeSumPipelineAggregator.registerStreams();
BucketScriptPipelineAggregator.registerStreams();

View File

@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.avg.AvgBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.max.MaxBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.min.MinBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.sum.SumBucketBuilder;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketscript.BucketScriptBuilder;
@ -55,6 +56,10 @@ public final class PipelineAggregatorBuilders {
return new SumBucketBuilder(name);
}
public static final PercentilesBucketBuilder percentilesBucket(String name) {
return new PercentilesBucketBuilder(name);
}
public static final MovAvgBuilder movingAvg(String name) {
return new MovAvgBuilder(name);
}

View File

@ -61,7 +61,7 @@ public abstract class BucketMetricsBuilder<B extends BucketMetricsBuilder<B>> ex
return builder;
}
protected void doInternalXContent(XContentBuilder builder, Params params) {
protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException {
}
}

View File

@ -105,7 +105,8 @@ public abstract class BucketMetricsParser implements PipelineAggregator.Parser {
protected abstract PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
ValueFormatter formatter);
protected boolean doParse(String pipelineAggregatorName, String currentFieldName, Token token, XContentParser parser, SearchContext context) {
protected boolean doParse(String pipelineAggregatorName, String currentFieldName, Token token,
XContentParser parser, SearchContext context) throws IOException {
return false;
}

View File

@ -0,0 +1,163 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.percentiles.InternalPercentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class InternalPercentilesBucket extends InternalNumericMetricsAggregation.MultiValue implements PercentilesBucket {
public final static Type TYPE = new Type("percentiles_bucket");
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalPercentilesBucket readResult(StreamInput in) throws IOException {
InternalPercentilesBucket result = new InternalPercentilesBucket();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
private double[] percentiles;
private double[] percents;
protected InternalPercentilesBucket() {
} // for serialization
public InternalPercentilesBucket(String name, double[] percents, double[] percentiles,
ValueFormatter formatter, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.valueFormatter = formatter;
this.percentiles = percentiles;
this.percents = percents;
}
@Override
public double percentile(double percent) throws IllegalArgumentException {
int index = Arrays.binarySearch(percents, percent);
if (index < 0) {
throw new IllegalArgumentException("Percent requested [" + String.valueOf(percent) + "] was not" +
" one of the computed percentiles. Available keys are: " + Arrays.toString(percents));
}
return percentiles[index];
}
@Override
public String percentileAsString(double percent) {
return valueFormatter.format(percentile(percent));
}
@Override
public Iterator<Percentile> iterator() {
return new Iter(percents, percentiles);
}
@Override
public double value(String name) {
return percentile(Double.parseDouble(name));
}
@Override
public Type type() {
return TYPE;
}
@Override
public InternalMax doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
valueFormatter = ValueFormatterStreams.readOptional(in);
percentiles = in.readDoubleArray();
percents = in.readDoubleArray();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
ValueFormatterStreams.writeOptional(valueFormatter, out);
out.writeDoubleArray(percentiles);
out.writeDoubleArray(percents);
}
@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startObject("values");
for (double percent : percents) {
double value = percentile(percent);
boolean hasValue = !(Double.isInfinite(value) || Double.isNaN(value));
String key = String.valueOf(percent);
builder.field(key, hasValue ? value : null);
if (hasValue && !(valueFormatter instanceof ValueFormatter.Raw)) {
builder.field(key + "_as_string", percentileAsString(percent));
}
}
builder.endObject();
return builder;
}
public static class Iter extends UnmodifiableIterator<Percentile> {
private final double[] percents;
private final double[] percentiles;
private int i;
public Iter(double[] percents, double[] percentiles) {
this.percents = percents;
this.percentiles = percentiles;
i = 0;
}
@Override
public boolean hasNext() {
return i < percents.length;
}
@Override
public Percentile next() {
final Percentile next = new InternalPercentile(percents[i], percentiles[i]);
++i;
return next;
}
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
public interface PercentilesBucket extends Percentiles {
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsBuilder;
import java.io.IOException;
public class PercentilesBucketBuilder extends BucketMetricsBuilder<PercentilesBucketBuilder> {
Double[] percents;
public PercentilesBucketBuilder(String name) {
super(name, PercentilesBucketPipelineAggregator.TYPE.name());
}
public PercentilesBucketBuilder percents(Double[] percents) {
this.percents = percents;
return this;
}
@Override
protected void doInternalXContent(XContentBuilder builder, Params params) throws IOException {
if (percents != null) {
builder.field(PercentilesBucketParser.PERCENTS.getPreferredName(), percents);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
import com.google.common.primitives.Doubles;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
public class PercentilesBucketParser extends BucketMetricsParser {
public static final ParseField PERCENTS = new ParseField("percents");
double[] percents = new double[] { 1.0, 5.0, 25.0, 50.0, 75.0, 95.0, 99.0 };
@Override
public String type() {
return PercentilesBucketPipelineAggregator.TYPE.name();
}
@Override
protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
ValueFormatter formatter) {
return new PercentilesBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter, percents);
}
@Override
protected boolean doParse(String pipelineAggregatorName, String currentFieldName,
XContentParser.Token token, XContentParser parser, SearchContext context) throws IOException {
if (context.parseFieldMatcher().match(currentFieldName, PERCENTS)) {
List<Double> parsedPercents = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
parsedPercents.add(parser.doubleValue());
}
percents = Doubles.toArray(parsedPercents);
return true;
}
return false;
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsPipelineAggregator;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
public class PercentilesBucketPipelineAggregator extends BucketMetricsPipelineAggregator {
public final static Type TYPE = new Type("percentiles_bucket");
public final static PipelineAggregatorStreams.Stream STREAM = new PipelineAggregatorStreams.Stream() {
@Override
public PercentilesBucketPipelineAggregator readResult(StreamInput in) throws IOException {
PercentilesBucketPipelineAggregator result = new PercentilesBucketPipelineAggregator();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
PipelineAggregatorStreams.registerStream(STREAM, TYPE.stream());
InternalPercentilesBucket.registerStreams();
}
private double[] percents;
private List<Double> data;
private PercentilesBucketPipelineAggregator() {
}
protected PercentilesBucketPipelineAggregator(String name, double[] percents, String[] bucketsPaths, GapPolicy gapPolicy,
ValueFormatter formatter, Map<String, Object> metaData) {
super(name, bucketsPaths, gapPolicy, formatter, metaData);
this.percents = percents;
}
@Override
public Type type() {
return TYPE;
}
@Override
protected void preCollection() {
data = new ArrayList<>(1024);
}
@Override
protected void collectBucketValue(String bucketKey, Double bucketValue) {
data.add(bucketValue);
}
@Override
protected InternalAggregation buildAggregation(List<PipelineAggregator> pipelineAggregators, Map<String, Object> metadata) {
// Perform the sorting and percentile collection now that all the data
// has been collected.
Collections.sort(data);
double[] percentiles = new double[percents.length];
if (data.size() == 0) {
for (int i = 0; i < percents.length; i++) {
percentiles[i] = Double.NaN;
}
} else {
for (int i = 0; i < percents.length; i++) {
int index = (int)((percents[i] / 100.0) * data.size());
percentiles[i] = data.get(index);
}
}
// todo need postCollection() to clean up temp sorted data?
return new InternalPercentilesBucket(name(), percents, percentiles, formatter, pipelineAggregators, metadata);
}
@Override
public void doReadFrom(StreamInput in) throws IOException {
super.doReadFrom(in);
percents = in.readDoubleArray();
}
@Override
public void doWriteTo(StreamOutput out) throws IOException {
super.doWriteTo(out);
out.writeDoubleArray(percents);
}
public static class Factory extends PipelineAggregatorFactory {
private final ValueFormatter formatter;
private final GapPolicy gapPolicy;
private final double[] percents;
public Factory(String name, String[] bucketsPaths, GapPolicy gapPolicy, ValueFormatter formatter, double[] percents) {
super(name, TYPE.name(), bucketsPaths);
this.gapPolicy = gapPolicy;
this.formatter = formatter;
this.percents = percents;
}
@Override
protected PipelineAggregator createInternal(Map<String, Object> metaData) throws IOException {
return new PercentilesBucketPipelineAggregator(name, percents, bucketsPaths, gapPolicy, formatter, metaData);
}
@Override
public void doValidate(AggregatorFactory parent, AggregatorFactory[] aggFactories,
List<PipelineAggregatorFactory> pipelineAggregatorFactories) {
if (bucketsPaths.length != 1) {
throw new IllegalStateException(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName()
+ " must contain a single entry for aggregation [" + name + "]");
}
for (Double p : percents) {
if (p == null || p < 0.0 || p > 100.0) {
throw new IllegalStateException(PercentilesBucketParser.PERCENTS.getPreferredName()
+ " must only contain non-null doubles from 0.0-100.0 inclusive");
}
}
}
}
}

View File

@ -0,0 +1,625 @@
package org.elasticsearch.search.aggregations.pipeline;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile.PercentilesBucket;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
import static org.elasticsearch.search.aggregations.AggregationBuilders.sum;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.percentilesBucket;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorBuilders.sumBucket;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.IsNull.notNullValue;
@ESIntegTestCase.SuiteScopeTestCase
public class PercentilesBucketIT extends ESIntegTestCase {
private static final String SINGLE_VALUED_FIELD_NAME = "l_value";
private static final Double[] PERCENTS = {1.0, 25.0, 50.0, 75.0, 99.0};
static int numDocs;
static int interval;
static int minRandomValue;
static int maxRandomValue;
static int numValueBuckets;
static long[] valueCounts;
@Override
public void setupSuiteScopeCluster() throws Exception {
createIndex("idx");
createIndex("idx_unmapped");
numDocs = randomIntBetween(6, 20);
interval = randomIntBetween(2, 5);
minRandomValue = 0;
maxRandomValue = 20;
numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1;
valueCounts = new long[numValueBuckets];
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
int fieldValue = randomIntBetween(minRandomValue, maxRandomValue);
builders.add(client().prepareIndex("idx", "type").setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, fieldValue).field("tag", "tag" + (i % interval))
.endObject()));
final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1);
valueCounts[bucket]++;
}
assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer"));
for (int i = 0; i < 2; i++) {
builders.add(client().prepareIndex("empty_bucket_idx", "type", "" + i).setSource(
jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()));
}
indexRandom(true, builders);
ensureSearchable();
}
@Test
public void testDocCount_topLevel() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.addAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("histo>_count")
.percents(PERCENTS)).execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
assertThat(buckets.size(), equalTo(numValueBuckets));
double[] values = new double[numValueBuckets];
for (int i = 0; i < numValueBuckets; ++i) {
Histogram.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval));
assertThat(bucket.getDocCount(), equalTo(valueCounts[i]));
values[i] = bucket.getDocCount();
}
Arrays.sort(values);
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
for (Double p : PERCENTS) {
double expected = values[(int)((p / 100) * values.length)];
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
}
@Test
public void testDocCount_asSubAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Terms.Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("histo>_count")
.percents(PERCENTS))).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
double[] values = new double[numValueBuckets];
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
values[j] = bucket.getDocCount();
}
Arrays.sort(values);
PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
for (Double p : PERCENTS) {
double expected = values[(int)((p / 100) * values.length)];
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
}
}
@Test
public void testMetric_topLevel() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("terms>sum")
.percents(PERCENTS)).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(interval));
double[] values = new double[interval];
for (int i = 0; i < interval; ++i) {
Terms.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval)));
assertThat(bucket.getDocCount(), greaterThan(0l));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
values[i] = sum.value();
}
Arrays.sort(values);
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
for (Double p : PERCENTS) {
double expected = values[(int)((p / 100) * values.length)];
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
}
@Test
public void testMetric_topLevelDefaultPercents() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("terms>sum")).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(interval));
double[] values = new double[interval];
for (int i = 0; i < interval; ++i) {
Terms.Bucket bucket = buckets.get(i);
assertThat(bucket, notNullValue());
assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval)));
assertThat(bucket.getDocCount(), greaterThan(0l));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
values[i] = sum.value();
}
Arrays.sort(values);
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
for (Percentile p : percentilesBucketValue) {
double expected = values[(int)((p.getPercent() / 100) * values.length)];
assertThat(percentilesBucketValue.percentile(p.getPercent()), equalTo(expected));
assertThat(p.getValue(), equalTo(expected));
}
}
@Test
public void testMetric_asSubAgg() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Terms.Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("histo>sum")
.percents(PERCENTS))).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
List<Double> values = new ArrayList<>(numValueBuckets);
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
if (bucket.getDocCount() != 0) {
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
values.add(sum.value());
}
}
Collections.sort(values);
PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
for (Double p : PERCENTS) {
double expected = values.get((int) ((p / 100) * values.size()));
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
}
}
@Test
public void testMetric_asSubAggWithInsertZeros() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Terms.Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue)
.subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.subAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("histo>sum")
.gapPolicy(BucketHelpers.GapPolicy.INSERT_ZEROS)
.percents(PERCENTS)))
.execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
double[] values = new double[numValueBuckets];
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
Sum sum = bucket.getAggregations().get("sum");
assertThat(sum, notNullValue());
values[j] = sum.value();
}
Arrays.sort(values);
PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
for (Double p : PERCENTS) {
double expected = values[(int)((p / 100) * values.length)];
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
}
}
@Test
public void testNoBuckets() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("terms>sum")
.percents(PERCENTS)).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(0));
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
for (Double p : PERCENTS) {
assertThat(percentilesBucketValue.percentile(p), equalTo(Double.NaN));
}
}
@Test
public void testWrongPercents() throws Exception {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(terms("terms").field("tag").exclude("tag.*").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("terms>sum")
.percents(PERCENTS)).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(0));
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket"));
try {
percentilesBucketValue.percentile(2.0);
fail("2.0 was not a valid percent, should have thrown exception");
} catch (IllegalArgumentException exception) {
// All good
}
}
@Test
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/13179")
public void testBadPercents() throws Exception {
Double[] badPercents = {-1.0, 110.0};
try {
SearchResponse response = client().prepareSearch("idx")
.addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)))
.addAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("terms>sum")
.percents(badPercents)).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> buckets = terms.getBuckets();
assertThat(buckets.size(), equalTo(0));
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
fail("Illegal percent's were provided but no exception was thrown.");
} catch (SearchPhaseExecutionException exception) {
// All good
}
}
@Test
public void testBadPercents_asSubAgg() throws Exception {
Double[] badPercents = {-1.0, 110.0};
try {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Terms.Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(percentilesBucket("percentiles_bucket")
.setBucketsPaths("histo>_count")
.percents(badPercents))).execute().actionGet();
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket");
fail("Illegal percent's were provided but no exception was thrown.");
} catch (SearchPhaseExecutionException exception) {
// All good
}
}
@Test
public void testNested() throws Exception {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Terms.Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(percentilesBucket("percentile_histo_bucket").setBucketsPaths("histo>_count")))
.addAggregation(percentilesBucket("percentile_terms_bucket")
.setBucketsPaths("terms>percentile_histo_bucket.50")
.percents(PERCENTS)).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
double[] values = new double[termsBuckets.size()];
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
double[] innerValues = new double[numValueBuckets];
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
innerValues[j] = bucket.getDocCount();
}
Arrays.sort(innerValues);
PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket"));
for (Double p : PERCENTS) {
double expected = innerValues[(int)((p / 100) * innerValues.length)];
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
values[i] = percentilesBucketValue.percentile(50.0);
}
Arrays.sort(values);
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket"));
for (Double p : PERCENTS) {
double expected = values[(int)((p / 100) * values.length)];
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
}
@Test
public void testNestedWithDecimal() throws Exception {
Double[] percent = {99.9};
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
terms("terms")
.field("tag")
.order(Terms.Order.term(true))
.subAggregation(
histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval)
.extendedBounds((long) minRandomValue, (long) maxRandomValue))
.subAggregation(percentilesBucket("percentile_histo_bucket")
.percents(percent)
.setBucketsPaths("histo>_count")))
.addAggregation(percentilesBucket("percentile_terms_bucket")
.setBucketsPaths("terms>percentile_histo_bucket[99.9]")
.percents(percent)).execute().actionGet();
assertSearchResponse(response);
Terms terms = response.getAggregations().get("terms");
assertThat(terms, notNullValue());
assertThat(terms.getName(), equalTo("terms"));
List<Terms.Bucket> termsBuckets = terms.getBuckets();
assertThat(termsBuckets.size(), equalTo(interval));
double[] values = new double[termsBuckets.size()];
for (int i = 0; i < interval; ++i) {
Terms.Bucket termsBucket = termsBuckets.get(i);
assertThat(termsBucket, notNullValue());
assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval)));
Histogram histo = termsBucket.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
double[] innerValues = new double[numValueBuckets];
for (int j = 0; j < numValueBuckets; ++j) {
Histogram.Bucket bucket = buckets.get(j);
assertThat(bucket, notNullValue());
assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval));
innerValues[j] = bucket.getDocCount();
}
Arrays.sort(innerValues);
PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket"));
for (Double p : percent) {
double expected = innerValues[(int)((p / 100) * innerValues.length)];
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
values[i] = percentilesBucketValue.percentile(99.9);
}
Arrays.sort(values);
PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket");
assertThat(percentilesBucketValue, notNullValue());
assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket"));
for (Double p : percent) {
double expected = values[(int)((p / 100) * values.length)];
assertThat(percentilesBucketValue.percentile(p), equalTo(expected));
}
}
}

View File

@ -163,6 +163,7 @@ include::pipeline/derivative-aggregation.asciidoc[]
include::pipeline/max-bucket-aggregation.asciidoc[]
include::pipeline/min-bucket-aggregation.asciidoc[]
include::pipeline/sum-bucket-aggregation.asciidoc[]
include::pipeline/percentiles-bucket-aggregation.asciidoc[]
include::pipeline/movavg-aggregation.asciidoc[]
include::pipeline/cumulative-sum-aggregation.asciidoc[]
include::pipeline/bucket-script-aggregation.asciidoc[]

View File

@ -0,0 +1,121 @@
[[search-aggregations-pipeline-percentiles-bucket-aggregation]]
=== Percentiles Bucket Aggregation
coming[2.1.0]
experimental[]
A sibling pipeline aggregation which calculates percentiles across all bucket of a specified metric in a sibling aggregation.
The specified metric must be numeric and the sibling aggregation must be a multi-bucket aggregation.
==== Syntax
A `percentiles_bucket` aggregation looks like this in isolation:
[source,js]
--------------------------------------------------
{
"percentiles_bucket": {
"buckets_path": "the_sum"
}
}
--------------------------------------------------
.`sum_bucket` Parameters
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to find the sum for (see <<bucket-path-syntax>> for more
details) |Required |
|`gap_policy` |The policy to apply when gaps are found in the data (see <<gap-policy>> for more
details)|Optional | `skip`
|`format` |format to apply to the output value of this aggregation |Optional | `null`
|`percents` |The list of percentiles to calculate |Optional | `[ 1, 5, 25, 50, 75, 95, 99 ]`
|===
The following snippet calculates the sum of all the total monthly `sales` buckets:
[source,js]
--------------------------------------------------
{
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"sum_monthly_sales": {
"percentiles_bucket": {
"buckets_paths": "sales_per_month>sales", <1>
"percents": [ 25.0, 50.0, 75.0 ] <2>
}
}
}
}
--------------------------------------------------
<1> `bucket_paths` instructs this percentiles_bucket aggregation that we want to calculate percentiles for
the `sales` aggregation in the `sales_per_month` date histogram.
<2> `percents` specifies which percentiles we wish to calculate, in this case, the 25th, 50th and 75th percentil
And the following may be the response:
[source,js]
--------------------------------------------------
{
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375
}
}
]
},
"percentiles_monthly_sales": {
"values" : {
"25.0": 60,
"50.0": 375",
"75.0": 550
}
}
}
}
--------------------------------------------------
==== Percentiles_bucket implementation
The Percentile Bucket returns the nearest input data point that is not greater than the requested percentile; it does not
interpolate between data points.
The percentiles are calculated exactly and is not an approximation (unlike the Percentiles Metric). This means
the implementation maintains an in-memory, sorted list of your data to compute the percentiles, before discarding the
data. You may run into memory pressure issues if you attempt to calculate percentiles over many millions of
data-points in a single `percentiles_bucket`.