From b7597d7aeac275689ff10902bdf496d922fb947a Mon Sep 17 00:00:00 2001 From: javanna Date: Tue, 1 Mar 2016 11:13:17 +0100 Subject: [PATCH] Rename SearchServiceTransportAction to SearchTransportService The suffix TransportAction is misleading as it may make think that it extends TransportAction, but it does not. This class makes accessible the different search operations exposed by SearchService through the transport layer. Also resolved few compiler warnings in the class itself. --- .../resources/checkstyle_suppressions.xml | 1 - .../search/AbstractSearchAsyncAction.java | 10 +- .../SearchDfsQueryAndFetchAsyncAction.java | 11 +- .../SearchDfsQueryThenFetchAsyncAction.java | 13 +- .../SearchQueryAndFetchAsyncAction.java | 9 +- .../SearchQueryThenFetchAsyncAction.java | 8 +- .../SearchScrollQueryAndFetchAsyncAction.java | 10 +- ...SearchScrollQueryThenFetchAsyncAction.java | 12 +- .../search/TransportClearScrollAction.java | 18 ++- .../action/search/TransportSearchAction.java | 16 +-- .../search/TransportSearchScrollAction.java | 12 +- .../elasticsearch/search/SearchModule.java | 4 +- ...ction.java => SearchTransportService.java} | 114 +++++++++++------- .../messy/tests/IndicesRequestTests.java | 36 +++--- 14 files changed, 148 insertions(+), 126 deletions(-) rename core/src/main/java/org/elasticsearch/search/action/{SearchServiceTransportAction.java => SearchTransportService.java} (80%) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 494ddf5d071..60a11d951f9 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -746,7 +746,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index d4ae139ee0c..830a54778e1 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -39,7 +39,7 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -58,7 +58,7 @@ import static org.elasticsearch.action.search.TransportSearchHelper.internalSear abstract class AbstractSearchAsyncAction extends AbstractAsyncAction { protected final ESLogger logger; - protected final SearchServiceTransportAction searchService; + protected final SearchTransportService searchTransportService; private final IndexNameExpressionResolver indexNameExpressionResolver; protected final SearchPhaseController searchPhaseController; protected final ThreadPool threadPool; @@ -76,12 +76,12 @@ abstract class AbstractSearchAsyncAction private final Object shardFailuresMutex = new Object(); protected volatile ScoreDoc[] sortedShardList; - protected AbstractSearchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, ClusterService clusterService, + protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, ActionListener listener) { this.logger = logger; - this.searchService = searchService; + this.searchTransportService = searchTransportService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.searchPhaseController = searchPhaseController; this.threadPool = threadPool; @@ -332,7 +332,7 @@ abstract class AbstractSearchAsyncAction protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { if (node != null) { - searchService.sendFreeContext(node, contextId, request); + searchTransportService.sendFreeContext(node, contextId, request); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java index b04b18f735b..56d0fedd40c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java @@ -26,7 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -43,11 +43,12 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction queryFetchResults; - SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, ActionListener listener) { - super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, + request, listener); queryFetchResults = new AtomicArray<>(firstResults.length()); } @@ -59,7 +60,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction listener) { - searchService.sendExecuteDfs(node, request, listener); + searchTransportService.sendExecuteDfs(node, request, listener); } @Override @@ -77,7 +78,7 @@ class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction() { + searchTransportService.sendExecuteFetch(node, querySearchRequest, new ActionListener() { @Override public void onResponse(QueryFetchSearchResult result) { result.shardTarget(dfsResult.shardTarget()); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 76337334caa..f2dcefa7554 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -29,7 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.search.dfs.DfsSearchResult; @@ -50,11 +50,12 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction fetchResults; final AtomicArray docIdsToLoad; - SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + SearchDfsQueryThenFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, ActionListener listener) { - super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, + request, listener); queryResults = new AtomicArray<>(firstResults.length()); fetchResults = new AtomicArray<>(firstResults.length()); docIdsToLoad = new AtomicArray<>(firstResults.length()); @@ -68,7 +69,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { - searchService.sendExecuteDfs(node, request, listener); + searchTransportService.sendExecuteDfs(node, request, listener); } @Override @@ -85,7 +86,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction() { + searchTransportService.sendExecuteQuery(node, querySearchRequest, new ActionListener() { @Override public void onResponse(QuerySearchResult result) { result.shardTarget(dfsResult.shardTarget()); @@ -157,7 +158,7 @@ class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction() { + searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index 5187e77f0e7..dcbf9b5091f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -36,11 +36,12 @@ import java.io.IOException; class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction { - SearchQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + SearchQueryAndFetchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, ActionListener listener) { - super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener); + super(logger, searchTransportService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, + request, listener); } @Override @@ -51,7 +52,7 @@ class SearchQueryAndFetchAsyncAction extends AbstractSearchAsyncAction listener) { - searchService.sendExecuteFetch(node, request, listener); + searchTransportService.sendExecuteFetch(node, request, listener); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 84f93590f23..e15b9da8acb 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -29,7 +29,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchSearchRequest; @@ -46,7 +46,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction fetchResults; final AtomicArray docIdsToLoad; - SearchQueryThenFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, + SearchQueryThenFetchAsyncAction(ESLogger logger, SearchTransportService searchService, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request, ActionListener listener) { @@ -63,7 +63,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { - searchService.sendExecuteQuery(node, request, listener); + searchTransportService.sendExecuteQuery(node, request, listener); } @Override @@ -91,7 +91,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction() { + searchTransportService.sendExecuteFetch(node, fetchSearchRequest, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(shardTarget); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index e8fe59cc447..b5b95dc5cbe 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -26,7 +26,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; @@ -42,7 +42,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { private final ESLogger logger; private final SearchPhaseController searchPhaseController; - private final SearchServiceTransportAction searchService; + private final SearchTransportService searchTransportService; private final SearchScrollRequest request; private final ActionListener listener; private final ParsedScrollId scrollId; @@ -53,11 +53,11 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { private final AtomicInteger counter; SearchScrollQueryAndFetchAsyncAction(ESLogger logger, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, + SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { this.logger = logger; this.searchPhaseController = searchPhaseController; - this.searchService = searchService; + this.searchTransportService = searchTransportService; this.request = request; this.listener = listener; this.scrollId = scrollId; @@ -128,7 +128,7 @@ class SearchScrollQueryAndFetchAsyncAction extends AbstractAsyncAction { void executePhase(final int shardIndex, DiscoveryNode node, final long searchId) { InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchService.sendExecuteFetch(node, internalRequest, new ActionListener() { + searchTransportService.sendExecuteFetch(node, internalRequest, new ActionListener() { @Override public void onResponse(ScrollQueryFetchSearchResult result) { queryFetchResults.set(shardIndex, result.result()); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index 0efff74524d..864f17eee2c 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -27,7 +27,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.fetch.ShardFetchRequest; @@ -44,7 +44,7 @@ import static org.elasticsearch.action.search.TransportSearchHelper.internalScro class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private final ESLogger logger; - private final SearchServiceTransportAction searchService; + private final SearchTransportService searchTransportService; private final SearchPhaseController searchPhaseController; private final SearchScrollRequest request; private final ActionListener listener; @@ -57,10 +57,10 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private final AtomicInteger successfulOps; SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService, - SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController, + SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, SearchScrollRequest request, ParsedScrollId scrollId, ActionListener listener) { this.logger = logger; - this.searchService = searchService; + this.searchTransportService = searchTransportService; this.searchPhaseController = searchPhaseController; this.request = request; this.listener = listener; @@ -124,7 +124,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { InternalScrollSearchRequest internalRequest = internalScrollSearchRequest(searchId, request); - searchService.sendExecuteQuery(node, internalRequest, new ActionListener() { + searchTransportService.sendExecuteQuery(node, internalRequest, new ActionListener() { @Override public void onResponse(ScrollQuerySearchResult result) { queryResults.set(shardIndex, result.queryResult()); @@ -182,7 +182,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc); DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId()); - searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener() { + searchTransportService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener() { @Override public void onResponse(FetchSearchResult result) { result.shardTarget(querySearchResult.shardTarget()); diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index a43f9302f3a..95f0796ba4f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -30,7 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -47,15 +47,15 @@ import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollI public class TransportClearScrollAction extends HandledTransportAction { private final ClusterService clusterService; - private final SearchServiceTransportAction searchServiceTransportAction; + private final SearchTransportService searchTransportService; @Inject public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool, - ClusterService clusterService, SearchServiceTransportAction searchServiceTransportAction, + ClusterService clusterService, SearchTransportService searchTransportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new); this.clusterService = clusterService; - this.searchServiceTransportAction = searchServiceTransportAction; + this.searchTransportService = searchTransportService; } @Override @@ -64,10 +64,8 @@ public class TransportClearScrollAction extends HandledTransportAction contexts = new ArrayList<>(); final ActionListener listener; final AtomicReference expHolder; @@ -85,8 +83,6 @@ public class TransportClearScrollAction extends HandledTransportAction(); this.expectedOps = new CountDown(expectedOps); @@ -100,7 +96,7 @@ public class TransportClearScrollAction extends HandledTransportAction() { + searchTransportService.sendClearAllScrollContexts(node, new ActionListener() { @Override public void onResponse(TransportResponse response) { onFreedContext(true); @@ -121,9 +117,9 @@ public class TransportClearScrollAction extends HandledTransportAction() { + searchTransportService.sendFreeContext(node, target.getScrollId(), new ActionListener() { @Override - public void onResponse(SearchServiceTransportAction.SearchFreeContextResponse freed) { + public void onResponse(SearchTransportService.SearchFreeContextResponse freed) { onFreedContext(freed.isFreed()); } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 8e08350b694..e87fa2a345a 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndexClosedException; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.search.controller.SearchPhaseController; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -45,17 +45,17 @@ import static org.elasticsearch.action.search.SearchType.QUERY_AND_FETCH; public class TransportSearchAction extends HandledTransportAction { private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; + private final SearchTransportService searchTransportService; private final SearchPhaseController searchPhaseController; @Inject public TransportSearchAction(Settings settings, ThreadPool threadPool, SearchPhaseController searchPhaseController, - TransportService transportService, SearchServiceTransportAction searchService, + TransportService transportService, SearchTransportService searchTransportService, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new); this.searchPhaseController = searchPhaseController; - this.searchService = searchService; + this.searchTransportService = searchTransportService; this.clusterService = clusterService; } @@ -81,19 +81,19 @@ public class TransportSearchAction extends HandledTransportAction { private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; + private final SearchTransportService searchTransportService; private final SearchPhaseController searchPhaseController; @Inject public TransportSearchScrollAction(Settings settings, ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, SearchServiceTransportAction searchService, + ClusterService clusterService, SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchScrollRequest::new); this.clusterService = clusterService; - this.searchService = searchService; + this.searchTransportService = searchTransportService; this.searchPhaseController = searchPhaseController; } @@ -63,11 +63,11 @@ public class TransportSearchScrollAction extends HandledTransportAction()); - transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, new FreeContextTransportHandler()); - transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME, new ClearScrollContextsTransportHandler()); - transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchDfsTransportHandler()); - transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchQueryTransportHandler()); - transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryByIdTransportHandler()); - transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryScrollTransportHandler()); - transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, new SearchQueryFetchTransportHandler()); - transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryQueryFetchTransportHandler()); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, new SearchQueryFetchScrollTransportHandler()); - transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler<>()); - transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, new FetchByIdTransportHandler()); + transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME, + new FreeContextTransportHandler<>()); + transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, + new FreeContextTransportHandler<>()); + transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME, + new ClearScrollContextsTransportHandler()); + transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + new SearchDfsTransportHandler()); + transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + new SearchQueryTransportHandler()); + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, + new SearchQueryByIdTransportHandler()); + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + new SearchQueryScrollTransportHandler()); + transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + new SearchQueryFetchTransportHandler()); + transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, + new SearchQueryQueryFetchTransportHandler()); + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + new SearchQueryFetchScrollTransportHandler()); + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, + new FetchByIdTransportHandler<>()); + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, + new FetchByIdTransportHandler<>()); } public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { - transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new ActionListenerResponseHandler(new ActionListener() { + transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), + new ActionListenerResponseHandler(new ActionListener() { @Override public void onResponse(SearchFreeContextResponse response) { // no need to respond if it was freed or not @@ -114,8 +126,9 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener listener) { - transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new ActionListenerResponseHandler(listener) { + public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener listener) { + transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId), + new ActionListenerResponseHandler(listener) { @Override public SearchFreeContextResponse newInstance() { return new SearchFreeContextResponse(); @@ -123,8 +136,9 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendClearAllScrollContexts(DiscoveryNode node, ClearScrollRequest request, final ActionListener listener) { - transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(), new ActionListenerResponseHandler(listener) { + public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener listener) { + transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(), + new ActionListenerResponseHandler(listener) { @Override public TransportResponse newInstance() { return TransportResponse.Empty.INSTANCE; @@ -132,7 +146,8 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { + public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, + final ActionListener listener) { transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { @Override public DfsSearchResult newInstance() { @@ -141,8 +156,10 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { - transportService.sendRequest(node, QUERY_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, + final ActionListener listener) { + transportService.sendRequest(node, QUERY_ACTION_NAME, request, + new ActionListenerResponseHandler(listener) { @Override public QuerySearchResult newInstance() { return new QuerySearchResult(); @@ -159,8 +176,10 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener listener) { - transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, + final ActionListener listener) { + transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, + new ActionListenerResponseHandler(listener) { @Override public ScrollQuerySearchResult newInstance() { return new ScrollQuerySearchResult(); @@ -168,8 +187,10 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { - transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, + final ActionListener listener) { + transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, + new ActionListenerResponseHandler(listener) { @Override public QueryFetchSearchResult newInstance() { return new QueryFetchSearchResult(); @@ -177,8 +198,10 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final ActionListener listener) { - transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, + final ActionListener listener) { + transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, + new ActionListenerResponseHandler(listener) { @Override public QueryFetchSearchResult newInstance() { return new QueryFetchSearchResult(); @@ -186,8 +209,10 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener listener) { - transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { + public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, + final ActionListener listener) { + transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, + new ActionListenerResponseHandler(listener) { @Override public ScrollQueryFetchSearchResult newInstance() { return new ScrollQueryFetchSearchResult(); @@ -195,15 +220,18 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final ActionListener listener) { + public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, + final ActionListener listener) { sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener); } - public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final ActionListener listener) { + public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, + final ActionListener listener) { sendExecuteFetch(node, FETCH_ID_SCROLL_ACTION_NAME, request, listener); } - private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final ActionListener listener) { + private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, + final ActionListener listener) { transportService.sendRequest(node, action, request, new ActionListenerResponseHandler(listener) { @Override public FetchSearchResult newInstance() { @@ -212,17 +240,13 @@ public class SearchServiceTransportAction extends AbstractComponent { }); } - public static class ScrollFreeContextRequest extends TransportRequest { + static class ScrollFreeContextRequest extends TransportRequest { private long id; - public ScrollFreeContextRequest() { + ScrollFreeContextRequest() { } - ScrollFreeContextRequest(ClearScrollRequest request, long id) { - this(id); - } - - private ScrollFreeContextRequest(long id) { + ScrollFreeContextRequest(long id) { this.id = id; } @@ -243,7 +267,7 @@ public class SearchServiceTransportAction extends AbstractComponent { } } - public static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest { + static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest { private OriginalIndices originalIndices; public SearchFreeContextRequest() { @@ -311,7 +335,8 @@ public class SearchServiceTransportAction extends AbstractComponent { } } - class FreeContextTransportHandler implements TransportRequestHandler { + class FreeContextTransportHandler + implements TransportRequestHandler { @Override public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception { boolean freed = searchService.freeContext(request.id()); @@ -319,7 +344,7 @@ public class SearchServiceTransportAction extends AbstractComponent { } } - public static class ClearScrollContextsRequest extends TransportRequest { + static class ClearScrollContextsRequest extends TransportRequest { } class ClearScrollContextsTransportHandler implements TransportRequestHandler { @@ -393,5 +418,4 @@ public class SearchServiceTransportAction extends AbstractComponent { channel.sendResponse(result); } } - } diff --git a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java index d2e05869b94..709308994f9 100644 --- a/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java +++ b/modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java @@ -92,7 +92,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.groovy.GroovyPlugin; -import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.action.SearchTransportService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; @@ -580,8 +580,8 @@ public class IndicesRequestTests extends ESIntegTestCase { } public void testSearchQueryThenFetch() throws Exception { - interceptTransportActions(SearchServiceTransportAction.QUERY_ACTION_NAME, - SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + interceptTransportActions(SearchTransportService.QUERY_ACTION_NAME, + SearchTransportService.FETCH_ID_ACTION_NAME, SearchTransportService.FREE_CONTEXT_ACTION_NAME); String[] randomIndicesOrAliases = randomIndicesOrAliases(); for (int i = 0; i < randomIndicesOrAliases.length; i++) { @@ -595,14 +595,14 @@ public class IndicesRequestTests extends ESIntegTestCase { assertThat(searchResponse.getHits().totalHits(), greaterThan(0L)); clearInterceptedActions(); - assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_ACTION_NAME, SearchServiceTransportAction.FETCH_ID_ACTION_NAME); + assertSameIndices(searchRequest, SearchTransportService.QUERY_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME); //free context messages are not necessarily sent, but if they are, check their indices - assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME); } public void testSearchDfsQueryThenFetch() throws Exception { - interceptTransportActions(SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME, - SearchServiceTransportAction.FETCH_ID_ACTION_NAME, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + interceptTransportActions(SearchTransportService.DFS_ACTION_NAME, SearchTransportService.QUERY_ID_ACTION_NAME, + SearchTransportService.FETCH_ID_ACTION_NAME, SearchTransportService.FREE_CONTEXT_ACTION_NAME); String[] randomIndicesOrAliases = randomIndicesOrAliases(); for (int i = 0; i < randomIndicesOrAliases.length; i++) { @@ -616,15 +616,15 @@ public class IndicesRequestTests extends ESIntegTestCase { assertThat(searchResponse.getHits().totalHits(), greaterThan(0L)); clearInterceptedActions(); - assertSameIndices(searchRequest, SearchServiceTransportAction.DFS_ACTION_NAME, SearchServiceTransportAction.QUERY_ID_ACTION_NAME, - SearchServiceTransportAction.FETCH_ID_ACTION_NAME); + assertSameIndices(searchRequest, SearchTransportService.DFS_ACTION_NAME, SearchTransportService.QUERY_ID_ACTION_NAME, + SearchTransportService.FETCH_ID_ACTION_NAME); //free context messages are not necessarily sent, but if they are, check their indices - assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME); } public void testSearchQueryAndFetch() throws Exception { - interceptTransportActions(SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME, - SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + interceptTransportActions(SearchTransportService.QUERY_FETCH_ACTION_NAME, + SearchTransportService.FREE_CONTEXT_ACTION_NAME); String[] randomIndicesOrAliases = randomIndicesOrAliases(); for (int i = 0; i < randomIndicesOrAliases.length; i++) { @@ -638,14 +638,14 @@ public class IndicesRequestTests extends ESIntegTestCase { assertThat(searchResponse.getHits().totalHits(), greaterThan(0L)); clearInterceptedActions(); - assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_FETCH_ACTION_NAME); + assertSameIndices(searchRequest, SearchTransportService.QUERY_FETCH_ACTION_NAME); //free context messages are not necessarily sent, but if they are, check their indices - assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME); } public void testSearchDfsQueryAndFetch() throws Exception { - interceptTransportActions(SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME, - SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + interceptTransportActions(SearchTransportService.QUERY_QUERY_FETCH_ACTION_NAME, + SearchTransportService.FREE_CONTEXT_ACTION_NAME); String[] randomIndicesOrAliases = randomIndicesOrAliases(); for (int i = 0; i < randomIndicesOrAliases.length; i++) { @@ -659,9 +659,9 @@ public class IndicesRequestTests extends ESIntegTestCase { assertThat(searchResponse.getHits().totalHits(), greaterThan(0L)); clearInterceptedActions(); - assertSameIndices(searchRequest, SearchServiceTransportAction.QUERY_QUERY_FETCH_ACTION_NAME); + assertSameIndices(searchRequest, SearchTransportService.QUERY_QUERY_FETCH_ACTION_NAME); //free context messages are not necessarily sent, but if they are, check their indices - assertSameIndicesOptionalRequests(searchRequest, SearchServiceTransportAction.FREE_CONTEXT_ACTION_NAME); + assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME); } private static void assertSameIndices(IndicesRequest originalRequest, String... actions) {