Don't register SearchTransportService handlers more than once (#20468)

This utility class is used in 3 places while we only need to register
the handlers once per node. Otherwise we will see nasty `WARN` logs like:
`registered two transport handlers for action indices:data/read/search[phase/fetch/id/scroll]...`

This change will only register handlers inside the main TransportSearchAction.
This commit is contained in:
Simon Willnauer 2016-09-14 10:34:40 +02:00 committed by GitHub
parent 26dc6f1306
commit a1cd6be777
4 changed files with 72 additions and 123 deletions

View File

@ -71,36 +71,10 @@ public class SearchTransportService extends AbstractComponent {
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
private final TransportService transportService;
private final SearchService searchService;
SearchTransportService(Settings settings, TransportService transportService, SearchService searchService) {
SearchTransportService(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
this.searchService = searchService;
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME,
new FreeContextTransportHandler<>());
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
new FreeContextTransportHandler<>());
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, ClearScrollContextsRequest::new, ThreadPool.Names.SAME,
new ClearScrollContextsTransportHandler());
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new SearchDfsTransportHandler());
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryTransportHandler());
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryByIdTransportHandler());
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryScrollTransportHandler());
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryFetchTransportHandler());
transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryQueryFetchTransportHandler());
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
new SearchQueryFetchScrollTransportHandler());
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
new FetchByIdTransportHandler<>());
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
new FetchByIdTransportHandler<>());
}
public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) {
@ -124,8 +98,8 @@ public class SearchTransportService extends AbstractComponent {
}
public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener<TransportResponse> listener) {
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(),
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE,
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
}
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request,
@ -278,87 +252,66 @@ public class SearchTransportService extends AbstractComponent {
}
}
class FreeContextTransportHandler<FreeContextRequest extends ScrollFreeContextRequest>
implements TransportRequestHandler<FreeContextRequest> {
@Override
public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
}
static class ClearScrollContextsRequest extends TransportRequest {
}
class ClearScrollContextsTransportHandler implements TransportRequestHandler<ClearScrollContextsRequest> {
@Override
public void messageReceived(ClearScrollContextsRequest request, TransportChannel channel) throws Exception {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
}
class SearchDfsTransportHandler implements TransportRequestHandler<ShardSearchTransportRequest> {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
DfsSearchResult result = searchService.executeDfsPhase(request);
channel.sendResponse(result);
}
}
class SearchQueryTransportHandler implements TransportRequestHandler<ShardSearchTransportRequest> {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
QuerySearchResultProvider result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}
}
class SearchQueryByIdTransportHandler implements TransportRequestHandler<QuerySearchRequest> {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel) throws Exception {
QuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}
}
class SearchQueryScrollTransportHandler implements TransportRequestHandler<InternalScrollSearchRequest> {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel) throws Exception {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
}
}
class SearchQueryFetchTransportHandler implements TransportRequestHandler<ShardSearchTransportRequest> {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
}
class SearchQueryQueryFetchTransportHandler implements TransportRequestHandler<QuerySearchRequest> {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel) throws Exception {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
}
class FetchByIdTransportHandler<Request extends ShardFetchRequest> implements TransportRequestHandler<Request> {
@Override
public void messageReceived(Request request, TransportChannel channel) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
}
class SearchQueryFetchScrollTransportHandler implements TransportRequestHandler<InternalScrollSearchRequest> {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel) throws Exception {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME,
((request, channel) -> {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}));
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.SAME,
(request, channel) -> {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
DfsSearchResult result = searchService.executeDfsPhase(request);
channel.sendResponse(result);
});
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
QuerySearchResultProvider result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
});
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
QuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
});
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request);
channel.sendResponse(result);
});
transportService.registerRequestHandler(QUERY_FETCH_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
});
transportService.registerRequestHandler(QUERY_QUERY_FETCH_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
QueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
});
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
});
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
});
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
(request, channel) -> {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
});
}
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -44,8 +43,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;
/**
*/
public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> {
private final ClusterService clusterService;
@ -53,11 +50,11 @@ public class TransportClearScrollAction extends HandledTransportAction<ClearScro
@Inject
public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool,
ClusterService clusterService, SearchService searchService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, ClearScrollRequest::new);
this.clusterService = clusterService;
this.searchTransportService = new SearchTransportService(settings, transportService, searchService);
this.searchTransportService = new SearchTransportService(settings, transportService);
}
@Override

View File

@ -60,7 +60,8 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
this.searchPhaseController = new SearchPhaseController(settings, bigArrays, scriptService, clusterService);;
this.searchTransportService = new SearchTransportService(settings, transportService, searchService);
this.searchTransportService = new SearchTransportService(settings, transportService);
SearchTransportService.registerRequestHandler(transportService, searchService);
this.clusterService = clusterService;
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -47,13 +46,12 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
@Inject
public TransportSearchScrollAction(Settings settings, BigArrays bigArrays, ThreadPool threadPool, ScriptService scriptService,
TransportService transportService,
ClusterService clusterService, SearchService searchService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
TransportService transportService, ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
SearchScrollRequest::new);
this.clusterService = clusterService;
this.searchTransportService = new SearchTransportService(settings, transportService, searchService);
this.searchTransportService = new SearchTransportService(settings, transportService);
this.searchPhaseController = new SearchPhaseController(settings, bigArrays, scriptService, clusterService);
}