Merge pull request #16880 from javanna/enhancement/rename_search_service_transport_action
Rename SearchServiceTransportAction to SearchTransportService
This commit is contained in:
commit
d81bfe6f30
|
@ -746,7 +746,6 @@
|
|||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]MultiValueMode.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]SearchModule.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]SearchService.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]action[/\\]SearchServiceTransportAction.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]AggregatorFactories.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]AggregatorFactory.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]InternalAggregation.java" checks="LineLength" />
|
||||
|
|
|
@ -39,7 +39,7 @@ import org.elasticsearch.common.logging.ESLogger;
|
|||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
|
@ -58,7 +58,7 @@ import static org.elasticsearch.action.search.TransportSearchHelper.internalSear
|
|||
abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
|
||||
|
||||
protected final ESLogger logger;
|
||||
protected final SearchServiceTransportAction searchService;
|
||||
protected final SearchTransportService searchTransportService;
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
protected final SearchPhaseController searchPhaseController;
|
||||
protected final ThreadPool threadPool;
|
||||
|
@ -76,12 +76,12 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
|||
private final Object shardFailuresMutex = new Object();
|
||||
protected volatile ScoreDoc[] sortedShardList;
|
||||
|
||||
protected AbstractSearchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, ClusterService clusterService,
|
||||
protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request,
|
||||
ActionListener<SearchResponse> listener) {
|
||||
this.logger = logger;
|
||||
this.searchService = searchService;
|
||||
this.searchTransportService = searchTransportService;
|
||||
this.indexNameExpressionResolver = indexNameExpressionResolver;
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
this.threadPool = threadPool;
|
||||
|
@ -332,7 +332,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
|
|||
|
||||
protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {
|
||||
if (node != null) {
|
||||
searchService.sendFreeContext(node, contextId, request);
|
||||
searchTransportService.sendFreeContext(node, contextId, request);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.search.dfs.AggregatedDfs;
|
||||
import org.elasticsearch.search.dfs.DfsSearchResult;
|
||||
|
@ -43,11 +43,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
|
|||
|
||||
private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
|
||||
|
||||
SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
|
||||
SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService,
|
||||
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
SearchPhaseController searchPhaseController, ThreadPool threadPool,
|
||||
SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
|
||||
super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
|
||||
request, listener);
|
||||
queryFetchResults = new AtomicArray<>(firstResults.length());
|
||||
}
|
||||
|
||||
|
@ -59,7 +60,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
|
|||
@Override
|
||||
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
|
||||
ActionListener<DfsSearchResult> listener) {
|
||||
searchService.sendExecuteDfs(node, request, listener);
|
||||
searchTransportService.sendExecuteDfs(node, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,7 +78,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
|
|||
|
||||
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
|
||||
final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
|
||||
searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
|
||||
searchTransportService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
|
||||
@Override
|
||||
public void onResponse(QueryFetchSearchResult result) {
|
||||
result.shardTarget(dfsResult.shardTarget());
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.search.dfs.AggregatedDfs;
|
||||
import org.elasticsearch.search.dfs.DfsSearchResult;
|
||||
|
@ -50,11 +50,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
|
|||
final AtomicArray<FetchSearchResult> fetchResults;
|
||||
final AtomicArray<IntArrayList> docIdsToLoad;
|
||||
|
||||
SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
|
||||
SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService,
|
||||
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
SearchPhaseController searchPhaseController, ThreadPool threadPool,
|
||||
SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
|
||||
super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
|
||||
request, listener);
|
||||
queryResults = new AtomicArray<>(firstResults.length());
|
||||
fetchResults = new AtomicArray<>(firstResults.length());
|
||||
docIdsToLoad = new AtomicArray<>(firstResults.length());
|
||||
|
@ -68,7 +69,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
|
|||
@Override
|
||||
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
|
||||
ActionListener<DfsSearchResult> listener) {
|
||||
searchService.sendExecuteDfs(node, request, listener);
|
||||
searchTransportService.sendExecuteDfs(node, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -85,7 +86,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
|
|||
|
||||
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
|
||||
final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
|
||||
searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
|
||||
searchTransportService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
|
||||
@Override
|
||||
public void onResponse(QuerySearchResult result) {
|
||||
result.shardTarget(dfsResult.shardTarget());
|
||||
|
@ -157,7 +158,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
|
|||
|
||||
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
|
||||
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
|
||||
searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
|
||||
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
|
||||
@Override
|
||||
public void onResponse(FetchSearchResult result) {
|
||||
result.shardTarget(shardTarget);
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterService;
|
|||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
|
||||
import org.elasticsearch.search.internal.InternalSearchResponse;
|
||||
|
@ -36,11 +36,12 @@ import java.io.IOException;
|
|||
|
||||
class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetchSearchResult> {
|
||||
|
||||
SearchQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
|
||||
SearchQueryAndFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService,
|
||||
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
SearchPhaseController searchPhaseController, ThreadPool threadPool,
|
||||
SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
|
||||
super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
|
||||
request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -51,7 +52,7 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
|
|||
@Override
|
||||
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
|
||||
ActionListener<QueryFetchSearchResult> listener) {
|
||||
searchService.sendExecuteFetch(node, request, listener);
|
||||
searchTransportService.sendExecuteFetch(node, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
|
||||
|
@ -46,7 +46,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
|
|||
final AtomicArray<FetchSearchResult> fetchResults;
|
||||
final AtomicArray<IntArrayList> docIdsToLoad;
|
||||
|
||||
SearchQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
|
||||
SearchQueryThenFetchAsyncAction(ESLogger logger, SearchTransportService searchService,
|
||||
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
SearchPhaseController searchPhaseController, ThreadPool threadPool,
|
||||
SearchRequest request, ActionListener<SearchResponse> listener) {
|
||||
|
@ -63,7 +63,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
|
|||
@Override
|
||||
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
|
||||
ActionListener<QuerySearchResultProvider> listener) {
|
||||
searchService.sendExecuteQuery(node, request, listener);
|
||||
searchTransportService.sendExecuteQuery(node, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,7 +91,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
|
|||
|
||||
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
|
||||
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
|
||||
searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
|
||||
searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
|
||||
@Override
|
||||
public void onResponse(FetchSearchResult result) {
|
||||
result.shardTarget(shardTarget);
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
|
||||
|
@ -42,7 +42,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
|
|||
|
||||
private final ESLogger logger;
|
||||
private final SearchPhaseController searchPhaseController;
|
||||
private final SearchServiceTransportAction searchService;
|
||||
private final SearchTransportService searchTransportService;
|
||||
private final SearchScrollRequest request;
|
||||
private final ActionListener<SearchResponse> listener;
|
||||
private final ParsedScrollId scrollId;
|
||||
|
@ -53,11 +53,11 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
|
|||
private final AtomicInteger counter;
|
||||
|
||||
SearchScrollQueryAndFetchAsyncAction(ESLogger logger, ClusterService clusterService,
|
||||
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
|
||||
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
|
||||
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
|
||||
this.logger = logger;
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
this.searchService = searchService;
|
||||
this.searchTransportService = searchTransportService;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
this.scrollId = scrollId;
|
||||
|
@ -128,7 +128,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
|
|||
|
||||
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
|
||||
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
|
||||
searchService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
|
||||
searchTransportService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
|
||||
@Override
|
||||
public void onResponse(ScrollQueryFetchSearchResult result) {
|
||||
queryFetchResults.set(shardIndex, result.result());
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.search.fetch.FetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.ShardFetchRequest;
|
||||
|
@ -44,7 +44,7 @@ import static org.elasticsearch.action.search.TransportSearchHelper.internalScro
|
|||
class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final SearchServiceTransportAction searchService;
|
||||
private final SearchTransportService searchTransportService;
|
||||
private final SearchPhaseController searchPhaseController;
|
||||
private final SearchScrollRequest request;
|
||||
private final ActionListener<SearchResponse> listener;
|
||||
|
@ -57,10 +57,10 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
|
|||
private final AtomicInteger successfulOps;
|
||||
|
||||
SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService,
|
||||
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController,
|
||||
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
|
||||
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
|
||||
this.logger = logger;
|
||||
this.searchService = searchService;
|
||||
this.searchTransportService = searchTransportService;
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
@ -124,7 +124,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
|
|||
|
||||
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
|
||||
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
|
||||
searchService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
|
||||
searchTransportService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
|
||||
@Override
|
||||
public void onResponse(ScrollQuerySearchResult result) {
|
||||
queryResults.set(shardIndex, result.queryResult());
|
||||
|
@ -182,7 +182,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
|
|||
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
|
||||
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
|
||||
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
|
||||
searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
|
||||
searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
|
||||
@Override
|
||||
public void onResponse(FetchSearchResult result) {
|
||||
result.shardTarget(querySearchResult.shardTarget());
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -47,15 +47,15 @@ import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollI
|
|||
public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final SearchServiceTransportAction searchServiceTransportAction;
|
||||
private final SearchTransportService searchTransportService;
|
||||
|
||||
@Inject
|
||||
public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool,
|
||||
ClusterService clusterService, SearchServiceTransportAction searchServiceTransportAction,
|
||||
ClusterService clusterService, SearchTransportService searchTransportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new);
|
||||
this.clusterService = clusterService;
|
||||
this.searchServiceTransportAction = searchServiceTransportAction;
|
||||
this.searchTransportService = searchTransportService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -64,10 +64,8 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
|
|||
}
|
||||
|
||||
private class Async {
|
||||
|
||||
final DiscoveryNodes nodes;
|
||||
final CountDown expectedOps;
|
||||
final ClearScrollRequest request;
|
||||
final List<ScrollIdForNode[]> contexts = new ArrayList<>();
|
||||
final ActionListener<ClearScrollResponse> listener;
|
||||
final AtomicReference<Throwable> expHolder;
|
||||
|
@ -85,8 +83,6 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
|
|||
this.contexts.add(context);
|
||||
}
|
||||
}
|
||||
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
this.expHolder = new AtomicReference<>();
|
||||
this.expectedOps = new CountDown(expectedOps);
|
||||
|
@ -100,7 +96,7 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
|
|||
|
||||
if (contexts.isEmpty()) {
|
||||
for (final DiscoveryNode node : nodes) {
|
||||
searchServiceTransportAction.sendClearAllScrollContexts(node, request, new ActionListener<TransportResponse>() {
|
||||
searchTransportService.sendClearAllScrollContexts(node, new ActionListener<TransportResponse>() {
|
||||
@Override
|
||||
public void onResponse(TransportResponse response) {
|
||||
onFreedContext(true);
|
||||
|
@ -121,9 +117,9 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
|
|||
continue;
|
||||
}
|
||||
|
||||
searchServiceTransportAction.sendFreeContext(node, target.getScrollId(), request, new ActionListener<SearchServiceTransportAction.SearchFreeContextResponse>() {
|
||||
searchTransportService.sendFreeContext(node, target.getScrollId(), new ActionListener<SearchTransportService.SearchFreeContextResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchServiceTransportAction.SearchFreeContextResponse freed) {
|
||||
public void onResponse(SearchTransportService.SearchFreeContextResponse freed) {
|
||||
onFreedContext(freed.isFreed());
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -45,17 +45,17 @@ import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH;
|
|||
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final SearchServiceTransportAction searchService;
|
||||
private final SearchTransportService searchTransportService;
|
||||
private final SearchPhaseController searchPhaseController;
|
||||
|
||||
@Inject
|
||||
public TransportSearchAction(Settings settings, ThreadPool threadPool, SearchPhaseController searchPhaseController,
|
||||
TransportService transportService, SearchServiceTransportAction searchService,
|
||||
TransportService transportService, SearchTransportService searchTransportService,
|
||||
ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver
|
||||
indexNameExpressionResolver) {
|
||||
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
this.searchService = searchService;
|
||||
this.searchTransportService = searchTransportService;
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
|
@ -81,19 +81,19 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
|
|||
AbstractSearchAsyncAction searchAsyncAction;
|
||||
switch(searchRequest.searchType()) {
|
||||
case DFS_QUERY_THEN_FETCH:
|
||||
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchService, clusterService,
|
||||
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, clusterService,
|
||||
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
|
||||
break;
|
||||
case QUERY_THEN_FETCH:
|
||||
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService,
|
||||
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, clusterService,
|
||||
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
|
||||
break;
|
||||
case DFS_QUERY_AND_FETCH:
|
||||
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchService, clusterService,
|
||||
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, clusterService,
|
||||
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
|
||||
break;
|
||||
case QUERY_AND_FETCH:
|
||||
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchService, clusterService,
|
||||
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, clusterService,
|
||||
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
|
||||
break;
|
||||
default:
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.cluster.ClusterService;
|
|||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -41,18 +41,18 @@ import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollI
|
|||
public class TransportSearchScrollAction extends HandledTransportAction<SearchScrollRequest, SearchResponse> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final SearchServiceTransportAction searchService;
|
||||
private final SearchTransportService searchTransportService;
|
||||
private final SearchPhaseController searchPhaseController;
|
||||
|
||||
@Inject
|
||||
public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService,
|
||||
ClusterService clusterService, SearchServiceTransportAction searchService,
|
||||
ClusterService clusterService, SearchTransportService searchTransportService,
|
||||
SearchPhaseController searchPhaseController,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
|
||||
SearchScrollRequest::new);
|
||||
this.clusterService = clusterService;
|
||||
this.searchService = searchService;
|
||||
this.searchTransportService = searchTransportService;
|
||||
this.searchPhaseController = searchPhaseController;
|
||||
}
|
||||
|
||||
|
@ -63,11 +63,11 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
|
|||
AbstractAsyncAction action;
|
||||
switch (scrollId.getType()) {
|
||||
case QUERY_THEN_FETCH_TYPE:
|
||||
action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchService,
|
||||
action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchTransportService,
|
||||
searchPhaseController, request, scrollId, listener);
|
||||
break;
|
||||
case QUERY_AND_FETCH_TYPE:
|
||||
action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchService,
|
||||
action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchTransportService,
|
||||
searchPhaseController, request, scrollId, listener);
|
||||
break;
|
||||
default:
|
||||
|
|
|
@ -96,7 +96,7 @@ import org.elasticsearch.index.query.functionscore.random.RandomScoreFunctionPar
|
|||
import org.elasticsearch.index.query.functionscore.script.ScriptScoreFunctionParser;
|
||||
import org.elasticsearch.index.query.functionscore.weight.WeightBuilder;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.search.aggregations.AggregationBinaryParseElement;
|
||||
import org.elasticsearch.search.aggregations.AggregationParseElement;
|
||||
import org.elasticsearch.search.aggregations.AggregationPhase;
|
||||
|
@ -445,7 +445,7 @@ public class SearchModule extends AbstractModule {
|
|||
bind(QueryPhase.class).asEagerSingleton();
|
||||
bind(SearchPhaseController.class).asEagerSingleton();
|
||||
bind(FetchPhase.class).asEagerSingleton();
|
||||
bind(SearchServiceTransportAction.class).asEagerSingleton();
|
||||
bind(SearchTransportService.class).asEagerSingleton();
|
||||
if (searchServiceImpl == SearchService.class) {
|
||||
bind(SearchService.class).asEagerSingleton();
|
||||
} else {
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.action.ActionListenerResponseHandler;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
|
@ -58,7 +57,7 @@ import java.io.IOException;
|
|||
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
|
||||
* transport.
|
||||
*/
|
||||
public class SearchServiceTransportAction extends AbstractComponent {
|
||||
public class SearchTransportService extends AbstractComponent {
|
||||
|
||||
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
|
||||
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
|
||||
|
@ -77,26 +76,39 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
private final SearchService searchService;
|
||||
|
||||
@Inject
|
||||
public SearchServiceTransportAction(Settings settings, TransportService transportService, SearchService searchService) {
|
||||
public SearchTransportService(Settings settings, TransportService transportService, SearchService searchService) {
|
||||
super(settings);
|
||||
this.transportService = transportService;
|
||||
this.searchService = searchService;
|
||||
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME, new FreeContextTransportHandler<>());
|
||||
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, new FreeContextTransportHandler<SearchFreeContextRequest>());
|
||||
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME, new ClearScrollContextsTransportHandler());
|
||||
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchDfsTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchQueryTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryByIdTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryScrollTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryQueryFetchTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryFetchScrollTransportHandler());
|
||||
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler<>());
|
||||
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler<ShardFetchSearchRequest>());
|
||||
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME,
|
||||
new FreeContextTransportHandler<>());
|
||||
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
|
||||
new FreeContextTransportHandler<>());
|
||||
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME,
|
||||
new ClearScrollContextsTransportHandler());
|
||||
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
|
||||
new SearchDfsTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
|
||||
new SearchQueryTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
|
||||
new SearchQueryByIdTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
|
||||
new SearchQueryScrollTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
|
||||
new SearchQueryFetchTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
|
||||
new SearchQueryQueryFetchTransportHandler());
|
||||
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
|
||||
new SearchQueryFetchScrollTransportHandler());
|
||||
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
|
||||
new FetchByIdTransportHandler<>());
|
||||
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
|
||||
new FetchByIdTransportHandler<>());
|
||||
}
|
||||
|
||||
public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) {
|
||||
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new ActionListenerResponseHandler<SearchFreeContextResponse>(new ActionListener<SearchFreeContextResponse>() {
|
||||
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId),
|
||||
new ActionListenerResponseHandler<SearchFreeContextResponse>(new ActionListener<SearchFreeContextResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchFreeContextResponse response) {
|
||||
// no need to respond if it was freed or not
|
||||
|
@ -114,8 +126,9 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<SearchFreeContextResponse> listener) {
|
||||
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new ActionListenerResponseHandler<SearchFreeContextResponse>(listener) {
|
||||
public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener<SearchFreeContextResponse> listener) {
|
||||
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId),
|
||||
new ActionListenerResponseHandler<SearchFreeContextResponse>(listener) {
|
||||
@Override
|
||||
public SearchFreeContextResponse newInstance() {
|
||||
return new SearchFreeContextResponse();
|
||||
|
@ -123,8 +136,9 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendClearAllScrollContexts(DiscoveryNode node, ClearScrollRequest request, final ActionListener<TransportResponse> listener) {
|
||||
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(), new ActionListenerResponseHandler<TransportResponse>(listener) {
|
||||
public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener<TransportResponse> listener) {
|
||||
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(),
|
||||
new ActionListenerResponseHandler<TransportResponse>(listener) {
|
||||
@Override
|
||||
public TransportResponse newInstance() {
|
||||
return TransportResponse.Empty.INSTANCE;
|
||||
|
@ -132,7 +146,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<DfsSearchResult> listener) {
|
||||
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request,
|
||||
final ActionListener<DfsSearchResult> listener) {
|
||||
transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<DfsSearchResult>(listener) {
|
||||
@Override
|
||||
public DfsSearchResult newInstance() {
|
||||
|
@ -141,8 +156,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QuerySearchResultProvider> listener) {
|
||||
transportService.sendRequest(node, QUERY_ACTION_NAME, request, new ActionListenerResponseHandler<QuerySearchResultProvider>(listener) {
|
||||
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request,
|
||||
final ActionListener<QuerySearchResultProvider> listener) {
|
||||
transportService.sendRequest(node, QUERY_ACTION_NAME, request,
|
||||
new ActionListenerResponseHandler<QuerySearchResultProvider>(listener) {
|
||||
@Override
|
||||
public QuerySearchResult newInstance() {
|
||||
return new QuerySearchResult();
|
||||
|
@ -159,8 +176,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener<ScrollQuerySearchResult> listener) {
|
||||
transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler<ScrollQuerySearchResult>(listener) {
|
||||
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request,
|
||||
final ActionListener<ScrollQuerySearchResult> listener) {
|
||||
transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request,
|
||||
new ActionListenerResponseHandler<ScrollQuerySearchResult>(listener) {
|
||||
@Override
|
||||
public ScrollQuerySearchResult newInstance() {
|
||||
return new ScrollQuerySearchResult();
|
||||
|
@ -168,8 +187,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QueryFetchSearchResult> listener) {
|
||||
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
|
||||
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request,
|
||||
final ActionListener<QueryFetchSearchResult> listener) {
|
||||
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request,
|
||||
new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
|
||||
@Override
|
||||
public QueryFetchSearchResult newInstance() {
|
||||
return new QueryFetchSearchResult();
|
||||
|
@ -177,8 +198,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final ActionListener<QueryFetchSearchResult> listener) {
|
||||
transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
|
||||
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request,
|
||||
final ActionListener<QueryFetchSearchResult> listener) {
|
||||
transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request,
|
||||
new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
|
||||
@Override
|
||||
public QueryFetchSearchResult newInstance() {
|
||||
return new QueryFetchSearchResult();
|
||||
|
@ -186,8 +209,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener<ScrollQueryFetchSearchResult> listener) {
|
||||
transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler<ScrollQueryFetchSearchResult>(listener) {
|
||||
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request,
|
||||
final ActionListener<ScrollQueryFetchSearchResult> listener) {
|
||||
transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request,
|
||||
new ActionListenerResponseHandler<ScrollQueryFetchSearchResult>(listener) {
|
||||
@Override
|
||||
public ScrollQueryFetchSearchResult newInstance() {
|
||||
return new ScrollQueryFetchSearchResult();
|
||||
|
@ -195,15 +220,18 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final ActionListener<FetchSearchResult> listener) {
|
||||
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request,
|
||||
final ActionListener<FetchSearchResult> listener) {
|
||||
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
|
||||
}
|
||||
|
||||
public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final ActionListener<FetchSearchResult> listener) {
|
||||
public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request,
|
||||
final ActionListener<FetchSearchResult> listener) {
|
||||
sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener);
|
||||
}
|
||||
|
||||
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final ActionListener<FetchSearchResult> listener) {
|
||||
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request,
|
||||
final ActionListener<FetchSearchResult> listener) {
|
||||
transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<FetchSearchResult>(listener) {
|
||||
@Override
|
||||
public FetchSearchResult newInstance() {
|
||||
|
@ -212,17 +240,13 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
});
|
||||
}
|
||||
|
||||
public static class ScrollFreeContextRequest extends TransportRequest {
|
||||
static class ScrollFreeContextRequest extends TransportRequest {
|
||||
private long id;
|
||||
|
||||
public ScrollFreeContextRequest() {
|
||||
ScrollFreeContextRequest() {
|
||||
}
|
||||
|
||||
ScrollFreeContextRequest(ClearScrollRequest request, long id) {
|
||||
this(id);
|
||||
}
|
||||
|
||||
private ScrollFreeContextRequest(long id) {
|
||||
ScrollFreeContextRequest(long id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
|
@ -243,7 +267,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
|
||||
static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
|
||||
private OriginalIndices originalIndices;
|
||||
|
||||
public SearchFreeContextRequest() {
|
||||
|
@ -311,7 +335,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
class FreeContextTransportHandler<FreeContextRequest extends ScrollFreeContextRequest> implements TransportRequestHandler<FreeContextRequest> {
|
||||
class FreeContextTransportHandler<FreeContextRequest extends ScrollFreeContextRequest>
|
||||
implements TransportRequestHandler<FreeContextRequest> {
|
||||
@Override
|
||||
public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception {
|
||||
boolean freed = searchService.freeContext(request.id());
|
||||
|
@ -319,7 +344,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public static class ClearScrollContextsRequest extends TransportRequest {
|
||||
static class ClearScrollContextsRequest extends TransportRequest {
|
||||
}
|
||||
|
||||
class ClearScrollContextsTransportHandler implements TransportRequestHandler<ClearScrollContextsRequest> {
|
||||
|
@ -393,5 +418,4 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
channel.sendResponse(result);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -92,7 +92,7 @@ import org.elasticsearch.index.query.QueryBuilders;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.search.action.SearchTransportService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
@ -580,8 +580,8 @@ public class IndicesRequestTests extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testSearchQueryThenFetch() throws Exception {
|
||||
interceptTransportActions(SearchServiceTransportAction.QUERY_ACTION_NAME,
|
||||
SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
|
||||
interceptTransportActions(SearchTransportService.QUERY_ACTION_NAME,
|
||||
SearchTransportService.FETCH_ID_ACTION_NAME, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
|
||||
String[] randomIndicesOrAliases = randomIndicesOrAliases();
|
||||
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
|
||||
|
@ -595,14 +595,14 @@ public class IndicesRequestTests extends ESIntegTestCase {
|
|||
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
|
||||
|
||||
clearInterceptedActions();
|
||||
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_ACTION_NAME, SearchServiceTransportAction.FETCH_ID_ACTION_NAME);
|
||||
assertSameIndices(searchRequest, SearchTransportService.QUERY_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME);
|
||||
//free context messages are not necessarily sent, but if they are, check their indices
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
}
|
||||
|
||||
public void testSearchDfsQueryThenFetch() throws Exception {
|
||||
interceptTransportActions(SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME,
|
||||
SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
|
||||
interceptTransportActions(SearchTransportService.DFS_ACTION_NAME, SearchTransportService.QUERY_ID_ACTION_NAME,
|
||||
SearchTransportService.FETCH_ID_ACTION_NAME, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
|
||||
String[] randomIndicesOrAliases = randomIndicesOrAliases();
|
||||
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
|
||||
|
@ -616,15 +616,15 @@ public class IndicesRequestTests extends ESIntegTestCase {
|
|||
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
|
||||
|
||||
clearInterceptedActions();
|
||||
assertSameIndices(searchRequest, SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME,
|
||||
SearchServiceTransportAction.FETCH_ID_ACTION_NAME);
|
||||
assertSameIndices(searchRequest, SearchTransportService.DFS_ACTION_NAME, SearchTransportService.QUERY_ID_ACTION_NAME,
|
||||
SearchTransportService.FETCH_ID_ACTION_NAME);
|
||||
//free context messages are not necessarily sent, but if they are, check their indices
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
}
|
||||
|
||||
public void testSearchQueryAndFetch() throws Exception {
|
||||
interceptTransportActions(SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME,
|
||||
SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
|
||||
interceptTransportActions(SearchTransportService.QUERY_FETCH_ACTION_NAME,
|
||||
SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
|
||||
String[] randomIndicesOrAliases = randomIndicesOrAliases();
|
||||
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
|
||||
|
@ -638,14 +638,14 @@ public class IndicesRequestTests extends ESIntegTestCase {
|
|||
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
|
||||
|
||||
clearInterceptedActions();
|
||||
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME);
|
||||
assertSameIndices(searchRequest, SearchTransportService.QUERY_FETCH_ACTION_NAME);
|
||||
//free context messages are not necessarily sent, but if they are, check their indices
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
}
|
||||
|
||||
public void testSearchDfsQueryAndFetch() throws Exception {
|
||||
interceptTransportActions(SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME,
|
||||
SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
|
||||
interceptTransportActions(SearchTransportService.QUERY_QUERY_FETCH_ACTION_NAME,
|
||||
SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
|
||||
String[] randomIndicesOrAliases = randomIndicesOrAliases();
|
||||
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
|
||||
|
@ -659,9 +659,9 @@ public class IndicesRequestTests extends ESIntegTestCase {
|
|||
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
|
||||
|
||||
clearInterceptedActions();
|
||||
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME);
|
||||
assertSameIndices(searchRequest, SearchTransportService.QUERY_QUERY_FETCH_ACTION_NAME);
|
||||
//free context messages are not necessarily sent, but if they are, check their indices
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME);
|
||||
assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
|
||||
}
|
||||
|
||||
private static void assertSameIndices(IndicesRequest originalRequest, String... actions) {
|
||||
|
|
Loading…
Reference in New Issue