Fix concurrent issue in SearchPhaseController (#49829)

The list used by the search progress listener can be nullified
by another thread that reports a query result. This change replaces
the usage of this list with a new array that is synchronously modified.

Closes #49778
This commit is contained in:
Jim Ferenczi 2019-12-04 16:50:15 +01:00 committed by jimczi
parent 04e99ff1ee
commit 495762486d
2 changed files with 15 additions and 1 deletions

View File

@ -39,6 +39,7 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -564,6 +565,7 @@ public final class SearchPhaseController {
* iff the buffer is exhausted.
*/
static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhaseResult> {
private final SearchShardTarget[] processedShards;
private final InternalAggregations[] aggsBuffer;
private final TopDocs[] topDocsBuffer;
private final boolean hasAggs;
@ -600,6 +602,7 @@ public final class SearchPhaseController {
}
this.controller = controller;
this.progressListener = progressListener;
this.processedShards = new SearchShardTarget[expectedResultSize];
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
@ -636,7 +639,7 @@ public final class SearchPhaseController {
numReducePhases++;
index = 1;
if (hasAggs) {
progressListener.notifyPartialReduce(progressListener.searchShards(results.asList()),
progressListener.notifyPartialReduce(progressListener.searchShards(processedShards),
topDocsStats.getTotalHits(), aggsBuffer[0], numReducePhases);
}
}
@ -650,6 +653,7 @@ public final class SearchPhaseController {
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}
private synchronized List<InternalAggregations> getRemainingAggs() {

View File

@ -25,8 +25,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregations;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -171,6 +173,14 @@ abstract class SearchProgressListener {
return Collections.unmodifiableList(lst);
}
final List<SearchShard> searchShards(SearchShardTarget[] results) {
List<SearchShard> lst = Arrays.stream(results)
.filter(Objects::nonNull)
.map(e -> new SearchShard(e.getClusterAlias(), e.getShardId()))
.collect(Collectors.toList());
return Collections.unmodifiableList(lst);
}
final List<SearchShard> searchShards(GroupShardsIterator<SearchShardIterator> its) {
List<SearchShard> lst = StreamSupport.stream(its.spliterator(), false)
.map(e -> new SearchShard(e.getClusterAlias(), e.shardId()))