Rename SearchServiceTransportAction to SearchTransportService

The suffix TransportAction is misleading as it may make think that it extends TransportAction, but it does not. This class makes accessible the different search operations exposed by SearchService through the transport layer. Also resolved few compiler warnings in the class itself.
This commit is contained in:
javanna 2016-03-01 11:13:17 +01:00 committed by Luca Cavanna
parent 9674cbbe62
commit b7597d7aea
14 changed files with 148 additions and 126 deletions

View File

@ -746,7 +746,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]MultiValueMode.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]MultiValueMode.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]SearchModule.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]SearchModule.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]SearchService.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]SearchService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]action[/\\]SearchServiceTransportAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]AggregatorFactories.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]AggregatorFactories.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]AggregatorFactory.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]AggregatorFactory.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]InternalAggregation.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]aggregations[/\\]InternalAggregation.java" checks="LineLength" />

View File

@ -39,7 +39,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
@ -58,7 +58,7 @@ import static org.elasticsearch.action.search.TransportSearchHelper.internalSear
abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction { abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult> extends AbstractAsyncAction {
protected final ESLogger logger; protected final ESLogger logger;
protected final SearchServiceTransportAction searchService; protected final SearchTransportService searchTransportService;
private final IndexNameExpressionResolver indexNameExpressionResolver; private final IndexNameExpressionResolver indexNameExpressionResolver;
protected final SearchPhaseController searchPhaseController; protected final SearchPhaseController searchPhaseController;
protected final ThreadPool threadPool; protected final ThreadPool threadPool;
@ -76,12 +76,12 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
private final Object shardFailuresMutex = new Object(); private final Object shardFailuresMutex = new Object();
protected volatile ScoreDoc[] sortedShardList; protected volatile ScoreDoc[] sortedShardList;
protected AbstractSearchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, ClusterService clusterService, protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request,
ActionListener<SearchResponse> listener) { ActionListener<SearchResponse> listener) {
this.logger = logger; this.logger = logger;
this.searchService = searchService; this.searchTransportService = searchTransportService;
this.indexNameExpressionResolver = indexNameExpressionResolver; this.indexNameExpressionResolver = indexNameExpressionResolver;
this.searchPhaseController = searchPhaseController; this.searchPhaseController = searchPhaseController;
this.threadPool = threadPool; this.threadPool = threadPool;
@ -332,7 +332,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {
if (node != null) { if (node != null) {
searchService.sendFreeContext(node, contextId, request); searchTransportService.sendFreeContext(node, contextId, request);
} }
} }

View File

@ -26,7 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
@ -43,11 +43,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
private final AtomicArray<QueryFetchSearchResult> queryFetchResults; private final AtomicArray<QueryFetchSearchResult> queryFetchResults;
SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) { SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
queryFetchResults = new AtomicArray<>(firstResults.length()); queryFetchResults = new AtomicArray<>(firstResults.length());
} }
@ -59,7 +60,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) { ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener); searchTransportService.sendExecuteDfs(node, request, listener);
} }
@Override @Override
@ -77,7 +78,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSea
void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final DiscoveryNode node, final QuerySearchRequest querySearchRequest) { final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() { searchTransportService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
@Override @Override
public void onResponse(QueryFetchSearchResult result) { public void onResponse(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget()); result.shardTarget(dfsResult.shardTarget());

View File

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult; import org.elasticsearch.search.dfs.DfsSearchResult;
@ -50,11 +50,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
final AtomicArray<FetchSearchResult> fetchResults; final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad; final AtomicArray<IntArrayList> docIdsToLoad;
SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) { SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
queryResults = new AtomicArray<>(firstResults.length()); queryResults = new AtomicArray<>(firstResults.length());
fetchResults = new AtomicArray<>(firstResults.length()); fetchResults = new AtomicArray<>(firstResults.length());
docIdsToLoad = new AtomicArray<>(firstResults.length()); docIdsToLoad = new AtomicArray<>(firstResults.length());
@ -68,7 +69,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) { ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener); searchTransportService.sendExecuteDfs(node, request, listener);
} }
@Override @Override
@ -85,7 +86,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter, void executeQuery(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final QuerySearchRequest querySearchRequest, final DiscoveryNode node) { final QuerySearchRequest querySearchRequest, final DiscoveryNode node) {
searchService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() { searchTransportService.sendExecuteQuery(node, querySearchRequest, new ActionListener<QuerySearchResult>() {
@Override @Override
public void onResponse(QuerySearchResult result) { public void onResponse(QuerySearchResult result) {
result.shardTarget(dfsResult.shardTarget()); result.shardTarget(dfsResult.shardTarget());
@ -157,7 +158,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSe
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() { searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
@Override @Override
public void onResponse(FetchSearchResult result) { public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget); result.shardTarget(shardTarget);

View File

@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
@ -36,11 +36,12 @@ import java.io.IOException;
class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetchSearchResult> { class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetchSearchResult> {
SearchQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, SearchQueryAndFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) { SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
} }
@Override @Override
@ -51,7 +52,7 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<QueryFetc
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QueryFetchSearchResult> listener) { ActionListener<QueryFetchSearchResult> listener) {
searchService.sendExecuteFetch(node, request, listener); searchTransportService.sendExecuteFetch(node, request, listener);
} }
@Override @Override

View File

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
@ -46,7 +46,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
final AtomicArray<FetchSearchResult> fetchResults; final AtomicArray<FetchSearchResult> fetchResults;
final AtomicArray<IntArrayList> docIdsToLoad; final AtomicArray<IntArrayList> docIdsToLoad;
SearchQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, SearchQueryThenFetchAsyncAction(ESLogger logger, SearchTransportService searchService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) { SearchRequest request, ActionListener<SearchResponse> listener) {
@ -63,7 +63,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
@Override @Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<QuerySearchResultProvider> listener) { ActionListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener); searchTransportService.sendExecuteQuery(node, request, listener);
} }
@Override @Override
@ -91,7 +91,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySea
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter,
final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) { final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() { searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener<FetchSearchResult>() {
@Override @Override
public void onResponse(FetchSearchResult result) { public void onResponse(FetchSearchResult result) {
result.shardTarget(shardTarget); result.shardTarget(shardTarget);

View File

@ -26,7 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
@ -42,7 +42,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private final ESLogger logger; private final ESLogger logger;
private final SearchPhaseController searchPhaseController; private final SearchPhaseController searchPhaseController;
private final SearchServiceTransportAction searchService; private final SearchTransportService searchTransportService;
private final SearchScrollRequest request; private final SearchScrollRequest request;
private final ActionListener<SearchResponse> listener; private final ActionListener<SearchResponse> listener;
private final ParsedScrollId scrollId; private final ParsedScrollId scrollId;
@ -53,11 +53,11 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
private final AtomicInteger counter; private final AtomicInteger counter;
SearchScrollQueryAndFetchAsyncAction(ESLogger logger, ClusterService clusterService, SearchScrollQueryAndFetchAsyncAction(ESLogger logger, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) { SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.logger = logger; this.logger = logger;
this.searchPhaseController = searchPhaseController; this.searchPhaseController = searchPhaseController;
this.searchService = searchService; this.searchTransportService = searchTransportService;
this.request = request; this.request = request;
this.listener = listener; this.listener = listener;
this.scrollId = scrollId; this.scrollId = scrollId;
@ -128,7 +128,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction {
void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() { searchTransportService.sendExecuteFetch(node, internalRequest, new ActionListener<ScrollQueryFetchSearchResult>() {
@Override @Override
public void onResponse(ScrollQueryFetchSearchResult result) { public void onResponse(ScrollQueryFetchSearchResult result) {
queryFetchResults.set(shardIndex, result.result()); queryFetchResults.set(shardIndex, result.result());

View File

@ -27,7 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.ShardFetchRequest; import org.elasticsearch.search.fetch.ShardFetchRequest;
@ -44,7 +44,7 @@ import static org.elasticsearch.action.search.TransportSearchHelper.internalScro
class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private final ESLogger logger; private final ESLogger logger;
private final SearchServiceTransportAction searchService; private final SearchTransportService searchTransportService;
private final SearchPhaseController searchPhaseController; private final SearchPhaseController searchPhaseController;
private final SearchScrollRequest request; private final SearchScrollRequest request;
private final ActionListener<SearchResponse> listener; private final ActionListener<SearchResponse> listener;
@ -57,10 +57,10 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private final AtomicInteger successfulOps; private final AtomicInteger successfulOps;
SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService, SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService,
SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) { SearchScrollRequest request, ParsedScrollId scrollId, ActionListener<SearchResponse> listener) {
this.logger = logger; this.logger = logger;
this.searchService = searchService; this.searchTransportService = searchTransportService;
this.searchPhaseController = searchPhaseController; this.searchPhaseController = searchPhaseController;
this.request = request; this.request = request;
this.listener = listener; this.listener = listener;
@ -124,7 +124,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) {
InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request);
searchService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() { searchTransportService.sendExecuteQuery(node, internalRequest, new ActionListener<ScrollQuerySearchResult>() {
@Override @Override
public void onResponse(ScrollQuerySearchResult result) { public void onResponse(ScrollQuerySearchResult result) {
queryResults.set(shardIndex, result.queryResult()); queryResults.set(shardIndex, result.queryResult());
@ -182,7 +182,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() { searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
@Override @Override
public void onResponse(FetchSearchResult result) { public void onResponse(FetchSearchResult result) {
result.shardTarget(querySearchResult.shardTarget()); result.shardTarget(querySearchResult.shardTarget());

View File

@ -30,7 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -47,15 +47,15 @@ import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollI
public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> { public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final SearchServiceTransportAction searchServiceTransportAction; private final SearchTransportService searchTransportService;
@Inject @Inject
public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool, public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ClusterService clusterService, SearchServiceTransportAction searchServiceTransportAction, ClusterService clusterService, SearchTransportService searchTransportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new); super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new);
this.clusterService = clusterService; this.clusterService = clusterService;
this.searchServiceTransportAction = searchServiceTransportAction; this.searchTransportService = searchTransportService;
} }
@Override @Override
@ -64,10 +64,8 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
} }
private class Async { private class Async {
final DiscoveryNodes nodes; final DiscoveryNodes nodes;
final CountDown expectedOps; final CountDown expectedOps;
final ClearScrollRequest request;
final List<ScrollIdForNode[]> contexts = new ArrayList<>(); final List<ScrollIdForNode[]> contexts = new ArrayList<>();
final ActionListener<ClearScrollResponse> listener; final ActionListener<ClearScrollResponse> listener;
final AtomicReference<Throwable> expHolder; final AtomicReference<Throwable> expHolder;
@ -85,8 +83,6 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
this.contexts.add(context); this.contexts.add(context);
} }
} }
this.request = request;
this.listener = listener; this.listener = listener;
this.expHolder = new AtomicReference<>(); this.expHolder = new AtomicReference<>();
this.expectedOps = new CountDown(expectedOps); this.expectedOps = new CountDown(expectedOps);
@ -100,7 +96,7 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
if (contexts.isEmpty()) { if (contexts.isEmpty()) {
for (final DiscoveryNode node : nodes) { for (final DiscoveryNode node : nodes) {
searchServiceTransportAction.sendClearAllScrollContexts(node, request, new ActionListener<TransportResponse>() { searchTransportService.sendClearAllScrollContexts(node, new ActionListener<TransportResponse>() {
@Override @Override
public void onResponse(TransportResponse response) { public void onResponse(TransportResponse response) {
onFreedContext(true); onFreedContext(true);
@ -121,9 +117,9 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
continue; continue;
} }
searchServiceTransportAction.sendFreeContext(node, target.getScrollId(), request, new ActionListener<SearchServiceTransportAction.SearchFreeContextResponse>() { searchTransportService.sendFreeContext(node, target.getScrollId(), new ActionListener<SearchTransportService.SearchFreeContextResponse>() {
@Override @Override
public void onResponse(SearchServiceTransportAction.SearchFreeContextResponse freed) { public void onResponse(SearchTransportService.SearchFreeContextResponse freed) {
onFreedContext(freed.isFreed()); onFreedContext(freed.isFreed());
} }

View File

@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -45,17 +45,17 @@ import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH;
public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> { public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final SearchServiceTransportAction searchService; private final SearchTransportService searchTransportService;
private final SearchPhaseController searchPhaseController; private final SearchPhaseController searchPhaseController;
@Inject @Inject
public TransportSearchAction(Settings settings, ThreadPool threadPool, SearchPhaseController searchPhaseController, public TransportSearchAction(Settings settings, ThreadPool threadPool, SearchPhaseController searchPhaseController,
TransportService transportService, SearchServiceTransportAction searchService, TransportService transportService, SearchTransportService searchTransportService,
ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver
indexNameExpressionResolver) { indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new); super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
this.searchPhaseController = searchPhaseController; this.searchPhaseController = searchPhaseController;
this.searchService = searchService; this.searchTransportService = searchTransportService;
this.clusterService = clusterService; this.clusterService = clusterService;
} }
@ -81,19 +81,19 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
AbstractSearchAsyncAction searchAsyncAction; AbstractSearchAsyncAction searchAsyncAction;
switch(searchRequest.searchType()) { switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH: case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchService, clusterService, searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break; break;
case QUERY_THEN_FETCH: case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService, searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break; break;
case DFS_QUERY_AND_FETCH: case DFS_QUERY_AND_FETCH:
searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchService, clusterService, searchAsyncAction = new SearchDfsQueryAndFetchAsyncAction(logger, searchTransportService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break; break;
case QUERY_AND_FETCH: case QUERY_AND_FETCH:
searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchService, clusterService, searchAsyncAction = new SearchQueryAndFetchAsyncAction(logger, searchTransportService, clusterService,
indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
break; break;
default: default:

View File

@ -26,7 +26,7 @@ import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -41,18 +41,18 @@ import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollI
public class TransportSearchScrollAction extends HandledTransportAction<SearchScrollRequest, SearchResponse> { public class TransportSearchScrollAction extends HandledTransportAction<SearchScrollRequest, SearchResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final SearchServiceTransportAction searchService; private final SearchTransportService searchTransportService;
private final SearchPhaseController searchPhaseController; private final SearchPhaseController searchPhaseController;
@Inject @Inject
public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService, public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, SearchServiceTransportAction searchService, ClusterService clusterService, SearchTransportService searchTransportService,
SearchPhaseController searchPhaseController, SearchPhaseController searchPhaseController,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
SearchScrollRequest::new); SearchScrollRequest::new);
this.clusterService = clusterService; this.clusterService = clusterService;
this.searchService = searchService; this.searchTransportService = searchTransportService;
this.searchPhaseController = searchPhaseController; this.searchPhaseController = searchPhaseController;
} }
@ -63,11 +63,11 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
AbstractAsyncAction action; AbstractAsyncAction action;
switch (scrollId.getType()) { switch (scrollId.getType()) {
case QUERY_THEN_FETCH_TYPE: case QUERY_THEN_FETCH_TYPE:
action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchService, action = new SearchScrollQueryThenFetchAsyncAction(logger, clusterService, searchTransportService,
searchPhaseController, request, scrollId, listener); searchPhaseController, request, scrollId, listener);
break; break;
case QUERY_AND_FETCH_TYPE: case QUERY_AND_FETCH_TYPE:
action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchService, action = new SearchScrollQueryAndFetchAsyncAction(logger, clusterService, searchTransportService,
searchPhaseController, request, scrollId, listener); searchPhaseController, request, scrollId, listener);
break; break;
default: default:

View File

@ -96,7 +96,7 @@ import org.elasticsearch.index.query.functionscore.random.RandomScoreFunctionPar
import org.elasticsearch.index.query.functionscore.script.ScriptScoreFunctionParser; import org.elasticsearch.index.query.functionscore.script.ScriptScoreFunctionParser;
import org.elasticsearch.index.query.functionscore.weight.WeightBuilder; import org.elasticsearch.index.query.functionscore.weight.WeightBuilder;
import org.elasticsearch.indices.query.IndicesQueriesRegistry; import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.aggregations.AggregationBinaryParseElement; import org.elasticsearch.search.aggregations.AggregationBinaryParseElement;
import org.elasticsearch.search.aggregations.AggregationParseElement; import org.elasticsearch.search.aggregations.AggregationParseElement;
import org.elasticsearch.search.aggregations.AggregationPhase; import org.elasticsearch.search.aggregations.AggregationPhase;
@ -445,7 +445,7 @@ public class SearchModule extends AbstractModule {
bind(QueryPhase.class).asEagerSingleton(); bind(QueryPhase.class).asEagerSingleton();
bind(SearchPhaseController.class).asEagerSingleton(); bind(SearchPhaseController.class).asEagerSingleton();
bind(FetchPhase.class).asEagerSingleton(); bind(FetchPhase.class).asEagerSingleton();
bind(SearchServiceTransportAction.class).asEagerSingleton(); bind(SearchTransportService.class).asEagerSingleton();
if (searchServiceImpl == SearchService.class) { if (searchServiceImpl == SearchService.class) {
bind(SearchService.class).asEagerSingleton(); bind(SearchService.class).asEagerSingleton();
} else { } else {

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -58,7 +57,7 @@ import java.io.IOException;
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through * An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
* transport. * transport.
*/ */
public class SearchServiceTransportAction extends AbstractComponent { public class SearchTransportService extends AbstractComponent {
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]"; public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]"; public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
@ -77,26 +76,39 @@ public class SearchServiceTransportAction extends AbstractComponent {
private final SearchService searchService; private final SearchService searchService;
@Inject @Inject
public SearchServiceTransportAction(Settings settings, TransportService transportService, SearchService searchService) { public SearchTransportService(Settings settings, TransportService transportService, SearchService searchService) {
super(settings); super(settings);
this.transportService = transportService; this.transportService = transportService;
this.searchService = searchService; this.searchService = searchService;
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME, new FreeContextTransportHandler<>()); transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, new FreeContextTransportHandler<SearchFreeContextRequest>()); new FreeContextTransportHandler<>());
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME, new ClearScrollContextsTransportHandler()); transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchDfsTransportHandler()); new FreeContextTransportHandler<>());
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchQueryTransportHandler()); transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryByIdTransportHandler()); new ClearScrollContextsTransportHandler());
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryScrollTransportHandler()); transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler()); new SearchDfsTransportHandler());
transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryQueryFetchTransportHandler()); transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryFetchScrollTransportHandler()); new SearchQueryTransportHandler());
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler<>()); transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler<ShardFetchSearchRequest>()); new SearchQueryByIdTransportHandler());
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryScrollTransportHandler());
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryFetchTransportHandler());
transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryQueryFetchTransportHandler());
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryFetchScrollTransportHandler());
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
new FetchByIdTransportHandler<>());
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
new FetchByIdTransportHandler<>());
} }
public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) {
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new ActionListenerResponseHandler<SearchFreeContextResponse>(new ActionListener<SearchFreeContextResponse>() { transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId),
new ActionListenerResponseHandler<SearchFreeContextResponse>(new ActionListener<SearchFreeContextResponse>() {
@Override @Override
public void onResponse(SearchFreeContextResponse response) { public void onResponse(SearchFreeContextResponse response) {
// no need to respond if it was freed or not // no need to respond if it was freed or not
@ -114,8 +126,9 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<SearchFreeContextResponse> listener) { public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener<SearchFreeContextResponse> listener) {
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new ActionListenerResponseHandler<SearchFreeContextResponse>(listener) { transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId),
new ActionListenerResponseHandler<SearchFreeContextResponse>(listener) {
@Override @Override
public SearchFreeContextResponse newInstance() { public SearchFreeContextResponse newInstance() {
return new SearchFreeContextResponse(); return new SearchFreeContextResponse();
@ -123,8 +136,9 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendClearAllScrollContexts(DiscoveryNode node, ClearScrollRequest request, final ActionListener<TransportResponse> listener) { public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener<TransportResponse> listener) {
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(), new ActionListenerResponseHandler<TransportResponse>(listener) { transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(),
new ActionListenerResponseHandler<TransportResponse>(listener) {
@Override @Override
public TransportResponse newInstance() { public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE; return TransportResponse.Empty.INSTANCE;
@ -132,7 +146,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<DfsSearchResult> listener) { public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request,
final ActionListener<DfsSearchResult> listener) {
transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<DfsSearchResult>(listener) { transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<DfsSearchResult>(listener) {
@Override @Override
public DfsSearchResult newInstance() { public DfsSearchResult newInstance() {
@ -141,8 +156,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QuerySearchResultProvider> listener) { public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request,
transportService.sendRequest(node, QUERY_ACTION_NAME, request, new ActionListenerResponseHandler<QuerySearchResultProvider>(listener) { final ActionListener<QuerySearchResultProvider> listener) {
transportService.sendRequest(node, QUERY_ACTION_NAME, request,
new ActionListenerResponseHandler<QuerySearchResultProvider>(listener) {
@Override @Override
public QuerySearchResult newInstance() { public QuerySearchResult newInstance() {
return new QuerySearchResult(); return new QuerySearchResult();
@ -159,8 +176,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener<ScrollQuerySearchResult> listener) { public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request,
transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler<ScrollQuerySearchResult>(listener) { final ActionListener<ScrollQuerySearchResult> listener) {
transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request,
new ActionListenerResponseHandler<ScrollQuerySearchResult>(listener) {
@Override @Override
public ScrollQuerySearchResult newInstance() { public ScrollQuerySearchResult newInstance() {
return new ScrollQuerySearchResult(); return new ScrollQuerySearchResult();
@ -168,8 +187,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener<QueryFetchSearchResult> listener) { public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request,
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) { final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request,
new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
@Override @Override
public QueryFetchSearchResult newInstance() { public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult(); return new QueryFetchSearchResult();
@ -177,8 +198,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final ActionListener<QueryFetchSearchResult> listener) { public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request,
transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) { final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request,
new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
@Override @Override
public QueryFetchSearchResult newInstance() { public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult(); return new QueryFetchSearchResult();
@ -186,8 +209,10 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener<ScrollQueryFetchSearchResult> listener) { public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request,
transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler<ScrollQueryFetchSearchResult>(listener) { final ActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request,
new ActionListenerResponseHandler<ScrollQueryFetchSearchResult>(listener) {
@Override @Override
public ScrollQueryFetchSearchResult newInstance() { public ScrollQueryFetchSearchResult newInstance() {
return new ScrollQueryFetchSearchResult(); return new ScrollQueryFetchSearchResult();
@ -195,15 +220,18 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final ActionListener<FetchSearchResult> listener) { public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request,
final ActionListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener); sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
} }
public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final ActionListener<FetchSearchResult> listener) { public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request,
final ActionListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener); sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener);
} }
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final ActionListener<FetchSearchResult> listener) { private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request,
final ActionListener<FetchSearchResult> listener) {
transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<FetchSearchResult>(listener) { transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<FetchSearchResult>(listener) {
@Override @Override
public FetchSearchResult newInstance() { public FetchSearchResult newInstance() {
@ -212,17 +240,13 @@ public class SearchServiceTransportAction extends AbstractComponent {
}); });
} }
public static class ScrollFreeContextRequest extends TransportRequest { static class ScrollFreeContextRequest extends TransportRequest {
private long id; private long id;
public ScrollFreeContextRequest() { ScrollFreeContextRequest() {
} }
ScrollFreeContextRequest(ClearScrollRequest request, long id) { ScrollFreeContextRequest(long id) {
this(id);
}
private ScrollFreeContextRequest(long id) {
this.id = id; this.id = id;
} }
@ -243,7 +267,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
public static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest { static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
private OriginalIndices originalIndices; private OriginalIndices originalIndices;
public SearchFreeContextRequest() { public SearchFreeContextRequest() {
@ -311,7 +335,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
class FreeContextTransportHandler<FreeContextRequest extends ScrollFreeContextRequest> implements TransportRequestHandler<FreeContextRequest> { class FreeContextTransportHandler<FreeContextRequest extends ScrollFreeContextRequest>
implements TransportRequestHandler<FreeContextRequest> {
@Override @Override
public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception { public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception {
boolean freed = searchService.freeContext(request.id()); boolean freed = searchService.freeContext(request.id());
@ -319,7 +344,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
public static class ClearScrollContextsRequest extends TransportRequest { static class ClearScrollContextsRequest extends TransportRequest {
} }
class ClearScrollContextsTransportHandler implements TransportRequestHandler<ClearScrollContextsRequest> { class ClearScrollContextsTransportHandler implements TransportRequestHandler<ClearScrollContextsRequest> {
@ -393,5 +418,4 @@ public class SearchServiceTransportAction extends AbstractComponent {
channel.sendResponse(result); channel.sendResponse(result);
} }
} }
} }

View File

@ -92,7 +92,7 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.groovy.GroovyPlugin; import org.elasticsearch.script.groovy.GroovyPlugin;
import org.elasticsearch.search.action.SearchServiceTransportAction; import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -580,8 +580,8 @@ public class IndicesRequestTests extends ESIntegTestCase {
} }
public void testSearchQueryThenFetch() throws Exception { public void testSearchQueryThenFetch() throws Exception {
interceptTransportActions(SearchServiceTransportAction.QUERY_ACTION_NAME, interceptTransportActions(SearchTransportService.QUERY_ACTION_NAME,
SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); SearchTransportService.FETCH_ID_ACTION_NAME, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases(); String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) { for (int i = 0; i < randomIndicesOrAliases.length; i++) {
@ -595,14 +595,14 @@ public class IndicesRequestTests extends ESIntegTestCase {
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L)); assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
clearInterceptedActions(); clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_ACTION_NAME, SearchServiceTransportAction.FETCH_ID_ACTION_NAME); assertSameIndices(searchRequest, SearchTransportService.QUERY_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices //free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
} }
public void testSearchDfsQueryThenFetch() throws Exception { public void testSearchDfsQueryThenFetch() throws Exception {
interceptTransportActions(SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME, interceptTransportActions(SearchTransportService.DFS_ACTION_NAME, SearchTransportService.QUERY_ID_ACTION_NAME,
SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); SearchTransportService.FETCH_ID_ACTION_NAME, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases(); String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) { for (int i = 0; i < randomIndicesOrAliases.length; i++) {
@ -616,15 +616,15 @@ public class IndicesRequestTests extends ESIntegTestCase {
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L)); assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
clearInterceptedActions(); clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME, assertSameIndices(searchRequest, SearchTransportService.DFS_ACTION_NAME, SearchTransportService.QUERY_ID_ACTION_NAME,
SearchServiceTransportAction.FETCH_ID_ACTION_NAME); SearchTransportService.FETCH_ID_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices //free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
} }
public void testSearchQueryAndFetch() throws Exception { public void testSearchQueryAndFetch() throws Exception {
interceptTransportActions(SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME, interceptTransportActions(SearchTransportService.QUERY_FETCH_ACTION_NAME,
SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); SearchTransportService.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases(); String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) { for (int i = 0; i < randomIndicesOrAliases.length; i++) {
@ -638,14 +638,14 @@ public class IndicesRequestTests extends ESIntegTestCase {
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L)); assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
clearInterceptedActions(); clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME); assertSameIndices(searchRequest, SearchTransportService.QUERY_FETCH_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices //free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
} }
public void testSearchDfsQueryAndFetch() throws Exception { public void testSearchDfsQueryAndFetch() throws Exception {
interceptTransportActions(SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME, interceptTransportActions(SearchTransportService.QUERY_QUERY_FETCH_ACTION_NAME,
SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); SearchTransportService.FREE_CONTEXT_ACTION_NAME);
String[] randomIndicesOrAliases = randomIndicesOrAliases(); String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) { for (int i = 0; i < randomIndicesOrAliases.length; i++) {
@ -659,9 +659,9 @@ public class IndicesRequestTests extends ESIntegTestCase {
assertThat(searchResponse.getHits().totalHits(), greaterThan(0L)); assertThat(searchResponse.getHits().totalHits(), greaterThan(0L));
clearInterceptedActions(); clearInterceptedActions();
assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME); assertSameIndices(searchRequest, SearchTransportService.QUERY_QUERY_FETCH_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices //free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
} }
private static void assertSameIndices(IndicesRequest originalRequest, String... actions) { private static void assertSameIndices(IndicesRequest originalRequest, String... actions) {