Fork off a search thread before sending back fetched responses

This is just a temporary fix until #23048 is fixed. FieldCollapsing
is executing blocking calls on a network thread which causes potential deadlocks
and trips assertions.

Relates to #23048
This commit is contained in:
Simon Willnauer 2017-02-08 15:27:08 +01:00
parent ecb01c15b9
commit d45761e488
1 changed files with 19 additions and 7 deletions

View File

@ -558,13 +558,25 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
private void sendResponse(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs, private void sendResponse(SearchPhaseController searchPhaseController, ScoreDoc[] sortedDocs,
String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase, String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) { AtomicArray<? extends QuerySearchResultProvider> fetchResultsArr) {
// this is only a temporary fix since field collapsing executes a blocking call on response
// which could be a network thread. we are fixing this but for now we just fork off again.
// this should be removed once https://github.com/elastic/elasticsearch/issues/23048 is fixed
getExecutor().execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final boolean isScrollRequest = request.scroll() != null; final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, reducedQueryPhase, final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedDocs, reducedQueryPhase,
fetchResultsArr); fetchResultsArr);
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures())); buildTookInMillis(), buildShardFailures()));
} }
@Override
public void onFailure(Exception e) {
raisePhaseFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()));
}
});
} }
}
} }