aggs: Changed how top_hits initialises leaf collectors
Both TopDocsCollector and LeafCollector were being kept around at the aggregator level. In case the nested aggregator would do a post collection then this could cause pushing down docids to top hits child aggregators that already moved the next LeafCollector (causing assertions to trip and incorrect results). By keeping track of the LeafCollector in a seperate map at the leaf bucket level this problem can simply not happen any more as the place holding LeafCollector is no longer shared. Also LeafCollector instances for TopDocsCollectors are no longer pre-created as the beginning a new segment is evaluated. There is no guarantee that TopHitsAggregator encounters a document for a particular bucket and there has to be logic to create LeafCollector instances which have not been seen before. Closes #26738
This commit is contained in:
parent
5b711c283d
commit
a056c5d469
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.metrics.tophits;
|
||||
|
||||
import com.carrotsearch.hppc.LongObjectHashMap;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.LeafCollector;
|
||||
|
@ -54,21 +55,11 @@ import java.util.Map;
|
|||
|
||||
public class TopHitsAggregator extends MetricsAggregator {
|
||||
|
||||
/** Simple wrapper around a top-level collector and the current leaf collector. */
|
||||
private static class TopDocsAndLeafCollector {
|
||||
final TopDocsCollector<?> topLevelCollector;
|
||||
LeafCollector leafCollector;
|
||||
private final FetchPhase fetchPhase;
|
||||
private final SubSearchContext subSearchContext;
|
||||
private final LongObjectPagedHashMap<TopDocsCollector<?>> topDocsCollectors;
|
||||
|
||||
TopDocsAndLeafCollector(TopDocsCollector<?> topLevelCollector) {
|
||||
this.topLevelCollector = topLevelCollector;
|
||||
}
|
||||
}
|
||||
|
||||
final FetchPhase fetchPhase;
|
||||
final SubSearchContext subSearchContext;
|
||||
final LongObjectPagedHashMap<TopDocsAndLeafCollector> topDocsCollectors;
|
||||
|
||||
public TopHitsAggregator(FetchPhase fetchPhase, SubSearchContext subSearchContext, String name, SearchContext context,
|
||||
TopHitsAggregator(FetchPhase fetchPhase, SubSearchContext subSearchContext, String name, SearchContext context,
|
||||
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
||||
super(name, context, parent, pipelineAggregators, metaData);
|
||||
this.fetchPhase = fetchPhase;
|
||||
|
@ -88,9 +79,12 @@ public class TopHitsAggregator extends MetricsAggregator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx,
|
||||
final LeafBucketCollector sub) throws IOException {
|
||||
|
||||
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
|
||||
// Create leaf collectors here instead of at the aggregator level. Otherwise in case this collector get invoked
|
||||
// when post collecting then we have already replaced the leaf readers on the aggregator level have already been
|
||||
// replaced with the next leaf readers and then post collection pushes docids of the previous segement, which
|
||||
// then causes assertions to trip or incorrect top docs to be computed.
|
||||
final LongObjectHashMap<LeafCollector> leafCollectors = new LongObjectHashMap<>(1);
|
||||
return new LeafBucketCollectorBase(sub, null) {
|
||||
|
||||
Scorer scorer;
|
||||
|
@ -98,21 +92,13 @@ public class TopHitsAggregator extends MetricsAggregator {
|
|||
@Override
|
||||
public void setScorer(Scorer scorer) throws IOException {
|
||||
this.scorer = scorer;
|
||||
for (LongObjectPagedHashMap.Cursor<TopDocsAndLeafCollector> cursor : topDocsCollectors) {
|
||||
// Instantiate the leaf collector not in the getLeafCollector(...) method or in the constructor of this
|
||||
// anonymous class. Otherwise in the case this leaf bucket collector gets invoked with post collection
|
||||
// then we already have moved on to the next reader and then we may encounter assertion errors or
|
||||
// incorrect results.
|
||||
cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx);
|
||||
cursor.value.leafCollector.setScorer(scorer);
|
||||
}
|
||||
super.setScorer(scorer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void collect(int docId, long bucket) throws IOException {
|
||||
TopDocsAndLeafCollector collectors = topDocsCollectors.get(bucket);
|
||||
if (collectors == null) {
|
||||
TopDocsCollector<?> topDocsCollector = topDocsCollectors.get(bucket);
|
||||
if (topDocsCollector == null) {
|
||||
SortAndFormats sort = subSearchContext.sort();
|
||||
int topN = subSearchContext.from() + subSearchContext.size();
|
||||
if (sort == null) {
|
||||
|
@ -123,31 +109,39 @@ public class TopHitsAggregator extends MetricsAggregator {
|
|||
// In the QueryPhase we don't need this protection, because it is build into the IndexSearcher,
|
||||
// but here we create collectors ourselves and we need prevent OOM because of crazy an offset and size.
|
||||
topN = Math.min(topN, subSearchContext.searcher().getIndexReader().maxDoc());
|
||||
TopDocsCollector<?> topLevelCollector;
|
||||
if (sort == null) {
|
||||
topLevelCollector = TopScoreDocCollector.create(topN);
|
||||
topDocsCollector = TopScoreDocCollector.create(topN);
|
||||
} else {
|
||||
topLevelCollector = TopFieldCollector.create(sort.sort, topN, true, subSearchContext.trackScores(),
|
||||
subSearchContext.trackScores());
|
||||
topDocsCollector = TopFieldCollector.create(sort.sort, topN, true, subSearchContext.trackScores(),
|
||||
subSearchContext.trackScores());
|
||||
}
|
||||
collectors = new TopDocsAndLeafCollector(topLevelCollector);
|
||||
collectors.leafCollector = collectors.topLevelCollector.getLeafCollector(ctx);
|
||||
collectors.leafCollector.setScorer(scorer);
|
||||
topDocsCollectors.put(bucket, collectors);
|
||||
topDocsCollectors.put(bucket, topDocsCollector);
|
||||
}
|
||||
collectors.leafCollector.collect(docId);
|
||||
|
||||
final LeafCollector leafCollector;
|
||||
final int key = leafCollectors.indexOf(bucket);
|
||||
if (key < 0) {
|
||||
leafCollector = topDocsCollector.getLeafCollector(ctx);
|
||||
if (scorer != null) {
|
||||
leafCollector.setScorer(scorer);
|
||||
}
|
||||
leafCollectors.indexInsert(key, bucket, leafCollector);
|
||||
} else {
|
||||
leafCollector = leafCollectors.indexGet(key);
|
||||
}
|
||||
leafCollector.collect(docId);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
|
||||
TopDocsAndLeafCollector topDocsCollector = topDocsCollectors.get(owningBucketOrdinal);
|
||||
TopDocsCollector<?> topDocsCollector = topDocsCollectors.get(owningBucketOrdinal);
|
||||
final InternalTopHits topHits;
|
||||
if (topDocsCollector == null) {
|
||||
topHits = buildEmptyAggregation();
|
||||
} else {
|
||||
TopDocs topDocs = topDocsCollector.topLevelCollector.topDocs();
|
||||
TopDocs topDocs = topDocsCollector.topDocs();
|
||||
if (subSearchContext.sort() == null) {
|
||||
for (RescoreContext ctx : context().rescore()) {
|
||||
try {
|
||||
|
|
|
@ -51,7 +51,7 @@ public class TopHitsAggregatorFactory extends AggregatorFactory<TopHitsAggregato
|
|||
private final List<ScriptFieldsContext.ScriptField> scriptFields;
|
||||
private final FetchSourceContext fetchSourceContext;
|
||||
|
||||
public TopHitsAggregatorFactory(String name, int from, int size, boolean explain, boolean version, boolean trackScores,
|
||||
TopHitsAggregatorFactory(String name, int from, int size, boolean explain, boolean version, boolean trackScores,
|
||||
Optional<SortAndFormats> sort, HighlightBuilder highlightBuilder, StoredFieldsContext storedFieldsContext,
|
||||
List<String> docValueFields, List<ScriptFieldsContext.ScriptField> scriptFields, FetchSourceContext fetchSourceContext,
|
||||
SearchContext context, AggregatorFactory<?> parent, AggregatorFactories.Builder subFactories, Map<String, Object> metaData)
|
||||
|
|
|
@ -703,7 +703,6 @@ public class TopHitsIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/26738")
|
||||
public void testTopHitsInNestedSimple() throws Exception {
|
||||
SearchResponse searchResponse = client().prepareSearch("articles")
|
||||
.setQuery(matchQuery("title", "title"))
|
||||
|
|
Loading…
Reference in New Issue