diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml index 26c530551a9..325bdf8f18e 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search.aggregation/230_composite.yml @@ -335,10 +335,11 @@ setup: --- "Composite aggregation and array size": - skip: - version: " - 6.3.99" - reason: starting in 6.4 the composite sources do not allocate arrays eagerly. + version: " - 6.99.99" + reason: starting in 7.0 the composite aggregation throws an execption if the provided size is greater than search.max_buckets. - do: + catch: /.*Trying to create too many buckets.*/ search: rest_total_hits_as_int: true index: test @@ -356,8 +357,3 @@ setup: } } ] - - - match: {hits.total: 6} - - length: { aggregations.test.buckets: 2 } - - length: { aggregations.test.after_key: 1 } - - match: { aggregations.test.after_key.keyword: "foo" } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java index 63ba70c6f23..b302c40c3bd 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/MultiBucketConsumerService.java @@ -118,6 +118,10 @@ public class MultiBucketConsumerService { public int getCount() { return count; } + + public int getLimit() { + return limit; + } } public MultiBucketConsumer create() { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java index ab0a73d53ed..563bede42d1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java @@ -18,8 +18,6 @@ */ package org.elasticsearch.search.aggregations; -import java.util.function.IntConsumer; - import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer; /** @@ -60,7 +58,7 @@ public class SearchContextAggregations { * Returns a consumer for multi bucket aggregation that checks the total number of buckets * created in the response */ - public IntConsumer multiBucketConsumer() { + public MultiBucketConsumer multiBucketConsumer() { return multiBucketConsumer; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java index 19392553bfd..21346844aac 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java @@ -114,6 +114,24 @@ class BinaryValuesSource extends SingleDimensionValuesSource { return compareValues(currentValue, afterValue); } + @Override + int hashCode(int slot) { + if (missingBucket && values.get(slot) == null) { + return 0; + } else { + return values.get(slot).hashCode(); + } + } + + @Override + int hashCodeCurrent() { + if (missingBucket && currentValue == null) { + return 0; + } else { + return currentValue.hashCode(); + } + } + int compareValues(BytesRef v1, BytesRef v2) { return v1.compareTo(v2) * reverseMul; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index 3c43cf3ec1d..cd7fd6abe8c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -40,6 +40,7 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.MultiBucketCollector; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -54,6 +55,8 @@ import java.util.Map; import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; +import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.MAX_BUCKET_SETTING; + final class CompositeAggregator extends BucketsAggregator { private final int size; private final SortedDocsProducer sortedDocsProducer; @@ -78,9 +81,15 @@ final class CompositeAggregator extends BucketsAggregator { this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray(); this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).collect(Collectors.toList()); this.sources = new SingleDimensionValuesSource[sourceConfigs.length]; + // check that the provided size is not greater than the search.max_buckets setting + int bucketLimit = context.aggregations().multiBucketConsumer().getLimit(); + if (size > bucketLimit) { + throw new MultiBucketConsumerService.TooManyBucketsException("Trying to create too many buckets. Must be less than or equal" + + " to: [" + bucketLimit + "] but was [" + size + "]. This limit can be set by changing the [" + MAX_BUCKET_SETTING.getKey() + + "] cluster level setting.", bucketLimit); + } for (int i = 0; i < sourceConfigs.length; i++) { - this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), - context.query(), sourceConfigs[i], size, i); + this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), sourceConfigs[i], size); } this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query()); @@ -88,8 +97,11 @@ final class CompositeAggregator extends BucketsAggregator { @Override protected void doClose() { - Releasables.close(queue); - Releasables.close(sources); + try { + Releasables.close(queue); + } finally { + Releasables.close(sources); + } } @Override @@ -116,12 +128,12 @@ final class CompositeAggregator extends BucketsAggregator { int num = Math.min(size, queue.size()); final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; - int pos = 0; - for (int slot : queue.getSortedSlot()) { + while (queue.size() > 0) { + int slot = queue.pop(); CompositeKey key = queue.toCompositeKey(slot); InternalAggregations aggs = bucketAggregations(slot); int docCount = queue.getDocCount(slot); - buckets[pos++] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs); + buckets[queue.size()] = new InternalComposite.InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs); } CompositeKey lastBucket = num > 0 ? buckets[num-1].getRawKey() : null; return new InternalComposite(name, size, sourceNames, formats, Arrays.asList(buckets), lastBucket, reverseMuls, @@ -259,13 +271,13 @@ final class CompositeAggregator extends BucketsAggregator { }; } - private SingleDimensionValuesSource createValuesSource(BigArrays bigArrays, IndexReader reader, Query query, - CompositeValuesSourceConfig config, int sortRank, int size) { + private SingleDimensionValuesSource createValuesSource(BigArrays bigArrays, IndexReader reader, + CompositeValuesSourceConfig config, int size) { final int reverseMul = config.reverseMul(); if (config.valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) { ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) config.valuesSource(); - SingleDimensionValuesSource source = new GlobalOrdinalValuesSource( + return new GlobalOrdinalValuesSource( bigArrays, config.fieldType(), vs::globalOrdinalsValues, @@ -274,25 +286,6 @@ final class CompositeAggregator extends BucketsAggregator { size, reverseMul ); - - if (sortRank == 0 && source.createSortedDocsProducerOrNull(reader, query) != null) { - // this the leading source and we can optimize it with the sorted docs producer but - // we don't want to use global ordinals because the number of visited documents - // should be low and global ordinals need one lookup per visited term. - Releasables.close(source); - return new BinaryValuesSource( - bigArrays, - this::addRequestCircuitBreakerBytes, - config.fieldType(), - vs::bytesValues, - config.format(), - config.missingBucket(), - size, - reverseMul - ); - } else { - return source; - } } else if (config.valuesSource() instanceof ValuesSource.Bytes) { ValuesSource.Bytes vs = (ValuesSource.Bytes) config.valuesSource(); return new BinaryValuesSource( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeKey.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeKey.java index 187612fe805..c643ee67b18 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeKey.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeKey.java @@ -77,4 +77,11 @@ class CompositeKey implements Writeable { public int hashCode() { return Arrays.hashCode(values); } + + @Override + public String toString() { + return "CompositeKey{" + + "values=" + Arrays.toString(values) + + '}'; + } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index ba3633292f3..58887d9e6a2 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -20,6 +20,7 @@ package org.elasticsearch.search.aggregations.bucket.composite; import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.BigArrays; @@ -27,19 +28,40 @@ import org.elasticsearch.common.util.IntArray; import org.elasticsearch.search.aggregations.LeafBucketCollector; import java.io.IOException; -import java.util.Set; -import java.util.TreeMap; +import java.util.HashMap; +import java.util.Map; /** - * A specialized queue implementation for composite buckets + * A specialized {@link PriorityQueue} implementation for composite buckets. */ -final class CompositeValuesCollectorQueue implements Releasable { +final class CompositeValuesCollectorQueue extends PriorityQueue implements Releasable { + private class Slot { + int value; + + Slot(int initial) { + this.value = initial; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Slot slot = (Slot) o; + return CompositeValuesCollectorQueue.this.equals(value, slot.value); + } + + @Override + public int hashCode() { + return CompositeValuesCollectorQueue.this.hashCode(value); + } + } + // the slot for the current candidate private static final int CANDIDATE_SLOT = Integer.MAX_VALUE; private final BigArrays bigArrays; private final int maxSize; - private final TreeMap keys; + private final Map map; private final SingleDimensionValuesSource[] arrays; private IntArray docCounts; private boolean afterKeyIsSet = false; @@ -52,10 +74,11 @@ final class CompositeValuesCollectorQueue implements Releasable { * @param afterKey composite key */ CompositeValuesCollectorQueue(BigArrays bigArrays, SingleDimensionValuesSource[] sources, int size, CompositeKey afterKey) { + super(size); this.bigArrays = bigArrays; this.maxSize = size; this.arrays = sources; - this.keys = new TreeMap<>(this::compare); + this.map = new HashMap<>(size); if (afterKey != null) { assert afterKey.size() == sources.length; afterKeyIsSet = true; @@ -66,25 +89,16 @@ final class CompositeValuesCollectorQueue implements Releasable { this.docCounts = bigArrays.newIntArray(1, false); } - /** - * The current size of the queue. - */ - int size() { - return keys.size(); + @Override + protected boolean lessThan(Integer a, Integer b) { + return compare(a, b) > 0; } /** * Whether the queue is full or not. */ boolean isFull() { - return keys.size() == maxSize; - } - - /** - * Returns a sorted {@link Set} view of the slots contained in this queue. - */ - Set getSortedSlot() { - return keys.keySet(); + return size() >= maxSize; } /** @@ -92,7 +106,7 @@ final class CompositeValuesCollectorQueue implements Releasable { * the slot if the candidate is already in the queue or null if the candidate is not present. */ Integer compareCurrent() { - return keys.get(CANDIDATE_SLOT); + return map.get(new Slot(CANDIDATE_SLOT)); } /** @@ -106,7 +120,7 @@ final class CompositeValuesCollectorQueue implements Releasable { * Returns the upper value (inclusive) of the leading source. */ Comparable getUpperValueLeadSource() throws IOException { - return size() >= maxSize ? arrays[0].toComparable(keys.lastKey()) : null; + return size() >= maxSize ? arrays[0].toComparable(top()) : null; } /** * Returns the document count in slot. @@ -127,12 +141,17 @@ final class CompositeValuesCollectorQueue implements Releasable { } /** - * Compares the values in slot1 with slot2. + * Compares the values in slot1 with the values in slot2. */ int compare(int slot1, int slot2) { + assert slot2 != CANDIDATE_SLOT; for (int i = 0; i < arrays.length; i++) { - int cmp = (slot1 == CANDIDATE_SLOT) ? arrays[i].compareCurrent(slot2) : - arrays[i].compare(slot1, slot2); + final int cmp; + if (slot1 == CANDIDATE_SLOT) { + cmp = arrays[i].compareCurrent(slot2); + } else { + cmp = arrays[i].compare(slot1, slot2); + } if (cmp != 0) { return cmp; } @@ -140,6 +159,36 @@ final class CompositeValuesCollectorQueue implements Releasable { return 0; } + /** + * Returns true if the values in slot1 are equals to the value in slot2. + */ + boolean equals(int slot1, int slot2) { + assert slot2 != CANDIDATE_SLOT; + for (int i = 0; i < arrays.length; i++) { + final int cmp; + if (slot1 == CANDIDATE_SLOT) { + cmp = arrays[i].compareCurrent(slot2); + } else { + cmp = arrays[i].compare(slot1, slot2); + } + if (cmp != 0) { + return false; + } + } + return true; + } + + /** + * Returns a hash code value for the values in slot. + */ + int hashCode(int slot) { + int result = 1; + for (int i = 0; i < arrays.length; i++) { + result = 31 * result + (slot == CANDIDATE_SLOT ? arrays[i].hashCodeCurrent() : arrays[i].hashCode(slot)); + } + return result; + } + /** * Compares the after values with the values in slot. */ @@ -209,28 +258,28 @@ final class CompositeValuesCollectorQueue implements Releasable { // this key is greater than the top value collected in the previous round, skip it return -1; } - if (keys.size() >= maxSize) { - // the tree map is full, check if the candidate key should be kept - if (compare(CANDIDATE_SLOT, keys.lastKey()) > 0) { - // the candidate key is not competitive, skip it - return -1; - } + if (size() >= maxSize + // the tree map is full, check if the candidate key should be kept + && compare(CANDIDATE_SLOT, top()) > 0) { + // the candidate key is not competitive, skip it + return -1; } // the candidate key is competitive final int newSlot; - if (keys.size() >= maxSize) { - // the tree map is full, we replace the last key with this candidate - int slot = keys.pollLastEntry().getKey(); + if (size() >= maxSize) { + // the queue is full, we replace the last key with this candidate + int slot = pop(); + map.remove(new Slot(slot)); // and we recycle the deleted slot newSlot = slot; } else { - newSlot = keys.size(); - assert newSlot < maxSize; + newSlot = size(); } // move the candidate key to its new slot copyCurrent(newSlot); - keys.put(newSlot, newSlot); + map.put(new Slot(newSlot), newSlot); + add(newSlot); return newSlot; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java index a8670ebe9ca..beb66398a68 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java @@ -103,6 +103,24 @@ class DoubleValuesSource extends SingleDimensionValuesSource { return compareValues(currentValue, afterValue); } + @Override + int hashCode(int slot) { + if (missingBucket && bits.get(slot) == false) { + return 0; + } else { + return Double.hashCode(values.get(slot)); + } + } + + @Override + int hashCodeCurrent() { + if (missingCurrentValue) { + return 0; + } else { + return Double.hashCode(currentValue); + } + } + private int compareValues(double v1, double v2) { return Double.compare(v1, v2) * reverseMul; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java index 55d87aad04c..3d29aee19b1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/GlobalOrdinalValuesSource.java @@ -88,6 +88,16 @@ class GlobalOrdinalValuesSource extends SingleDimensionValuesSource { return cmp * reverseMul; } + @Override + int hashCode(int slot) { + return Long.hashCode(values.get(slot)); + } + + @Override + int hashCodeCurrent() { + return Long.hashCode(currentValue); + } + @Override void setAfter(Comparable value) { if (missingBucket && value == null) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java index 29c9504a946..d71ed3c3bd9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -120,6 +120,24 @@ class LongValuesSource extends SingleDimensionValuesSource { return compareValues(currentValue, afterValue); } + @Override + int hashCode(int slot) { + if (missingBucket && bits.get(slot) == false) { + return 0; + } else { + return Long.hashCode(values.get(slot)); + } + } + + @Override + int hashCodeCurrent() { + if (missingCurrentValue) { + return 0; + } else { + return Long.hashCode(currentValue); + } + } + private int compareValues(long v1, long v2) { return Long.compare(v1, v2) * reverseMul; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java index 3f2539945b1..f49e20e5bd0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java @@ -99,6 +99,16 @@ abstract class SingleDimensionValuesSource> implements R */ abstract int compareCurrentWithAfter(); + /** + * Returns a hash code value for the provided slot. + */ + abstract int hashCode(int slot); + + /** + * Returns a hash code value for the current value. + */ + abstract int hashCodeCurrent(); + /** * Sets the after value for this source. Values that compares smaller are filtered. */ diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java index 006b13e2c0b..6516309de96 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueueTests.java @@ -296,13 +296,16 @@ public class CompositeValuesCollectorQueueTests extends AggregatorTestCase { } } assertEquals(size, Math.min(queue.size(), expected.length - pos)); - int ptr = 0; - for (int slot : queue.getSortedSlot()) { - CompositeKey key = queue.toCompositeKey(slot); - assertThat(key, equalTo(expected[ptr++])); - last = key; - } + int ptr = pos + (queue.size() - 1); pos += queue.size(); + last = null; + while (queue.size() > pos) { + CompositeKey key = queue.toCompositeKey(queue.pop()); + if (last == null) { + last = key; + } + assertThat(key, equalTo(expected[ptr--])); + } } } reader.close();