diff --git a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java index 603adb38a97..4ec4b7c3a59 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.mapping.get; import com.google.common.collect.ImmutableMap; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -38,17 +39,16 @@ import java.util.concurrent.atomic.AtomicReferenceArray; /** */ -public class TransportGetFieldMappingsAction extends TransportAction { +public class TransportGetFieldMappingsAction extends HandledTransportAction { private final ClusterService clusterService; private final TransportGetFieldMappingsIndexAction shardAction; @Inject public TransportGetFieldMappingsAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportGetFieldMappingsIndexAction shardAction, ActionFilters actionFilters) { - super(settings, GetFieldMappingsAction.NAME, threadPool, actionFilters); + super(settings, GetFieldMappingsAction.NAME, threadPool, transportService, actionFilters); this.clusterService = clusterService; this.shardAction = shardAction; - transportService.registerHandler(actionName, new TransportHandler()); } @Override @@ -101,41 +101,8 @@ public class TransportGetFieldMappingsAction extends TransportAction { - - @Override - public GetFieldMappingsRequest newInstance() { - return new GetFieldMappingsRequest(); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public void messageReceived(final GetFieldMappingsRequest request, final TransportChannel channel) throws Exception { - // no need for a threaded listener, since we just send a response - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(GetFieldMappingsResponse result) { - try { - channel.sendResponse(result); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send error response for action [" + actionName + "] and request [" + request + "]", e1); - } - } - }); - } + @Override + public GetFieldMappingsRequest newRequestInstance() { + return new GetFieldMappingsRequest(); } } diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 4841f8f210a..c441ad4bdf0 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -34,6 +34,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterService; @@ -63,7 +64,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** * */ -public class TransportBulkAction extends TransportAction { +public class TransportBulkAction extends HandledTransportAction { private final AutoCreateIndex autoCreateIndex; @@ -78,15 +79,18 @@ public class TransportBulkAction extends TransportAction { - - @Override - public BulkRequest newInstance() { - return new BulkRequest(); - } - - @Override - public void messageReceived(final BulkRequest request, final TransportChannel channel) throws Exception { - // no need to use threaded listener, since we just send a response - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(BulkResponse result) { - try { - channel.sendResponse(result); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send error response for action [" + BulkAction.NAME + "] and request [" + request + "]", e1); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } } diff --git a/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java b/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java index 72a71e131bc..26ed5c00276 100644 --- a/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java +++ b/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.get; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -39,7 +40,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -public class TransportMultiGetAction extends TransportAction { +public class TransportMultiGetAction extends HandledTransportAction { private final ClusterService clusterService; @@ -47,11 +48,14 @@ public class TransportMultiGetAction extends TransportAction { - - @Override - public MultiGetRequest newInstance() { - return new MultiGetRequest(); - } - - @Override - public void messageReceived(final MultiGetRequest request, final TransportChannel channel) throws Exception { - // no need to use threaded listener, since we just send a response - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(MultiGetResponse response) { - try { - channel.sendResponse(response); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send error response for action [" + MultiGetAction.NAME + "] and request [" + request + "]", e1); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } } diff --git a/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java b/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java index 29599440dc0..fad9bdf0b1b 100644 --- a/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java +++ b/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -65,7 +66,7 @@ import static org.elasticsearch.search.builder.SearchSourceBuilder.searchSource; /** * The more like this action. */ -public class TransportMoreLikeThisAction extends TransportAction { +public class TransportMoreLikeThisAction extends HandledTransportAction { private final TransportSearchAction searchAction; @@ -80,14 +81,17 @@ public class TransportMoreLikeThisAction extends TransportAction { - - @Override - public MoreLikeThisRequest newInstance() { - return new MoreLikeThisRequest(); - } - - @Override - public void messageReceived(MoreLikeThisRequest request, final TransportChannel channel) throws Exception { - // no need to have a threaded listener since we just send back a response - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(SearchResponse result) { - try { - channel.sendResponse(result); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send response for get", e1); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } } diff --git a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java index 16bdf7bbd95..c22ca31bf9e 100644 --- a/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java +++ b/src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.get.*; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; import org.elasticsearch.cluster.ClusterService; @@ -50,7 +51,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; /** */ -public class TransportMultiPercolateAction extends TransportAction { +public class TransportMultiPercolateAction extends HandledTransportAction { private final ClusterService clusterService; private final PercolatorService percolatorService; @@ -62,13 +63,16 @@ public class TransportMultiPercolateAction extends TransportAction finalListener; @@ -322,43 +325,4 @@ public class TransportMultiPercolateAction extends TransportAction { - - @Override - public MultiPercolateRequest newInstance() { - return new MultiPercolateRequest(); - } - - @Override - public void messageReceived(final MultiPercolateRequest request, final TransportChannel channel) throws Exception { - // no need to use threaded listener, since we just send a response - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(MultiPercolateResponse response) { - try { - channel.sendResponse(response); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send error response for action [mpercolate] and request [" + request + "]", e1); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } - } diff --git a/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index ba7f947383e..a7268f15634 100644 --- a/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -45,17 +46,16 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.parseSc /** */ -public class TransportClearScrollAction extends TransportAction { +public class TransportClearScrollAction extends HandledTransportAction { private final ClusterService clusterService; private final SearchServiceTransportAction searchServiceTransportAction; @Inject public TransportClearScrollAction(Settings settings, TransportService transportService, ThreadPool threadPool, ClusterService clusterService, SearchServiceTransportAction searchServiceTransportAction, ActionFilters actionFilters) { - super(settings, ClearScrollAction.NAME, threadPool, actionFilters); + super(settings, ClearScrollAction.NAME, threadPool, transportService, actionFilters); this.clusterService = clusterService; this.searchServiceTransportAction = searchServiceTransportAction; - transportService.registerHandler(ClearScrollAction.NAME, new TransportHandler()); } @Override @@ -63,6 +63,11 @@ public class TransportClearScrollAction extends TransportAction { - - @Override - public ClearScrollRequest newInstance() { - return new ClearScrollRequest(); - } - - @Override - public void messageReceived(final ClearScrollRequest request, final TransportChannel channel) throws Exception { - // no need to use threaded listener, since we just send a response - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(ClearScrollResponse response) { - try { - channel.sendResponse(response); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send error response for action [clear_sc] and request [" + request + "]", e1); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - } } diff --git a/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java b/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java index 23951909f06..b5ce0f6d0c1 100644 --- a/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java +++ b/src/main/java/org/elasticsearch/action/search/TransportMultiSearchAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -38,7 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** */ -public class TransportMultiSearchAction extends TransportAction { +public class TransportMultiSearchAction extends HandledTransportAction { private final ClusterService clusterService; @@ -46,11 +47,9 @@ public class TransportMultiSearchAction extends TransportAction { - - @Override - public MultiSearchRequest newInstance() { - return new MultiSearchRequest(); - } - - @Override - public void messageReceived(final MultiSearchRequest request, final TransportChannel channel) throws Exception { - // no need to use threaded listener, since we just send a response - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(MultiSearchResponse response) { - try { - channel.sendResponse(response); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send error response for action [msearch] and request [" + request + "]", e1); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public MultiSearchRequest newRequestInstance() { + return new MultiSearchRequest(); } } diff --git a/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 8b6e8f4a4b4..40b3584c986 100644 --- a/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.type.*; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -41,7 +42,7 @@ import static org.elasticsearch.action.search.SearchType.*; /** * */ -public class TransportSearchAction extends TransportAction { +public class TransportSearchAction extends HandledTransportAction { private final ClusterService clusterService; private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction; @@ -61,7 +62,7 @@ public class TransportSearchAction extends TransportAction { - - @Override - public SearchRequest newInstance() { - return new SearchRequest(); - } - - @Override - public void messageReceived(SearchRequest request, final TransportChannel channel) throws Exception { - // no need for a threaded listener - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(SearchResponse result) { - try { - channel.sendResponse(result); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send response for search", e1); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public SearchRequest newRequestInstance() { + return new SearchRequest(); } } diff --git a/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java b/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java index 181428667b9..145fed9fb55 100644 --- a/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java +++ b/src/main/java/org/elasticsearch/action/search/TransportSearchScrollAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAc import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction; import org.elasticsearch.action.search.type.TransportSearchScrollScanAction; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -40,7 +41,7 @@ import static org.elasticsearch.action.search.type.TransportSearchHelper.parseSc /** * */ -public class TransportSearchScrollAction extends TransportAction { +public class TransportSearchScrollAction extends HandledTransportAction { private final TransportSearchScrollQueryThenFetchAction queryThenFetchAction; @@ -53,12 +54,10 @@ public class TransportSearchScrollAction extends TransportAction { - - @Override - public SearchScrollRequest newInstance() { - return new SearchScrollRequest(); - } - - @Override - public void messageReceived(SearchScrollRequest request, final TransportChannel channel) throws Exception { - // no need for a threaded listener - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(SearchResponse result) { - try { - channel.sendResponse(result); - } catch (Throwable e) { - onFailure(e); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn("Failed to send response for search", e1); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public SearchScrollRequest newRequestInstance() { + return new SearchScrollRequest(); } } diff --git a/src/main/java/org/elasticsearch/action/termvector/TransportMultiTermVectorsAction.java b/src/main/java/org/elasticsearch/action/termvector/TransportMultiTermVectorsAction.java index 5fd53c491e3..91452d8e770 100644 --- a/src/main/java/org/elasticsearch/action/termvector/TransportMultiTermVectorsAction.java +++ b/src/main/java/org/elasticsearch/action/termvector/TransportMultiTermVectorsAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.termvector; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -39,7 +40,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -public class TransportMultiTermVectorsAction extends TransportAction { +public class TransportMultiTermVectorsAction extends HandledTransportAction { private final ClusterService clusterService; @@ -48,11 +49,9 @@ public class TransportMultiTermVectorsAction extends TransportAction { - - @Override - public MultiTermVectorsRequest newInstance() { - return new MultiTermVectorsRequest(); - } - - @Override - public void messageReceived(final MultiTermVectorsRequest request, final TransportChannel channel) throws Exception { - // no need to use threaded listener, since we just send a response - request.listenerThreaded(false); - execute(request, new ActionListener() { - @Override - public void onResponse(MultiTermVectorsResponse response) { - try { - channel.sendResponse(response); - } catch (Throwable t) { - onFailure(t); - } - } - - @Override - public void onFailure(Throwable e) { - try { - channel.sendResponse(e); - } catch (Throwable t) { - logger.warn("Failed to send error response for action [" + MultiTermVectorsAction.NAME + "] and request [" - + request + "]", t); - } - } - }); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } + @Override + public MultiTermVectorsRequest newRequestInstance() { + return new MultiTermVectorsRequest(); } }