diff --git a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc index c37e84799ab..e6059358d1d 100644 --- a/docs/reference/aggregations/bucket/composite-aggregation.asciidoc +++ b/docs/reference/aggregations/bucket/composite-aggregation.asciidoc @@ -117,6 +117,7 @@ Example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -135,6 +136,7 @@ Like the `terms` aggregation it is also possible to use a script to create the v -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -170,6 +172,7 @@ Example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -188,6 +191,7 @@ The values are built from a numeric field or a script that return numerical valu -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -220,6 +224,7 @@ is specified by date/time expression: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -249,6 +254,7 @@ the format specified with the format parameter: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -291,6 +297,7 @@ For example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -313,6 +320,7 @@ in the composite buckets. -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -342,6 +350,7 @@ For example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -368,6 +377,7 @@ It is possible to include them in the response by setting `missing_bucket` to -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -393,7 +403,7 @@ first 10 composite buckets created from the values source. The response contains the values for each composite bucket in an array containing the values extracted from each value source. -==== After +==== Pagination If the number of composite buckets is too high (or unknown) to be returned in a single response it is possible to split the retrieval in multiple requests. @@ -407,6 +417,7 @@ For example: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -472,6 +483,7 @@ round of result can be retrieved with: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { @@ -489,6 +501,116 @@ GET /_search <1> Should restrict the aggregation to buckets that sort **after** the provided values. +==== Early termination + +For optimal performance the <> should be set on the index so that it matches +parts or fully the source order in the composite aggregation. +For instance the following index sort: + +[source,console] +-------------------------------------------------- +PUT twitter +{ + "settings" : { + "index" : { + "sort.field" : ["username", "timestamp"], <1> + "sort.order" : ["asc", "desc"] <2> + } + }, + "mappings": { + "properties": { + "username": { + "type": "keyword", + "doc_values": true + }, + "timestamp": { + "type": "date" + } + } + } +} +-------------------------------------------------- + +<1> This index is sorted by `username` first then by `timestamp`. +<2> ... in ascending order for the `username` field and in descending order for the `timestamp` field. + +.. could be used to optimize these composite aggregations: + +[source,console] +-------------------------------------------------- +GET /_search +{ + "size": 0, + "aggs" : { + "my_buckets": { + "composite" : { + "sources" : [ + { "user_name": { "terms" : { "field": "user_name" } } } <1> + ] + } + } + } +} +-------------------------------------------------- + +<1> `user_name` is a prefix of the index sort and the order matches (`asc`). + +[source,console] +-------------------------------------------------- +GET /_search +{ + "size": 0, + "aggs" : { + "my_buckets": { + "composite" : { + "sources" : [ + { "user_name": { "terms" : { "field": "user_name" } } }, <1> + { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } } <2> + ] + } + } + } +} +-------------------------------------------------- + +<1> `user_name` is a prefix of the index sort and the order matches (`asc`). +<2> `timestamp` matches also the prefix and the order matches (`desc`). + +In order to optimize the early termination it is advised to set `track_total_hits` in the request +to `false`. The number of total hits that match the request can be retrieved on the first request +and it would be costly to compute this number on every page: + +[source,console] +-------------------------------------------------- +GET /_search +{ + "size": 0, + "track_total_hits": false, + "aggs" : { + "my_buckets": { + "composite" : { + "sources" : [ + { "user_name": { "terms" : { "field": "user_name" } } }, + { "date": { "date_histogram": { "field": "timestamp", "calendar_interval": "1d", "order": "desc" } } } + ] + } + } + } +} +-------------------------------------------------- + +Note that the order of the source is important, in the example below switching the `user_name` with the `timestamp` +would deactivate the sort optimization since this configuration wouldn't match the index sort specification. +If the order of sources do not matter for your use case you can follow these simple guidelines: + + * Put the fields with the highest cardinality first. + * Make sure that the order of the field matches the order of the index sort. + * Put multi-valued fields last since they cannot be used for early termination. + +WARNING: <> can slowdown indexing, it is very important to test index sorting +with your specific use case and dataset to ensure that it matches your requirement. If it doesn't note that `composite` +aggregations will also try to early terminate on non-sorted indices if the query matches all document (`match_all` query). + ==== Sub-aggregations Like any `multi-bucket` aggregations the `composite` aggregation can hold sub-aggregations. @@ -501,6 +623,7 @@ per composite bucket: -------------------------------------------------- GET /_search { + "size": 0, "aggs" : { "my_buckets": { "composite" : { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java index b3712e231fd..243d1057bbf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregationBuilder.java @@ -235,7 +235,8 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder sourceNames; private final int[] reverseMuls; private final List formats; + private final CompositeKey rawAfterKey; + private final CompositeValuesSourceConfig[] sourceConfigs; private final SingleDimensionValuesSource[] sources; private final CompositeValuesCollectorQueue queue; @@ -73,6 +86,8 @@ final class CompositeAggregator extends BucketsAggregator { private RoaringDocIdSet.Builder docIdSetBuilder; private BucketCollector deferredCollectors; + private boolean earlyTerminated; + CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent, List pipelineAggregators, Map metaData, int size, CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey) throws IOException { @@ -89,11 +104,12 @@ final class CompositeAggregator extends BucketsAggregator { " to: [" + bucketLimit + "] but was [" + size + "]. This limit can be set by changing the [" + MAX_BUCKET_SETTING.getKey() + "] cluster level setting.", bucketLimit); } + this.sourceConfigs = sourceConfigs; for (int i = 0; i < sourceConfigs.length; i++) { this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), sourceConfigs[i], size); } this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); - this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query()); + this.rawAfterKey = rawAfterKey; } @Override @@ -121,7 +137,6 @@ final class CompositeAggregator extends BucketsAggregator { public InternalAggregation buildAggregation(long zeroBucket) throws IOException { assert zeroBucket == 0L; consumeBucketsAndMaybeBreak(queue.size()); - if (deferredCollectors != NO_OP_COLLECTOR) { // Replay all documents that contain at least one top bucket (collected during the first pass). runDeferredCollections(); @@ -138,13 +153,13 @@ final class CompositeAggregator extends BucketsAggregator { } CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null; return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls, - pipelineAggregators(), metaData()); + earlyTerminated, pipelineAggregators(), metaData()); } @Override public InternalAggregation buildEmptyAggregation() { return new InternalComposite(name, size, sourceNames, formats, Collections.emptyList(), null, reverseMuls, - pipelineAggregators(), metaData()); + false, pipelineAggregators(), metaData()); } private void finishLeaf() { @@ -156,58 +171,179 @@ final class CompositeAggregator extends BucketsAggregator { } } + /** Return true if the provided field may have multiple values per document in the leaf **/ + private boolean isMaybeMultivalued(LeafReaderContext context, SortField sortField) throws IOException { + SortField.Type type = IndexSortConfig.getSortFieldType(sortField); + switch (type) { + case STRING: + final SortedSetDocValues v1 = context.reader().getSortedSetDocValues(sortField.getField()); + return v1 != null && DocValues.unwrapSingleton(v1) == null; + + case DOUBLE: + case FLOAT: + case LONG: + case INT: + final SortedNumericDocValues v2 = context.reader().getSortedNumericDocValues(sortField.getField()); + return v2 != null && DocValues.unwrapSingleton(v2) == null; + + default: + // we have no clue whether the field is multi-valued or not so we assume it is. + return true; + } + } + + /** + * Returns the {@link Sort} prefix that is eligible to index sort + * optimization and null if index sort is not applicable. + */ + private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException { + Sort indexSort = context.reader().getMetaData().getSort(); + if (indexSort == null) { + return null; + } + List sortFields = new ArrayList<>(); + for (int i = 0; i < indexSort.getSort().length; i++) { + CompositeValuesSourceConfig sourceConfig = sourceConfigs[i]; + SingleDimensionValuesSource source = sources[i]; + SortField indexSortField = indexSort.getSort()[i]; + if (source.fieldType == null + // TODO: can we handle missing bucket when using index sort optimization ? + || source.missingBucket + || indexSortField.getField().equals(source.fieldType.name()) == false + || isMaybeMultivalued(context, indexSortField) + || sourceConfig.hasScript()) { + break; + } + + if (indexSortField.getReverse() != (source.reverseMul == -1)) { + if (i == 0) { + // the leading index sort matches the leading source field but the order is reversed + // so we don't check the other sources. + return new Sort(indexSortField); + } + break; + } + sortFields.add(indexSortField); + } + return sortFields.isEmpty() ? null : new Sort(sortFields.toArray(new SortField[0])); + } + + /** + * Return the number of leading sources that match the index sort. + * + * @param indexSortPrefix The index sort prefix that matches the sources + * @return The length of the index sort prefix if the sort order matches + * or -1 if the leading index sort is in the reverse order of the + * leading source. A value of 0 indicates that the index sort is + * not applicable. + */ + private int computeSortPrefixLen(Sort indexSortPrefix) { + if (indexSortPrefix == null) { + return 0; + } + if (indexSortPrefix.getSort()[0].getReverse() != (sources[0].reverseMul == -1)) { + assert indexSortPrefix.getSort().length == 1; + return -1; + } else { + return indexSortPrefix.getSort().length; + } + } + + private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) throws IOException { + DocValueFormat[] formats = new DocValueFormat[indexSortPrefix.getSort().length]; + for (int i = 0; i < formats.length; i++) { + formats[i] = sources[i].format; + } + FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(new SortAndFormats(indexSortPrefix, formats), + Arrays.copyOfRange(rawAfterKey.values(), 0, formats.length)); + if (indexSortPrefix.getSort().length < sources.length) { + // include all docs that belong to the partial bucket + fieldDoc.doc = 0; + } + BooleanQuery newQuery = new BooleanQuery.Builder() + .add(context.query(), BooleanClause.Occur.MUST) + .add(new SearchAfterSortedDocQuery(indexSortPrefix, fieldDoc), BooleanClause.Occur.FILTER) + .build(); + Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f); + Scorer scorer = weight.scorer(ctx); + if (scorer != null) { + DocIdSetIterator docIt = scorer.iterator(); + final LeafBucketCollector inner = queue.getLeafCollector(ctx, + getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length)); + inner.setScorer(scorer); + while (docIt.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + inner.collect(docIt.docID()); + } + } + } + @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { finishLeaf(); + boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; + + Sort indexSortPrefix = buildIndexSortPrefix(ctx); + int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); + + SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0 ? + sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) : null; if (sortedDocsProducer != null) { - /* - The producer will visit documents sorted by the leading source of the composite definition - and terminates when the leading source value is guaranteed to be greater than the lowest - composite bucket in the queue. - */ + // Visit documents sorted by the leading source of the composite definition and terminates + // when the leading source value is guaranteed to be greater than the lowest composite bucket + // in the queue. DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet); if (fillDocIdSet) { entries.add(new Entry(ctx, docIdSet)); } - - /* - We can bypass search entirely for this segment, all the processing has been done in the previous call. - Throwing this exception will terminate the execution of the search for this root aggregation, - see {@link org.apache.lucene.search.MultiCollector} for more details on how we handle early termination in aggregations. - */ + // We can bypass search entirely for this segment, the processing is done in the previous call. + // Throwing this exception will terminate the execution of the search for this root aggregation, + // see {@link MultiCollector} for more details on how we handle early termination in aggregations. + earlyTerminated = true; throw new CollectionTerminatedException(); } else { if (fillDocIdSet) { currentLeaf = ctx; docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); } - final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder)); - return new LeafBucketCollector() { - @Override - public void collect(int doc, long zeroBucket) throws IOException { - assert zeroBucket == 0L; - inner.collect(doc); - } - }; + if (rawAfterKey != null && sortPrefixLen > 0) { + // We have an after key and index sort is applicable so we jump directly to the doc + // that is after the index sort prefix using the rawAfterKey and we start collecting + // document from there. + processLeafFromQuery(ctx, indexSortPrefix); + throw new CollectionTerminatedException(); + } else { + final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen)); + return new LeafBucketCollector() { + @Override + public void collect(int doc, long zeroBucket) throws IOException { + assert zeroBucket == 0L; + inner.collect(doc); + } + }; + } } } /** * The first pass selects the top composite buckets from all matching documents. */ - private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder) { + private LeafBucketCollector getFirstPassCollector(RoaringDocIdSet.Builder builder, int indexSortPrefix) { return new LeafBucketCollector() { int lastDoc = -1; @Override public void collect(int doc, long bucket) throws IOException { - int slot = queue.addIfCompetitive(); - if (slot != -1) { - if (builder != null && lastDoc != doc) { - builder.add(doc); - lastDoc = doc; + try { + if (queue.addIfCompetitive(indexSortPrefix)) { + if (builder != null && lastDoc != doc) { + builder.add(doc); + lastDoc = doc; + } } + } catch (CollectionTerminatedException exc) { + earlyTerminated = true; + throw exc; } } }; @@ -274,7 +410,6 @@ final class CompositeAggregator extends BucketsAggregator { private SingleDimensionValuesSource createValuesSource(BigArrays bigArrays, IndexReader reader, CompositeValuesSourceConfig config, int size) { - final int reverseMul = config.reverseMul(); if (config.valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) { ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) config.valuesSource(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 58887d9e6a2..93511498e22 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.bucket.composite; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; @@ -63,6 +64,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue impleme private final int maxSize; private final Map map; private final SingleDimensionValuesSource[] arrays; + private IntArray docCounts; private boolean afterKeyIsSet = false; @@ -153,7 +155,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue impleme cmp = arrays[i].compare(slot1, slot2); } if (cmp != 0) { - return cmp; + return cmp > 0 ? i+1 : -(i+1); } } return 0; @@ -244,27 +246,57 @@ final class CompositeValuesCollectorQueue extends PriorityQueue impleme /** * Check if the current candidate should be added in the queue. - * @return The target slot of the candidate or -1 is the candidate is not competitive. + * @return true if the candidate is competitive (added or already in the queue). */ - int addIfCompetitive() { + boolean addIfCompetitive() { + return addIfCompetitive(0); + } + + + /** + * Add or update the current composite key in the queue if the values are competitive. + * + * @param indexSortSourcePrefix 0 if the index sort is null or doesn't match any of the sources field, + * a value greater than 0 indicates the prefix len of the sources that match the index sort + * and a negative value indicates that the index sort match the source field but the order is reversed. + * @return true if the candidate is competitive (added or already in the queue). + * + * @throws CollectionTerminatedException if the current collection can be terminated early due to index sorting. + */ + boolean addIfCompetitive(int indexSortSourcePrefix) { // checks if the candidate key is competitive Integer topSlot = compareCurrent(); if (topSlot != null) { // this key is already in the top N, skip it docCounts.increment(topSlot, 1); - return topSlot; + return true; } - if (afterKeyIsSet && compareCurrentWithAfter() <= 0) { - // this key is greater than the top value collected in the previous round, skip it - return -1; + if (afterKeyIsSet) { + int cmp = compareCurrentWithAfter(); + if (cmp <= 0) { + if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) { + // the leading index sort is in the reverse order of the leading source + // so we can early terminate when we reach a document that is smaller + // than the after key (collected on a previous page). + throw new CollectionTerminatedException(); + } + // key was collected on a previous page, skip it (>= afterKey). + return false; + } } - if (size() >= maxSize - // the tree map is full, check if the candidate key should be kept - && compare(CANDIDATE_SLOT, top()) > 0) { - // the candidate key is not competitive, skip it - return -1; + if (size() >= maxSize) { + // the tree map is full, check if the candidate key should be kept + int cmp = compare(CANDIDATE_SLOT, top()); + if (cmp > 0) { + if (cmp <= indexSortSourcePrefix) { + // index sort guarantees that there is no key greater or equal than the + // current one in the subsequent documents so we can early terminate. + throw new CollectionTerminatedException(); + } + // the candidate key is not competitive, skip it. + return false; + } } - // the candidate key is competitive final int newSlot; if (size() >= maxSize) { @@ -280,7 +312,7 @@ final class CompositeValuesCollectorQueue extends PriorityQueue impleme copyCurrent(newSlot); map.put(new Slot(newSlot), newSlot); add(newSlot); - return newSlot; + return true; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java index b9c92907866..bf7007a6544 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceBuilder.java @@ -221,7 +221,7 @@ public abstract class CompositeValuesSourceBuildertrue an explicit null bucket will represent documents with missing values. */ @SuppressWarnings("unchecked") public AB missingBucket(boolean missingBucket) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java index bf88285c190..5c9378d44ef 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesSourceConfig.java @@ -33,23 +33,28 @@ class CompositeValuesSourceConfig { private final DocValueFormat format; private final int reverseMul; private final boolean missingBucket; + private final boolean hasScript; /** * Creates a new {@link CompositeValuesSourceConfig}. + * * @param name The name of the source. * @param fieldType The field type or null if the source is a script. * @param vs The underlying {@link ValuesSource}. * @param format The {@link DocValueFormat} of this source. * @param order The sort order associated with this source. + * @param missingBucket If true an explicit null bucket will represent documents with missing values. + * @param hasScript true if the source contains a script that can change the value. */ CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format, - SortOrder order, boolean missingBucket) { + SortOrder order, boolean missingBucket, boolean hasScript) { this.name = name; this.fieldType = fieldType; this.vs = vs; this.format = format; this.reverseMul = order == SortOrder.ASC ? 1 : -1; this.missingBucket = missingBucket; + this.hasScript = hasScript; } /** @@ -88,6 +93,13 @@ class CompositeValuesSourceConfig { return missingBucket; } + /** + * Returns true if the source contains a script that can change the value. + */ + boolean hasScript() { + return hasScript; + } + /** * The sort order for the values source (e.g. -1 for descending and 1 for ascending). */ diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index f1c1f5502df..564399d4c26 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -228,7 +228,8 @@ public class DateHistogramValuesSourceBuilder // is specified in the builder. final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format(); final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; - return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), missingBucket()); + return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(), + missingBucket(), config.script() != null); } else { throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java index 17a5b3c0e99..b6f2b2788cd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GeoTileGridValuesSourceBuilder.java @@ -113,7 +113,8 @@ public class GeoTileGridValuesSourceBuilder extends CompositeValuesSourceBuilder // is specified in the builder. final MappedFieldType fieldType = config.fieldContext() != null ? config.fieldContext().fieldType() : null; CellIdSource cellIdSource = new CellIdSource(geoPoint, precision, GeoTileUtils::longEncode); - return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(), missingBucket()); + return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(), + missingBucket(), script() != null); } else { throw new IllegalArgumentException("invalid source, expected geo_point, got " + orig.getClass().getSimpleName()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java index daafa6f1441..aa15d5a6947 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/HistogramValuesSourceBuilder.java @@ -119,7 +119,8 @@ public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder sourceNames; private final List formats; + private final boolean earlyTerminated; + InternalComposite(String name, int size, List sourceNames, List formats, - List buckets, CompositeKey afterKey, int[] reverseMuls, + List buckets, CompositeKey afterKey, int[] reverseMuls, boolean earlyTerminated, List pipelineAggregators, Map metaData) { super(name, pipelineAggregators, metaData); this.sourceNames = sourceNames; @@ -64,6 +66,7 @@ public class InternalComposite this.afterKey = afterKey; this.size = size; this.reverseMuls = reverseMuls; + this.earlyTerminated = earlyTerminated; } public InternalComposite(StreamInput in) throws IOException { @@ -85,6 +88,7 @@ public class InternalComposite } else { this.afterKey = buckets.size() > 0 ? buckets.get(buckets.size()-1).key : null; } + this.earlyTerminated = in.getVersion().onOrAfter(Version.V_7_6_0) ? in.readBoolean() : false; } @Override @@ -104,6 +108,9 @@ public class InternalComposite afterKey.writeTo(out); } } + if (out.getVersion().onOrAfter(Version.V_7_6_0)) { + out.writeBoolean(earlyTerminated); + } } @Override @@ -124,7 +131,7 @@ public class InternalComposite * to be able to retrieve the next page even if all buckets have been filtered. */ return new InternalComposite(name, size, sourceNames, formats, newBuckets, afterKey, - reverseMuls, pipelineAggregators(), getMetaData()); + reverseMuls, earlyTerminated, pipelineAggregators(), getMetaData()); } @Override @@ -150,6 +157,11 @@ public class InternalComposite return null; } + // Visible for tests + boolean isTerminatedEarly() { + return earlyTerminated; + } + // Visible for tests int[] getReverseMuls() { return reverseMuls; @@ -158,8 +170,10 @@ public class InternalComposite @Override public InternalAggregation reduce(List aggregations, ReduceContext reduceContext) { PriorityQueue pq = new PriorityQueue<>(aggregations.size()); + boolean earlyTerminated = false; for (InternalAggregation agg : aggregations) { InternalComposite sortedAgg = (InternalComposite) agg; + earlyTerminated |= sortedAgg.earlyTerminated; BucketIterator it = new BucketIterator(sortedAgg.buckets); if (it.next() != null) { pq.add(it); @@ -191,7 +205,8 @@ public class InternalComposite result.add(reduceBucket); } final CompositeKey lastKey = result.size() > 0 ? result.get(result.size()-1).getRawKey() : null; - return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, pipelineAggregators(), metaData); + return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, + earlyTerminated, pipelineAggregators(), metaData); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java index 63530a4eed6..01e2c0a3ae5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -66,8 +66,7 @@ abstract class SortedDocsProducer { @Override public void collect(int doc, long bucket) throws IOException { hasCollected[0] = true; - int slot = queue.addIfCompetitive(); - if (slot != -1) { + if (queue.addIfCompetitive()) { topCompositeCollected[0]++; if (adder != null && doc != lastDoc) { if (remainingBits == 0) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java index 8d02eb4b19d..cd88a56614e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/TermsValuesSourceBuilder.java @@ -85,6 +85,6 @@ public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder { throw new IllegalStateException("now() is not allowed in [search_after] key"); }); case FLOAT: if (value instanceof Number) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java index 1520dfde8a1..601154234e7 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregatorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.aggregations.bucket.composite; +import org.apache.lucene.analysis.MockAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.DoublePoint; import org.apache.lucene.document.Field; @@ -31,17 +32,28 @@ import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.index.Term; import org.apache.lucene.search.DocValuesFieldExistsQuery; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; +import org.apache.lucene.util.TestUtil; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatters; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.GeoPointFieldMapper; @@ -63,6 +75,7 @@ import org.elasticsearch.search.aggregations.metrics.TopHits; import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.test.IndexSettingsModule; import org.junit.After; import org.junit.Before; @@ -82,12 +95,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -public class CompositeAggregatorTests extends AggregatorTestCase { +public class CompositeAggregatorTests extends AggregatorTestCase { private static MappedFieldType[] FIELD_TYPES; @Override @@ -109,6 +123,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase { DateFieldMapper.Builder builder = new DateFieldMapper.Builder("date"); builder.docValues(true); + builder.format("yyyy-MM-dd||epoch_millis"); DateFieldMapper fieldMapper = builder.build(new Mapper.BuilderContext(createIndexSettings().getSettings(), new ContentPath(0))); FIELD_TYPES[3] = fieldMapper.fieldType(); @@ -419,7 +434,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase { ); } - public void testWithKeywordDesc() throws Exception { + public void testWithKeywordDesc() throws Exception { final List>> dataset = new ArrayList<>(); dataset.addAll( Arrays.asList( @@ -485,19 +500,19 @@ public class CompositeAggregatorTests extends AggregatorTestCase { return new CompositeAggregationBuilder("name", Collections.singletonList(terms)); }, (result) -> { - assertEquals(5, result.getBuckets().size()); - assertEquals("{keyword=z}", result.afterKey().toString()); - assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString()); - assertEquals(2L, result.getBuckets().get(0).getDocCount()); - assertEquals("{keyword=b}", result.getBuckets().get(1).getKeyAsString()); - assertEquals(2L, result.getBuckets().get(1).getDocCount()); - assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(2).getDocCount()); - assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(3).getDocCount()); - assertEquals("{keyword=z}", result.getBuckets().get(4).getKeyAsString()); - assertEquals(1L, result.getBuckets().get(4).getDocCount()); - } + assertEquals(5, result.getBuckets().size()); + assertEquals("{keyword=z}", result.afterKey().toString()); + assertEquals("{keyword=a}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=b}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(2L, result.getBuckets().get(1).getDocCount()); + assertEquals("{keyword=c}", result.getBuckets().get(2).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(2).getDocCount()); + assertEquals("{keyword=d}", result.getBuckets().get(3).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(3).getDocCount()); + assertEquals("{keyword=z}", result.getBuckets().get(4).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(4).getDocCount()); + } ); testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, @@ -589,10 +604,10 @@ public class CompositeAggregatorTests extends AggregatorTestCase { ); testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", - Arrays.asList( - new TermsValuesSourceBuilder("keyword").field("keyword"), - new TermsValuesSourceBuilder("long").field("long") - ) + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + ) ), (result) -> { assertEquals(4, result.getBuckets().size()); @@ -610,11 +625,11 @@ public class CompositeAggregatorTests extends AggregatorTestCase { testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery("keyword")), dataset, () -> new CompositeAggregationBuilder("name", - Arrays.asList( - new TermsValuesSourceBuilder("keyword").field("keyword"), - new TermsValuesSourceBuilder("long").field("long") - ) - ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L) + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + ) + ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L) ), (result) -> { assertEquals(2, result.getBuckets().size()); @@ -942,7 +957,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase { new TermsValuesSourceBuilder("double").field("double") ) ).aggregateAfter(createAfterKey("keyword", "a", "long", 100L, "double", 0.4d)) - ,(result) -> { + , (result) -> { assertEquals(10, result.getBuckets().size()); assertEquals("{keyword=z, long=0, double=0.09}", result.afterKey().toString()); assertEquals("{keyword=b, long=100, double=0.4}", result.getBuckets().get(0).getKeyAsString()); @@ -1152,8 +1167,9 @@ public class CompositeAggregatorTests extends AggregatorTestCase { return new CompositeAggregationBuilder("name", Collections.singletonList(histo)) .aggregateAfter(createAfterKey("date", "now")); }, - (result) -> {} - )); + (result) -> { + } + )); assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(exc.getCause().getMessage(), containsString("now() is not supported in [after] key")); @@ -1167,7 +1183,8 @@ public class CompositeAggregatorTests extends AggregatorTestCase { return new CompositeAggregationBuilder("name", Collections.singletonList(histo)) .aggregateAfter(createAfterKey("date", "1474329600000")); }, - (result) -> {} + (result) -> { + } )); assertThat(exc.getMessage(), containsString("failed to parse date field [1474329600000]")); assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future."); @@ -1486,7 +1503,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase { new DateHistogramValuesSourceBuilder("date_histo").field("date") .dateHistogramInterval(DateHistogramInterval.days(1)) ) - ).aggregateAfter(createAfterKey("keyword","c", "date_histo", 1474329600000L)) + ).aggregateAfter(createAfterKey("keyword", "c", "date_histo", 1474329600000L)) , (result) -> { assertEquals(4, result.getBuckets().size()); assertEquals("{keyword=z, date_histo=1474329600000}", result.afterKey().toString()); @@ -1668,7 +1685,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase { builders.add(new TermsValuesSourceBuilder("duplicate1").field("baz")); builders.add(new TermsValuesSourceBuilder("duplicate2").field("bar")); builders.add(new TermsValuesSourceBuilder("duplicate2").field("baz")); - new CompositeAggregationBuilder("foo", builders); + new CompositeAggregationBuilder("foo", builders); }); assertThat(e.getMessage(), equalTo("Composite source names must be unique, found duplicates: [duplicate2, duplicate1]")); } @@ -1705,7 +1722,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase { List>> dataset = new ArrayList<>(); Set valuesSet = new HashSet<>(); - Map, AtomicLong> expectedDocCounts = new HashMap<> (); + Map, AtomicLong> expectedDocCounts = new HashMap<>(); for (int i = 0; i < numDocs; i++) { int numValues = randomIntBetween(1, 5); Set values = new HashSet<>(); @@ -1725,13 +1742,13 @@ public class CompositeAggregatorTests extends AggregatorTestCase { List> seen = new ArrayList<>(); AtomicBoolean finish = new AtomicBoolean(false); - int size = randomIntBetween(1, expected.size()); + int size = randomIntBetween(1, expected.size()); while (finish.get() == false) { testSearchCase(Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(field)), dataset, () -> { Map afterKey = null; if (seen.size() > 0) { - afterKey = Collections.singletonMap(field, seen.get(seen.size()-1)); + afterKey = Collections.singletonMap(field, seen.get(seen.size() - 1)); } TermsValuesSourceBuilder source = new TermsValuesSourceBuilder(field).field(field); return new CompositeAggregationBuilder("name", Collections.singletonList(source)) @@ -1838,44 +1855,130 @@ public class CompositeAggregatorTests extends AggregatorTestCase { ); } + public void testEarlyTermination() throws Exception { + final List>> dataset = new ArrayList<>(); + dataset.addAll( + Arrays.asList( + createDocument("keyword", "a", "long", 100L, "foo", "bar"), + createDocument("keyword", "c", "long", 100L, "foo", "bar"), + createDocument("keyword", "a", "long", 0L, "foo", "bar"), + createDocument("keyword", "d", "long", 10L, "foo", "bar"), + createDocument("keyword", "b", "long", 10L, "foo", "bar"), + createDocument("keyword", "c", "long", 10L, "foo", "bar"), + createDocument("keyword", "e", "long", 100L, "foo", "bar"), + createDocument("keyword", "e", "long", 10L, "foo", "bar") + ) + ); + + executeTestCase(true, false, new TermQuery(new Term("foo", "bar")), + dataset, + () -> + new CompositeAggregationBuilder("name", + Arrays.asList( + new TermsValuesSourceBuilder("keyword").field("keyword"), + new TermsValuesSourceBuilder("long").field("long") + )).aggregateAfter(createAfterKey("keyword", "b", "long", 10L)).size(2), + (result) -> { + assertEquals(2, result.getBuckets().size()); + assertEquals("{keyword=c, long=100}", result.afterKey().toString()); + assertEquals("{keyword=c, long=10}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=c, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + assertTrue(result.isTerminatedEarly()); + } + ); + + // source field and index sorting config have different order + executeTestCase(true, false, new TermQuery(new Term("foo", "bar")), + dataset, + () -> + new CompositeAggregationBuilder("name", + Arrays.asList( + // reverse source order + new TermsValuesSourceBuilder("keyword").field("keyword").order(SortOrder.DESC), + new TermsValuesSourceBuilder("long").field("long").order(SortOrder.DESC) + ) + ).aggregateAfter(createAfterKey("keyword", "c", "long", 10L)).size(2), + (result) -> { + assertEquals(2, result.getBuckets().size()); + assertEquals("{keyword=a, long=100}", result.afterKey().toString()); + assertEquals("{keyword=b, long=10}", result.getBuckets().get(0).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(0).getDocCount()); + assertEquals("{keyword=a, long=100}", result.getBuckets().get(1).getKeyAsString()); + assertEquals(1L, result.getBuckets().get(1).getDocCount()); + assertTrue(result.isTerminatedEarly()); + } + ); + } + private void testSearchCase(List queries, List>> dataset, Supplier create, Consumer verify) throws IOException { for (Query query : queries) { - executeTestCase(false, query, dataset, create, verify); - executeTestCase(true, query, dataset, create, verify); + executeTestCase(false, false, query, dataset, create, verify); + executeTestCase(false, true, query, dataset, create, verify); + executeTestCase(true, true, query, dataset, create, verify); } } - private void executeTestCase(boolean reduced, + private void executeTestCase(boolean useIndexSort, + boolean reduced, Query query, List>> dataset, Supplier create, Consumer verify) throws IOException { + Map types = + Arrays.stream(FIELD_TYPES).collect(Collectors.toMap(MappedFieldType::name, Function.identity())); + CompositeAggregationBuilder aggregationBuilder = create.get(); + Sort indexSort = useIndexSort ? buildIndexSort(aggregationBuilder.sources(), types) : null; + IndexSettings indexSettings = createIndexSettings(indexSort); try (Directory directory = newDirectory()) { - try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())); + if (indexSort != null) { + config.setIndexSort(indexSort); + config.setCodec(TestUtil.getDefaultCodec()); + } + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, config)) { Document document = new Document(); for (Map> fields : dataset) { addToDocument(document, fields); indexWriter.addDocument(document); document.clear(); } + if (reduced == false && randomBoolean()) { + indexWriter.forceMerge(1); + } } try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = new IndexSearcher(indexReader); - CompositeAggregationBuilder aggregationBuilder = create.get(); final InternalComposite composite; if (reduced) { - composite = searchAndReduce(indexSearcher, query, aggregationBuilder, FIELD_TYPES); + composite = searchAndReduce(indexSettings, indexSearcher, query, aggregationBuilder, FIELD_TYPES); } else { - composite = search(indexSearcher, query, aggregationBuilder, FIELD_TYPES); + composite = search(indexSettings, indexSearcher, query, aggregationBuilder, FIELD_TYPES); } verify.accept(composite); } } } + private static IndexSettings createIndexSettings(Sort sort) { + Settings.Builder builder = Settings.builder(); + if (sort != null) { + String[] fields = Arrays.stream(sort.getSort()) + .map(SortField::getField) + .toArray(String[]::new); + String[] orders = Arrays.stream(sort.getSort()) + .map((o) -> o.getReverse() ? "desc" : "asc") + .toArray(String[]::new); + builder.putList("index.sort.field", fields); + builder.putList("index.sort.order", orders); + } + return IndexSettingsModule.newIndexSettings(new Index("_index", "0"), builder.build()); + } + private void addToDocument(Document doc, Map> keys) { for (Map.Entry> entry : keys.entrySet()) { final String name = entry.getKey(); @@ -1935,4 +2038,43 @@ public class CompositeAggregatorTests extends AggregatorTestCase { private static long asLong(String dateTime) { return DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(dateTime)).toInstant().toEpochMilli(); } + + private static Sort buildIndexSort(List> sources, Map fieldTypes) { + List sortFields = new ArrayList<>(); + for (CompositeValuesSourceBuilder source : sources) { + MappedFieldType type = fieldTypes.get(source.field()); + if (type instanceof KeywordFieldMapper.KeywordFieldType) { + sortFields.add(new SortedSetSortField(type.name(), false)); + } else if (type instanceof DateFieldMapper.DateFieldType) { + sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.LONG, false)); + } else if (type instanceof NumberFieldMapper.NumberFieldType) { + boolean comp = false; + switch (type.typeName()) { + case "byte": + case "short": + case "integer": + comp = true; + sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.INT, false)); + break; + + case "long": + sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.LONG, false)); + break; + + case "float": + case "double": + comp = true; + sortFields.add(new SortedNumericSortField(type.name(), SortField.Type.DOUBLE, false)); + break; + + default: + break; + } + if (comp == false) { + break; + } + } + } + return sortFields.size() > 0 ? new Sort(sortFields.toArray(new SortField[0])) : null; + } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java index 6516309de96..ff893314e59 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java @@ -29,10 +29,16 @@ import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DocValues; import org.apache.lucene.index.IndexOptions; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.CollectionTerminatedException; import org.apache.lucene.search.DocIdSet; import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.search.SortedSetSortField; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -56,6 +62,7 @@ import java.util.Set; import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.DOUBLE; import static org.elasticsearch.index.mapper.NumberFieldMapper.NumberType.LONG; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class CompositeValuesCollectorQueueTests extends AggregatorTestCase { static class ClassAndName { @@ -133,31 +140,47 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase { } private void testRandomCase(ClassAndName... types) throws IOException { - testRandomCase(true, true, types); - testRandomCase(true, false, types); - testRandomCase(false, true, types); - testRandomCase(false, false, types); + for (int i = 0; i < types.length; i++) { + testRandomCase(true, true, i, types); + testRandomCase(true, false, i, types); + testRandomCase(false, true, i, types); + testRandomCase(false, false, i, types); + } } - private void testRandomCase(boolean forceMerge, boolean missingBucket, ClassAndName... types) throws IOException { + private void testRandomCase(boolean forceMerge, + boolean missingBucket, + int indexSortSourcePrefix, + ClassAndName... types) throws IOException { final BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE; int numDocs = randomIntBetween(50, 100); List[]> possibleValues = new ArrayList<>(); - for (ClassAndName type : types) { + SortField[] indexSortFields = indexSortSourcePrefix == 0 ? null : new SortField[indexSortSourcePrefix]; + for (int i = 0; i < types.length; i++) { + ClassAndName type = types[i]; final Comparable[] values; int numValues = randomIntBetween(1, numDocs * 2); values = new Comparable[numValues]; if (type.clazz == Long.class) { - for (int i = 0; i < numValues; i++) { - values[i] = randomLong(); + if (i < indexSortSourcePrefix) { + indexSortFields[i] = new SortedNumericSortField(type.fieldType.name(), SortField.Type.LONG); + } + for (int j = 0; j < numValues; j++) { + values[j] = randomLong(); } } else if (type.clazz == Double.class) { - for (int i = 0; i < numValues; i++) { - values[i] = randomDouble(); + if (i < indexSortSourcePrefix) { + indexSortFields[i] = new SortedNumericSortField(type.fieldType.name(), SortField.Type.DOUBLE); + } + for (int j = 0; j < numValues; j++) { + values[j] = randomDouble(); } } else if (type.clazz == BytesRef.class) { - for (int i = 0; i < numValues; i++) { - values[i] = new BytesRef(randomAlphaOfLengthBetween(5, 50)); + if (i < indexSortSourcePrefix) { + indexSortFields[i] = new SortedSetSortField(type.fieldType.name(), false); + } + for (int j = 0; j < numValues; j++) { + values[j] = new BytesRef(randomAlphaOfLengthBetween(5, 50)); } } else { assert (false); @@ -167,13 +190,17 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase { Set keys = new HashSet<>(); try (Directory directory = newDirectory()) { + final IndexWriterConfig writerConfig = newIndexWriterConfig(); + if (indexSortFields != null) { + writerConfig.setIndexSort(new Sort(indexSortFields)); + } try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory, new KeywordAnalyzer())) { for (int i = 0; i < numDocs; i++) { Document document = new Document(); List>> docValues = new ArrayList<>(); boolean hasAllField = true; for (int j = 0; j < types.length; j++) { - int numValues = randomIntBetween(0, 5); + int numValues = indexSortSourcePrefix-1 >= j ? 1 : randomIntBetween(0, 5); List> values = new ArrayList<>(); if (numValues == 0) { hasAllField = false; @@ -212,7 +239,7 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase { } } IndexReader reader = DirectoryReader.open(directory); - int size = randomIntBetween(1, keys.size()); + int size = keys.size() > 1 ? randomIntBetween(1, keys.size()) : 1; SingleDimensionValuesSource[] sources = new SingleDimensionValuesSource[types.length]; for (int i = 0; i < types.length; i++) { final MappedFieldType fieldType = types[i].fieldType; @@ -276,21 +303,25 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase { new CompositeValuesCollectorQueue(BigArrays.NON_RECYCLING_INSTANCE, sources, size, last); final SortedDocsProducer docsProducer = sources[0].createSortedDocsProducerOrNull(reader, new MatchAllDocsQuery()); for (LeafReaderContext leafReaderContext : reader.leaves()) { - final LeafBucketCollector leafCollector = new LeafBucketCollector() { - @Override - public void collect(int doc, long bucket) throws IOException { - queue.addIfCompetitive(); - } - }; if (docsProducer != null && withProducer) { assertEquals(DocIdSet.EMPTY, docsProducer.processLeaf(new MatchAllDocsQuery(), queue, leafReaderContext, false)); } else { + final LeafBucketCollector leafCollector = new LeafBucketCollector() { + @Override + public void collect(int doc, long bucket) throws IOException { + queue.addIfCompetitive(indexSortSourcePrefix); + } + }; final LeafBucketCollector queueCollector = queue.getLeafCollector(leafReaderContext, leafCollector); final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); for (int i = 0; i < leafReaderContext.reader().maxDoc(); i++) { if (liveDocs == null || liveDocs.get(i)) { - queueCollector.collect(i); + try { + queueCollector.collect(i); + } catch (CollectionTerminatedException exc) { + assertThat(indexSortSourcePrefix, greaterThan(0)); + } } } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java index 725600b3551..756b6439504 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/InternalCompositeTests.java @@ -170,7 +170,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa } Collections.sort(buckets, (o1, o2) -> o1.compareKey(o2)); CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size()-1).getRawKey() : null; - return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, + return new InternalComposite(name, size, sourceNames, formats, buckets, lastBucket, reverseMuls, randomBoolean(), Collections.emptyList(), metaData); } @@ -207,7 +207,7 @@ public class InternalCompositeTests extends InternalMultiBucketAggregationTestCa } CompositeKey lastBucket = buckets.size() > 0 ? buckets.get(buckets.size()-1).getRawKey() : null; return new InternalComposite(instance.getName(), instance.getSize(), sourceNames, formats, buckets, lastBucket, reverseMuls, - instance.pipelineAggregators(), metaData); + randomBoolean(), instance.pipelineAggregators(), metaData); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 5d361fa1c5b..843ee063834 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -304,7 +304,15 @@ public abstract class AggregatorTestCase extends ESTestCase { Query query, AggregationBuilder builder, MappedFieldType... fieldTypes) throws IOException { - return search(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + return search(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + } + + protected A search(IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + MappedFieldType... fieldTypes) throws IOException { + return search(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); } protected A search(IndexSearcher searcher, @@ -312,8 +320,17 @@ public abstract class AggregatorTestCase extends ESTestCase { AggregationBuilder builder, int maxBucket, MappedFieldType... fieldTypes) throws IOException { + return search(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes); + } + + protected A search(IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + MappedFieldType... fieldTypes) throws IOException { MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(maxBucket); - C a = createAggregator(query, builder, searcher, bucketConsumer, fieldTypes); + C a = createAggregator(query, builder, searcher, indexSettings, bucketConsumer, fieldTypes); a.preCollection(); searcher.search(query, a); a.postCollection(); @@ -327,7 +344,23 @@ public abstract class AggregatorTestCase extends ESTestCase { Query query, AggregationBuilder builder, MappedFieldType... fieldTypes) throws IOException { - return searchAndReduce(searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + } + + protected A searchAndReduce(IndexSettings indexSettings, + IndexSearcher searcher, + Query query, + AggregationBuilder builder, + MappedFieldType... fieldTypes) throws IOException { + return searchAndReduce(indexSettings, searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes); + } + + protected A searchAndReduce(IndexSearcher searcher, + Query query, + AggregationBuilder builder, + int maxBucket, + MappedFieldType... fieldTypes) throws IOException { + return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes); } /** @@ -335,7 +368,8 @@ public abstract class AggregatorTestCase extends ESTestCase { * builds an aggregator for each sub-searcher filtered by the provided {@link Query} and * returns the reduced {@link InternalAggregation}. */ - protected A searchAndReduce(IndexSearcher searcher, + protected A searchAndReduce(IndexSettings indexSettings, + IndexSearcher searcher, Query query, AggregationBuilder builder, int maxBucket, @@ -364,7 +398,7 @@ public abstract class AggregatorTestCase extends ESTestCase { for (ShardSearcher subSearcher : subSearchers) { MultiBucketConsumer shardBucketConsumer = new MultiBucketConsumer(maxBucket); - C a = createAggregator(query, builder, subSearcher, shardBucketConsumer, fieldTypes); + C a = createAggregator(query, builder, subSearcher, indexSettings, shardBucketConsumer, fieldTypes); a.preCollection(); subSearcher.search(weight, a); a.postCollection();