Allocate memory lazily in BestBucketsDeferringCollector (#43339)

While investigating memory consumption of deeply nested aggregations for #43091
the memory used to keep track of the doc ids and buckets in the BestBucketsDeferringCollector
showed up as one of the main contributor. In my tests half of the memory held in the
 BestBucketsDeferringCollector is associated to segments that don't have matching docs
 in the selected buckets. This is expected on fields that have a big cardinality since each
 bucket can appear in very few segments. By allocating the builders lazily this change
 reduces the memory consumption by a factor 2 (from 1GB to 512MB), hence reducing the
impact on gcs for these volatile allocations. This commit also switches the PackedLongValues.Builder
with a RoaringDocIdSet in order to handle very sparse buckets more efficiently.

I ran all my tests on the `geoname` rally track with the following query:

````
{
    "size": 0,
    "aggs": {
        "country_population": {
            "terms": {
                "size": 100,
                "field": "country_code.raw"
            },
            "aggs": {
                "admin1_code": {
                    "terms": {
                        "size": 100,
                        "field": "admin1_code.raw"
                    },
                    "aggs": {
                        "admin2_code": {
                            "terms": {
                                "size": 100,
                                "field": "admin2_code.raw"
                            },
                            "aggs": {
                                "sum_population": {
                                    "sum": {
                                        "field": "population"
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}
````
This commit is contained in:
Jim Ferenczi 2019-06-19 21:25:21 +02:00 committed by jimczi
parent d1637ca476
commit b957aa46ce
3 changed files with 45 additions and 215 deletions

View File

@ -40,6 +40,7 @@ import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* A specialization of {@link DeferringBucketCollector} that collects all
@ -48,28 +49,28 @@ import java.util.List;
* this collector.
*/
public class BestBucketsDeferringCollector extends DeferringBucketCollector {
private static class Entry {
static class Entry {
final LeafReaderContext context;
final PackedLongValues docDeltas;
final PackedLongValues buckets;
Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
this.context = context;
this.docDeltas = docDeltas;
this.buckets = buckets;
this.context = Objects.requireNonNull(context);
this.docDeltas = Objects.requireNonNull(docDeltas);
this.buckets = Objects.requireNonNull(buckets);
}
}
final List<Entry> entries = new ArrayList<>();
BucketCollector collector;
final SearchContext searchContext;
final boolean isGlobal;
LeafReaderContext context;
PackedLongValues.Builder docDeltas;
PackedLongValues.Builder buckets;
long maxBucket = -1;
boolean finished = false;
LongHash selectedBuckets;
protected List<Entry> entries = new ArrayList<>();
protected BucketCollector collector;
protected final SearchContext searchContext;
protected final boolean isGlobal;
protected LeafReaderContext context;
protected PackedLongValues.Builder docDeltasBuilder;
protected PackedLongValues.Builder bucketsBuilder;
protected long maxBucket = -1;
protected boolean finished = false;
protected LongHash selectedBuckets;
/**
* Sole constructor.
@ -97,28 +98,32 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
private void finishLeaf() {
if (context != null) {
entries.add(new Entry(context, docDeltas.build(), buckets.build()));
assert docDeltasBuilder != null && bucketsBuilder != null;
entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build()));
}
context = null;
docDeltas = null;
buckets = null;
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();
context = ctx;
docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
context = null;
// allocates the builder lazily in case this segment doesn't contain any match
docDeltasBuilder = null;
bucketsBuilder = null;
return new LeafBucketCollector() {
int lastDoc = 0;
@Override
public void collect(int doc, long bucket) throws IOException {
docDeltas.add(doc - lastDoc);
buckets.add(bucket);
if (context == null) {
context = ctx;
docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
}
docDeltasBuilder.add(doc - lastDoc);
bucketsBuilder.add(bucket);
lastDoc = doc;
maxBucket = Math.max(maxBucket, bucket);
}
@ -141,7 +146,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (!finished) {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
if (this.selectedBuckets != null) {
@ -160,14 +165,16 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
Query query = isGlobal ? new MatchAllDocsQuery() : searchContext.query();
weight = searchContext.searcher().createWeight(searchContext.searcher().rewrite(query), ScoreMode.COMPLETE, 1f);
}
for (Entry entry : entries) {
assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0";
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator docIt = null;
if (needsScores && entry.docDeltas.size() > 0) {
DocIdSetIterator scoreIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
docIt = scorer.iterator();
scoreIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
@ -179,17 +186,16 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
if (scoreIt.docID() < doc) {
scoreIt.advance(doc);
}
// aggregations should only be replayed on matching documents
assert docIt.docID() == doc;
assert scoreIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket);
}
}
}
collector.postCollection();
}

View File

@ -19,98 +19,25 @@
package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.internal.SearchContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* A specialization of {@link DeferringBucketCollector} that collects all
* A specialization of {@link BestBucketsDeferringCollector} that collects all
* matches and then is able to replay a given subset of buckets. Exposes
* mergeBuckets, which can be invoked by the aggregator when increasing the
* rounding interval.
*/
public class MergingBucketsDeferringCollector extends DeferringBucketCollector {
List<Entry> entries = new ArrayList<>();
BucketCollector collector;
final SearchContext searchContext;
LeafReaderContext context;
PackedLongValues.Builder docDeltas;
PackedLongValues.Builder buckets;
long maxBucket = -1;
boolean finished = false;
LongHash selectedBuckets;
public MergingBucketsDeferringCollector(SearchContext context) {
this.searchContext = context;
}
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = MultiBucketCollector.wrap(deferredCollectors);
}
@Override
public ScoreMode scoreMode() {
if (collector == null) {
throw new IllegalStateException();
}
return collector.scoreMode();
}
@Override
public void preCollection() throws IOException {
collector.preCollection();
}
private void finishLeaf() {
if (context != null) {
entries.add(new Entry(context, docDeltas.build(), buckets.build()));
}
context = null;
docDeltas = null;
buckets = null;
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
finishLeaf();
context = ctx;
docDeltas = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
buckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
return new LeafBucketCollector() {
int lastDoc = 0;
@Override
public void collect(int doc, long bucket) {
docDeltas.add(doc - lastDoc);
buckets.add(bucket);
lastDoc = doc;
maxBucket = Math.max(maxBucket, bucket);
}
};
public class MergingBucketsDeferringCollector extends BestBucketsDeferringCollector {
public MergingBucketsDeferringCollector(SearchContext context, boolean isGlobal) {
super(context, isGlobal);
}
public void mergeBuckets(long[] mergeMap) {
List<Entry> newEntries = new ArrayList<>(entries.size());
for (Entry sourceEntry : entries) {
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
@ -124,117 +51,14 @@ public class MergingBucketsDeferringCollector extends DeferringBucketCollector {
// if there are buckets that have been collected in the current segment
// we need to update the bucket ordinals there too
if (buckets.size() > 0) {
PackedLongValues currentBuckets = buckets.build();
if (bucketsBuilder.size() > 0) {
PackedLongValues currentBuckets = bucketsBuilder.build();
PackedLongValues.Builder newBuckets = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
for (PackedLongValues.Iterator itr = currentBuckets.iterator(); itr.hasNext();) {
long bucket = itr.next();
newBuckets.add(mergeMap[Math.toIntExact(bucket)]);
}
buckets = newBuckets;
bucketsBuilder = newBuckets;
}
}
@Override
public void postCollection() {
finishLeaf();
finished = true;
}
/**
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
if (this.selectedBuckets != null) {
throw new IllegalStateException("Already been replayed");
}
final LongHash hash = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long bucket : selectedBuckets) {
hash.add(bucket);
}
this.selectedBuckets = hash;
boolean needsScores = collector.scoreMode().needsScores();
Weight weight = null;
if (needsScores) {
weight = searchContext.searcher().createWeight(
searchContext.searcher().rewrite(searchContext.query()),
ScoreMode.COMPLETE, 1f);
}
for (Entry entry : entries) {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
DocIdSetIterator docIt = null;
if (needsScores && entry.docDeltas.size() > 0) {
Scorer scorer = weight.scorer(entry.context);
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay
// (entry.docDeltas it not empty).
docIt = scorer.iterator();
leafCollector.setScorer(scorer);
}
final PackedLongValues.Iterator docDeltaIterator = entry.docDeltas.iterator();
final PackedLongValues.Iterator buckets = entry.buckets.iterator();
int doc = 0;
for (long i = 0, end = entry.docDeltas.size(); i < end; ++i) {
doc += docDeltaIterator.next();
final long bucket = buckets.next();
final long rebasedBucket = hash.find(bucket);
if (rebasedBucket != -1) {
if (needsScores) {
if (docIt.docID() < doc) {
docIt.advance(doc);
}
// aggregations should only be replayed on matching
// documents
assert docIt.docID() == doc;
}
leafCollector.collect(doc, rebasedBucket);
}
}
}
collector.postCollection();
}
/**
* Wrap the provided aggregator so that it behaves (almost) as if it had
* been collected directly.
*/
@Override
public Aggregator wrap(final Aggregator in) {
return new WrappedAggregator(in) {
@Override
public InternalAggregation buildAggregation(long bucket) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
final long rebasedBucket = selectedBuckets.find(bucket);
if (rebasedBucket == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected [" + bucket + "]");
}
return in.buildAggregation(rebasedBucket);
}
};
}
private static class Entry {
final LeafReaderContext context;
final PackedLongValues docDeltas;
final PackedLongValues buckets;
Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
this.context = context;
this.docDeltas = docDeltas;
this.buckets = buckets;
}
}
}

View File

@ -93,7 +93,7 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator {
@Override
public DeferringBucketCollector getDeferringCollector() {
deferringCollector = new MergingBucketsDeferringCollector(context);
deferringCollector = new MergingBucketsDeferringCollector(context, descendsFromGlobalAggregator(parent()));
return deferringCollector;
}