Simplify ActionListenerResponseHandler by adding response supplier (#17752)

This commit is contained in:
Yannick Welsch 2016-04-14 15:46:55 +02:00
parent b3eef99120
commit b7a1baa801
5 changed files with 30 additions and 89 deletions

View File

@ -24,16 +24,21 @@ import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import java.util.Objects;
import java.util.function.Supplier;
/**
* A simple base class for action response listeners, defaulting to using the SAME executor (as its
* very common on response handlers).
*/
public abstract class ActionListenerResponseHandler<Response extends TransportResponse> extends BaseTransportResponseHandler<Response> {
public class ActionListenerResponseHandler<Response extends TransportResponse> extends BaseTransportResponseHandler<Response> {
private final ActionListener<Response> listener;
private final Supplier<Response> responseSupplier;
public ActionListenerResponseHandler(ActionListener<Response> listener) {
this.listener = listener;
public ActionListenerResponseHandler(ActionListener<Response> listener, Supplier<Response> responseSupplier) {
this.listener = Objects.requireNonNull(listener);
this.responseSupplier = Objects.requireNonNull(responseSupplier);
}
@Override
@ -46,6 +51,11 @@ public abstract class ActionListenerResponseHandler<Response extends TransportRe
listener.onFailure(e);
}
@Override
public Response newInstance() {
return responseSupplier.get();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;

View File

@ -49,11 +49,7 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
listener.onFailure(validationException);
return;
}
transportService.sendRequest(node, action.name(), request, transportOptions, new ActionListenerResponseHandler<Response>(listener) {
@Override
public Response newInstance() {
return action.newResponse();
}
});
transportService.sendRequest(node, action.name(), request, transportOptions,
new ActionListenerResponseHandler<>(listener, action::newResponse));
}
}

View File

@ -83,13 +83,7 @@ public final class IngestProxyActionFilter implements ActionFilter {
@SuppressWarnings("unchecked")
private void forwardIngestRequest(Action<?, ?, ?> action, ActionRequest request, ActionListener<?> listener) {
transportService.sendRequest(randomIngestNode(), action.name(), request, new ActionListenerResponseHandler(listener) {
@Override
public TransportResponse newInstance() {
return action.newResponse();
}
});
transportService.sendRequest(randomIngestNode(), action.name(), request, new ActionListenerResponseHandler(listener, action::newResponse));
}
@Override

View File

@ -168,12 +168,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
retry(null, MasterNodeChangePredicate.INSTANCE);
} else {
taskManager.registerChildTask(task, nodes.getMasterNode().getId());
transportService.sendRequest(nodes.getMasterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener) {
@Override
public Response newInstance() {
return newResponse();
}
transportService.sendRequest(nodes.getMasterNode(), actionName, request, new ActionListenerResponseHandler<Response>(listener, TransportMasterNodeAction.this::newResponse) {
@Override
public void handleException(final TransportException exp) {
Throwable cause = exp.unwrapCause();

View File

@ -108,7 +108,7 @@ public class SearchTransportService extends AbstractComponent {
public void sendFreeContext(DiscoveryNode node, final long contextId, SearchRequest request) {
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId),
new ActionListenerResponseHandler<SearchFreeContextResponse>(new ActionListener<SearchFreeContextResponse>() {
new ActionListenerResponseHandler<>(new ActionListener<SearchFreeContextResponse>() {
@Override
public void onResponse(SearchFreeContextResponse response) {
// no need to respond if it was freed or not
@ -118,106 +118,57 @@ public class SearchTransportService extends AbstractComponent {
public void onFailure(Throwable e) {
}
}) {
@Override
public SearchFreeContextResponse newInstance() {
return new SearchFreeContextResponse();
}
});
}, SearchFreeContextResponse::new));
}
public void sendFreeContext(DiscoveryNode node, long contextId, final ActionListener<SearchFreeContextResponse> listener) {
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(contextId),
new ActionListenerResponseHandler<SearchFreeContextResponse>(listener) {
@Override
public SearchFreeContextResponse newInstance() {
return new SearchFreeContextResponse();
}
});
new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new));
}
public void sendClearAllScrollContexts(DiscoveryNode node, final ActionListener<TransportResponse> listener) {
transportService.sendRequest(node, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsRequest(),
new ActionListenerResponseHandler<TransportResponse>(listener) {
@Override
public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE;
}
});
new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE));
}
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchTransportRequest request,
final ActionListener<DfsSearchResult> listener) {
transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<DfsSearchResult>(listener) {
@Override
public DfsSearchResult newInstance() {
return new DfsSearchResult();
}
});
transportService.sendRequest(node, DFS_ACTION_NAME, request, new ActionListenerResponseHandler<>(listener, DfsSearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final ShardSearchTransportRequest request,
final ActionListener<QuerySearchResultProvider> listener) {
transportService.sendRequest(node, QUERY_ACTION_NAME, request,
new ActionListenerResponseHandler<QuerySearchResultProvider>(listener) {
@Override
public QuerySearchResult newInstance() {
return new QuerySearchResult();
}
});
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final QuerySearchRequest request, final ActionListener<QuerySearchResult> listener) {
transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request, new ActionListenerResponseHandler<QuerySearchResult>(listener) {
@Override
public QuerySearchResult newInstance() {
return new QuerySearchResult();
}
});
transportService.sendRequest(node, QUERY_ID_ACTION_NAME, request,
new ActionListenerResponseHandler<>(listener, QuerySearchResult::new));
}
public void sendExecuteQuery(DiscoveryNode node, final InternalScrollSearchRequest request,
final ActionListener<ScrollQuerySearchResult> listener) {
transportService.sendRequest(node, QUERY_SCROLL_ACTION_NAME, request,
new ActionListenerResponseHandler<ScrollQuerySearchResult>(listener) {
@Override
public ScrollQuerySearchResult newInstance() {
return new ScrollQuerySearchResult();
}
});
new ActionListenerResponseHandler<>(listener, ScrollQuerySearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final ShardSearchTransportRequest request,
final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_ACTION_NAME, request,
new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
@Override
public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult();
}
});
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final QuerySearchRequest request,
final ActionListener<QueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_QUERY_FETCH_ACTION_NAME, request,
new ActionListenerResponseHandler<QueryFetchSearchResult>(listener) {
@Override
public QueryFetchSearchResult newInstance() {
return new QueryFetchSearchResult();
}
});
new ActionListenerResponseHandler<>(listener, QueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final InternalScrollSearchRequest request,
final ActionListener<ScrollQueryFetchSearchResult> listener) {
transportService.sendRequest(node, QUERY_FETCH_SCROLL_ACTION_NAME, request,
new ActionListenerResponseHandler<ScrollQueryFetchSearchResult>(listener) {
@Override
public ScrollQueryFetchSearchResult newInstance() {
return new ScrollQueryFetchSearchResult();
}
});
new ActionListenerResponseHandler<>(listener, ScrollQueryFetchSearchResult::new));
}
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request,
@ -232,12 +183,7 @@ public class SearchTransportService extends AbstractComponent {
private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request,
final ActionListener<FetchSearchResult> listener) {
transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<FetchSearchResult>(listener) {
@Override
public FetchSearchResult newInstance() {
return new FetchSearchResult();
}
});
transportService.sendRequest(node, action, request, new ActionListenerResponseHandler<>(listener, FetchSearchResult::new));
}
static class ScrollFreeContextRequest extends TransportRequest {