Remove abstraction in the percentiles aggregation.

We initially added abstraction in the percentiles aggregation in order to be
able to plug in different percentiles estimators. However, only one of the 3
options that we looked into proved useful and I don't see us adding new
estimators in the future.

Moreover, because of this, we let the parser put unknown parameters into a hash
table in case these parameters would have meaning for a specific percentiles
estimator impl. But this makes parsing error-prone: for example a user reported
that his percentiles aggregation reported extremely high (in the order of
several millions while the maximum field value was `5`), and the reason was that
he had a typo and had written `fields` instead of `field`. As a consequence,
the percentiles aggregation used the parent value source which was a timestamp,
hence the large values. Parsing would now barf in case of an unknown parameter.

Close #5859
This commit is contained in:
Adrien Grand 2014-04-17 19:38:45 +02:00
parent b3e0e58094
commit cb8139a583
8 changed files with 134 additions and 445 deletions

View File

@ -19,12 +19,15 @@
package org.elasticsearch.search.aggregations.metrics.percentiles; package org.elasticsearch.search.aggregations.metrics.percentiles;
import com.google.common.collect.UnmodifiableIterator; import com.google.common.collect.UnmodifiableIterator;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregationStreams; import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregation; import org.elasticsearch.search.aggregations.metrics.MetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigestState;
import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams; import org.elasticsearch.search.aggregations.support.format.ValueFormatterStreams;
import java.io.IOException; import java.io.IOException;
@ -51,20 +54,22 @@ public class InternalPercentiles extends MetricsAggregation.MultiValue implement
AggregationStreams.registerStream(STREAM, TYPE.stream()); AggregationStreams.registerStream(STREAM, TYPE.stream());
} }
private PercentilesEstimator.Result result; private double[] percents;
private TDigestState state;
private boolean keyed; private boolean keyed;
InternalPercentiles() {} // for serialization InternalPercentiles() {} // for serialization
public InternalPercentiles(String name, PercentilesEstimator.Result result, boolean keyed) { public InternalPercentiles(String name, double[] percents, TDigestState state, boolean keyed) {
super(name); super(name);
this.result = result; this.percents = percents;
this.state = state;
this.keyed = keyed; this.keyed = keyed;
} }
@Override @Override
public double value(String name) { public double value(String name) {
return result.estimate(Double.valueOf(name)); return percentile(Double.parseDouble(name));
} }
@Override @Override
@ -74,34 +79,44 @@ public class InternalPercentiles extends MetricsAggregation.MultiValue implement
@Override @Override
public double percentile(double percent) { public double percentile(double percent) {
return result.estimate(percent); return state.quantile(percent / 100);
} }
@Override @Override
public Iterator<Percentiles.Percentile> iterator() { public Iterator<Percentiles.Percentile> iterator() {
return new Iter(result); return new Iter(percents, state);
} }
@Override @Override
public InternalPercentiles reduce(ReduceContext reduceContext) { public InternalPercentiles reduce(ReduceContext reduceContext) {
List<InternalAggregation> aggregations = reduceContext.aggregations(); List<InternalAggregation> aggregations = reduceContext.aggregations();
InternalPercentiles first = (InternalPercentiles) aggregations.get(0); InternalPercentiles merged = null;
if (aggregations.size() == 1) {
return first;
}
PercentilesEstimator.Result.Merger merger = first.result.merger(aggregations.size());
for (InternalAggregation aggregation : aggregations) { for (InternalAggregation aggregation : aggregations) {
merger.add(((InternalPercentiles) aggregation).result); final InternalPercentiles percentiles = (InternalPercentiles) aggregation;
if (merged == null) {
merged = percentiles;
} else {
merged.state.add(percentiles.state);
}
} }
first.result = merger.merge(); return merged;
return first;
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
name = in.readString(); name = in.readString();
valueFormatter = ValueFormatterStreams.readOptional(in); valueFormatter = ValueFormatterStreams.readOptional(in);
result = PercentilesEstimator.Streams.read(in); if (in.getVersion().before(Version.V_1_2_0)) {
final byte id = in.readByte();
if (id != 0) {
throw new ElasticsearchIllegalArgumentException("Unexpected percentiles aggregator id [" + id + "]");
}
}
percents = new double[in.readInt()];
for (int i = 0; i < percents.length; ++i) {
percents[i] = in.readDouble();
}
state = TDigestState.read(in);
keyed = in.readBoolean(); keyed = in.readBoolean();
} }
@ -109,18 +124,24 @@ public class InternalPercentiles extends MetricsAggregation.MultiValue implement
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeString(name); out.writeString(name);
ValueFormatterStreams.writeOptional(valueFormatter, out); ValueFormatterStreams.writeOptional(valueFormatter, out);
PercentilesEstimator.Streams.write(result, out); if (out.getVersion().before(Version.V_1_2_0)) {
out.writeByte((byte) 0);
}
out.writeInt(percents.length);
for (int i = 0 ; i < percents.length; ++i) {
out.writeDouble(percents[i]);
}
TDigestState.write(state, out);
out.writeBoolean(keyed); out.writeBoolean(keyed);
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
double[] percents = result.percents;
if (keyed) { if (keyed) {
builder.startObject(name); builder.startObject(name);
for(int i = 0; i < percents.length; ++i) { for(int i = 0; i < percents.length; ++i) {
String key = String.valueOf(percents[i]); String key = String.valueOf(percents[i]);
double value = result.estimate(i); double value = percentile(percents[i]);
builder.field(key, value); builder.field(key, value);
if (valueFormatter != null) { if (valueFormatter != null) {
builder.field(key + "_as_string", valueFormatter.format(value)); builder.field(key + "_as_string", valueFormatter.format(value));
@ -130,7 +151,7 @@ public class InternalPercentiles extends MetricsAggregation.MultiValue implement
} else { } else {
builder.startArray(name); builder.startArray(name);
for (int i = 0; i < percents.length; i++) { for (int i = 0; i < percents.length; i++) {
double value = result.estimate(i); double value = percentile(percents[i]);
builder.startObject(); builder.startObject();
builder.field(CommonFields.KEY, percents[i]); builder.field(CommonFields.KEY, percents[i]);
builder.field(CommonFields.VALUE, value); builder.field(CommonFields.VALUE, value);
@ -146,22 +167,24 @@ public class InternalPercentiles extends MetricsAggregation.MultiValue implement
public static class Iter extends UnmodifiableIterator<Percentiles.Percentile> { public static class Iter extends UnmodifiableIterator<Percentiles.Percentile> {
private final PercentilesEstimator.Result result; private final double[] percents;
private final TDigestState state;
private int i; private int i;
public Iter(PercentilesEstimator.Result estimator) { public Iter(double[] percents, TDigestState state) {
this.result = estimator; this.percents = percents;
this.state = state;
i = 0; i = 0;
} }
@Override @Override
public boolean hasNext() { public boolean hasNext() {
return i < result.percents.length; return i < percents.length;
} }
@Override @Override
public Percentiles.Percentile next() { public Percentiles.Percentile next() {
final Percentiles.Percentile next = new InnerPercentile(result.percents[i], result.estimate(i)); final Percentiles.Percentile next = new InnerPercentile(percents[i], state.quantile(percents[i] / 100));
++i; ++i;
return next; return next;
} }

View File

@ -18,57 +18,13 @@
*/ */
package org.elasticsearch.search.aggregations.metrics.percentiles; package org.elasticsearch.search.aggregations.metrics.percentiles;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregation;
import java.io.IOException;
/** /**
* *
*/ */
public interface Percentiles extends Aggregation, Iterable<Percentiles.Percentile> { public interface Percentiles extends Aggregation, Iterable<Percentiles.Percentile> {
public static abstract class Estimator {
public static TDigest tDigest() {
return new TDigest();
}
private final String type;
protected Estimator(String type) {
this.type = type;
}
public static class TDigest extends Estimator {
protected double compression = -1;
TDigest() {
super("tdigest");
}
public TDigest compression(double compression) {
this.compression = compression;
return this;
}
@Override
void paramsToXContent(XContentBuilder builder) throws IOException {
if (compression > 0) {
builder.field("compression", compression);
}
}
}
String type() {
return type;
}
abstract void paramsToXContent(XContentBuilder builder) throws IOException;
}
public static interface Percentile { public static interface Percentile {
double getPercent(); double getPercent();

View File

@ -19,10 +19,14 @@
package org.elasticsearch.search.aggregations.metrics.percentiles; package org.elasticsearch.search.aggregations.metrics.percentiles;
import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.AtomicReaderContext;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.fielddata.DoubleValues; import org.elasticsearch.index.fielddata.DoubleValues;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigestState;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
@ -35,19 +39,27 @@ import java.io.IOException;
*/ */
public class PercentilesAggregator extends MetricsAggregator.MultiValue { public class PercentilesAggregator extends MetricsAggregator.MultiValue {
private static int indexOfPercent(double[] percents, double percent) {
return ArrayUtils.binarySearch(percents, percent, 0.001);
}
private final double[] percents;
private final ValuesSource.Numeric valuesSource; private final ValuesSource.Numeric valuesSource;
private DoubleValues values; private DoubleValues values;
private final PercentilesEstimator estimator; private ObjectArray<TDigestState> states;
private final double compression;
private final boolean keyed; private final boolean keyed;
public PercentilesAggregator(String name, long estimatedBucketsCount, ValuesSource.Numeric valuesSource, AggregationContext context, public PercentilesAggregator(String name, long estimatedBucketsCount, ValuesSource.Numeric valuesSource, AggregationContext context,
Aggregator parent, PercentilesEstimator estimator, boolean keyed) { Aggregator parent, double[] percents, double compression, boolean keyed) {
super(name, estimatedBucketsCount, context, parent); super(name, estimatedBucketsCount, context, parent);
this.valuesSource = valuesSource; this.valuesSource = valuesSource;
this.keyed = keyed; this.keyed = keyed;
this.estimator = estimator; this.states = bigArrays.newObjectArray(estimatedBucketsCount);
this.percents = percents;
this.compression = compression;
} }
@Override @Override
@ -61,65 +73,86 @@ public class PercentilesAggregator extends MetricsAggregator.MultiValue {
} }
@Override @Override
public void collect(int doc, long owningBucketOrdinal) throws IOException { public void collect(int doc, long bucketOrd) throws IOException {
states = bigArrays.grow(states, bucketOrd + 1);
TDigestState state = states.get(bucketOrd);
if (state == null) {
state = new TDigestState(compression);
states.set(bucketOrd, state);
}
final int valueCount = values.setDocument(doc); final int valueCount = values.setDocument(doc);
for (int i = 0; i < valueCount; i++) { for (int i = 0; i < valueCount; i++) {
estimator.offer(values.nextValue(), owningBucketOrdinal); state.add(values.nextValue());
} }
} }
@Override @Override
public boolean hasMetric(String name) { public boolean hasMetric(String name) {
return PercentilesEstimator.indexOfPercent(estimator.percents, Double.parseDouble(name)) >= 0; return indexOfPercent(percents, Double.parseDouble(name)) >= 0;
}
private TDigestState getState(long bucketOrd) {
if (bucketOrd >= states.size()) {
return null;
}
final TDigestState state = states.get(bucketOrd);
return state;
} }
@Override @Override
public double metric(String name, long owningBucketOrd) { public double metric(String name, long bucketOrd) {
return estimator.result(owningBucketOrd).estimate(Double.parseDouble(name)); TDigestState state = getState(bucketOrd);
if (state == null) {
return Double.NaN;
} else {
return state.quantile(Double.parseDouble(name) / 100);
}
} }
@Override @Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) { public InternalAggregation buildAggregation(long owningBucketOrdinal) {
if (valuesSource == null) { TDigestState state = getState(owningBucketOrdinal);
if (state == null) {
return buildEmptyAggregation(); return buildEmptyAggregation();
} else {
return new InternalPercentiles(name, percents, state, keyed);
} }
return new InternalPercentiles(name, estimator.result(owningBucketOrdinal), keyed);
} }
@Override @Override
public InternalAggregation buildEmptyAggregation() { public InternalAggregation buildEmptyAggregation() {
return new InternalPercentiles(name, estimator.emptyResult(), keyed); return new InternalPercentiles(name, percents, new TDigestState(compression), keyed);
} }
@Override @Override
protected void doClose() { protected void doClose() {
estimator.close(); Releasables.close(states);
} }
public static class Factory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource.Numeric> { public static class Factory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource.Numeric> {
private final PercentilesEstimator.Factory estimatorFactory;
private final double[] percents; private final double[] percents;
private final double compression;
private final boolean keyed; private final boolean keyed;
public Factory(String name, ValuesSourceConfig<ValuesSource.Numeric> valuesSourceConfig, public Factory(String name, ValuesSourceConfig<ValuesSource.Numeric> valuesSourceConfig,
double[] percents, PercentilesEstimator.Factory estimatorFactory, boolean keyed) { double[] percents, double compression, boolean keyed) {
super(name, InternalPercentiles.TYPE.name(), valuesSourceConfig); super(name, InternalPercentiles.TYPE.name(), valuesSourceConfig);
this.estimatorFactory = estimatorFactory;
this.percents = percents; this.percents = percents;
this.compression = compression;
this.keyed = keyed; this.keyed = keyed;
} }
@Override @Override
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) { protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent) {
return new PercentilesAggregator(name, 0, null, aggregationContext, parent, estimatorFactory.create(percents, 0, aggregationContext), keyed); return new PercentilesAggregator(name, 0, null, aggregationContext, parent, percents, compression, keyed);
} }
@Override @Override
protected Aggregator create(ValuesSource.Numeric valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) { protected Aggregator create(ValuesSource.Numeric valuesSource, long expectedBucketsCount, AggregationContext aggregationContext, Aggregator parent) {
PercentilesEstimator estimator = estimatorFactory.create(percents, expectedBucketsCount, aggregationContext); return new PercentilesAggregator(name, expectedBucketsCount, valuesSource, aggregationContext, parent, percents, compression, keyed);
return new PercentilesAggregator(name, expectedBucketsCount, valuesSource, aggregationContext, parent, estimator, keyed);
} }
} }
} }

View File

@ -29,7 +29,7 @@ import java.io.IOException;
public class PercentilesBuilder extends ValuesSourceMetricsAggregationBuilder<PercentilesBuilder> { public class PercentilesBuilder extends ValuesSourceMetricsAggregationBuilder<PercentilesBuilder> {
private double[] percentiles; private double[] percentiles;
private Percentiles.Estimator estimator; private Double compression;
public PercentilesBuilder(String name) { public PercentilesBuilder(String name) {
super(name, InternalPercentiles.TYPE.name()); super(name, InternalPercentiles.TYPE.name());
@ -46,8 +46,8 @@ public class PercentilesBuilder extends ValuesSourceMetricsAggregationBuilder<Pe
return this; return this;
} }
public PercentilesBuilder estimator(Percentiles.Estimator estimator) { public PercentilesBuilder compression(double compression) {
this.estimator = estimator; this.compression = compression;
return this; return this;
} }
@ -59,9 +59,8 @@ public class PercentilesBuilder extends ValuesSourceMetricsAggregationBuilder<Pe
builder.field("percents", percentiles); builder.field("percents", percentiles);
} }
if (estimator != null) { if (compression != null) {
builder.field("estimator", estimator.type()); builder.field("compression", compression);
estimator.paramsToXContent(builder);
} }
} }
} }

View File

@ -1,123 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics.percentiles;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigest;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException;
import java.util.Arrays;
/**
*
*/
public abstract class PercentilesEstimator implements Releasable {
protected double[] percents;
public PercentilesEstimator(double[] percents) {
this.percents = percents;
}
/**
* @return list of percentile intervals
*/
public double[] percents() {
return percents;
}
/**
* Offer a new value to the streaming percentile algo. May modify the current
* estimate
*
* @param value Value to stream
*/
public abstract void offer(double value, long bucketOrd);
public abstract Result result(long bucketOrd);
public abstract Result emptyResult();
static int indexOfPercent(double[] percents, double percent) {
return ArrayUtils.binarySearch(percents, percent, 0.001);
}
/**
* Responsible for merging multiple estimators into a single one.
*/
public abstract static class Result<E extends PercentilesEstimator, F extends Result> implements Streamable {
protected double[] percents;
protected Result() {} // for serialization
protected Result(double[] percents) {
this.percents = percents;
}
protected abstract byte id();
public double estimate(double percent) {
int i = indexOfPercent(percents, percent);
assert i >= 0;
return estimate(i);
}
public abstract double estimate(int index);
public abstract Merger<E, F> merger(int estimatedMerges);
public static interface Merger<E extends PercentilesEstimator, F extends Result> {
public abstract void add(F result);
public abstract Result merge();
}
}
public static interface Factory<E extends PercentilesEstimator> {
public abstract E create(double[] percents, long estimatedBucketCount, AggregationContext context);
}
static class Streams {
static Result read(StreamInput in) throws IOException {
switch (in.readByte()) {
case TDigest.ID: return TDigest.Result.read(in);
default:
throw new ElasticsearchIllegalArgumentException("Unknown percentile estimator");
}
}
static void write(Result estimator, StreamOutput out) throws IOException {
out.writeByte(estimator.id());
estimator.writeTo(out);
}
}
}

View File

@ -23,15 +23,12 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException; import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.metrics.percentiles.tdigest.TDigest;
import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceParser; import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/** /**
* *
@ -58,7 +55,7 @@ public class PercentilesParser implements Aggregator.Parser {
double[] percents = DEFAULT_PERCENTS; double[] percents = DEFAULT_PERCENTS;
boolean keyed = true; boolean keyed = true;
Map<String, Object> settings = null; double compression = 100;
XContentParser.Token token; XContentParser.Token token;
String currentFieldName = null; String currentFieldName = null;
@ -84,43 +81,24 @@ public class PercentilesParser implements Aggregator.Parser {
} else { } else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "]."); throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
} }
} else if (token.isValue()) { } else if (token == XContentParser.Token.VALUE_BOOLEAN) {
if (token == XContentParser.Token.VALUE_BOOLEAN && "keyed".equals(currentFieldName)) { if ("keyed".equals(currentFieldName)) {
keyed = parser.booleanValue(); keyed = parser.booleanValue();
} else { } else {
if (settings == null) { throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
settings = new HashMap<>(); }
} } else if (token == XContentParser.Token.VALUE_NUMBER) {
settings.put(currentFieldName, parser.objectText()); if ("compression".equals(currentFieldName)) {
compression = parser.doubleValue();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: [" + currentFieldName + "].");
} }
} else { } else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "]."); throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].");
} }
} }
PercentilesEstimator.Factory estimatorFactory = EstimatorType.TDIGEST.estimatorFactory(settings); return new PercentilesAggregator.Factory(aggregationName, vsParser.config(), percents, compression, keyed);
return new PercentilesAggregator.Factory(aggregationName, vsParser.config(), percents, estimatorFactory, keyed);
} }
/**
*
*/
public static enum EstimatorType {
TDIGEST() {
@Override
public PercentilesEstimator.Factory estimatorFactory(Map<String, Object> settings) {
return new TDigest.Factory(settings);
}
};
public abstract PercentilesEstimator.Factory estimatorFactory(Map<String, Object> settings);
public static EstimatorType resolve(String name, SearchContext context) {
if (name.equals("tdigest")) {
return TDIGEST;
}
throw new SearchParseException(context, "Unknown percentile estimator [" + name + "]");
}
}
} }

View File

@ -1,174 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics.percentiles.tdigest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesEstimator;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException;
import java.util.Map;
public class TDigest extends PercentilesEstimator {
public final static byte ID = 0;
private final BigArrays bigArrays;
private ObjectArray<TDigestState> states;
private final double compression;
public TDigest(double[] percents, double compression, long estimatedBucketsCount, AggregationContext context) {
super(percents);
bigArrays = context.bigArrays();
states = bigArrays.newObjectArray(estimatedBucketsCount);
this.compression = compression;
}
@Override
public void close() throws ElasticsearchException {
states.close();
}
public void offer(double value, long bucketOrd) {
states = bigArrays.grow(states, bucketOrd + 1);
TDigestState state = states.get(bucketOrd);
if (state == null) {
state = new TDigestState(compression);
states.set(bucketOrd, state);
}
state.add(value);
}
@Override
public PercentilesEstimator.Result result(long bucketOrd) {
if (bucketOrd >= states.size() || states.get(bucketOrd) == null) {
return emptyResult();
}
return new Result(percents, states.get(bucketOrd));
}
@Override
public PercentilesEstimator.Result emptyResult() {
return new Result(percents, new TDigestState(compression));
}
public static class Result extends PercentilesEstimator.Result<TDigest, Result> {
private TDigestState state;
public Result() {} // for serialization
public Result(double[] percents, TDigestState state) {
super(percents);
this.state = state;
}
@Override
protected byte id() {
return ID;
}
@Override
public double estimate(int index) {
return state.quantile(percents[index] / 100);
}
@Override
public Merger merger(int estimatedMerges) {
return new Merger();
}
public static Result read(StreamInput in) throws IOException {
Result result = new Result();
result.readFrom(in);
return result;
}
@Override
public void readFrom(StreamInput in) throws IOException {
this.percents = new double[in.readInt()];
for (int i = 0; i < percents.length; i++) {
percents[i] = in.readDouble();
}
state = TDigestState.read(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(percents.length);
for (int i = 0 ; i < percents.length; ++i) {
out.writeDouble(percents[i]);
}
TDigestState.write(state, out);
}
private class Merger implements PercentilesEstimator.Result.Merger<TDigest, Result> {
private Result merged;
@Override
public void add(Result result) {
if (merged == null || merged.state == null) {
merged = result;
return;
}
if (result.state == null || result.state.size() == 0) {
return;
}
merged.state.add(result.state);
}
@Override
public Result merge() {
return merged;
}
}
}
public static class Factory implements PercentilesEstimator.Factory {
private final double compression;
public Factory(Map<String, Object> settings) {
double compression = 100;
if (settings != null) {
Object compressionObject = settings.get("compression");
if (compressionObject != null) {
if (!(compressionObject instanceof Number)) {
throw new ElasticsearchIllegalArgumentException("tdigest compression must be number, got a " + compressionObject.getClass());
}
compression = ((Number) compressionObject).doubleValue();
}
}
this.compression = compression;
}
public TDigest create(double[] percents, long estimtedBucketCount, AggregationContext context) {
return new TDigest(percents, compression, estimtedBucketCount, context);
}
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Order;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles.Estimator.TDigest;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles.Percentile; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesBuilder; import org.elasticsearch.search.aggregations.metrics.percentiles.PercentilesBuilder;
import org.junit.Test; import org.junit.Test;
@ -63,11 +62,9 @@ public class PercentilesTests extends AbstractNumericTests {
return percentiles; return percentiles;
} }
private static PercentilesBuilder randomEstimator(PercentilesBuilder builder) { private static PercentilesBuilder randomCompression(PercentilesBuilder builder) {
if (randomBoolean()) { if (randomBoolean()) {
TDigest estimator = TDigest.tDigest(); builder.compression(randomIntBetween(20, 120) + randomDouble());
estimator.compression(randomIntBetween(20, 120) + randomDouble());
builder.estimator(estimator);
} }
return builder; return builder;
} }
@ -100,7 +97,7 @@ public class PercentilesTests extends AbstractNumericTests {
SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx") SearchResponse searchResponse = client().prepareSearch("empty_bucket_idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0) .addAggregation(histogram("histo").field("value").interval(1l).minDocCount(0)
.subAggregation(randomEstimator(percentiles("percentiles")) .subAggregation(randomCompression(percentiles("percentiles"))
.percentiles(10, 15))) .percentiles(10, 15)))
.execute().actionGet(); .execute().actionGet();
@ -121,7 +118,7 @@ public class PercentilesTests extends AbstractNumericTests {
public void testUnmapped() throws Exception { public void testUnmapped() throws Exception {
SearchResponse searchResponse = client().prepareSearch("idx_unmapped") SearchResponse searchResponse = client().prepareSearch("idx_unmapped")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("value") .field("value")
.percentiles(0, 10, 15, 100)) .percentiles(0, 10, 15, 100))
.execute().actionGet(); .execute().actionGet();
@ -142,7 +139,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("value") .field("value")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -158,7 +155,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx", "idx_unmapped") SearchResponse searchResponse = client().prepareSearch("idx", "idx_unmapped")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("value") .field("value")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -174,7 +171,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("value").script("_value - 1") .field("value").script("_value - 1")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -190,7 +187,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("value").script("_value - dec").param("dec", 1) .field("value").script("_value - dec").param("dec", 1)
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -206,7 +203,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("values") .field("values")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -222,7 +219,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("values").script("_value - 1") .field("values").script("_value - 1")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -238,7 +235,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("values").script("_value * -1") .field("values").script("_value * -1")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -254,7 +251,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.field("values").script("_value - dec").param("dec", 1) .field("values").script("_value - dec").param("dec", 1)
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -270,7 +267,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.script("doc['value'].value") .script("doc['value'].value")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -286,7 +283,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.script("doc['value'].value - dec").param("dec", 1) .script("doc['value'].value - dec").param("dec", 1)
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -302,7 +299,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.script("doc['value'].value - dec").param("dec", 1) .script("doc['value'].value - dec").param("dec", 1)
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -318,7 +315,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.script("doc['values'].values") .script("doc['values'].values")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -334,7 +331,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.script("doc['values'].values") .script("doc['values'].values")
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -350,7 +347,7 @@ public class PercentilesTests extends AbstractNumericTests {
final double[] pcts = randomPercentiles(); final double[] pcts = randomPercentiles();
SearchResponse searchResponse = client().prepareSearch("idx") SearchResponse searchResponse = client().prepareSearch("idx")
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation(randomEstimator(percentiles("percentiles")) .addAggregation(randomCompression(percentiles("percentiles"))
.script("List values = doc['values'].values; double[] res = new double[values.length]; for (int i = 0; i < res.length; i++) { res[i] = values.get(i) - dec; }; return res;").param("dec", 1) .script("List values = doc['values'].values; double[] res = new double[values.length]; for (int i = 0; i < res.length; i++) { res[i] = values.get(i) - dec; }; return res;").param("dec", 1)
.percentiles(pcts)) .percentiles(pcts))
.execute().actionGet(); .execute().actionGet();
@ -368,7 +365,7 @@ public class PercentilesTests extends AbstractNumericTests {
.setQuery(matchAllQuery()) .setQuery(matchAllQuery())
.addAggregation( .addAggregation(
histogram("histo").field("value").interval(2l) histogram("histo").field("value").interval(2l)
.subAggregation(randomEstimator(percentiles("percentiles").percentiles(99))) .subAggregation(randomCompression(percentiles("percentiles").percentiles(99)))
.order(Order.aggregation("percentiles", "99", asc))) .order(Order.aggregation("percentiles", "99", asc)))
.execute().actionGet(); .execute().actionGet();