Reduce merge map memory overhead in the Variable Width Histogram Aggregation (#59366) (#60171)

When a document which is distant from existing buckets gets collected, the
`variable_width_histogram` will create a new bucket and then insert it
into the ordered list of buckets.

Currently, a new merge map array is created to move this bucket. This is
very expensive as there might be thousands of buckets.

This PR creates `mergeBuckets(UnaryOperator<Long> mergeMap)` methods in
`BucketsAggregator` and `MergingBucketsDefferingCollector`, and updates
the `variable_width_histogram`  to use them. This eliminates the need to
create an entire merge map array for each new bucket and reduces the
memory overhead of the algorithm.

Co-authored-by: James Dorfman <jamesdorfman@users.noreply.github.com>
This commit is contained in:
Nik Everett 2020-07-27 09:23:06 -04:00 committed by GitHub
parent 95c99ca887
commit a451dd87aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 394 additions and 20 deletions

View File

@ -45,6 +45,7 @@ import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.LongUnaryOperator;
import java.util.function.ToLongFunction;
public abstract class BucketsAggregator extends AggregatorBase {
@ -105,17 +106,35 @@ public abstract class BucketsAggregator extends AggregatorBase {
* ordinals and doc ID deltas.
*
* Refer to that method for documentation about the merge map.
*
* @deprecated use {@link mergeBuckets(long, LongUnaryOperator)}
*/
@Deprecated
public final void mergeBuckets(long[] mergeMap, long newNumBuckets) {
mergeBuckets(newNumBuckets, bucket -> mergeMap[Math.toIntExact(bucket)]);
}
/**
*
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*
* This only tidies up doc counts. Call {@link MergingBucketsDeferringCollector#mergeBuckets(LongUnaryOperator)} to
* merge the actual ordinals and doc ID deltas.
*/
public final void mergeBuckets(long newNumBuckets, LongUnaryOperator mergeMap){
try (IntArray oldDocCounts = docCounts) {
docCounts = bigArrays.newIntArray(newNumBuckets, true);
docCounts.fill(0, newNumBuckets, 0);
for (int i = 0; i < oldDocCounts.size(); i++) {
for (long i = 0; i < oldDocCounts.size(); i++) {
int docCount = oldDocCounts.get(i);
if(docCount == 0) continue;
// Skip any in the map which have been "removed", signified with -1
if (docCount != 0 && mergeMap[i] != -1) {
docCounts.increment(mergeMap[i], docCount);
long destinationOrdinal = mergeMap.applyAsLong(i);
if (destinationOrdinal != -1) {
docCounts.increment(destinationOrdinal, docCount);
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.search.internal.SearchContext;
import java.util.ArrayList;
import java.util.List;
import java.util.function.LongUnaryOperator;
/**
* A specialization of {@link BestBucketsDeferringCollector} that collects all
@ -51,8 +52,24 @@ public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollec
*
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
* not be called unless there are actually changes to be made, to avoid unnecessary work.
*
* @deprecated use {@link mergeBuckets(LongUnaryOperator)}
*/
@Deprecated
public void mergeBuckets(long[] mergeMap) {
mergeBuckets(bucket -> mergeMap[Math.toIntExact(bucket)]);
}
/**
* Merges/prunes the existing bucket ordinals and docDeltas according to the provided mergeMap.
*
* @param mergeMap a unary operator which maps a bucket's ordinal to the ordinal it should be merged with.
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
*
* This process rebuilds the ordinals and docDeltas according to the mergeMap, so it should
* not be called unless there are actually changes to be made, to avoid unnecessary work.
*/
public void mergeBuckets(LongUnaryOperator mergeMap){
List<Entry> newEntries = new ArrayList<>(entries.size());
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
@ -66,7 +83,7 @@ public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollec
long delta = docDeltasItr.next();
// Only merge in the ordinal if it hasn't been "removed", signified with -1
long ordinal = mergeMap[Math.toIntExact(bucket)];
long ordinal = mergeMap.applyAsLong(bucket);
if (ordinal != -1) {
newBuckets.add(ordinal);
@ -102,7 +119,7 @@ public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollec
long bucket = itr.next();
assert docDeltasItr.hasNext();
long delta = docDeltasItr.next();
long ordinal = mergeMap[Math.toIntExact(bucket)];
long ordinal = mergeMap.applyAsLong(bucket);
// Only merge in the ordinal if it hasn't been "removed", signified with -1
if (ordinal != -1) {

View File

@ -51,6 +51,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
public class VariableWidthHistogramAggregator extends DeferableBucketAggregator {
@ -353,22 +354,23 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator
clusterSizes.set(index, holdSize);
// Move the underlying buckets
long[] mergeMap = new long[numClusters];
for (int i = 0; i < index; i++) {
// The clusters in range {0 ... idx - 1} don't move
mergeMap[i] = i;
}
for (int i = index; i < numClusters - 1; i++) {
// The clusters in range {index ... numClusters - 1} shift up
mergeMap[i] = i + 1;
}
// Finally, the new cluster moves to index
mergeMap[numClusters - 1] = index;
LongUnaryOperator mergeMap = new LongUnaryOperator() {
@Override
public long applyAsLong(long i) {
if(i < index) {
// The clusters in range {0 ... idx - 1} don't move
return i;
}
if(i == numClusters - 1) {
// The new cluster moves to index
return (long)index;
}
// The clusters in range {index ... numClusters - 1} shift forward
return i + 1;
}
};
// TODO: Create a moveLastCluster() method in BucketsAggregator which is like BucketsAggregator::mergeBuckets,
// except it doesn't require a merge map. This would be more efficient as there would be no need to create a
// merge map on every call.
mergeBuckets(mergeMap, numClusters);
mergeBuckets(numClusters, mergeMap);
if (deferringCollector != null) {
deferringCollector.mergeBuckets(mergeMap);
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import static org.elasticsearch.search.aggregations.MultiBucketConsumerService.DEFAULT_MAX_BUCKETS;
public class BucketsAggregatorTests extends AggregatorTestCase{
public BucketsAggregator buildMergeAggregator() throws IOException{
try(Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
document.add(new SortedNumericDocValuesField("numeric", 0));
indexWriter.addDocument(document);
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
SearchContext searchContext = createSearchContext(
indexSearcher,
createIndexSettings(),
null,
new MultiBucketConsumerService.MultiBucketConsumer(
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
),
new NumberFieldMapper.NumberFieldType("test", NumberFieldMapper.NumberType.INTEGER)
);
return new BucketsAggregator("test", AggregatorFactories.EMPTY, searchContext, null, null, null) {
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
return null;
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return new InternalAggregation[0];
}
@Override
public InternalAggregation buildEmptyAggregation() {
return null;
}
};
}
}
}
public void testBucketMergeNoDelete() throws IOException{
BucketsAggregator mergeAggregator = buildMergeAggregator();
mergeAggregator.grow(10);
for(int i = 0; i < 10; i++){
mergeAggregator.incrementBucketDocCount(i, i);
}
mergeAggregator.mergeBuckets(10, bucket -> bucket % 5);
for(int i=0; i<5; i++) {
// The i'th bucket should now have all docs whose index % 5 = i
// This is buckets i and i + 5
// i + (i+5) = 2*i + 5
assertEquals(mergeAggregator.getDocCounts().get(i), (2 * i) + 5);
}
for(int i=5; i<10; i++){
assertEquals(mergeAggregator.getDocCounts().get(i), 0);
}
}
public void testBucketMergeAndDelete() throws IOException{
BucketsAggregator mergeAggregator = buildMergeAggregator();
mergeAggregator.grow(10);
int sum = 0;
for(int i = 0; i < 20; i++){
mergeAggregator.incrementBucketDocCount(i, i);
if(5 <= i && i < 15) {
sum += i;
}
}
// Put the buckets in indices 5 ... 14 into bucket 5, and delete the rest of the buckets
mergeAggregator.mergeBuckets(10, bucket -> (5 <= bucket && bucket < 15) ? 5 : -1);
assertEquals(mergeAggregator.getDocCounts().size(), 10); // Confirm that the 10 other buckets were deleted
for(int i=0; i<10; i++){
assertEquals(mergeAggregator.getDocCounts().get(i), i == 5 ? sum : 0);
}
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.when;
public class MergingBucketsDeferringCollectorTests extends AggregatorTestCase {
public void testBucketMergeNoDelete() throws Exception {
testCase((deferringCollector, delegate) -> new LeafBucketCollector() {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
assert owningBucketOrd == 0; // Only collects at top level
delegate.collect(doc, doc);
if (doc == 7) {
deferringCollector.mergeBuckets(oldBucket -> 0);
}
}
}, (deferringCollector, finalCollector) -> {
deferringCollector.prepareSelectedBuckets(0, 8, 9);
equalTo(
org.elasticsearch.common.collect.Map.of(
0L,
org.elasticsearch.common.collect.List.of(0, 1, 2, 3, 4, 5, 6, 7),
1L,
org.elasticsearch.common.collect.List.of(8),
2L,
org.elasticsearch.common.collect.List.of(9)
)
);
});
}
public void testBucketMergeAndDelete() throws Exception {
testCase((deferringCollector, delegate) -> new LeafBucketCollector() {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
assert owningBucketOrd == 0; // Only collects at top level
delegate.collect(doc, doc);
if (doc == 7) {
deferringCollector.mergeBuckets(oldBucket -> oldBucket > 3 ? 0 : -1);
}
}
}, (deferringCollector, finalCollector) -> {
deferringCollector.prepareSelectedBuckets(0, 8, 9);
assertThat(
finalCollector.collection,
equalTo(
org.elasticsearch.common.collect.Map.of(
0L,
org.elasticsearch.common.collect.List.of(4, 5, 6, 7),
1L,
org.elasticsearch.common.collect.List.of(8),
2L,
org.elasticsearch.common.collect.List.of(9)
)
)
);
});
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/60021")
public void testBucketMergeAndDeleteLastEntry() throws Exception {
testCase((deferringCollector, delegate) -> new LeafBucketCollector() {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
assert owningBucketOrd == 0; // Only collects at top level
delegate.collect(doc, doc);
if (doc == 7) {
deferringCollector.mergeBuckets(oldBucket -> oldBucket <= 3 ? 0 : -1);
}
}
}, (deferringCollector, finalCollector) -> {
deferringCollector.prepareSelectedBuckets(0, 8, 9);
assertThat(
finalCollector.collection,
equalTo(
org.elasticsearch.common.collect.Map.of(
0L,
org.elasticsearch.common.collect.List.of(0, 1, 2, 3),
1L,
org.elasticsearch.common.collect.List.of(8),
2L,
org.elasticsearch.common.collect.List.of(9)
)
)
);
});
}
private void testCase(
BiFunction<MergingBucketsDeferringCollector, LeafBucketCollector, LeafBucketCollector> leafCollector,
CheckedBiConsumer<MergingBucketsDeferringCollector, CollectingBucketCollector, IOException> verify
) throws IOException {
try (Directory directory = newDirectory()) {
try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) {
for (int i = 0; i < 10; i++) {
indexWriter.addDocument(new Document());
}
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
Query query = new MatchAllDocsQuery();
SearchContext searchContext = createSearchContext(indexSearcher, createIndexSettings(), query, null);
when(searchContext.query()).thenReturn(query);
MergingBucketsDeferringCollector deferringCollector = new MergingBucketsDeferringCollector(searchContext, false);
CollectingBucketCollector finalCollector = new CollectingBucketCollector();
deferringCollector.setDeferredCollector(Collections.singleton(finalCollector));
deferringCollector.preCollection();
indexSearcher.search(query, new BucketCollector() {
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
@Override
public void preCollection() throws IOException {}
@Override
public void postCollection() throws IOException {}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
LeafBucketCollector delegate = deferringCollector.getLeafCollector(ctx);
return leafCollector.apply(deferringCollector, delegate);
}
});
deferringCollector.postCollection();
verify.accept(deferringCollector, finalCollector);
}
}
}
private class CollectingBucketCollector extends BucketCollector {
final Map<Long, List<Integer>> collection = new HashMap<>();
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return new LeafBucketCollector() {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
collection.computeIfAbsent(owningBucketOrd, k -> new ArrayList<>()).add(doc);
}
};
}
@Override
public void preCollection() throws IOException {}
@Override
public void postCollection() throws IOException {}
}
}