diff --git a/core/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java b/core/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java index fb4d99acab9..6cdc1c3194f 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java +++ b/core/src/main/java/org/elasticsearch/action/ActionListenerResponseHandler.java @@ -24,16 +24,21 @@ import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; +import java.util.Objects; +import java.util.function.Supplier; + /** * A simple base class for action response listeners, defaulting to using the SAME executor (as its * very common on response handlers). */ -public abstract class ActionListenerResponseHandler extends BaseTransportResponseHandler { +public class ActionListenerResponseHandler extends BaseTransportResponseHandler { private final ActionListener listener; + private final Supplier responseSupplier; - public ActionListenerResponseHandler(ActionListener listener) { - this.listener = listener; + public ActionListenerResponseHandler(ActionListener listener, Supplier responseSupplier) { + this.listener = Objects.requireNonNull(listener); + this.responseSupplier = Objects.requireNonNull(responseSupplier); } @Override @@ -46,6 +51,11 @@ public abstract class ActionListenerResponseHandler(listener) { - @Override - public Response newInstance() { - return action.newResponse(); - } - }); + transportService.sendRequest(node, action.name(), request, transportOptions, + new ActionListenerResponseHandler<>(listener, action::newResponse)); } } diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestProxyActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestProxyActionFilter.java index 62716c6dc0d..5d2aea389dc 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestProxyActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestProxyActionFilter.java @@ -83,13 +83,7 @@ public final class IngestProxyActionFilter implements ActionFilter { @SuppressWarnings("unchecked") private void forwardIngestRequest(Action action, ActionRequest request, ActionListener listener) { - transportService.sendRequest(randomIngestNode(), action.name(), request, new ActionListenerResponseHandler(listener) { - @Override - public TransportResponse newInstance() { - return action.newResponse(); - } - - }); + transportService.sendRequest(randomIngestNode(), action.name(), request, new ActionListenerResponseHandler(listener, action::newResponse)); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 691bcc66272..ee082610d93 100644 --- a/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -168,12 +168,7 @@ public abstract class TransportMasterNodeAction(listener) { - @Override - public Response newInstance() { - return newResponse(); - } - + transportService.sendRequest(nodes.getMasterNode(), actionName, request, new ActionListenerResponseHandler(listener, TransportMasterNodeAction.this::newResponse) { @Override public void handleException(final TransportException exp) { Throwable cause = exp.unwrapCause(); diff --git a/core/src/main/java/org/elasticsearch/search/action/SearchTransportService.java b/core/src/main/java/org/elasticsearch/search/action/SearchTransportService.java index a15d40e2e02..b6ac92c42da 100644 --- a/core/src/main/java/org/elasticsearch/search/action/SearchTransportService.java +++ b/core/src/main/java/org/elasticsearch/search/action/SearchTransportService.java @@ -108,7 +108,7 @@ public class SearchTransportService extends AbstractComponent { public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) { transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), - new ActionListenerResponseHandler(new ActionListener() { + new ActionListenerResponseHandler<>(new ActionListener() { @Override public void onResponse(SearchFreeContextResponse response) { // no need to respond if it was freed or not @@ -118,106 +118,57 @@ public class SearchTransportService extends AbstractComponent { public void onFailure(Throwable e) { } - }) { - @Override - public SearchFreeContextResponse newInstance() { - return new SearchFreeContextResponse(); - } - }); + }, SearchFreeContextResponse::new)); } public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener listener) { transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId), - new ActionListenerResponseHandler(listener) { - @Override - public SearchFreeContextResponse newInstance() { - return new SearchFreeContextResponse(); - } - }); + new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new)); } public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener listener) { transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(), - new ActionListenerResponseHandler(listener) { - @Override - public TransportResponse newInstance() { - return TransportResponse.Empty.INSTANCE; - } - }); + new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); } public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { - transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { - @Override - public DfsSearchResult newInstance() { - return new DfsSearchResult(); - } - }); + transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<>(listener, DfsSearchResult::new)); } public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { transportService.sendRequest(node, QUERY_ACTION_NAME, request, - new ActionListenerResponseHandler(listener) { - @Override - public QuerySearchResult newInstance() { - return new QuerySearchResult(); - } - }); + new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); } public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final ActionListener listener) { - transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request, new ActionListenerResponseHandler(listener) { - @Override - public QuerySearchResult newInstance() { - return new QuerySearchResult(); - } - }); + transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request, + new ActionListenerResponseHandler<>(listener, QuerySearchResult::new)); } public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener listener) { transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request, - new ActionListenerResponseHandler(listener) { - @Override - public ScrollQuerySearchResult newInstance() { - return new ScrollQuerySearchResult(); - } - }); + new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new)); } public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request, final ActionListener listener) { transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request, - new ActionListenerResponseHandler(listener) { - @Override - public QueryFetchSearchResult newInstance() { - return new QueryFetchSearchResult(); - } - }); + new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); } public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request, final ActionListener listener) { transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request, - new ActionListenerResponseHandler(listener) { - @Override - public QueryFetchSearchResult newInstance() { - return new QueryFetchSearchResult(); - } - }); + new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new)); } public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request, final ActionListener listener) { transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request, - new ActionListenerResponseHandler(listener) { - @Override - public ScrollQueryFetchSearchResult newInstance() { - return new ScrollQueryFetchSearchResult(); - } - }); + new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new)); } public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, @@ -232,12 +183,7 @@ public class SearchTransportService extends AbstractComponent { private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final ActionListener listener) { - transportService.sendRequest(node, action, request, new ActionListenerResponseHandler(listener) { - @Override - public FetchSearchResult newInstance() { - return new FetchSearchResult(); - } - }); + transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<>(listener, FetchSearchResult::new)); } static class ScrollFreeContextRequest extends TransportRequest {