From 81fcdfcee97141a475da99680f863fdce5faebed Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 30 Jun 2016 10:39:45 -0400 Subject: [PATCH] Expose task information from NodeClient This exposes a method to start an action and return a task from `NodeClient`. This allows reindex to use the injected `Client` rather than require injecting `TransportAction`s --- .../resources/checkstyle_suppressions.xml | 1 - .../action/support/TransportAction.java | 4 ++ .../elasticsearch/client/node/NodeClient.java | 46 +++++++++++++++++-- .../AbstractBaseReindexRestHandler.java | 19 ++++---- .../AbstractBulkByQueryRestHandler.java | 7 ++- ...kIndexByScrollResponseContentListener.java | 6 +-- .../reindex/RestDeleteByQueryAction.java | 8 ++-- .../index/reindex/RestReindexAction.java | 8 ++-- .../reindex/RestUpdateByQueryAction.java | 8 ++-- 9 files changed, 72 insertions(+), 35 deletions(-) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 882d12a305f..f5fc4046e66 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -233,7 +233,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/support/TransportAction.java b/core/src/main/java/org/elasticsearch/action/support/TransportAction.java index 60bd6100089..03408dab77f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/TransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/TransportAction.java @@ -101,6 +101,10 @@ public abstract class TransportAction, Re return task; } + /** + * Execute the transport action on the local node, returning the {@link Task} used to track its execution and accepting a + * {@link TaskListener} which listens for the completion of the action. + */ public final Task execute(Request request, TaskListener listener) { Task task = taskManager.register("transport", actionName, request); execute(task, request, new ActionListener() { diff --git a/core/src/main/java/org/elasticsearch/client/node/NodeClient.java b/core/src/main/java/org/elasticsearch/client/node/NodeClient.java index 59909df428b..e68b902e259 100644 --- a/core/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/core/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -26,14 +26,17 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskListener; import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; /** - * + * Client that executes actions on the local node. */ public class NodeClient extends AbstractClient { @@ -52,10 +55,43 @@ public class NodeClient extends AbstractClient { // nothing really to do } - @SuppressWarnings("unchecked") @Override - public , Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder> void doExecute( - Action action, Request request, ActionListener listener) { + public < Request extends ActionRequest, + Response extends ActionResponse, + RequestBuilder extends ActionRequestBuilder + > void doExecute(Action action, Request request, ActionListener listener) { + // Discard the task because the Client interface doesn't use it. + executeLocally(action, request, listener); + } + + /** + * Execute an {@link Action} locally, returning that {@link Task} used to track it, and linking an {@link ActionListener}. Prefer this + * method if you don't need access to the task when listening for the response. This is the method used to implement the {@link Client} + * interface. + */ + public < Request extends ActionRequest, + Response extends ActionResponse + > Task executeLocally(GenericAction action, Request request, ActionListener listener) { + return transportAction(action).execute(request, listener); + } + + /** + * Execute an {@link Action} locally, returning that {@link Task} used to track it, and linking an {@link TaskListener}. Prefer this + * method if you need access to the task when listening for the response. + */ + public < Request extends ActionRequest, + Response extends ActionResponse + > Task executeLocally(GenericAction action, Request request, TaskListener listener) { + return transportAction(action).execute(request, listener); + } + + /** + * Get the {@link TransportAction} for an {@link Action}, throwing exceptions if the action isn't available. + */ + @SuppressWarnings("unchecked") + private < Request extends ActionRequest, + Response extends ActionResponse + > TransportAction transportAction(GenericAction action) { if (actions == null) { throw new IllegalStateException("NodeClient has not been initialized"); } @@ -63,6 +99,6 @@ public class NodeClient extends AbstractClient { if (transportAction == null) { throw new IllegalStateException("failed to find action [" + action + "] to execute"); } - transportAction.execute(request, listener); + return transportAction; } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java index 048e4208fea..7cf1aed2406 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java @@ -20,9 +20,9 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.client.Client; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -43,18 +43,18 @@ import java.util.Map; public abstract class AbstractBaseReindexRestHandler< Request extends AbstractBulkByScrollRequest, - TA extends TransportAction + A extends GenericAction > extends BaseRestHandler { protected final IndicesQueriesRegistry indicesQueriesRegistry; protected final AggregatorParsers aggParsers; protected final Suggesters suggesters; private final ClusterService clusterService; - private final TA action; + private final A action; protected AbstractBaseReindexRestHandler(Settings settings, IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters, - ClusterService clusterService, TA action) { + ClusterService clusterService, A action) { super(settings); this.indicesQueriesRegistry = indicesQueriesRegistry; this.aggParsers = aggParsers; @@ -63,9 +63,8 @@ public abstract class AbstractBaseReindexRestHandler< this.action = action; } - public void handleRequest(RestRequest request, RestChannel channel, + protected void handleRequest(RestRequest request, RestChannel channel, NodeClient client, boolean includeCreated, boolean includeUpdated) throws IOException { - // Build the internal request Request internal = setCommonOptions(request, buildRequest(request)); @@ -75,14 +74,14 @@ public abstract class AbstractBaseReindexRestHandler< params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(includeCreated)); params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(includeUpdated)); - action.execute(internal, new BulkIndexByScrollResponseContentListener<>(channel, params)); + client.executeLocally(action, internal, new BulkIndexByScrollResponseContentListener(channel, params)); return; } else { internal.setShouldPersistResult(true); } /* - * Lets try and validate before forking so the user gets some error. The + * Let's try and validate before forking so the user gets some error. The * task can't totally validate until it starts but this is better than * nothing. */ @@ -91,7 +90,7 @@ public abstract class AbstractBaseReindexRestHandler< channel.sendResponse(new BytesRestResponse(channel, validationException)); return; } - sendTask(channel, action.execute(internal, LoggingTaskListener.instance())); + sendTask(channel, client.executeLocally(action, internal, LoggingTaskListener.instance())); } /** diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java index 9e4d8fc6d4e..41f103698d6 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java @@ -19,9 +19,8 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.support.TransportAction; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; @@ -48,11 +47,11 @@ import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_A */ public abstract class AbstractBulkByQueryRestHandler< Request extends AbstractBulkByScrollRequest, - TA extends TransportAction> extends AbstractBaseReindexRestHandler { + A extends GenericAction> extends AbstractBaseReindexRestHandler { protected AbstractBulkByQueryRestHandler(Settings settings, IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters, ClusterService clusterService, - TA action) { + A action) { super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java index 6cfba3a302d..cc4096f9421 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkIndexByScrollResponseContentListener.java @@ -35,7 +35,7 @@ import java.util.Map; /** * RestBuilderListener that returns higher than 200 status if there are any failures and allows to set XContent.Params. */ -public class BulkIndexByScrollResponseContentListener extends RestBuilderListener { +public class BulkIndexByScrollResponseContentListener extends RestBuilderListener { private final Map params; @@ -45,14 +45,14 @@ public class BulkIndexByScrollResponseContentListener { +public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler { @Inject public RestDeleteByQueryAction(Settings settings, RestController controller, IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters, - ClusterService clusterService, TransportDeleteByQueryAction action) { - super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action); + ClusterService clusterService) { + super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, DeleteByQueryAction.INSTANCE); controller.registerHandler(POST, "/{index}/_delete_by_query", this); controller.registerHandler(POST, "/{index}/{type}/_delete_by_query", this); } @@ -55,7 +55,7 @@ public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler { +public class RestReindexAction extends AbstractBaseReindexRestHandler { static final ObjectParser PARSER = new ObjectParser<>("reindex"); private static final Pattern HOST_PATTERN = Pattern.compile("(?[^:]+)://(?[^:]+):(?\\d+)"); @@ -113,8 +113,8 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler { +public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler { @Inject public RestUpdateByQueryAction(Settings settings, RestController controller, IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters, - ClusterService clusterService, TransportUpdateByQueryAction action) { - super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action); + ClusterService clusterService) { + super(settings, indicesQueriesRegistry, aggParsers, suggesters, clusterService, UpdateByQueryAction.INSTANCE); controller.registerHandler(POST, "/{index}/_update_by_query", this); controller.registerHandler(POST, "/{index}/{type}/_update_by_query", this); } @Override public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { - handleRequest(request, channel, false, true); + handleRequest(request, channel, client, false, true); } @Override