[REFACTOR] TransportActions

Get rid of boilerplate code for handling transport actions.
Make these transport actions extend HandledTransportAction where this code
now lives.
This commit is contained in:
Brian Murphy 2014-07-23 16:18:34 +01:00
parent 3e30fa2089
commit ce864d4016
10 changed files with 67 additions and 406 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.mapping.get;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -38,17 +39,16 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/** /**
*/ */
public class TransportGetFieldMappingsAction extends TransportAction<GetFieldMappingsRequest, GetFieldMappingsResponse> { public class TransportGetFieldMappingsAction extends HandledTransportAction<GetFieldMappingsRequest, GetFieldMappingsResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportGetFieldMappingsIndexAction shardAction; private final TransportGetFieldMappingsIndexAction shardAction;
@Inject @Inject
public TransportGetFieldMappingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportGetFieldMappingsIndexAction shardAction, ActionFilters actionFilters) { public TransportGetFieldMappingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportGetFieldMappingsIndexAction shardAction, ActionFilters actionFilters) {
super(settings, GetFieldMappingsAction.NAME, threadPool, actionFilters); super(settings, GetFieldMappingsAction.NAME, threadPool, transportService, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.shardAction = shardAction; this.shardAction = shardAction;
transportService.registerHandler(actionName, new TransportHandler());
} }
@Override @Override
@ -101,41 +101,8 @@ public class TransportGetFieldMappingsAction extends TransportAction<GetFieldMap
return new GetFieldMappingsResponse(mergedResponses.immutableMap()); return new GetFieldMappingsResponse(mergedResponses.immutableMap());
} }
private class TransportHandler extends BaseTransportRequestHandler<GetFieldMappingsRequest> {
@Override @Override
public GetFieldMappingsRequest newInstance() { public GetFieldMappingsRequest newRequestInstance() {
return new GetFieldMappingsRequest(); return new GetFieldMappingsRequest();
} }
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void messageReceived(final GetFieldMappingsRequest request, final TransportChannel channel) throws Exception {
// no need for a threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<GetFieldMappingsResponse>() {
@Override
public void onResponse(GetFieldMappingsResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [" + actionName + "] and request [" + request + "]", e1);
}
}
});
}
}
} }

View File

@ -34,6 +34,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -63,7 +64,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
* *
*/ */
public class TransportBulkAction extends TransportAction<BulkRequest, BulkResponse> { public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
private final AutoCreateIndex autoCreateIndex; private final AutoCreateIndex autoCreateIndex;
@ -78,15 +79,18 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
@Inject @Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction, ActionFilters actionFilters) { TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction, ActionFilters actionFilters) {
super(settings, BulkAction.NAME, threadPool, actionFilters); super(settings, BulkAction.NAME, threadPool, transportService, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.shardBulkAction = shardBulkAction; this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction; this.createIndexAction = createIndexAction;
this.autoCreateIndex = new AutoCreateIndex(settings); this.autoCreateIndex = new AutoCreateIndex(settings);
this.allowIdGeneration = componentSettings.getAsBoolean("action.allow_id_generation", true); this.allowIdGeneration = componentSettings.getAsBoolean("action.allow_id_generation", true);
}
transportService.registerHandler(BulkAction.NAME, new TransportHandler()); @Override
public BulkRequest newRequestInstance(){
return new BulkRequest();
} }
@Override @Override
@ -337,42 +341,4 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
}); });
} }
} }
class TransportHandler extends BaseTransportRequestHandler<BulkRequest> {
@Override
public BulkRequest newInstance() {
return new BulkRequest();
}
@Override
public void messageReceived(final BulkRequest request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [" + BulkAction.NAME + "] and request [" + request + "]", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.get;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -39,7 +40,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class TransportMultiGetAction extends TransportAction<MultiGetRequest, MultiGetResponse> { public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequest, MultiGetResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
@ -47,11 +48,14 @@ public class TransportMultiGetAction extends TransportAction<MultiGetRequest, Mu
@Inject @Inject
public TransportMultiGetAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, TransportShardMultiGetAction shardAction, ActionFilters actionFilters) { public TransportMultiGetAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, TransportShardMultiGetAction shardAction, ActionFilters actionFilters) {
super(settings, MultiGetAction.NAME, threadPool, actionFilters); super(settings, MultiGetAction.NAME, threadPool, transportService, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.shardAction = shardAction; this.shardAction = shardAction;
}
transportService.registerHandler(MultiGetAction.NAME, new TransportHandler()); @Override
public MultiGetRequest newRequestInstance(){
return new MultiGetRequest();
} }
@Override @Override
@ -128,42 +132,4 @@ public class TransportMultiGetAction extends TransportAction<MultiGetRequest, Mu
}); });
} }
} }
class TransportHandler extends BaseTransportRequestHandler<MultiGetRequest> {
@Override
public MultiGetRequest newInstance() {
return new MultiGetRequest();
}
@Override
public void messageReceived(final MultiGetRequest request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<MultiGetResponse>() {
@Override
public void onResponse(MultiGetResponse response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [" + MultiGetAction.NAME + "] and request [" + request + "]", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -65,7 +66,7 @@ import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource;
/** /**
* The more like this action. * The more like this action.
*/ */
public class TransportMoreLikeThisAction extends TransportAction<MoreLikeThisRequest, SearchResponse> { public class TransportMoreLikeThisAction extends HandledTransportAction<MoreLikeThisRequest, SearchResponse> {
private final TransportSearchAction searchAction; private final TransportSearchAction searchAction;
@ -80,14 +81,17 @@ public class TransportMoreLikeThisAction extends TransportAction<MoreLikeThisReq
@Inject @Inject
public TransportMoreLikeThisAction(Settings settings, ThreadPool threadPool, TransportSearchAction searchAction, TransportGetAction getAction, public TransportMoreLikeThisAction(Settings settings, ThreadPool threadPool, TransportSearchAction searchAction, TransportGetAction getAction,
ClusterService clusterService, IndicesService indicesService, TransportService transportService, ActionFilters actionFilters) { ClusterService clusterService, IndicesService indicesService, TransportService transportService, ActionFilters actionFilters) {
super(settings, MoreLikeThisAction.NAME, threadPool, actionFilters); super(settings, MoreLikeThisAction.NAME, threadPool, transportService, actionFilters);
this.searchAction = searchAction; this.searchAction = searchAction;
this.getAction = getAction; this.getAction = getAction;
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.transportService = transportService; this.transportService = transportService;
}
transportService.registerHandler(MoreLikeThisAction.NAME, new TransportHandler()); @Override
public MoreLikeThisRequest newRequestInstance(){
return new MoreLikeThisRequest();
} }
@Override @Override
@ -331,42 +335,4 @@ public class TransportMoreLikeThisAction extends TransportAction<MoreLikeThisReq
.failOnUnsupportedField(failOnUnsupportedField); .failOnUnsupportedField(failOnUnsupportedField);
boolBuilder.should(mlt); boolBuilder.should(mlt);
} }
private class TransportHandler extends BaseTransportRequestHandler<MoreLikeThisRequest> {
@Override
public MoreLikeThisRequest newInstance() {
return new MoreLikeThisRequest();
}
@Override
public void messageReceived(MoreLikeThisRequest request, final TransportChannel channel) throws Exception {
// no need to have a threaded listener since we just send back a response
request.listenerThreaded(false);
execute(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for get", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.get.*; import org.elasticsearch.action.get.*;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -50,7 +51,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/** /**
*/ */
public class TransportMultiPercolateAction extends TransportAction<MultiPercolateRequest, MultiPercolateResponse> { public class TransportMultiPercolateAction extends HandledTransportAction<MultiPercolateRequest, MultiPercolateResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final PercolatorService percolatorService; private final PercolatorService percolatorService;
@ -62,13 +63,16 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
public TransportMultiPercolateAction(Settings settings, ThreadPool threadPool, TransportShardMultiPercolateAction shardMultiPercolateAction, public TransportMultiPercolateAction(Settings settings, ThreadPool threadPool, TransportShardMultiPercolateAction shardMultiPercolateAction,
ClusterService clusterService, TransportService transportService, PercolatorService percolatorService, ClusterService clusterService, TransportService transportService, PercolatorService percolatorService,
TransportMultiGetAction multiGetAction, ActionFilters actionFilters) { TransportMultiGetAction multiGetAction, ActionFilters actionFilters) {
super(settings, MultiPercolateAction.NAME, threadPool, actionFilters); super(settings, MultiPercolateAction.NAME, threadPool, transportService, actionFilters);
this.shardMultiPercolateAction = shardMultiPercolateAction; this.shardMultiPercolateAction = shardMultiPercolateAction;
this.clusterService = clusterService; this.clusterService = clusterService;
this.percolatorService = percolatorService; this.percolatorService = percolatorService;
this.multiGetAction = multiGetAction; this.multiGetAction = multiGetAction;
}
transportService.registerHandler(MultiPercolateAction.NAME, new TransportHandler()); @Override
public MultiPercolateRequest newRequestInstance() {
return new MultiPercolateRequest();
} }
@Override @Override
@ -135,7 +139,6 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
} }
private class ASyncAction { private class ASyncAction {
final ActionListener<MultiPercolateResponse> finalListener; final ActionListener<MultiPercolateResponse> finalListener;
@ -322,43 +325,4 @@ public class TransportMultiPercolateAction extends TransportAction<MultiPercolat
} }
class TransportHandler extends BaseTransportRequestHandler<MultiPercolateRequest> {
@Override
public MultiPercolateRequest newInstance() {
return new MultiPercolateRequest();
}
@Override
public void messageReceived(final MultiPercolateRequest request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<MultiPercolateResponse>() {
@Override
public void onResponse(MultiPercolateResponse response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [mpercolate] and request [" + request + "]", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -45,17 +46,16 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.parseSc
/** /**
*/ */
public class TransportClearScrollAction extends TransportAction<ClearScrollRequest, ClearScrollResponse> { public class TransportClearScrollAction extends HandledTransportAction<ClearScrollRequest, ClearScrollResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final SearchServiceTransportAction searchServiceTransportAction; private final SearchServiceTransportAction searchServiceTransportAction;
@Inject @Inject
public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, SearchServiceTransportAction searchServiceTransportAction, ActionFilters actionFilters) { public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, SearchServiceTransportAction searchServiceTransportAction, ActionFilters actionFilters) {
super(settings, ClearScrollAction.NAME, threadPool, actionFilters); super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.searchServiceTransportAction = searchServiceTransportAction; this.searchServiceTransportAction = searchServiceTransportAction;
transportService.registerHandler(ClearScrollAction.NAME, new TransportHandler());
} }
@Override @Override
@ -63,6 +63,11 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
new Async(request, listener, clusterService.state()).run(); new Async(request, listener, clusterService.state()).run();
} }
@Override
public ClearScrollRequest newRequestInstance() {
return new ClearScrollRequest();
}
private class Async { private class Async {
final DiscoveryNodes nodes; final DiscoveryNodes nodes;
@ -158,42 +163,4 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
} }
class TransportHandler extends BaseTransportRequestHandler<ClearScrollRequest> {
@Override
public ClearScrollRequest newInstance() {
return new ClearScrollRequest();
}
@Override
public void messageReceived(final ClearScrollRequest request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [clear_sc] and request [" + request + "]", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -38,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
public class TransportMultiSearchAction extends TransportAction<MultiSearchRequest, MultiSearchResponse> { public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
@ -46,11 +47,9 @@ public class TransportMultiSearchAction extends TransportAction<MultiSearchReque
@Inject @Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) { public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) {
super(settings, MultiSearchAction.NAME, threadPool, actionFilters); super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.searchAction = searchAction; this.searchAction = searchAction;
transportService.registerHandler(MultiSearchAction.NAME, new TransportHandler());
} }
@Override @Override
@ -86,41 +85,8 @@ public class TransportMultiSearchAction extends TransportAction<MultiSearchReque
} }
} }
class TransportHandler extends BaseTransportRequestHandler<MultiSearchRequest> {
@Override @Override
public MultiSearchRequest newInstance() { public MultiSearchRequest newRequestInstance() {
return new MultiSearchRequest(); return new MultiSearchRequest();
} }
@Override
public void messageReceived(final MultiSearchRequest request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<MultiSearchResponse>() {
@Override
public void onResponse(MultiSearchResponse response) {
try {
channel.sendResponse(response);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send error response for action [msearch] and request [" + request + "]", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.type.*; import org.elasticsearch.action.search.type.*;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -41,7 +42,7 @@ import static org.elasticsearch.action.search.SearchType.*;
/** /**
* *
*/ */
public class TransportSearchAction extends TransportAction<SearchRequest, SearchResponse> { public class TransportSearchAction extends HandledTransportAction<SearchRequest, SearchResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction; private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction;
@ -61,7 +62,7 @@ public class TransportSearchAction extends TransportAction<SearchRequest, Search
TransportSearchQueryAndFetchAction queryAndFetchAction, TransportSearchQueryAndFetchAction queryAndFetchAction,
TransportSearchScanAction scanAction, TransportSearchScanAction scanAction,
TransportSearchCountAction countAction, ActionFilters actionFilters) { TransportSearchCountAction countAction, ActionFilters actionFilters) {
super(settings, SearchAction.NAME, threadPool, actionFilters); super(settings, SearchAction.NAME, threadPool, transportService, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.dfsQueryThenFetchAction = dfsQueryThenFetchAction; this.dfsQueryThenFetchAction = dfsQueryThenFetchAction;
this.queryThenFetchAction = queryThenFetchAction; this.queryThenFetchAction = queryThenFetchAction;
@ -72,7 +73,7 @@ public class TransportSearchAction extends TransportAction<SearchRequest, Search
this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true); this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true);
transportService.registerHandler(SearchAction.NAME, new TransportHandler());
} }
@Override @Override
@ -111,41 +112,8 @@ public class TransportSearchAction extends TransportAction<SearchRequest, Search
} }
} }
private class TransportHandler extends BaseTransportRequestHandler<SearchRequest> {
@Override @Override
public SearchRequest newInstance() { public SearchRequest newRequestInstance() {
return new SearchRequest(); return new SearchRequest();
} }
@Override
public void messageReceived(SearchRequest request, final TransportChannel channel) throws Exception {
// no need for a threaded listener
request.listenerThreaded(false);
execute(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for search", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAc
import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction; import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollScanAction; import org.elasticsearch.action.search.type.TransportSearchScrollScanAction;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -40,7 +41,7 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.parseSc
/** /**
* *
*/ */
public class TransportSearchScrollAction extends TransportAction<SearchScrollRequest, SearchResponse> { public class TransportSearchScrollAction extends HandledTransportAction<SearchScrollRequest, SearchResponse> {
private final TransportSearchScrollQueryThenFetchAction queryThenFetchAction; private final TransportSearchScrollQueryThenFetchAction queryThenFetchAction;
@ -53,12 +54,10 @@ public class TransportSearchScrollAction extends TransportAction<SearchScrollReq
TransportSearchScrollQueryThenFetchAction queryThenFetchAction, TransportSearchScrollQueryThenFetchAction queryThenFetchAction,
TransportSearchScrollQueryAndFetchAction queryAndFetchAction, TransportSearchScrollQueryAndFetchAction queryAndFetchAction,
TransportSearchScrollScanAction scanAction, ActionFilters actionFilters) { TransportSearchScrollScanAction scanAction, ActionFilters actionFilters) {
super(settings, SearchScrollAction.NAME, threadPool, actionFilters); super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters);
this.queryThenFetchAction = queryThenFetchAction; this.queryThenFetchAction = queryThenFetchAction;
this.queryAndFetchAction = queryAndFetchAction; this.queryAndFetchAction = queryAndFetchAction;
this.scanAction = scanAction; this.scanAction = scanAction;
transportService.registerHandler(SearchScrollAction.NAME, new TransportHandler());
} }
@Override @Override
@ -79,41 +78,8 @@ public class TransportSearchScrollAction extends TransportAction<SearchScrollReq
} }
} }
private class TransportHandler extends BaseTransportRequestHandler<SearchScrollRequest> {
@Override @Override
public SearchScrollRequest newInstance() { public SearchScrollRequest newRequestInstance() {
return new SearchScrollRequest(); return new SearchScrollRequest();
} }
@Override
public void messageReceived(SearchScrollRequest request, final TransportChannel channel) throws Exception {
// no need for a threaded listener
request.listenerThreaded(false);
execute(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse result) {
try {
channel.sendResponse(result);
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
logger.warn("Failed to send response for search", e1);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.termvector;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -39,7 +40,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
public class TransportMultiTermVectorsAction extends TransportAction<MultiTermVectorsRequest, MultiTermVectorsResponse> { public class TransportMultiTermVectorsAction extends HandledTransportAction<MultiTermVectorsRequest, MultiTermVectorsResponse> {
private final ClusterService clusterService; private final ClusterService clusterService;
@ -48,11 +49,9 @@ public class TransportMultiTermVectorsAction extends TransportAction<MultiTermVe
@Inject @Inject
public TransportMultiTermVectorsAction(Settings settings, ThreadPool threadPool, TransportService transportService, public TransportMultiTermVectorsAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportSingleShardMultiTermsVectorAction shardAction, ActionFilters actionFilters) { ClusterService clusterService, TransportSingleShardMultiTermsVectorAction shardAction, ActionFilters actionFilters) {
super(settings, MultiTermVectorsAction.NAME, threadPool, actionFilters); super(settings, MultiTermVectorsAction.NAME, threadPool, transportService, actionFilters);
this.clusterService = clusterService; this.clusterService = clusterService;
this.shardAction = shardAction; this.shardAction = shardAction;
transportService.registerHandler(MultiTermVectorsAction.NAME, new TransportHandler());
} }
@Override @Override
@ -134,42 +133,8 @@ public class TransportMultiTermVectorsAction extends TransportAction<MultiTermVe
} }
} }
class TransportHandler extends BaseTransportRequestHandler<MultiTermVectorsRequest> {
@Override @Override
public MultiTermVectorsRequest newInstance() { public MultiTermVectorsRequest newRequestInstance() {
return new MultiTermVectorsRequest(); return new MultiTermVectorsRequest();
} }
@Override
public void messageReceived(final MultiTermVectorsRequest request, final TransportChannel channel) throws Exception {
// no need to use threaded listener, since we just send a response
request.listenerThreaded(false);
execute(request, new ActionListener<MultiTermVectorsResponse>() {
@Override
public void onResponse(MultiTermVectorsResponse response) {
try {
channel.sendResponse(response);
} catch (Throwable t) {
onFailure(t);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Throwable t) {
logger.warn("Failed to send error response for action [" + MultiTermVectorsAction.NAME + "] and request ["
+ request + "]", t);
}
}
});
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}
} }