Add early termination support for min/max aggregations (#33375)

This commit adds the support to early terminate the collection of a leaf
in the min/max aggregator. If the query matches all documents the min and max value
for a numeric field can be retrieved efficiently in the points reader.
This change applies this optimization when possible.
This commit is contained in:
Jim Ferenczi 2018-10-03 18:33:39 +02:00 committed by GitHub
parent 8f10c771e6
commit ee21067a41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 735 additions and 21 deletions

View File

@ -186,6 +186,11 @@ public class NumberFieldMapper extends FieldMapper {
return result;
}
@Override
public Number parsePoint(byte[] value) {
return HalfFloatPoint.decodeDimension(value, 0);
}
@Override
public Float parse(XContentParser parser, boolean coerce) throws IOException {
float parsed = parser.floatValue(coerce);
@ -278,6 +283,11 @@ public class NumberFieldMapper extends FieldMapper {
return result;
}
@Override
public Number parsePoint(byte[] value) {
return FloatPoint.decodeDimension(value, 0);
}
@Override
public Float parse(XContentParser parser, boolean coerce) throws IOException {
float parsed = parser.floatValue(coerce);
@ -359,6 +369,11 @@ public class NumberFieldMapper extends FieldMapper {
return parsed;
}
@Override
public Number parsePoint(byte[] value) {
return DoublePoint.decodeDimension(value, 0);
}
@Override
public Double parse(XContentParser parser, boolean coerce) throws IOException {
double parsed = parser.doubleValue(coerce);
@ -451,6 +466,11 @@ public class NumberFieldMapper extends FieldMapper {
return (byte) doubleValue;
}
@Override
public Number parsePoint(byte[] value) {
return INTEGER.parsePoint(value).byteValue();
}
@Override
public Short parse(XContentParser parser, boolean coerce) throws IOException {
int value = parser.intValue(coerce);
@ -507,6 +527,11 @@ public class NumberFieldMapper extends FieldMapper {
return (short) doubleValue;
}
@Override
public Number parsePoint(byte[] value) {
return INTEGER.parsePoint(value).shortValue();
}
@Override
public Short parse(XContentParser parser, boolean coerce) throws IOException {
return parser.shortValue(coerce);
@ -559,6 +584,11 @@ public class NumberFieldMapper extends FieldMapper {
return (int) doubleValue;
}
@Override
public Number parsePoint(byte[] value) {
return IntPoint.decodeDimension(value, 0);
}
@Override
public Integer parse(XContentParser parser, boolean coerce) throws IOException {
return parser.intValue(coerce);
@ -673,6 +703,11 @@ public class NumberFieldMapper extends FieldMapper {
return Numbers.toLong(stringValue, coerce);
}
@Override
public Number parsePoint(byte[] value) {
return LongPoint.decodeDimension(value, 0);
}
@Override
public Long parse(XContentParser parser, boolean coerce) throws IOException {
return parser.longValue(coerce);
@ -789,6 +824,7 @@ public class NumberFieldMapper extends FieldMapper {
boolean hasDocValues);
public abstract Number parse(XContentParser parser, boolean coerce) throws IOException;
public abstract Number parse(Object value, boolean coerce);
public abstract Number parsePoint(byte[] value);
public abstract List<Field> createFields(String name, Number value, boolean indexed,
boolean docValued, boolean stored);
Number valueForSearch(Number value) {
@ -937,6 +973,10 @@ public class NumberFieldMapper extends FieldMapper {
}
}
public Number parsePoint(byte[] value) {
return type.parsePoint(value);
}
@Override
public boolean equals(Object o) {
if (super.equals(o) == false) {

View File

@ -18,7 +18,12 @@
*/
package org.elasticsearch.search.aggregations.metrics;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FutureArrays;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
@ -33,30 +38,45 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.search.aggregations.metrics.MinAggregator.getPointReaderOrNull;
class MaxAggregator extends NumericMetricsAggregator.SingleValue {
final ValuesSource.Numeric valuesSource;
final DocValueFormat formatter;
final String pointField;
final Function<byte[], Number> pointConverter;
DoubleArray maxes;
MaxAggregator(String name, ValuesSource.Numeric valuesSource, DocValueFormat formatter,
SearchContext context,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
MaxAggregator(String name,
ValuesSourceConfig<ValuesSource.Numeric> config,
ValuesSource.Numeric valuesSource,
SearchContext context,
Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.valuesSource = valuesSource;
this.formatter = formatter;
if (valuesSource != null) {
maxes = context.bigArrays().newDoubleArray(1, false);
maxes.fill(0, maxes.size(), Double.NEGATIVE_INFINITY);
}
this.formatter = config.format();
this.pointConverter = getPointReaderOrNull(context, parent, config);
if (pointConverter != null) {
pointField = config.fieldContext().field();
} else {
pointField = null;
}
}
@Override
@ -68,8 +88,28 @@ class MaxAggregator extends NumericMetricsAggregator.SingleValue {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
if (parent != null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
// we have no parent and the values source is empty so we can skip collecting hits.
throw new CollectionTerminatedException();
}
}
if (pointConverter != null) {
Number segMax = findLeafMaxValue(ctx.reader(), pointField, pointConverter);
if (segMax != null) {
/**
* There is no parent aggregator (see {@link MinAggregator#getPointReaderOrNull}
* so the ordinal for the bucket is always 0.
*/
assert maxes.size() == 1;
double max = maxes.get(0);
max = Math.max(max, segMax.doubleValue());
maxes.set(0, max);
// the maximum value has been extracted, we don't need to collect hits on this segment.
throw new CollectionTerminatedException();
}
}
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
final NumericDoubleValues values = MultiValueMode.MAX.select(allValues);
@ -118,4 +158,48 @@ class MaxAggregator extends NumericMetricsAggregator.SingleValue {
public void doClose() {
Releasables.close(maxes);
}
/**
* Returns the maximum value indexed in the <code>fieldName</code> field or <code>null</code>
* if the value cannot be inferred from the indexed {@link PointValues}.
*/
static Number findLeafMaxValue(LeafReader reader, String fieldName, Function<byte[], Number> converter) throws IOException {
final PointValues pointValues = reader.getPointValues(fieldName);
if (pointValues == null) {
return null;
}
final Bits liveDocs = reader.getLiveDocs();
if (liveDocs == null) {
return converter.apply(pointValues.getMaxPackedValue());
}
int numBytes = pointValues.getBytesPerDimension();
final byte[] maxValue = pointValues.getMaxPackedValue();
final Number[] result = new Number[1];
pointValues.intersect(new PointValues.IntersectVisitor() {
@Override
public void visit(int docID) {
throw new UnsupportedOperationException();
}
@Override
public void visit(int docID, byte[] packedValue) {
if (liveDocs.get(docID)) {
// we need to collect all values in this leaf (the sort is ascending) where
// the last live doc is guaranteed to contain the max value for the segment.
result[0] = converter.apply(packedValue);
}
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
if (FutureArrays.equals(maxValue, 0, numBytes, maxPackedValue, 0, numBytes)) {
// we only check leaves that contain the max value for the segment.
return PointValues.Relation.CELL_CROSSES_QUERY;
} else {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}
});
return result[0];
}
}

View File

@ -43,13 +43,13 @@ class MaxAggregatorFactory extends ValuesSourceAggregatorFactory<ValuesSource.Nu
@Override
protected Aggregator createUnmapped(Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new MaxAggregator(name, null, config.format(), context, parent, pipelineAggregators, metaData);
return new MaxAggregator(name, config, null, context, parent, pipelineAggregators, metaData);
}
@Override
protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggregator parent,
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
return new MaxAggregator(name, valuesSource, config.format(), context, parent, pipelineAggregators, metaData);
return new MaxAggregator(name, config, valuesSource, context, parent, pipelineAggregators, metaData);
}
}

View File

@ -18,13 +18,23 @@
*/
package org.elasticsearch.search.aggregations.metrics;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.util.Bits;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.MultiValueMode;
import org.elasticsearch.search.aggregations.Aggregator;
@ -33,29 +43,44 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
class MinAggregator extends NumericMetricsAggregator.SingleValue {
final ValuesSource.Numeric valuesSource;
final DocValueFormat format;
final String pointField;
final Function<byte[], Number> pointConverter;
DoubleArray mins;
MinAggregator(String name, ValuesSource.Numeric valuesSource, DocValueFormat formatter,
SearchContext context, Aggregator parent, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
MinAggregator(String name,
ValuesSourceConfig<ValuesSource.Numeric> config,
ValuesSource.Numeric valuesSource,
SearchContext context,
Aggregator parent,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.valuesSource = valuesSource;
if (valuesSource != null) {
mins = context.bigArrays().newDoubleArray(1, false);
mins.fill(0, mins.size(), Double.POSITIVE_INFINITY);
}
this.format = formatter;
this.format = config.format();
this.pointConverter = getPointReaderOrNull(context, parent, config);
if (pointConverter != null) {
pointField = config.fieldContext().field();
} else {
pointField = null;
}
}
@Override
@ -67,7 +92,26 @@ class MinAggregator extends NumericMetricsAggregator.SingleValue {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
if (parent == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
// we have no parent and the values source is empty so we can skip collecting hits.
throw new CollectionTerminatedException();
}
}
if (pointConverter != null) {
Number segMin = findLeafMinValue(ctx.reader(), pointField, pointConverter);
if (segMin != null) {
/**
* There is no parent aggregator (see {@link MinAggregator#getPointReaderOrNull}
* so the ordinal for the bucket is always 0.
*/
double min = mins.get(0);
min = Math.min(min, segMin.doubleValue());
mins.set(0, min);
// the minimum value has been extracted, we don't need to collect hits on this segment.
throw new CollectionTerminatedException();
}
}
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
@ -117,4 +161,77 @@ class MinAggregator extends NumericMetricsAggregator.SingleValue {
public void doClose() {
Releasables.close(mins);
}
/**
* Returns a converter for point values if early termination is applicable to
* the context or <code>null</code> otherwise.
*
* @param context The {@link SearchContext} of the aggregation.
* @param parent The parent aggregator.
* @param config The config for the values source metric.
*/
static Function<byte[], Number> getPointReaderOrNull(SearchContext context, Aggregator parent,
ValuesSourceConfig<ValuesSource.Numeric> config) {
if (context.query() != null &&
context.query().getClass() != MatchAllDocsQuery.class) {
return null;
}
if (parent != null) {
return null;
}
if (config.fieldContext() != null && config.script() == null) {
MappedFieldType fieldType = config.fieldContext().fieldType();
if (fieldType == null || fieldType.indexOptions() == IndexOptions.NONE) {
return null;
}
Function<byte[], Number> converter = null;
if (fieldType instanceof NumberFieldMapper.NumberFieldType) {
converter = ((NumberFieldMapper.NumberFieldType) fieldType)::parsePoint;
} else if (fieldType.getClass() == DateFieldMapper.DateFieldType.class) {
converter = (in) -> LongPoint.decodeDimension(in, 0);
}
return converter;
}
return null;
}
/**
* Returns the minimum value indexed in the <code>fieldName</code> field or <code>null</code>
* if the value cannot be inferred from the indexed {@link PointValues}.
*/
static Number findLeafMinValue(LeafReader reader, String fieldName, Function<byte[], Number> converter) throws IOException {
final PointValues pointValues = reader.getPointValues(fieldName);
if (pointValues == null) {
return null;
}
final Bits liveDocs = reader.getLiveDocs();
if (liveDocs == null) {
return converter.apply(pointValues.getMinPackedValue());
}
final Number[] result = new Number[1];
try {
pointValues.intersect(new PointValues.IntersectVisitor() {
@Override
public void visit(int docID) {
throw new UnsupportedOperationException();
}
@Override
public void visit(int docID, byte[] packedValue) {
if (liveDocs.get(docID)) {
result[0] = converter.apply(packedValue);
// this is the first leaf with a live doc so the value is the minimum for this segment.
throw new CollectionTerminatedException();
}
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
return PointValues.Relation.CELL_CROSSES_QUERY;
}
});
} catch (CollectionTerminatedException e) {}
return result[0];
}
}

View File

@ -43,12 +43,12 @@ class MinAggregatorFactory extends ValuesSourceAggregatorFactory<ValuesSource.Nu
@Override
protected Aggregator createUnmapped(Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
throws IOException {
return new MinAggregator(name, null, config.format(), context, parent, pipelineAggregators, metaData);
return new MinAggregator(name, config, null, context, parent, pipelineAggregators, metaData);
}
@Override
protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
return new MinAggregator(name, valuesSource, config.format(), context, parent, pipelineAggregators, metaData);
return new MinAggregator(name, config, valuesSource, context, parent, pipelineAggregators, metaData);
}
}

View File

@ -45,6 +45,10 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource, AF
return config.timezone();
}
public ValuesSourceConfig<VS> getConfig() {
return config;
}
@Override
public Aggregator createInternal(Aggregator parent, boolean collectsFromSingleBucket,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.mapper;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.HalfFloatPoint;
import org.apache.lucene.document.IntPoint;
@ -53,6 +54,7 @@ import java.util.List;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class NumberFieldTypeTests extends FieldTypeTestCase {
@ -530,4 +532,49 @@ public class NumberFieldTypeTests extends FieldTypeTestCase {
assertEquals(Double.valueOf(1.2),
new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.DOUBLE).valueForDisplay(1.2));
}
public void testParsePoint() {
{
byte[] bytes = new byte[Integer.BYTES];
byte value = randomByte();
IntPoint.encodeDimension(value, bytes, 0);
assertThat(NumberType.BYTE.parsePoint(bytes), equalTo(value));
}
{
byte[] bytes = new byte[Integer.BYTES];
short value = randomShort();
IntPoint.encodeDimension(value, bytes, 0);
assertThat(NumberType.SHORT.parsePoint(bytes), equalTo(value));
}
{
byte[] bytes = new byte[Integer.BYTES];
int value = randomInt();
IntPoint.encodeDimension(value, bytes, 0);
assertThat(NumberType.INTEGER.parsePoint(bytes), equalTo(value));
}
{
byte[] bytes = new byte[Long.BYTES];
long value = randomLong();
LongPoint.encodeDimension(value, bytes, 0);
assertThat(NumberType.LONG.parsePoint(bytes), equalTo(value));
}
{
byte[] bytes = new byte[Float.BYTES];
float value = randomFloat();
FloatPoint.encodeDimension(value, bytes, 0);
assertThat(NumberType.FLOAT.parsePoint(bytes), equalTo(value));
}
{
byte[] bytes = new byte[Double.BYTES];
double value = randomDouble();
DoublePoint.encodeDimension(value, bytes, 0);
assertThat(NumberType.DOUBLE.parsePoint(bytes), equalTo(value));
}
{
byte[] bytes = new byte[Float.BYTES];
float value = 3f;
HalfFloatPoint.encodeDimension(value, bytes, 0);
assertThat(NumberType.HALF_FLOAT.parsePoint(bytes), equalTo(value));
}
}
}

View File

@ -19,27 +19,49 @@
package org.elasticsearch.search.aggregations.metrics;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FutureArrays;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static java.util.Collections.singleton;
import static org.hamcrest.Matchers.equalTo;
public class MaxAggregatorTests extends AggregatorTestCase {
public void testNoDocs() throws IOException {
@ -77,7 +99,6 @@ public class MaxAggregatorTests extends AggregatorTestCase {
});
}
public void testQueryFiltering() throws IOException {
testCase(IntPoint.newRangeQuery("number", 0, 5), iw -> {
iw.addDocument(Arrays.asList(new IntPoint("number", 7), new SortedNumericDocValuesField("number", 7)));
@ -96,8 +117,9 @@ public class MaxAggregatorTests extends AggregatorTestCase {
});
}
private void testCase(Query query, CheckedConsumer<RandomIndexWriter, IOException> buildIndex, Consumer<InternalMax> verify)
throws IOException {
private void testCase(Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalMax> verify) throws IOException {
Directory directory = newDirectory();
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
buildIndex.accept(indexWriter);
@ -107,10 +129,10 @@ public class MaxAggregatorTests extends AggregatorTestCase {
IndexSearcher indexSearcher = newSearcher(indexReader, true, true);
MaxAggregationBuilder aggregationBuilder = new MaxAggregationBuilder("_name").field("number");
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.INTEGER);
fieldType.setName("number");
MaxAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
MaxAggregator aggregator = createAggregator(query, aggregationBuilder, indexSearcher, createIndexSettings(), fieldType);
aggregator.preCollection();
indexSearcher.search(query, aggregator);
aggregator.postCollection();
@ -119,4 +141,110 @@ public class MaxAggregatorTests extends AggregatorTestCase {
indexReader.close();
directory.close();
}
public void testMaxShortcutRandom() throws Exception {
testMaxShortcutCase(
() -> randomLongBetween(Integer.MIN_VALUE, Integer.MAX_VALUE),
(n) -> new LongPoint("number", n.longValue()),
(v) -> LongPoint.decodeDimension(v, 0));
testMaxShortcutCase(
() -> randomInt(),
(n) -> new IntPoint("number", n.intValue()),
(v) -> IntPoint.decodeDimension(v, 0));
testMaxShortcutCase(
() -> randomFloat(),
(n) -> new FloatPoint("number", n.floatValue()),
(v) -> FloatPoint.decodeDimension(v, 0));
testMaxShortcutCase(
() -> randomDouble(),
(n) -> new DoublePoint("number", n.doubleValue()),
(v) -> DoublePoint.decodeDimension(v, 0));
}
private void testMaxShortcutCase(Supplier<Number> randomNumber,
Function<Number, Field> pointFieldFunc,
Function<byte[], Number> pointConvertFunc) throws IOException {
Directory directory = newDirectory();
IndexWriterConfig config = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE);
IndexWriter indexWriter = new IndexWriter(directory, config);
List<Document> documents = new ArrayList<>();
List<Tuple<Integer, Number>> values = new ArrayList<>();
int numValues = atLeast(50);
int docID = 0;
for (int i = 0; i < numValues; i++) {
int numDup = randomIntBetween(1, 3);
for (int j = 0; j < numDup; j++) {
Document document = new Document();
Number nextValue = randomNumber.get();
values.add(new Tuple<>(docID, nextValue));
document.add(new StringField("id", Integer.toString(docID), Field.Store.NO));
document.add(pointFieldFunc.apply(nextValue));
documents.add(document);
docID ++;
}
}
// insert some documents without a value for the metric field.
for (int i = 0; i < 3; i++) {
Document document = new Document();
documents.add(document);
}
indexWriter.addDocuments(documents);
Collections.sort(values, Comparator.comparingDouble(t -> t.v2().doubleValue()));
try (IndexReader reader = DirectoryReader.open(indexWriter)) {
LeafReaderContext ctx = reader.leaves().get(0);
Number res = MaxAggregator.findLeafMaxValue(ctx.reader(), "number" , pointConvertFunc);
assertThat(res, equalTo(values.get(values.size()-1).v2()));
}
for (int i = values.size()-1; i > 0; i--) {
indexWriter.deleteDocuments(new Term("id", values.get(i).v1().toString()));
try (IndexReader reader = DirectoryReader.open(indexWriter)) {
LeafReaderContext ctx = reader.leaves().get(0);
Number res = MaxAggregator.findLeafMaxValue(ctx.reader(), "number" , pointConvertFunc);
if (res != null) {
assertThat(res, equalTo(values.get(i - 1).v2()));
} else {
assertAllDeleted(ctx.reader().getLiveDocs(), ctx.reader().getPointValues("number"));
}
}
}
indexWriter.deleteDocuments(new Term("id", values.get(0).v1().toString()));
try (IndexReader reader = DirectoryReader.open(indexWriter)) {
LeafReaderContext ctx = reader.leaves().get(0);
Number res = MaxAggregator.findLeafMaxValue(ctx.reader(), "number" , pointConvertFunc);
assertThat(res, equalTo(null));
}
indexWriter.close();
directory.close();
}
// checks that documents inside the max leaves are all deleted
private void assertAllDeleted(Bits liveDocs, PointValues values) throws IOException {
final byte[] maxValue = values.getMaxPackedValue();
int numBytes = values.getBytesPerDimension();
final boolean[] seen = new boolean[1];
values.intersect(new PointValues.IntersectVisitor() {
@Override
public void visit(int docID) {
throw new AssertionError();
}
@Override
public void visit(int docID, byte[] packedValue) {
assertFalse(liveDocs.get(docID));
seen[0] = true;
}
@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
if (FutureArrays.equals(maxPackedValue, 0, numBytes, maxValue, 0, numBytes)) {
return PointValues.Relation.CELL_CROSSES_QUERY;
}
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
});
assertTrue(seen[0]);
}
}

View File

@ -40,6 +40,7 @@ import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.count;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.global;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
@ -392,4 +393,22 @@ public class MaxIT extends AbstractNumericTestCase {
assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache()
.getMissCount(), equalTo(1L));
}
public void testEarlyTermination() throws Exception {
SearchResponse searchResponse = client().prepareSearch("idx")
.setTrackTotalHits(false)
.setQuery(matchAllQuery())
.addAggregation(max("max").field("values"))
.addAggregation(count("count").field("values"))
.execute().actionGet();
Max max = searchResponse.getAggregations().get("max");
assertThat(max, notNullValue());
assertThat(max.getName(), equalTo("max"));
assertThat(max.getValue(), equalTo(12.0));
ValueCount count = searchResponse.getAggregations().get("count");
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));
}
}

View File

@ -16,20 +16,59 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.metrics;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.DoublePoint;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.FloatPoint;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.DoubleConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MinAggregatorTests extends AggregatorTestCase {
@ -38,21 +77,27 @@ public class MinAggregatorTests extends AggregatorTestCase {
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
Document document = new Document();
document.add(new NumericDocValuesField("number", 9));
document.add(new LongPoint("number", 9));
indexWriter.addDocument(document);
document = new Document();
document.add(new NumericDocValuesField("number", 7));
document.add(new LongPoint("number", 7));
indexWriter.addDocument(document);
document = new Document();
document.add(new NumericDocValuesField("number", 5));
document.add(new LongPoint("number", 5));
indexWriter.addDocument(document);
document = new Document();
document.add(new NumericDocValuesField("number", 3));
document.add(new LongPoint("number", 3));
indexWriter.addDocument(document);
document = new Document();
document.add(new NumericDocValuesField("number", 1));
document.add(new LongPoint("number", 1));
indexWriter.addDocument(document);
document = new Document();
document.add(new NumericDocValuesField("number", -1));
document.add(new LongPoint("number", -1));
indexWriter.addDocument(document);
indexWriter.close();
@ -63,6 +108,8 @@ public class MinAggregatorTests extends AggregatorTestCase {
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
fieldType.setName("number");
testMinCase(indexSearcher, aggregationBuilder, fieldType, min -> assertEquals(-1.0d, min, 0));
MinAggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
@ -80,14 +127,20 @@ public class MinAggregatorTests extends AggregatorTestCase {
Document document = new Document();
document.add(new SortedNumericDocValuesField("number", 9));
document.add(new SortedNumericDocValuesField("number", 7));
document.add(new LongPoint("number", 9));
document.add(new LongPoint("number", 7));
indexWriter.addDocument(document);
document = new Document();
document.add(new SortedNumericDocValuesField("number", 5));
document.add(new SortedNumericDocValuesField("number", 3));
document.add(new LongPoint("number", 5));
document.add(new LongPoint("number", 3));
indexWriter.addDocument(document);
document = new Document();
document.add(new SortedNumericDocValuesField("number", 1));
document.add(new SortedNumericDocValuesField("number", -1));
document.add(new LongPoint("number", 1));
document.add(new LongPoint("number", -1));
indexWriter.addDocument(document);
indexWriter.close();
@ -164,4 +217,207 @@ public class MinAggregatorTests extends AggregatorTestCase {
directory.close();
}
public void testShortcutIsApplicable() {
for (NumberFieldMapper.NumberType type : NumberFieldMapper.NumberType.values()) {
assertNotNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(new MatchAllDocsQuery()),
null,
mockNumericValuesSourceConfig("number", type, true)
)
);
assertNotNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(null),
null,
mockNumericValuesSourceConfig("number", type, true)
)
);
assertNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(null),
mockAggregator(),
mockNumericValuesSourceConfig("number", type, true)
)
);
assertNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(new TermQuery(new Term("foo", "bar"))),
null,
mockNumericValuesSourceConfig("number", type, true)
)
);
assertNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(null),
mockAggregator(),
mockNumericValuesSourceConfig("number", type, true)
)
);
assertNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(null),
null,
mockNumericValuesSourceConfig("number", type, false)
)
);
}
assertNotNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(new MatchAllDocsQuery()),
null,
mockDateValuesSourceConfig("number", true)
)
);
assertNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(new MatchAllDocsQuery()),
mockAggregator(),
mockDateValuesSourceConfig("number", true)
)
);
assertNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(new TermQuery(new Term("foo", "bar"))),
null,
mockDateValuesSourceConfig("number", true)
)
);
assertNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(null),
mockAggregator(),
mockDateValuesSourceConfig("number", true)
)
);
assertNull(
MinAggregator.getPointReaderOrNull(
mockSearchContext(null),
null,
mockDateValuesSourceConfig("number", false)
)
);
}
public void testMinShortcutRandom() throws Exception {
testMinShortcutCase(
() -> randomLongBetween(Integer.MIN_VALUE, Integer.MAX_VALUE),
(n) -> new LongPoint("number", n.longValue()),
(v) -> LongPoint.decodeDimension(v, 0));
testMinShortcutCase(
() -> randomInt(),
(n) -> new IntPoint("number", n.intValue()),
(v) -> IntPoint.decodeDimension(v, 0));
testMinShortcutCase(
() -> randomFloat(),
(n) -> new FloatPoint("number", n.floatValue()),
(v) -> FloatPoint.decodeDimension(v, 0));
testMinShortcutCase(
() -> randomDouble(),
(n) -> new DoublePoint("number", n.doubleValue()),
(v) -> DoublePoint.decodeDimension(v, 0));
}
private void testMinCase(IndexSearcher searcher,
AggregationBuilder aggregationBuilder,
MappedFieldType ft,
DoubleConsumer testResult) throws IOException {
Collection<Query> queries = Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(ft.name()));
for (Query query : queries) {
MinAggregator aggregator = createAggregator(query, aggregationBuilder, searcher, createIndexSettings(), ft);
aggregator.preCollection();
searcher.search(new MatchAllDocsQuery(), aggregator);
aggregator.postCollection();
InternalMin result = (InternalMin) aggregator.buildAggregation(0L);
testResult.accept(result.getValue());
}
}
private void testMinShortcutCase(Supplier<Number> randomNumber,
Function<Number, Field> pointFieldFunc,
Function<byte[], Number> pointConvertFunc) throws IOException {
Directory directory = newDirectory();
IndexWriterConfig config = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE);
IndexWriter indexWriter = new IndexWriter(directory, config);
List<Document> documents = new ArrayList<>();
List<Tuple<Integer, Number>> values = new ArrayList<>();
int numValues = atLeast(50);
int docID = 0;
for (int i = 0; i < numValues; i++) {
int numDup = randomIntBetween(1, 3);
for (int j = 0; j < numDup; j++) {
Document document = new Document();
Number nextValue = randomNumber.get();
values.add(new Tuple<>(docID, nextValue));
document.add(new StringField("id", Integer.toString(docID), Field.Store.NO));
document.add(pointFieldFunc.apply(nextValue));
document.add(pointFieldFunc.apply(nextValue));
documents.add(document);
docID ++;
}
}
// insert some documents without a value for the metric field.
for (int i = 0; i < 3; i++) {
Document document = new Document();
documents.add(document);
}
indexWriter.addDocuments(documents);
Collections.sort(values, Comparator.comparingDouble(t -> t.v2().doubleValue()));
try (IndexReader reader = DirectoryReader.open(indexWriter)) {
LeafReaderContext ctx = reader.leaves().get(0);
Number res = MinAggregator.findLeafMinValue(ctx.reader(), "number", pointConvertFunc);
assertThat(res, equalTo(values.get(0).v2()));
}
for (int i = 1; i < values.size(); i++) {
indexWriter.deleteDocuments(new Term("id", values.get(i-1).v1().toString()));
try (IndexReader reader = DirectoryReader.open(indexWriter)) {
LeafReaderContext ctx = reader.leaves().get(0);
Number res = MinAggregator.findLeafMinValue(ctx.reader(), "number", pointConvertFunc);
assertThat(res, equalTo(values.get(i).v2()));
}
}
indexWriter.deleteDocuments(new Term("id", values.get(values.size()-1).v1().toString()));
try (IndexReader reader = DirectoryReader.open(indexWriter)) {
LeafReaderContext ctx = reader.leaves().get(0);
Number res = MinAggregator.findLeafMinValue(ctx.reader(), "number", pointConvertFunc);
assertThat(res, equalTo(null));
}
indexWriter.close();
directory.close();
}
private SearchContext mockSearchContext(Query query) {
SearchContext searchContext = mock(SearchContext.class);
when(searchContext.query()).thenReturn(query);
return searchContext;
}
private Aggregator mockAggregator() {
return mock(Aggregator.class);
}
private ValuesSourceConfig<ValuesSource.Numeric> mockNumericValuesSourceConfig(String fieldName,
NumberFieldMapper.NumberType numType,
boolean indexed) {
ValuesSourceConfig<ValuesSource.Numeric> config = mock(ValuesSourceConfig.class);
MappedFieldType ft = new NumberFieldMapper.NumberFieldType(numType);
ft.setName(fieldName);
ft.setIndexOptions(indexed ? IndexOptions.DOCS : IndexOptions.NONE);
ft.freeze();
when(config.fieldContext()).thenReturn(new FieldContext(fieldName, null, ft));
return config;
}
private ValuesSourceConfig<ValuesSource.Numeric> mockDateValuesSourceConfig(String fieldName, boolean indexed) {
ValuesSourceConfig<ValuesSource.Numeric> config = mock(ValuesSourceConfig.class);
MappedFieldType ft = new DateFieldMapper.Builder(fieldName).fieldType();
ft.setName(fieldName);
ft.setIndexOptions(indexed ? IndexOptions.DOCS : IndexOptions.NONE);
ft.freeze();
when(config.fieldContext()).thenReturn(new FieldContext(fieldName, null, ft));
return config;
}
}

View File

@ -40,6 +40,7 @@ import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.count;
import static org.elasticsearch.search.aggregations.AggregationBuilders.filter;
import static org.elasticsearch.search.aggregations.AggregationBuilders.global;
import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram;
@ -404,4 +405,22 @@ public class MinIT extends AbstractNumericTestCase {
assertThat(client().admin().indices().prepareStats("cache_test_idx").setRequestCache(true).get().getTotal().getRequestCache()
.getMissCount(), equalTo(1L));
}
public void testEarlyTermination() throws Exception {
SearchResponse searchResponse = client().prepareSearch("idx")
.setTrackTotalHits(false)
.setQuery(matchAllQuery())
.addAggregation(min("min").field("values"))
.addAggregation(count("count").field("values"))
.execute().actionGet();
Min min = searchResponse.getAggregations().get("min");
assertThat(min, notNullValue());
assertThat(min.getName(), equalTo("min"));
assertThat(min.getValue(), equalTo(2.0));
ValueCount count = searchResponse.getAggregations().get("count");
assertThat(count.getName(), equalTo("count"));
assertThat(count.getValue(), equalTo(20L));
}
}