Expose task information from NodeClient

This exposes a method to start an action and return a task from
`NodeClient`. This allows reindex to use the injected `Client` rather
than require injecting `TransportAction`s
This commit is contained in:
Nik Everett 2016-06-30 10:39:45 -04:00
parent fe0f28965a
commit 81fcdfcee9
9 changed files with 72 additions and 35 deletions

View File

@ -233,7 +233,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]bootstrap[/\\]Security.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]ElasticsearchClient.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]FilterClient.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]node[/\\]NodeClient.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]support[/\\]AbstractClient.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]transport[/\\]TransportClient.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]transport[/\\]support[/\\]TransportProxyClient.java" checks="LineLength" />

View File

@ -101,6 +101,10 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
return task;
}
/**
* Execute the transport action on the local node, returning the {@link Task} used to track its execution and accepting a
* {@link TaskListener} which listens for the completion of the action.
*/
public final Task execute(Request request, TaskListener<Response> listener) {
Task task = taskManager.register("transport", actionName, request);
execute(task, request, new ActionListener<Response>() {

View File

@ -26,14 +26,17 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskListener;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
/**
*
* Client that executes actions on the local node.
*/
public class NodeClient extends AbstractClient {
@ -52,10 +55,43 @@ public class NodeClient extends AbstractClient {
// nothing really to do
}
@SuppressWarnings("unchecked")
@Override
public <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
public < Request extends ActionRequest<Request>,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>
> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
// Discard the task because the Client interface doesn't use it.
executeLocally(action, request, listener);
}
/**
* Execute an {@link Action} locally, returning that {@link Task} used to track it, and linking an {@link ActionListener}. Prefer this
* method if you don't need access to the task when listening for the response. This is the method used to implement the {@link Client}
* interface.
*/
public < Request extends ActionRequest<Request>,
Response extends ActionResponse
> Task executeLocally(GenericAction<Request, Response> action, Request request, ActionListener<Response> listener) {
return transportAction(action).execute(request, listener);
}
/**
* Execute an {@link Action} locally, returning that {@link Task} used to track it, and linking an {@link TaskListener}. Prefer this
* method if you need access to the task when listening for the response.
*/
public < Request extends ActionRequest<Request>,
Response extends ActionResponse
> Task executeLocally(GenericAction<Request, Response> action, Request request, TaskListener<Response> listener) {
return transportAction(action).execute(request, listener);
}
/**
* Get the {@link TransportAction} for an {@link Action}, throwing exceptions if the action isn't available.
*/
@SuppressWarnings("unchecked")
private < Request extends ActionRequest<Request>,
Response extends ActionResponse
> TransportAction<Request, Response> transportAction(GenericAction<Request, Response> action) {
if (actions == null) {
throw new IllegalStateException("NodeClient has not been initialized");
}
@ -63,6 +99,6 @@ public class NodeClient extends AbstractClient {
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
transportAction.execute(request, listener);
return transportAction;
}
}

View File

@ -20,9 +20,9 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -43,18 +43,18 @@ import java.util.Map;
public abstract class AbstractBaseReindexRestHandler<
Request extends AbstractBulkByScrollRequest<Request>,
TA extends TransportAction<Request, BulkIndexByScrollResponse>
A extends GenericAction<Request, BulkIndexByScrollResponse>
> extends BaseRestHandler {
protected final IndicesQueriesRegistry indicesQueriesRegistry;
protected final AggregatorParsers aggParsers;
protected final Suggesters suggesters;
private final ClusterService clusterService;
private final TA action;
private final A action;
protected AbstractBaseReindexRestHandler(Settings settings, IndicesQueriesRegistry indicesQueriesRegistry,
AggregatorParsers aggParsers, Suggesters suggesters,
ClusterService clusterService, TA action) {
ClusterService clusterService, A action) {
super(settings);
this.indicesQueriesRegistry = indicesQueriesRegistry;
this.aggParsers = aggParsers;
@ -63,9 +63,8 @@ public abstract class AbstractBaseReindexRestHandler<
this.action = action;
}
public void handleRequest(RestRequest request, RestChannel channel,
protected void handleRequest(RestRequest request, RestChannel channel, NodeClient client,
boolean includeCreated, boolean includeUpdated) throws IOException {
// Build the internal request
Request internal = setCommonOptions(request, buildRequest(request));
@ -75,14 +74,14 @@ public abstract class AbstractBaseReindexRestHandler<
params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(includeCreated));
params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(includeUpdated));
action.execute(internal, new BulkIndexByScrollResponseContentListener<>(channel, params));
client.executeLocally(action, internal, new BulkIndexByScrollResponseContentListener(channel, params));
return;
} else {
internal.setShouldPersistResult(true);
}
/*
* Lets try and validate before forking so the user gets some error. The
* Let's try and validate before forking so the user gets some error. The
* task can't totally validate until it starts but this is better than
* nothing.
*/
@ -91,7 +90,7 @@ public abstract class AbstractBaseReindexRestHandler<
channel.sendResponse(new BytesRestResponse(channel, validationException));
return;
}
sendTask(channel, action.execute(internal, LoggingTaskListener.instance()));
sendTask(channel, client.executeLocally(action, internal, LoggingTaskListener.instance()));
}
/**

View File

@ -19,9 +19,8 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -48,11 +47,11 @@ import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_A
*/
public abstract class AbstractBulkByQueryRestHandler<
Request extends AbstractBulkByScrollRequest<Request>,
TA extends TransportAction<Request, BulkIndexByScrollResponse>> extends AbstractBaseReindexRestHandler<Request, TA> {
A extends GenericAction<Request, BulkIndexByScrollResponse>> extends AbstractBaseReindexRestHandler<Request, A> {
protected AbstractBulkByQueryRestHandler(Settings settings, IndicesQueriesRegistry indicesQueriesRegistry,
AggregatorParsers aggParsers, Suggesters suggesters, ClusterService clusterService,
TA action) {
A action) {
super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action);
}

View File

@ -35,7 +35,7 @@ import java.util.Map;
/**
* RestBuilderListener that returns higher than 200 status if there are any failures and allows to set XContent.Params.
*/
public class BulkIndexByScrollResponseContentListener<R extends BulkIndexByScrollResponse> extends RestBuilderListener<R> {
public class BulkIndexByScrollResponseContentListener extends RestBuilderListener<BulkIndexByScrollResponse> {
private final Map<String, String> params;
@ -45,14 +45,14 @@ public class BulkIndexByScrollResponseContentListener<R extends BulkIndexByScrol
}
@Override
public RestResponse buildResponse(R response, XContentBuilder builder) throws Exception {
public RestResponse buildResponse(BulkIndexByScrollResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContent(builder, new ToXContent.DelegatingMapParams(params, channel.request()));
builder.endObject();
return new BytesRestResponse(getStatus(response), builder);
}
private RestStatus getStatus(R response) {
private RestStatus getStatus(BulkIndexByScrollResponse response) {
/*
* Return the highest numbered rest status under the assumption that higher numbered statuses are "more error" and thus more
* interesting to the user.

View File

@ -39,13 +39,13 @@ import java.util.function.Consumer;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<DeleteByQueryRequest, TransportDeleteByQueryAction> {
public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<DeleteByQueryRequest, DeleteByQueryAction> {
@Inject
public RestDeleteByQueryAction(Settings settings, RestController controller,
IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters,
ClusterService clusterService, TransportDeleteByQueryAction action) {
super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action);
ClusterService clusterService) {
super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, DeleteByQueryAction.INSTANCE);
controller.registerHandler(POST, "/{index}/_delete_by_query", this);
controller.registerHandler(POST, "/{index}/{type}/_delete_by_query", this);
}
@ -55,7 +55,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<Dele
if (false == request.hasContent()) {
throw new ElasticsearchException("_delete_by_query requires a request body");
}
handleRequest(request, channel, false, false);
handleRequest(request, channel, client, false, false);
}
@Override

View File

@ -64,7 +64,7 @@ import static org.elasticsearch.rest.RestRequest.Method.POST;
/**
* Expose reindex over rest.
*/
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, TransportReindexAction> {
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, ReindexAction> {
static final ObjectParser<ReindexRequest, ReindexParseContext> PARSER = new ObjectParser<>("reindex");
private static final Pattern HOST_PATTERN = Pattern.compile("(?<scheme>[^:]+)://(?<host>[^:]+):(?<port>\\d+)");
@ -113,8 +113,8 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
@Inject
public RestReindexAction(Settings settings, RestController controller,
IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters,
ClusterService clusterService, TransportReindexAction action) {
super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action);
ClusterService clusterService) {
super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, ReindexAction.INSTANCE);
controller.registerHandler(POST, "/_reindex", this);
}
@ -123,7 +123,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
if (false == request.hasContent()) {
throw new ElasticsearchException("_reindex requires a request body");
}
handleRequest(request, channel, true, true);
handleRequest(request, channel, client, true, true);
}
@Override

View File

@ -39,20 +39,20 @@ import java.util.function.Consumer;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<UpdateByQueryRequest, TransportUpdateByQueryAction> {
public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<UpdateByQueryRequest, UpdateByQueryAction> {
@Inject
public RestUpdateByQueryAction(Settings settings, RestController controller,
IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters,
ClusterService clusterService, TransportUpdateByQueryAction action) {
super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action);
ClusterService clusterService) {
super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, UpdateByQueryAction.INSTANCE);
controller.registerHandler(POST, "/{index}/_update_by_query", this);
controller.registerHandler(POST, "/{index}/{type}/_update_by_query", this);
}
@Override
public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
handleRequest(request, channel, false, true);
handleRequest(request, channel, client, false, true);
}
@Override