diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index ad364d0367d..cffcb61aa06 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.IntConsumer; +import java.util.function.LongUnaryOperator; import java.util.function.ToLongFunction; public abstract class BucketsAggregator extends AggregatorBase { @@ -105,17 +106,35 @@ public abstract class BucketsAggregator extends AggregatorBase { * ordinals and doc ID deltas. * * Refer to that method for documentation about the merge map. + * + * @deprecated use {@link mergeBuckets(long, LongUnaryOperator)} */ + @Deprecated public final void mergeBuckets(long[] mergeMap, long newNumBuckets) { + mergeBuckets(newNumBuckets, bucket -> mergeMap[Math.toIntExact(bucket)]); + } + + /** + * + * @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with. + * If a bucket's ordinal is mapped to -1 then the bucket is removed entirely. + * + * This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(LongUnaryOperator)} to + * merge the actual ordinals and doc ID deltas. + */ + public final void mergeBuckets(long newNumBuckets, LongUnaryOperator mergeMap){ try (IntArray oldDocCounts = docCounts) { docCounts = bigArrays.newIntArray(newNumBuckets, true); docCounts.fill(0, newNumBuckets, 0); - for (int i = 0; i < oldDocCounts.size(); i++) { + for (long i = 0; i < oldDocCounts.size(); i++) { int docCount = oldDocCounts.get(i); + if(docCount == 0) continue; + // Skip any in the map which have been "removed", signified with -1 - if (docCount != 0 && mergeMap[i] != -1) { - docCounts.increment(mergeMap[i], docCount); + long destinationOrdinal = mergeMap.applyAsLong(i); + if (destinationOrdinal != -1) { + docCounts.increment(destinationOrdinal, docCount); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java index 2fd2be66133..db2a4b4df16 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollector.java @@ -25,6 +25,7 @@ import org.elasticsearch.search.internal.SearchContext; import java.util.ArrayList; import java.util.List; +import java.util.function.LongUnaryOperator; /** * A specialization of {@link BestBucketsDeferringCollector} that collects all @@ -51,8 +52,24 @@ public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollec * * This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should * not be called unless there are actually changes to be made, to avoid unnecessary work. + * + * @deprecated use {@link mergeBuckets(LongUnaryOperator)} */ + @Deprecated public void mergeBuckets(long[] mergeMap) { + mergeBuckets(bucket -> mergeMap[Math.toIntExact(bucket)]); + } + + /** + * Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap. + * + * @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with. + * If a bucket's ordinal is mapped to -1 then the bucket is removed entirely. + * + * This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should + * not be called unless there are actually changes to be made, to avoid unnecessary work. + */ + public void mergeBuckets(LongUnaryOperator mergeMap){ List newEntries = new ArrayList<>(entries.size()); for (Entry sourceEntry : entries) { PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT); @@ -66,7 +83,7 @@ public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollec long delta = docDeltasItr.next(); // Only merge in the ordinal if it hasn't been "removed", signified with -1 - long ordinal = mergeMap[Math.toIntExact(bucket)]; + long ordinal = mergeMap.applyAsLong(bucket); if (ordinal != -1) { newBuckets.add(ordinal); @@ -102,7 +119,7 @@ public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollec long bucket = itr.next(); assert docDeltasItr.hasNext(); long delta = docDeltasItr.next(); - long ordinal = mergeMap[Math.toIntExact(bucket)]; + long ordinal = mergeMap.applyAsLong(bucket); // Only merge in the ordinal if it hasn't been "removed", signified with -1 if (ordinal != -1) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java index 63e40557663..0779972ce46 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -51,6 +51,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Function; +import java.util.function.LongUnaryOperator; public class VariableWidthHistogramAggregator extends DeferableBucketAggregator { @@ -353,22 +354,23 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator clusterSizes.set(index, holdSize); // Move the underlying buckets - long[] mergeMap = new long[numClusters]; - for (int i = 0; i < index; i++) { - // The clusters in range {0 ... idx - 1} don't move - mergeMap[i] = i; - } - for (int i = index; i < numClusters - 1; i++) { - // The clusters in range {index ... numClusters - 1} shift up - mergeMap[i] = i + 1; - } - // Finally, the new cluster moves to index - mergeMap[numClusters - 1] = index; + LongUnaryOperator mergeMap = new LongUnaryOperator() { + @Override + public long applyAsLong(long i) { + if(i < index) { + // The clusters in range {0 ... idx - 1} don't move + return i; + } + if(i == numClusters - 1) { + // The new cluster moves to index + return (long)index; + } + // The clusters in range {index ... numClusters - 1} shift forward + return i + 1; + } + }; - // TODO: Create a moveLastCluster() method in BucketsAggregator which is like BucketsAggregator::mergeBuckets, - // except it doesn't require a merge map. This would be more efficient as there would be no need to create a - // merge map on every call. - mergeBuckets(mergeMap, numClusters); + mergeBuckets(numClusters, mergeMap); if (deferringCollector != null) { deferringCollector.mergeBuckets(mergeMap); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java new file mode 100644 index 00000000000..824e1d96d78 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java @@ -0,0 +1,131 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.search.aggregations.bucket; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.aggregations.MultiBucketConsumerService; +import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; + +import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS; + +public class BucketsAggregatorTests extends AggregatorTestCase{ + + public BucketsAggregator buildMergeAggregator() throws IOException{ + try(Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + document.add(new SortedNumericDocValuesField("numeric", 0)); + indexWriter.addDocument(document); + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + + SearchContext searchContext = createSearchContext( + indexSearcher, + createIndexSettings(), + null, + new MultiBucketConsumerService.MultiBucketConsumer( + DEFAULT_MAX_BUCKETS, + new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST) + ), + new NumberFieldMapper.NumberFieldType("test", NumberFieldMapper.NumberType.INTEGER) + ); + + return new BucketsAggregator("test", AggregatorFactories.EMPTY, searchContext, null, null, null) { + @Override + protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { + return null; + } + + @Override + public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + return new InternalAggregation[0]; + } + + @Override + public InternalAggregation buildEmptyAggregation() { + return null; + } + }; + } + } + } + + public void testBucketMergeNoDelete() throws IOException{ + BucketsAggregator mergeAggregator = buildMergeAggregator(); + + mergeAggregator.grow(10); + for(int i = 0; i < 10; i++){ + mergeAggregator.incrementBucketDocCount(i, i); + } + + mergeAggregator.mergeBuckets(10, bucket -> bucket % 5); + + for(int i=0; i<5; i++) { + // The i'th bucket should now have all docs whose index % 5 = i + // This is buckets i and i + 5 + // i + (i+5) = 2*i + 5 + assertEquals(mergeAggregator.getDocCounts().get(i), (2 * i) + 5); + } + for(int i=5; i<10; i++){ + assertEquals(mergeAggregator.getDocCounts().get(i), 0); + } + } + + public void testBucketMergeAndDelete() throws IOException{ + BucketsAggregator mergeAggregator = buildMergeAggregator(); + + mergeAggregator.grow(10); + int sum = 0; + for(int i = 0; i < 20; i++){ + mergeAggregator.incrementBucketDocCount(i, i); + if(5 <= i && i < 15) { + sum += i; + } + } + + // Put the buckets in indices 5 ... 14 into bucket 5, and delete the rest of the buckets + mergeAggregator.mergeBuckets(10, bucket -> (5 <= bucket && bucket < 15) ? 5 : -1); + + assertEquals(mergeAggregator.getDocCounts().size(), 10); // Confirm that the 10 other buckets were deleted + for(int i=0; i<10; i++){ + assertEquals(mergeAggregator.getDocCounts().get(i), i == 5 ? sum : 0); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java new file mode 100644 index 00000000000..6a0be50f8ce --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/MergingBucketsDeferringCollectorTests.java @@ -0,0 +1,205 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.search.aggregations.bucket; + +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.LeafBucketCollector; +import org.elasticsearch.search.internal.SearchContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.when; + +public class MergingBucketsDeferringCollectorTests extends AggregatorTestCase { + public void testBucketMergeNoDelete() throws Exception { + testCase((deferringCollector, delegate) -> new LeafBucketCollector() { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; // Only collects at top level + delegate.collect(doc, doc); + if (doc == 7) { + deferringCollector.mergeBuckets(oldBucket -> 0); + } + } + }, (deferringCollector, finalCollector) -> { + deferringCollector.prepareSelectedBuckets(0, 8, 9); + + equalTo( + org.elasticsearch.common.collect.Map.of( + 0L, + org.elasticsearch.common.collect.List.of(0, 1, 2, 3, 4, 5, 6, 7), + 1L, + org.elasticsearch.common.collect.List.of(8), + 2L, + org.elasticsearch.common.collect.List.of(9) + ) + ); + }); + } + + public void testBucketMergeAndDelete() throws Exception { + testCase((deferringCollector, delegate) -> new LeafBucketCollector() { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; // Only collects at top level + delegate.collect(doc, doc); + if (doc == 7) { + deferringCollector.mergeBuckets(oldBucket -> oldBucket > 3 ? 0 : -1); + } + } + }, (deferringCollector, finalCollector) -> { + deferringCollector.prepareSelectedBuckets(0, 8, 9); + + assertThat( + finalCollector.collection, + equalTo( + org.elasticsearch.common.collect.Map.of( + 0L, + org.elasticsearch.common.collect.List.of(4, 5, 6, 7), + 1L, + org.elasticsearch.common.collect.List.of(8), + 2L, + org.elasticsearch.common.collect.List.of(9) + ) + ) + ); + }); + } + + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/60021") + public void testBucketMergeAndDeleteLastEntry() throws Exception { + testCase((deferringCollector, delegate) -> new LeafBucketCollector() { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + assert owningBucketOrd == 0; // Only collects at top level + delegate.collect(doc, doc); + if (doc == 7) { + deferringCollector.mergeBuckets(oldBucket -> oldBucket <= 3 ? 0 : -1); + } + } + }, (deferringCollector, finalCollector) -> { + deferringCollector.prepareSelectedBuckets(0, 8, 9); + + assertThat( + finalCollector.collection, + equalTo( + org.elasticsearch.common.collect.Map.of( + 0L, + org.elasticsearch.common.collect.List.of(0, 1, 2, 3), + 1L, + org.elasticsearch.common.collect.List.of(8), + 2L, + org.elasticsearch.common.collect.List.of(9) + ) + ) + ); + }); + } + + private void testCase( + BiFunction leafCollector, + CheckedBiConsumer verify + ) throws IOException { + try (Directory directory = newDirectory()) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + for (int i = 0; i < 10; i++) { + indexWriter.addDocument(new Document()); + } + } + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = new IndexSearcher(indexReader); + + Query query = new MatchAllDocsQuery(); + SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), query, null); + when(searchContext.query()).thenReturn(query); + MergingBucketsDeferringCollector deferringCollector = new MergingBucketsDeferringCollector(searchContext, false); + + CollectingBucketCollector finalCollector = new CollectingBucketCollector(); + deferringCollector.setDeferredCollector(Collections.singleton(finalCollector)); + deferringCollector.preCollection(); + indexSearcher.search(query, new BucketCollector() { + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public void preCollection() throws IOException {} + + @Override + public void postCollection() throws IOException {} + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + LeafBucketCollector delegate = deferringCollector.getLeafCollector(ctx); + return leafCollector.apply(deferringCollector, delegate); + } + }); + deferringCollector.postCollection(); + verify.accept(deferringCollector, finalCollector); + } + } + } + + private class CollectingBucketCollector extends BucketCollector { + final Map> collection = new HashMap<>(); + + @Override + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE_NO_SCORES; + } + + @Override + public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException { + return new LeafBucketCollector() { + @Override + public void collect(int doc, long owningBucketOrd) throws IOException { + collection.computeIfAbsent(owningBucketOrd, k -> new ArrayList<>()).add(doc); + } + }; + } + + @Override + public void preCollection() throws IOException {} + + @Override + public void postCollection() throws IOException {} + } +}