[SEARCH] Execute search reduce phase on the search threadpool
Reduce Phases can be expensive and some of them like the aggregations reduce phase might even execute a one-off call via an internal client that might cause a deadlock due to execution on the network thread that is needed to handle the one-off call. This commit dispatches the reduce phase to the search threadpool to ensure we don't wait for the current thread to be available. Closes #7623
This commit is contained in:
parent
a6e0f02220
commit
aadbfa44b4
|
@ -29,6 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.search.action.SearchServiceListener;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
|
@ -119,21 +120,12 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
|
|||
}
|
||||
}
|
||||
|
||||
void finishHim() {
|
||||
private void finishHim() {
|
||||
try {
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
innerFinishHim();
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
} finally {
|
||||
//
|
||||
}
|
||||
}
|
||||
|
||||
void innerFinishHim() throws Exception {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, queryFetchResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
|
||||
|
@ -142,6 +134,19 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
|
|||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.action.SearchServiceListener;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
|
@ -191,9 +192,18 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
|
|||
}
|
||||
}
|
||||
|
||||
void finishHim() {
|
||||
private void finishHim() {
|
||||
try {
|
||||
innerFinishHim();
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -204,14 +214,15 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
|
|||
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
try {
|
||||
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
|
||||
} finally {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
|
||||
void innerFinishHim() throws Exception {
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterService;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.search.action.SearchServiceListener;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
|
@ -36,8 +37,6 @@ import org.elasticsearch.search.internal.InternalSearchResponse;
|
|||
import org.elasticsearch.search.internal.ShardSearchRequest;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
|
||||
|
||||
/**
|
||||
|
@ -75,17 +74,10 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
|
|||
@Override
|
||||
protected void moveToSecondPhase() throws Exception {
|
||||
try {
|
||||
innerFinishHim();
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
}
|
||||
|
||||
private void innerFinishHim() throws IOException {
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
boolean useScroll = !useSlowScroll && request.scroll() != null;
|
||||
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
|
||||
|
@ -94,6 +86,18 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
|
|||
scrollId = buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("merge", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("failed to reduce search", failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.carrotsearch.hppc.IntArrayList;
|
|||
import org.apache.lucene.search.ScoreDoc;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.search.ReduceSearchPhaseException;
|
||||
import org.elasticsearch.action.search.SearchPhaseExecutionException;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
|
@ -31,6 +32,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.action.SearchServiceListener;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
|
@ -135,9 +137,18 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
|
|||
}
|
||||
}
|
||||
|
||||
void finishHim() {
|
||||
private void finishHim() {
|
||||
try {
|
||||
innerFinishHim();
|
||||
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
} catch (Throwable e) {
|
||||
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures());
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
@ -148,14 +159,14 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
|
|||
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
|
||||
}
|
||||
}
|
||||
|
||||
void innerFinishHim() throws Exception {
|
||||
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
|
||||
String scrollId = null;
|
||||
if (request.scroll() != null) {
|
||||
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
|
||||
}
|
||||
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
|
||||
});
|
||||
} catch (EsRejectedExecutionException ex) {
|
||||
try {
|
||||
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
|
||||
} finally {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -34,13 +33,9 @@ import org.elasticsearch.common.io.stream.Streamable;
|
|||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsException;
|
||||
import org.elasticsearch.common.unit.SizeUnit;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
|
||||
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
|
||||
import org.elasticsearch.common.util.concurrent.*;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.search.SearchType;
|
|||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
|
@ -102,7 +103,7 @@ public class RejectionActionTests extends ElasticsearchIntegrationTest {
|
|||
for (ShardSearchFailure failure : e.shardFailures()) {
|
||||
assertTrue("got unexpected reason..." + failure.reason(), failure.reason().toLowerCase(Locale.ENGLISH).contains("rejected"));
|
||||
}
|
||||
} else {
|
||||
} else if ((unwrap instanceof EsRejectedExecutionException) == false) {
|
||||
throw new AssertionError("unexpected failure", (Throwable) response);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue