diff --git a/core/src/main/java/org/elasticsearch/client/node/NodeClient.java b/core/src/main/java/org/elasticsearch/client/node/NodeClient.java index 6c3aa071ba3..e4f26b15702 100644 --- a/core/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/core/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -28,12 +28,14 @@ import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskListener; import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; +import java.util.function.Supplier; /** * Client that executes actions on the local node. @@ -41,13 +43,19 @@ import java.util.Map; public class NodeClient extends AbstractClient { private Map actions; + /** + * The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by + * {@link #executeLocally(GenericAction, ActionRequest, TaskListener)}. + */ + private Supplier localNodeId; public NodeClient(Settings settings, ThreadPool threadPool) { super(settings, threadPool); } - public void initialize(Map actions) { + public void initialize(Map actions, Supplier localNodeId) { this.actions = actions; + this.localNodeId = localNodeId; } @Override @@ -85,6 +93,14 @@ public class NodeClient extends AbstractClient { return transportAction(action).execute(request, listener); } + /** + * The id of the local {@link DiscoveryNode}. Useful for generating task ids from tasks returned by + * {@link #executeLocally(GenericAction, ActionRequest, TaskListener)}. + */ + public String getLocalNodeId() { + return localNodeId.get(); + } + /** * Get the {@link TransportAction} for an {@link Action}, throwing exceptions if the action isn't available. */ diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index f77e56705df..7033f2a9f3d 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -463,7 +463,8 @@ public class Node implements Closeable { .map(injector::getInstance).collect(Collectors.toList())); resourcesToClose.addAll(pluginLifecycleComponents); this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents); - client.initialize(injector.getInstance(new Key>() {})); + client.initialize(injector.getInstance(new Key>() {}), + () -> clusterService.localNode().getId()); logger.info("initialized"); diff --git a/core/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java index fff3b3cc3af..160c14c243c 100644 --- a/core/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/node/NodeClientHeadersTests.java @@ -43,7 +43,7 @@ public class NodeClientHeadersTests extends AbstractClientHeadersTestCase { Settings settings = HEADER_SETTINGS; Actions actions = new Actions(settings, threadPool, testedActions); NodeClient client = new NodeClient(settings, threadPool); - client.initialize(actions); + client.initialize(actions, () -> "test"); return client; } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java index 10cb31fc992..01e73192802 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; @@ -44,14 +43,11 @@ public abstract class AbstractBaseReindexRestHandler< > extends BaseRestHandler { protected final SearchRequestParsers searchRequestParsers; - private final ClusterService clusterService; private final A action; - protected AbstractBaseReindexRestHandler(Settings settings, SearchRequestParsers searchRequestParsers, ClusterService clusterService, - A action) { + protected AbstractBaseReindexRestHandler(Settings settings, SearchRequestParsers searchRequestParsers, A action) { super(settings); this.searchRequestParsers = searchRequestParsers; - this.clusterService = clusterService; this.action = action; } @@ -80,7 +76,7 @@ public abstract class AbstractBaseReindexRestHandler< if (validationException != null) { throw validationException; } - return sendTask(client.executeLocally(action, internal, LoggingTaskListener.instance())); + return sendTask(client.getLocalNodeId(), client.executeLocally(action, internal, LoggingTaskListener.instance())); } /** @@ -111,11 +107,11 @@ public abstract class AbstractBaseReindexRestHandler< return request; } - private RestChannelConsumer sendTask(Task task) throws IOException { + private RestChannelConsumer sendTask(String localNodeId, Task task) throws IOException { return channel -> { try (XContentBuilder builder = channel.newBuilder()) { builder.startObject(); - builder.field("task", clusterService.localNode().getId() + ":" + task.getId()); + builder.field("task", localNodeId + ":" + task.getId()); builder.endObject(); channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java index 7e208986aa7..79bc0f381ef 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByQueryRestHandler.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -44,9 +43,8 @@ public abstract class AbstractBulkByQueryRestHandler< Request extends AbstractBulkByScrollRequest, A extends GenericAction> extends AbstractBaseReindexRestHandler { - protected AbstractBulkByQueryRestHandler(Settings settings, SearchRequestParsers searchRequestParsers, ClusterService clusterService, - A action) { - super(settings, searchRequestParsers, clusterService, action); + protected AbstractBulkByQueryRestHandler(Settings settings, SearchRequestParsers searchRequestParsers, A action) { + super(settings, searchRequestParsers, action); } protected void parseInternalRequest(Request internal, RestRequest restRequest, diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java index 8e8baacd2e7..0e21024eb54 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestDeleteByQueryAction.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; @@ -39,9 +38,8 @@ import static org.elasticsearch.rest.RestRequest.Method.POST; public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler { @Inject - public RestDeleteByQueryAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers, - ClusterService clusterService) { - super(settings, searchRequestParsers, clusterService, DeleteByQueryAction.INSTANCE); + public RestDeleteByQueryAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers) { + super(settings, searchRequestParsers, DeleteByQueryAction.INSTANCE); controller.registerHandler(POST, "/{index}/_delete_by_query", this); controller.registerHandler(POST, "/{index}/{type}/_delete_by_query", this); } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java index c631a4c7a1b..e5295b9e4cc 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestReindexAction.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.ParseFieldMatcherSupplier; @@ -102,9 +101,8 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler { @Inject - public RestUpdateByQueryAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers, - ClusterService clusterService) { - super(settings, searchRequestParsers, clusterService, UpdateByQueryAction.INSTANCE); + public RestUpdateByQueryAction(Settings settings, RestController controller, SearchRequestParsers searchRequestParsers) { + super(settings, searchRequestParsers, UpdateByQueryAction.INSTANCE); controller.registerHandler(POST, "/{index}/_update_by_query", this); controller.registerHandler(POST, "/{index}/{type}/_update_by_query", this); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java index 1fa3b50b3a4..32cb2d5e9d3 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RestReindexActionTests.java @@ -133,7 +133,7 @@ public class RestReindexActionTests extends ESTestCase { public void testPipelineQueryParameterIsError() throws IOException { SearchRequestParsers parsers = new SearchRequestParsers(); - RestReindexAction action = new RestReindexAction(Settings.EMPTY, mock(RestController.class), parsers, null); + RestReindexAction action = new RestReindexAction(Settings.EMPTY, mock(RestController.class), parsers); FakeRestRequest.Builder request = new FakeRestRequest.Builder(xContentRegistry()); try (XContentBuilder body = JsonXContent.contentBuilder().prettyPrint()) {