From a1cd6be7775ef1f239023fa88b0294e16cb9b309 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 14 Sep 2016 10:34:40 +0200 Subject: [PATCH] Don't register SearchTransportService handlers more than once (#20468) This utility class is used in 3 places while we only need to register the handlers once per node. Otherwise we will see nasty `WARN` logs like: `registered two transport handlers for action indices:data/read/search[phase/fetch/id/scroll]...` This change will only register handlers inside the main TransportSearchAction. --- .../action/search/SearchTransportService.java | 175 +++++++----------- .../search/TransportClearScrollAction.java | 9 +- .../action/search/TransportSearchAction.java | 3 +- .../search/TransportSearchScrollAction.java | 8 +- 4 files changed, 72 insertions(+), 123 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 985cc854f05..d5a0bfad891 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -71,36 +71,10 @@ public class SearchTransportService extends AbstractComponent { public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; private final TransportService transportService; - private final SearchService searchService; - SearchTransportService(Settings settings, TransportService transportService, SearchService searchService) { + SearchTransportService(Settings settings, TransportService transportService) { super(settings); this.transportService = transportService; - this.searchService = searchService; - transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME, - new FreeContextTransportHandler<>()); - transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, - new FreeContextTransportHandler<>()); - transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME, - new ClearScrollContextsTransportHandler()); - transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, - new SearchDfsTransportHandler()); - transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, - new SearchQueryTransportHandler()); - transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, - new SearchQueryByIdTransportHandler()); - transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, - new SearchQueryScrollTransportHandler()); - transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, - new SearchQueryFetchTransportHandler()); - transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, - new SearchQueryQueryFetchTransportHandler()); - transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, - new SearchQueryFetchScrollTransportHandler()); - transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, - new FetchByIdTransportHandler<>()); - transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, - new FetchByIdTransportHandler<>()); } public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { @@ -124,8 +98,8 @@ public class SearchTransportService extends AbstractComponent { } public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener listener) { - transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(), - new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); + transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE, + new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); } public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, @@ -278,87 +252,66 @@ public class SearchTransportService extends AbstractComponent { } } - class FreeContextTransportHandler - implements TransportRequestHandler { - @Override - public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception { - boolean freed = searchService.freeContext(request.id()); - channel.sendResponse(new SearchFreeContextResponse(freed)); - } - } - - static class ClearScrollContextsRequest extends TransportRequest { - } - - class ClearScrollContextsTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(ClearScrollContextsRequest request, TransportChannel channel) throws Exception { - searchService.freeAllScrollContexts(); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - } - - class SearchDfsTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception { - DfsSearchResult result = searchService.executeDfsPhase(request); - channel.sendResponse(result); - } - } - - class SearchQueryTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception { - QuerySearchResultProvider result = searchService.executeQueryPhase(request); - channel.sendResponse(result); - } - } - - class SearchQueryByIdTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(QuerySearchRequest request, TransportChannel channel) throws Exception { - QuerySearchResult result = searchService.executeQueryPhase(request); - channel.sendResponse(result); - } - } - - class SearchQueryScrollTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel) throws Exception { - ScrollQuerySearchResult result = searchService.executeQueryPhase(request); - channel.sendResponse(result); - } - } - - class SearchQueryFetchTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception { - QueryFetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); - } - } - - class SearchQueryQueryFetchTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(QuerySearchRequest request, TransportChannel channel) throws Exception { - QueryFetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); - } - } - - class FetchByIdTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(Request request, TransportChannel channel) throws Exception { - FetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); - } - } - - class SearchQueryFetchScrollTransportHandler implements TransportRequestHandler { - @Override - public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel) throws Exception { - ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request); - channel.sendResponse(result); - } + public static void registerRequestHandler(TransportService transportService, SearchService searchService) { + transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME, + ((request, channel) -> { + boolean freed = searchService.freeContext(request.id()); + channel.sendResponse(new SearchFreeContextResponse(freed)); + })); + transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME, + (request, channel) -> { + boolean freed = searchService.freeContext(request.id()); + channel.sendResponse(new SearchFreeContextResponse(freed)); + }); + transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.SAME, + (request, channel) -> { + searchService.freeAllScrollContexts(); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + }); + transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + DfsSearchResult result = searchService.executeDfsPhase(request); + channel.sendResponse(result); + }); + transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + QuerySearchResultProvider result = searchService.executeQueryPhase(request); + channel.sendResponse(result); + }); + transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + QuerySearchResult result = searchService.executeQueryPhase(request); + channel.sendResponse(result); + }); + transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + ScrollQuerySearchResult result = searchService.executeQueryPhase(request); + channel.sendResponse(result); + }); + transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + QueryFetchSearchResult result = searchService.executeFetchPhase(request); + channel.sendResponse(result); + }); + transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + QueryFetchSearchResult result = searchService.executeFetchPhase(request); + channel.sendResponse(result); + }); + transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request); + channel.sendResponse(result); + }); + transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + FetchSearchResult result = searchService.executeFetchPhase(request); + channel.sendResponse(result); + }); + transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH, + (request, channel) -> { + FetchSearchResult result = searchService.executeFetchPhase(request); + channel.sendResponse(result); + }); } } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index cb7e7531d2d..ef3815b1b32 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.search.SearchService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -44,8 +43,6 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId; -/** - */ public class TransportClearScrollAction extends HandledTransportAction { private final ClusterService clusterService; @@ -53,11 +50,11 @@ public class TransportClearScrollAction extends HandledTransportAction