Do not allow Sampler to allocate more than maxDoc size, better CB accounting (#39381)
The `sampler` agg creates a BestDocsDeferringCollector, which internally initializes a priority queue of size `shardSize`. This queue is populated with empty `Object` sentinels, which is roughly 16b per object. Similarly, the Diversified samplers create a DiversifiedTopDocsCollectors which internally track PQ slots with ScoreDocKeys, weighing in around 28kb If the user sets a very abusive `shard_size`, this could easily OOM a node or cluster since these PQ are allocated up-front without any checks. This commit makes sure that when we create the collector, it cannot be larger than the maxDoc so that we don't accidentally blow up the node. We ensure the size is not greater than the overall index maxDoc. A similar treatment is done for `maxDocsPerValue` parameter of the diversified samplers For good measure, this also adds in some CB accounting to try and track memory usage. Finally, a redundant array creation is removed to reduce a bit of temporary memory.
This commit is contained in:
parent
4d2ceb5ed5
commit
c72feedd74
|
@ -26,6 +26,7 @@ import org.apache.lucene.search.ScoreMode;
|
|||
import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TopDocsCollector;
|
||||
import org.apache.lucene.search.TopScoreDocCollector;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
|
@ -40,6 +41,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* A specialization of {@link DeferringBucketCollector} that collects all
|
||||
|
@ -56,16 +58,20 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
private int shardSize;
|
||||
private PerSegmentCollects perSegCollector;
|
||||
private final BigArrays bigArrays;
|
||||
private final Consumer<Long> circuitBreakerConsumer;
|
||||
|
||||
private static final long SENTINEL_SIZE = RamUsageEstimator.shallowSizeOfInstance(Object.class);
|
||||
|
||||
/**
|
||||
* Sole constructor.
|
||||
*
|
||||
* @param shardSize
|
||||
* The number of top-scoring docs to collect for each bucket
|
||||
* @param shardSize The number of top-scoring docs to collect for each bucket
|
||||
* @param circuitBreakerConsumer consumer for tracking runtime bytes in request circuit breaker
|
||||
*/
|
||||
BestDocsDeferringCollector(int shardSize, BigArrays bigArrays) {
|
||||
BestDocsDeferringCollector(int shardSize, BigArrays bigArrays, Consumer<Long> circuitBreakerConsumer) {
|
||||
this.shardSize = shardSize;
|
||||
this.bigArrays = bigArrays;
|
||||
this.circuitBreakerConsumer = circuitBreakerConsumer;
|
||||
perBucketSamples = bigArrays.newObjectArray(1);
|
||||
}
|
||||
|
||||
|
@ -105,6 +111,13 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
return TopScoreDocCollector.create(size, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
// Can be overridden by subclasses that have a different priority queue implementation
|
||||
// and need different memory sizes
|
||||
protected long getPriorityQueueSlotSize() {
|
||||
// Generic sentinel object
|
||||
return SENTINEL_SIZE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preCollection() throws IOException {
|
||||
deferred.preCollection();
|
||||
|
@ -122,29 +135,35 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
}
|
||||
|
||||
private void runDeferredAggs() throws IOException {
|
||||
List<ScoreDoc> allDocs = new ArrayList<>(shardSize);
|
||||
for (int i = 0; i < perBucketSamples.size(); i++) {
|
||||
PerParentBucketSamples perBucketSample = perBucketSamples.get(i);
|
||||
if (perBucketSample == null) {
|
||||
continue;
|
||||
}
|
||||
perBucketSample.getMatches(allDocs);
|
||||
}
|
||||
|
||||
// Sort the top matches by docID for the benefit of deferred collector
|
||||
ScoreDoc[] docsArr = allDocs.toArray(new ScoreDoc[allDocs.size()]);
|
||||
Arrays.sort(docsArr, (o1, o2) -> {
|
||||
if(o1.doc == o2.doc){
|
||||
return o1.shardIndex - o2.shardIndex;
|
||||
}
|
||||
return o1.doc - o2.doc;
|
||||
});
|
||||
// ScoreDoc is 12b ([float + int + int])
|
||||
circuitBreakerConsumer.accept(12L * shardSize);
|
||||
try {
|
||||
for (PerSegmentCollects perSegDocs : entries) {
|
||||
perSegDocs.replayRelatedMatches(docsArr);
|
||||
List<ScoreDoc> allDocs = new ArrayList<>(shardSize);
|
||||
for (int i = 0; i < perBucketSamples.size(); i++) {
|
||||
PerParentBucketSamples perBucketSample = perBucketSamples.get(i);
|
||||
if (perBucketSample == null) {
|
||||
continue;
|
||||
}
|
||||
perBucketSample.getMatches(allDocs);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("IOException collecting best scoring results", e);
|
||||
|
||||
// Sort the top matches by docID for the benefit of deferred collector
|
||||
allDocs.sort((o1, o2) -> {
|
||||
if (o1.doc == o2.doc) {
|
||||
return o1.shardIndex - o2.shardIndex;
|
||||
}
|
||||
return o1.doc - o2.doc;
|
||||
});
|
||||
try {
|
||||
for (PerSegmentCollects perSegDocs : entries) {
|
||||
perSegDocs.replayRelatedMatches(allDocs);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("IOException collecting best scoring results", e);
|
||||
}
|
||||
} finally {
|
||||
// done with allDocs now, reclaim some memory
|
||||
circuitBreakerConsumer.accept(-12L * shardSize);
|
||||
}
|
||||
deferred.postCollection();
|
||||
}
|
||||
|
@ -158,6 +177,10 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
PerParentBucketSamples(long parentBucket, Scorable scorer, LeafReaderContext readerContext) {
|
||||
try {
|
||||
this.parentBucket = parentBucket;
|
||||
|
||||
// Add to CB based on the size and the implementations per-doc overhead
|
||||
circuitBreakerConsumer.accept((long) shardSize * getPriorityQueueSlotSize());
|
||||
|
||||
tdc = createTopDocsCollector(shardSize);
|
||||
currentLeafCollector = tdc.getLeafCollector(readerContext);
|
||||
setScorer(scorer);
|
||||
|
@ -230,7 +253,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
}
|
||||
}
|
||||
|
||||
public void replayRelatedMatches(ScoreDoc[] sd) throws IOException {
|
||||
public void replayRelatedMatches(List<ScoreDoc> sd) throws IOException {
|
||||
final LeafBucketCollector leafCollector = deferred.getLeafCollector(readerContext);
|
||||
leafCollector.setScorer(this);
|
||||
|
||||
|
@ -251,7 +274,6 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
|
|||
leafCollector.collect(rebased, scoreDoc.shardIndex);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.search.internal.SearchContext;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Alternative, faster implementation for converting String keys to longs but
|
||||
|
@ -59,7 +60,7 @@ public class DiversifiedBytesHashSamplerAggregator extends SamplerAggregator {
|
|||
|
||||
@Override
|
||||
public DeferringBucketCollector getDeferringCollector() {
|
||||
bdd = new DiverseDocsDeferringCollector();
|
||||
bdd = new DiverseDocsDeferringCollector(this::addRequestCircuitBreakerBytes);
|
||||
return bdd;
|
||||
}
|
||||
|
||||
|
@ -70,14 +71,21 @@ public class DiversifiedBytesHashSamplerAggregator extends SamplerAggregator {
|
|||
*/
|
||||
class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
|
||||
|
||||
DiverseDocsDeferringCollector() {
|
||||
super(shardSize, context.bigArrays());
|
||||
DiverseDocsDeferringCollector(Consumer<Long> circuitBreakerConsumer) {
|
||||
super(shardSize, context.bigArrays(), circuitBreakerConsumer);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected TopDocsCollector<ScoreDocKey> createTopDocsCollector(int size) {
|
||||
return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue);
|
||||
// Make sure we do not allow size > maxDoc, to prevent accidental OOM
|
||||
int minMaxDocsPerValue = Math.min(maxDocsPerValue, context.searcher().getIndexReader().maxDoc());
|
||||
return new ValuesDiversifiedTopDocsCollector(size, minMaxDocsPerValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getPriorityQueueSlotSize() {
|
||||
return SCOREDOCKEY_SIZE;
|
||||
}
|
||||
|
||||
// This class extends the DiversifiedTopDocsCollector and provides
|
||||
|
@ -88,7 +96,6 @@ public class DiversifiedBytesHashSamplerAggregator extends SamplerAggregator {
|
|||
|
||||
ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerValue) {
|
||||
super(numHits, maxHitsPerValue);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.search.internal.SearchContext;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class DiversifiedMapSamplerAggregator extends SamplerAggregator {
|
||||
|
||||
|
@ -53,8 +54,8 @@ public class DiversifiedMapSamplerAggregator extends SamplerAggregator {
|
|||
super(name, shardSize, factories, context, parent, pipelineAggregators, metaData);
|
||||
this.valuesSource = valuesSource;
|
||||
this.maxDocsPerValue = maxDocsPerValue;
|
||||
bucketOrds = new BytesRefHash(shardSize, context.bigArrays());
|
||||
|
||||
// Need to use super class shardSize since it is limited to maxDoc
|
||||
bucketOrds = new BytesRefHash(this.shardSize, context.bigArrays());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -65,7 +66,7 @@ public class DiversifiedMapSamplerAggregator extends SamplerAggregator {
|
|||
|
||||
@Override
|
||||
public DeferringBucketCollector getDeferringCollector() {
|
||||
bdd = new DiverseDocsDeferringCollector();
|
||||
bdd = new DiverseDocsDeferringCollector(this::addRequestCircuitBreakerBytes);
|
||||
return bdd;
|
||||
}
|
||||
|
||||
|
@ -76,14 +77,20 @@ public class DiversifiedMapSamplerAggregator extends SamplerAggregator {
|
|||
*/
|
||||
class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
|
||||
|
||||
DiverseDocsDeferringCollector() {
|
||||
super(shardSize, context.bigArrays());
|
||||
DiverseDocsDeferringCollector(Consumer<Long> circuitBreakerConsumer) {
|
||||
super(shardSize, context.bigArrays(), circuitBreakerConsumer);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected TopDocsCollector<ScoreDocKey> createTopDocsCollector(int size) {
|
||||
return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue);
|
||||
// Make sure we do not allow size > maxDoc, to prevent accidental OOM
|
||||
int minMaxDocsPerValue = Math.min(maxDocsPerValue, context.searcher().getIndexReader().maxDoc());
|
||||
return new ValuesDiversifiedTopDocsCollector(size, minMaxDocsPerValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getPriorityQueueSlotSize() {
|
||||
return SCOREDOCKEY_SIZE;
|
||||
}
|
||||
|
||||
// This class extends the DiversifiedTopDocsCollector and provides
|
||||
|
@ -94,7 +101,6 @@ public class DiversifiedMapSamplerAggregator extends SamplerAggregator {
|
|||
|
||||
ValuesDiversifiedTopDocsCollector(int numHits, int maxHitsPerKey) {
|
||||
super(numHits, maxHitsPerKey);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.search.internal.SearchContext;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class DiversifiedNumericSamplerAggregator extends SamplerAggregator {
|
||||
|
||||
|
@ -53,7 +54,7 @@ public class DiversifiedNumericSamplerAggregator extends SamplerAggregator {
|
|||
|
||||
@Override
|
||||
public DeferringBucketCollector getDeferringCollector() {
|
||||
bdd = new DiverseDocsDeferringCollector();
|
||||
bdd = new DiverseDocsDeferringCollector(this::addRequestCircuitBreakerBytes);
|
||||
return bdd;
|
||||
}
|
||||
|
||||
|
@ -63,13 +64,20 @@ public class DiversifiedNumericSamplerAggregator extends SamplerAggregator {
|
|||
* This implementation is only for use with a single bucket aggregation.
|
||||
*/
|
||||
class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
|
||||
DiverseDocsDeferringCollector() {
|
||||
super(shardSize, context.bigArrays());
|
||||
DiverseDocsDeferringCollector(Consumer<Long> circuitBreakerConsumer) {
|
||||
super(shardSize, context.bigArrays(), circuitBreakerConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TopDocsCollector<ScoreDocKey> createTopDocsCollector(int size) {
|
||||
return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue);
|
||||
// Make sure we do not allow size > maxDoc, to prevent accidental OOM
|
||||
int minMaxDocsPerValue = Math.min(maxDocsPerValue, context.searcher().getIndexReader().maxDoc());
|
||||
return new ValuesDiversifiedTopDocsCollector(size, minMaxDocsPerValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getPriorityQueueSlotSize() {
|
||||
return SCOREDOCKEY_SIZE;
|
||||
}
|
||||
|
||||
// This class extends the DiversifiedTopDocsCollector and provides
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.search.internal.SearchContext;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class DiversifiedOrdinalsSamplerAggregator extends SamplerAggregator {
|
||||
|
||||
|
@ -54,7 +55,7 @@ public class DiversifiedOrdinalsSamplerAggregator extends SamplerAggregator {
|
|||
|
||||
@Override
|
||||
public DeferringBucketCollector getDeferringCollector() {
|
||||
bdd = new DiverseDocsDeferringCollector();
|
||||
bdd = new DiverseDocsDeferringCollector(this::addRequestCircuitBreakerBytes);
|
||||
return bdd;
|
||||
}
|
||||
|
||||
|
@ -65,13 +66,20 @@ public class DiversifiedOrdinalsSamplerAggregator extends SamplerAggregator {
|
|||
*/
|
||||
class DiverseDocsDeferringCollector extends BestDocsDeferringCollector {
|
||||
|
||||
DiverseDocsDeferringCollector() {
|
||||
super(shardSize, context.bigArrays());
|
||||
DiverseDocsDeferringCollector(Consumer<Long> circuitBreakerConsumer) {
|
||||
super(shardSize, context.bigArrays(), circuitBreakerConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TopDocsCollector<ScoreDocKey> createTopDocsCollector(int size) {
|
||||
return new ValuesDiversifiedTopDocsCollector(size, maxDocsPerValue);
|
||||
// Make sure we do not allow size > maxDoc, to prevent accidental OOM
|
||||
int minMaxDocsPerValue = Math.min(maxDocsPerValue, context.searcher().getIndexReader().maxDoc());
|
||||
return new ValuesDiversifiedTopDocsCollector(size, minMaxDocsPerValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long getPriorityQueueSlotSize() {
|
||||
return SCOREDOCKEY_SIZE;
|
||||
}
|
||||
|
||||
// This class extends the DiversifiedTopDocsCollector and provides
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
package org.elasticsearch.search.aggregations.bucket.sampler;
|
||||
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.DiversifiedTopDocsCollector;
|
||||
import org.apache.lucene.search.ScoreMode;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||
|
@ -56,6 +58,8 @@ public class SamplerAggregator extends DeferableBucketAggregator implements Sing
|
|||
public static final ParseField MAX_DOCS_PER_VALUE_FIELD = new ParseField("max_docs_per_value");
|
||||
public static final ParseField EXECUTION_HINT_FIELD = new ParseField("execution_hint");
|
||||
|
||||
static final long SCOREDOCKEY_SIZE = RamUsageEstimator.shallowSizeOfInstance(DiversifiedTopDocsCollector.ScoreDocKey.class);
|
||||
|
||||
public enum ExecutionMode {
|
||||
|
||||
MAP(new ParseField("map")) {
|
||||
|
@ -146,7 +150,8 @@ public class SamplerAggregator extends DeferableBucketAggregator implements Sing
|
|||
SamplerAggregator(String name, int shardSize, AggregatorFactories factories, SearchContext context,
|
||||
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
||||
super(name, factories, context, parent, pipelineAggregators, metaData);
|
||||
this.shardSize = shardSize;
|
||||
// Make sure we do not allow size > maxDoc, to prevent accidental OOM
|
||||
this.shardSize = Math.min(shardSize, context.searcher().getIndexReader().maxDoc());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -156,7 +161,7 @@ public class SamplerAggregator extends DeferableBucketAggregator implements Sing
|
|||
|
||||
@Override
|
||||
public DeferringBucketCollector getDeferringCollector() {
|
||||
bdd = new BestDocsDeferringCollector(shardSize, context.bigArrays());
|
||||
bdd = new BestDocsDeferringCollector(shardSize, context.bigArrays(), this::addRequestCircuitBreakerBytes);
|
||||
return bdd;
|
||||
}
|
||||
|
||||
|
|
|
@ -242,4 +242,29 @@ public class DiversifiedSamplerIT extends ESIntegTestCase {
|
|||
assertNull(authors);
|
||||
}
|
||||
|
||||
public void testRidiculousSizeDiversity() throws Exception {
|
||||
int MAX_DOCS_PER_AUTHOR = 1;
|
||||
DiversifiedAggregationBuilder sampleAgg = new DiversifiedAggregationBuilder("sample").shardSize(Integer.MAX_VALUE);
|
||||
sampleAgg.field("author").maxDocsPerValue(MAX_DOCS_PER_AUTHOR).executionHint(randomExecutionHint());
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
SearchResponse response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy"))
|
||||
.setFrom(0).setSize(60)
|
||||
.addAggregation(sampleAgg)
|
||||
.get();
|
||||
assertSearchResponse(response);
|
||||
|
||||
sampleAgg = new DiversifiedAggregationBuilder("sample").shardSize(100);
|
||||
sampleAgg.field("author").maxDocsPerValue(Integer.MAX_VALUE).executionHint(randomExecutionHint());
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
response = client().prepareSearch("test")
|
||||
.setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy"))
|
||||
.setFrom(0).setSize(60)
|
||||
.addAggregation(sampleAgg)
|
||||
.get();
|
||||
assertSearchResponse(response);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -178,4 +178,11 @@ public class SamplerIT extends ESIntegTestCase {
|
|||
assertThat(authors.getBuckets().size(), greaterThan(0));
|
||||
}
|
||||
|
||||
public void testRidiculousShardSizeSampler() throws Exception {
|
||||
SamplerAggregationBuilder sampleAgg = sampler("sample").shardSize(Integer.MAX_VALUE);
|
||||
sampleAgg.subAggregation(terms("authors").field("author"));
|
||||
SearchResponse response = client().prepareSearch("test").setSearchType(SearchType.QUERY_THEN_FETCH)
|
||||
.setQuery(new TermQueryBuilder("genre", "fantasy")).setFrom(0).setSize(60).addAggregation(sampleAgg).get();
|
||||
assertSearchResponse(response);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ import java.io.IOException;
|
|||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class BestDocsDeferringCollectorTests extends AggregatorTestCase {
|
||||
|
||||
|
@ -65,8 +66,10 @@ public class BestDocsDeferringCollectorTests extends AggregatorTestCase {
|
|||
TermQuery termQuery = new TermQuery(new Term("field", String.valueOf(randomInt(maxNumValues))));
|
||||
TopDocs topDocs = indexSearcher.search(termQuery, numDocs);
|
||||
|
||||
final AtomicLong bytes = new AtomicLong(0);
|
||||
|
||||
BestDocsDeferringCollector collector = new BestDocsDeferringCollector(numDocs,
|
||||
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()));
|
||||
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), bytes::addAndGet);
|
||||
Set<Integer> deferredCollectedDocIds = new HashSet<>();
|
||||
collector.setDeferredCollector(Collections.singleton(testCollector(deferredCollectedDocIds)));
|
||||
collector.preCollection();
|
||||
|
|
|
@ -47,6 +47,8 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilde
|
|||
import java.io.IOException;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class DiversifiedSamplerTests extends AggregatorTestCase {
|
||||
|
||||
public void testDiversifiedSampler() throws Exception {
|
||||
|
@ -117,8 +119,74 @@ public class DiversifiedSamplerTests extends AggregatorTestCase {
|
|||
directory.close();
|
||||
}
|
||||
|
||||
public void testRidiculousSize() throws Exception {
|
||||
String[] data = {
|
||||
// "id,cat,name,price,inStock,author_t,series_t,sequence_i,genre_s,genre_id",
|
||||
"0553573403,book,A Game of Thrones,7.99,true,George R.R. Martin,A Song of Ice and Fire,1,fantasy,0",
|
||||
"0553579908,book,A Clash of Kings,7.99,true,George R.R. Martin,A Song of Ice and Fire,2,fantasy,0",
|
||||
"055357342X,book,A Storm of Swords,7.99,true,George R.R. Martin,A Song of Ice and Fire,3,fantasy,0",
|
||||
"0553293354,book,Foundation,17.99,true,Isaac Asimov,Foundation Novels,1,scifi,1",
|
||||
"0812521390,book,The Black Company,6.99,false,Glen Cook,The Chronicles of The Black Company,1,fantasy,0",
|
||||
"0812550706,book,Ender's Game,6.99,true,Orson Scott Card,Ender,1,scifi,1",
|
||||
"0441385532,book,Jhereg,7.95,false,Steven Brust,Vlad Taltos,1,fantasy,0",
|
||||
"0380014300,book,Nine Princes In Amber,6.99,true,Roger Zelazny,the Chronicles of Amber,1,fantasy,0",
|
||||
"0805080481,book,The Book of Three,5.99,true,Lloyd Alexander,The Chronicles of Prydain,1,fantasy,0",
|
||||
"080508049X,book,The Black Cauldron,5.99,true,Lloyd Alexander,The Chronicles of Prydain,2,fantasy,0"
|
||||
};
|
||||
|
||||
Directory directory = newDirectory();
|
||||
RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory);
|
||||
for (String entry : data) {
|
||||
String[] parts = entry.split(",");
|
||||
Document document = new Document();
|
||||
document.add(new SortedDocValuesField("id", new BytesRef(parts[0])));
|
||||
document.add(new StringField("cat", parts[1], Field.Store.NO));
|
||||
document.add(new TextField("name", parts[2], Field.Store.NO));
|
||||
document.add(new DoubleDocValuesField("price", Double.valueOf(parts[3])));
|
||||
document.add(new StringField("inStock", parts[4], Field.Store.NO));
|
||||
document.add(new StringField("author", parts[5], Field.Store.NO));
|
||||
document.add(new StringField("series", parts[6], Field.Store.NO));
|
||||
document.add(new StringField("sequence", parts[7], Field.Store.NO));
|
||||
document.add(new SortedDocValuesField("genre", new BytesRef(parts[8])));
|
||||
document.add(new NumericDocValuesField("genre_id", Long.valueOf(parts[9])));
|
||||
indexWriter.addDocument(document);
|
||||
}
|
||||
|
||||
indexWriter.close();
|
||||
IndexReader indexReader = DirectoryReader.open(directory);
|
||||
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
||||
|
||||
MappedFieldType genreFieldType = new KeywordFieldMapper.KeywordFieldType();
|
||||
genreFieldType.setName("genre");
|
||||
genreFieldType.setHasDocValues(true);
|
||||
Consumer<InternalSampler> verify = result -> {
|
||||
Terms terms = result.getAggregations().get("terms");
|
||||
assertThat(terms.getBuckets().size(), greaterThan(0));
|
||||
};
|
||||
|
||||
try {
|
||||
// huge shard_size
|
||||
testCase(indexSearcher, genreFieldType, "map", verify, Integer.MAX_VALUE, 1);
|
||||
testCase(indexSearcher, genreFieldType, "global_ordinals", verify, Integer.MAX_VALUE, 1);
|
||||
testCase(indexSearcher, genreFieldType, "bytes_hash", verify, Integer.MAX_VALUE, 1);
|
||||
|
||||
// huge maxDocsPerValue
|
||||
testCase(indexSearcher, genreFieldType, "map", verify, 100, Integer.MAX_VALUE);
|
||||
testCase(indexSearcher, genreFieldType, "global_ordinals", verify, 100, Integer.MAX_VALUE);
|
||||
testCase(indexSearcher, genreFieldType, "bytes_hash", verify, 100, Integer.MAX_VALUE);
|
||||
} finally {
|
||||
indexReader.close();
|
||||
directory.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void testCase(IndexSearcher indexSearcher, MappedFieldType genreFieldType, String executionHint,
|
||||
Consumer<InternalSampler> verify) throws IOException {
|
||||
testCase(indexSearcher, genreFieldType, executionHint, verify, 100, 1);
|
||||
}
|
||||
|
||||
private void testCase(IndexSearcher indexSearcher, MappedFieldType genreFieldType, String executionHint,
|
||||
Consumer<InternalSampler> verify, int shardSize, int maxDocsPerValue) throws IOException {
|
||||
MappedFieldType idFieldType = new KeywordFieldMapper.KeywordFieldType();
|
||||
idFieldType.setName("id");
|
||||
idFieldType.setHasDocValues(true);
|
||||
|
@ -131,6 +199,8 @@ public class DiversifiedSamplerTests extends AggregatorTestCase {
|
|||
DiversifiedAggregationBuilder builder = new DiversifiedAggregationBuilder("_name")
|
||||
.field(genreFieldType.name())
|
||||
.executionHint(executionHint)
|
||||
.maxDocsPerValue(maxDocsPerValue)
|
||||
.shardSize(shardSize)
|
||||
.subAggregation(new TermsAggregationBuilder("terms", null).field("id"));
|
||||
|
||||
InternalSampler result = search(indexSearcher, query, builder, genreFieldType, idFieldType);
|
||||
|
|
|
@ -84,4 +84,43 @@ public class SamplerAggregatorTests extends AggregatorTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testRidiculousSize() throws IOException {
|
||||
TextFieldType textFieldType = new TextFieldType();
|
||||
textFieldType.setIndexAnalyzer(new NamedAnalyzer("foo", AnalyzerScope.GLOBAL, new StandardAnalyzer()));
|
||||
MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
|
||||
numericFieldType.setName("int");
|
||||
|
||||
IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
|
||||
indexWriterConfig.setMaxBufferedDocs(100);
|
||||
indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment with predictable docIds
|
||||
try (Directory dir = newDirectory();
|
||||
IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
|
||||
for (long value : new long[] {7, 3, -10, -6, 5, 50}) {
|
||||
Document doc = new Document();
|
||||
StringBuilder text = new StringBuilder();
|
||||
for (int i = 0; i < value; i++) {
|
||||
text.append("good ");
|
||||
}
|
||||
doc.add(new Field("text", text.toString(), textFieldType));
|
||||
doc.add(new SortedNumericDocValuesField("int", value));
|
||||
w.addDocument(doc);
|
||||
}
|
||||
|
||||
// Test with an outrageously large size to ensure that the maxDoc protection works
|
||||
SamplerAggregationBuilder aggBuilder = new SamplerAggregationBuilder("sampler")
|
||||
.shardSize(Integer.MAX_VALUE)
|
||||
.subAggregation(new MinAggregationBuilder("min")
|
||||
.field("int"));
|
||||
try (IndexReader reader = DirectoryReader.open(w)) {
|
||||
assertEquals("test expects a single segment", 1, reader.leaves().size());
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
InternalSampler sampler = searchAndReduce(searcher, new TermQuery(new Term("text", "good")), aggBuilder, textFieldType,
|
||||
numericFieldType);
|
||||
Min min = sampler.getAggregations().get("min");
|
||||
assertEquals(3.0, min.getValue(), 0);
|
||||
assertTrue(AggregationInspectionHelper.hasValue(sampler));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue