From fdd50612ae4ba1ecd4a704a128a8f0aa5af0673f Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 15 Aug 2016 15:51:24 -0400 Subject: [PATCH] Fix reindex under the transport client The big change here is cleaning up the `TaskListResponse` so it doesn't have a breaky `toString` implementation. That was causing the reindex tests to break. Also removed `NetworkModule#registerTaskStatus` which is part of the Plugin API. Use `Plugin#getNamedWriteables` instead. --- .../tasks/cancel/CancelTasksResponse.java | 10 +- .../cancel/TransportCancelTasksAction.java | 4 +- .../node/tasks/list/ListTasksResponse.java | 123 ++++++++++-------- .../tasks/list/TransportListTasksAction.java | 7 +- .../common/network/NetworkModule.java | 19 ++- .../admin/cluster/RestCancelTasksAction.java | 8 +- .../admin/cluster/RestListTasksAction.java | 42 +++--- .../node/tasks/TransportTasksActionTests.java | 15 ++- .../common/network/NetworkModuleTests.java | 47 +------ .../tasks/ListTasksResponseTests.java | 24 ++-- .../index/reindex/ReindexPlugin.java | 15 ++- .../index/reindex/RestRethrottleAction.java | 8 +- .../reindex/TransportRethrottleAction.java | 7 +- .../index/reindex/ReindexTestCase.java | 11 +- 14 files changed, 156 insertions(+), 184 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java index 34daee1d14d..5e7c2c0f97d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.tasks.TaskInfo; import java.util.List; @@ -35,13 +34,8 @@ public class CancelTasksResponse extends ListTasksResponse { public CancelTasksResponse() { } - public CancelTasksResponse(DiscoveryNodes discoveryNodes) { - super(discoveryNodes); - } - public CancelTasksResponse(List tasks, List taskFailures, List - nodeFailures, DiscoveryNodes discoveryNodes) { - super(tasks, taskFailures, nodeFailures, discoveryNodes); + nodeFailures) { + super(tasks, taskFailures, nodeFailures); } - } diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index 195cdd86f5c..dc52e4fd508 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -66,7 +66,7 @@ public class TransportCancelTasksAction extends TransportTasksAction new CancelTasksResponse(clusterService.state().nodes()), + indexNameExpressionResolver, CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT); transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, BanParentTaskRequest::new, ThreadPool.Names.SAME, new BanParentRequestHandler()); @@ -75,7 +75,7 @@ public class TransportCancelTasksAction extends TransportTasksAction tasks, List taskOperationFailures, List failedNodeExceptions) { - return new CancelTasksResponse(tasks, taskOperationFailures, failedNodeExceptions, clusterService.state().nodes()); + return new CancelTasksResponse(tasks, taskOperationFailures, failedNodeExceptions); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index ae6f0d0855f..b33226b973b 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -51,21 +51,14 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent { private List groups; - private final DiscoveryNodes discoveryNodes; - public ListTasksResponse() { - this(null, null, null, null); - } - - public ListTasksResponse(DiscoveryNodes discoveryNodes) { - this(null, null, null, discoveryNodes); + this(null, null, null); } public ListTasksResponse(List tasks, List taskFailures, - List nodeFailures, DiscoveryNodes discoveryNodes) { + List nodeFailures) { super(taskFailures, nodeFailures); this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks)); - this.discoveryNodes = discoveryNodes; } @Override @@ -90,6 +83,9 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent { return perNodeTasks; } + /** + * Get the tasks found by this request grouped by parent tasks. + */ public List getTaskGroups() { if (groups == null) { buildTaskGroups(); @@ -125,12 +121,76 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent { this.groups = Collections.unmodifiableList(topLevelTasks.stream().map(TaskGroup.Builder::build).collect(Collectors.toList())); } + /** + * Get the tasks found by this request. + */ public List getTasks() { return tasks; } + /** + * Convert this task response to XContent grouping by executing nodes. + */ + public XContentBuilder toXContentGroupedByNode(XContentBuilder builder, Params params, DiscoveryNodes discoveryNodes) + throws IOException { + toXContentCommon(builder, params); + builder.startObject("nodes"); + for (Map.Entry> entry : getPerNodeTasks().entrySet()) { + DiscoveryNode node = discoveryNodes.get(entry.getKey()); + builder.startObject(entry.getKey()); + if (node != null) { + // If the node is no longer part of the cluster, oh well, we'll just skip it's useful information. + builder.field("name", node.getName()); + builder.field("transport_address", node.getAddress().toString()); + builder.field("host", node.getHostName()); + builder.field("ip", node.getAddress()); + + builder.startArray("roles"); + for (DiscoveryNode.Role role : node.getRoles()) { + builder.value(role.getRoleName()); + } + builder.endArray(); + + if (!node.getAttributes().isEmpty()) { + builder.startObject("attributes"); + for (Map.Entry attrEntry : node.getAttributes().entrySet()) { + builder.field(attrEntry.getKey(), attrEntry.getValue()); + } + builder.endObject(); + } + } + builder.startObject("tasks"); + for(TaskInfo task : entry.getValue()) { + builder.field(task.getTaskId().toString()); + task.toXContent(builder, params); + } + builder.endObject(); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + /** + * Convert this response to XContent grouping by parent tasks. + */ + public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Params params) throws IOException { + toXContentCommon(builder, params); + builder.startObject("tasks"); + for (TaskGroup group : getTaskGroups()) { + builder.field(group.getTaskInfo().getTaskId().toString()); + group.toXContent(builder, params); + } + builder.endObject(); + return builder; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return toXContentGroupedByParents(builder, params); + } + + private void toXContentCommon(XContentBuilder builder, Params params) throws IOException { if (getTaskFailures() != null && getTaskFailures().size() > 0) { builder.startArray("task_failures"); for (TaskOperationFailure ex : getTaskFailures()){ @@ -150,51 +210,6 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent { } builder.endArray(); } - String groupBy = params.param("group_by", "nodes"); - if ("nodes".equals(groupBy)) { - builder.startObject("nodes"); - for (Map.Entry> entry : getPerNodeTasks().entrySet()) { - DiscoveryNode node = discoveryNodes.get(entry.getKey()); - builder.startObject(entry.getKey()); - if (node != null) { - // If the node is no longer part of the cluster, oh well, we'll just skip it's useful information. - builder.field("name", node.getName()); - builder.field("transport_address", node.getAddress().toString()); - builder.field("host", node.getHostName()); - builder.field("ip", node.getAddress()); - - builder.startArray("roles"); - for (DiscoveryNode.Role role : node.getRoles()) { - builder.value(role.getRoleName()); - } - builder.endArray(); - - if (!node.getAttributes().isEmpty()) { - builder.startObject("attributes"); - for (Map.Entry attrEntry : node.getAttributes().entrySet()) { - builder.field(attrEntry.getKey(), attrEntry.getValue()); - } - builder.endObject(); - } - } - builder.startObject("tasks"); - for(TaskInfo task : entry.getValue()) { - builder.field(task.getTaskId().toString()); - task.toXContent(builder, params); - } - builder.endObject(); - builder.endObject(); - } - builder.endObject(); - } else if ("parents".equals(groupBy)) { - builder.startObject("tasks"); - for (TaskGroup group : getTaskGroups()) { - builder.field(group.getTaskInfo().getTaskId().toString()); - group.toXContent(builder, params); - } - builder.endObject(); - } - return builder; } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java index a1f84f87f9d..32c0c3c1845 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java @@ -56,15 +56,14 @@ public class TransportListTasksAction extends TransportTasksAction new ListTasksResponse(clusterService.state().nodes()), - ThreadPool.Names.MANAGEMENT); + super(settings, ListTasksAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, + ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT); } @Override protected ListTasksResponse newResponse(ListTasksRequest request, List tasks, List taskOperationFailures, List failedNodeExceptions) { - return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions, clusterService.state().nodes()); + return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions); } @Override diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index f6c2ef326f2..bb4a4bd3b30 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -19,9 +19,6 @@ package org.elasticsearch.common.network; -import java.util.ArrayList; -import java.util.List; - import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand; @@ -33,6 +30,7 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman import org.elasticsearch.common.ParseField; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.util.Providers; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Setting; @@ -47,6 +45,9 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.local.LocalTransport; +import java.util.ArrayList; +import java.util.List; + /** * A module to handle registering and binding all network related classes. */ @@ -76,11 +77,11 @@ public class NetworkModule extends AbstractModule { private final ExtensionPoint.SelectedType transportServiceTypes = new ExtensionPoint.SelectedType<>("transport_service", TransportService.class); private final ExtensionPoint.SelectedType transportTypes = new ExtensionPoint.SelectedType<>("transport", Transport.class); private final ExtensionPoint.SelectedType httpTransportTypes = new ExtensionPoint.SelectedType<>("http_transport", HttpServerTransport.class); - private final List namedWriteables = new ArrayList<>(); + private final List namedWriteables = new ArrayList<>(); /** * Creates a network module that custom networking classes can be plugged into. - * @param networkService A constructed network service object to bind. + * @param networkService A constructed network service object to bind. * @param settings The settings for the node * @param transportClient True if only transport classes should be allowed to be registered, false otherwise. */ @@ -90,8 +91,8 @@ public class NetworkModule extends AbstractModule { this.transportClient = transportClient; registerTransportService("default", TransportService.class); registerTransport(LOCAL_TRANSPORT, LocalTransport.class); - registerTaskStatus(ReplicationTask.Status.NAME, ReplicationTask.Status::new); - registerTaskStatus(RawTaskStatus.NAME, RawTaskStatus::new); + namedWriteables.add(new NamedWriteableRegistry.Entry(Task.Status.class, ReplicationTask.Status.NAME, ReplicationTask.Status::new)); + namedWriteables.add(new NamedWriteableRegistry.Entry(Task.Status.class, RawTaskStatus.NAME, RawTaskStatus::new)); registerBuiltinAllocationCommands(); } @@ -118,10 +119,6 @@ public class NetworkModule extends AbstractModule { httpTransportTypes.registerExtension(name, clazz); } - public void registerTaskStatus(String name, Writeable.Reader reader) { - namedWriteables.add(new Entry(Task.Status.class, name, reader)); - } - /** * Register an allocation command. *

diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCancelTasksAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCancelTasksAction.java index 65786accd81..3c558fba937 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCancelTasksAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestCancelTasksAction.java @@ -19,9 +19,7 @@ package org.elasticsearch.rest.action.admin.cluster; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -31,11 +29,10 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.tasks.TaskId; import static org.elasticsearch.rest.RestRequest.Method.POST; -import static org.elasticsearch.rest.action.admin.cluster.RestListTasksAction.nodeSettingListener; +import static org.elasticsearch.rest.action.admin.cluster.RestListTasksAction.listTasksResponseListener; public class RestCancelTasksAction extends BaseRestHandler { @@ -61,8 +58,7 @@ public class RestCancelTasksAction extends BaseRestHandler { cancelTasksRequest.setNodesIds(nodesIds); cancelTasksRequest.setActions(actions); cancelTasksRequest.setParentTaskId(parentTaskId); - ActionListener listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel)); - client.admin().cluster().cancelTasks(cancelTasksRequest, listener); + client.admin().cluster().cancelTasks(cancelTasksRequest, listTasksResponseListener(clusterService, channel)); } @Override diff --git a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java index 13b97cb0942..d5ff427e3d0 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestListTasksAction.java @@ -28,10 +28,15 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.tasks.TaskId; @@ -68,27 +73,30 @@ public class RestListTasksAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel, final NodeClient client) { - ActionListener listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel)); - client.admin().cluster().listTasks(generateListTasksRequest(request), listener); + client.admin().cluster().listTasks(generateListTasksRequest(request), listTasksResponseListener(clusterService, channel)); } /** - * Wrap the normal channel listener in one that sets the discovery nodes on the response so we can support all of it's toXContent - * formats. + * Standard listener for extensions of {@link ListTasksResponse} that supports {@code group_by=nodes}. */ - public static ActionListener nodeSettingListener(ClusterService clusterService, - ActionListener channelListener) { - return new ActionListener() { - @Override - public void onResponse(T response) { - channelListener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - channelListener.onFailure(e); - } - }; + public static ActionListener listTasksResponseListener(ClusterService clusterService, + RestChannel channel) { + String groupBy = channel.request().param("group_by", "nodes"); + if ("nodes".equals(groupBy)) { + return new RestBuilderListener(channel) { + @Override + public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception { + builder.startObject(); + response.toXContentGroupedByNode(builder, channel.request(), clusterService.state().nodes()); + builder.endObject(); + return new BytesRestResponse(RestStatus.OK, builder); + } + }; + } else if ("parents".equals(groupBy)) { + return new RestToXContentListener<>(channel); + } else { + throw new IllegalArgumentException("[group_by] must be one of [nodes] or [parents] but was [" + groupBy + "]"); + } } @Override diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index 17ca7e7268b..f964eeba9f7 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.action.admin.cluster.RestListTasksAction; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; @@ -65,6 +66,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.elasticsearch.action.support.PlainActionFuture.newFuture; import static org.hamcrest.Matchers.containsString; @@ -736,7 +738,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { ListTasksResponse response = testNodes[0].transportListTasksAction.execute(listTasksRequest).get(); assertEquals(testNodes.length + 1, response.getTasks().size()); - Map byNodes = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "nodes"))); + Map byNodes = serialize(response, true); byNodes = (Map) byNodes.get("nodes"); // One element on the top level assertEquals(testNodes.length, byNodes.size()); @@ -750,7 +752,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { } // Group by parents - Map byParent = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "parents"))); + Map byParent = serialize(response, false); byParent = (Map) byParent.get("tasks"); // One element on the top level assertEquals(1, byParent.size()); // Only one top level task @@ -763,10 +765,15 @@ public class TransportTasksActionTests extends TaskManagerTestCase { } } - private Map serialize(ToXContent response, ToXContent.Params params) throws IOException { + private Map serialize(ListTasksResponse response, boolean byParents) throws IOException { XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON); builder.startObject(); - response.toXContent(builder, params); + if (byParents) { + DiscoveryNodes nodes = testNodes[0].clusterService.state().nodes(); + response.toXContentGroupedByNode(builder, ToXContent.EMPTY_PARAMS, nodes); + } else { + response.toXContentGroupedByParents(builder, ToXContent.EMPTY_PARAMS); + } builder.endObject(); builder.flush(); logger.info(builder.string()); diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index babc8bde346..734068347b3 100644 --- a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -19,20 +19,12 @@ package org.elasticsearch.common.network; -import java.io.IOException; -import java.util.Collections; - -import org.elasticsearch.action.support.replication.ReplicationTask; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.Table; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.ModuleTestCase; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.http.HttpInfo; import org.elasticsearch.http.HttpServerAdapter; import org.elasticsearch.http.HttpServerTransport; @@ -41,11 +33,12 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.cat.AbstractCatAction; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.transport.AssertingLocalTransport; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; +import java.util.Collections; + public class NetworkModuleTests extends ModuleTestCase { static class FakeTransportService extends TransportService { @@ -168,40 +161,4 @@ public class NetworkModuleTests extends ModuleTestCase { assertNotBound(module, HttpServerTransport.class); assertFalse(module.isTransportClient()); } - - public void testRegisterTaskStatus() { - Settings settings = Settings.EMPTY; - NetworkModule module = new NetworkModule(new NetworkService(settings, Collections.emptyList()), settings, false); - NamedWriteableRegistry registry = new NamedWriteableRegistry(module.getNamedWriteables()); - assertFalse(module.isTransportClient()); - - // Builtin reader comes back - assertNotNull(registry.getReader(Task.Status.class, ReplicationTask.Status.NAME)); - - module.registerTaskStatus(DummyTaskStatus.NAME, DummyTaskStatus::new); - assertTrue(module.getNamedWriteables().stream().anyMatch(x -> x.name.equals(DummyTaskStatus.NAME))); - } - - private class DummyTaskStatus implements Task.Status { - public static final String NAME = "dummy"; - - public DummyTaskStatus(StreamInput in) { - throw new UnsupportedOperationException("test"); - } - - @Override - public String getWriteableName() { - return NAME; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - throw new UnsupportedOperationException(); - } - } } diff --git a/core/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java b/core/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java index 433f3bdd16a..6643a71b096 100644 --- a/core/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java +++ b/core/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java @@ -20,29 +20,23 @@ package org.elasticsearch.tasks; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matchers; -import java.util.Collections; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; public class ListTasksResponseTests extends ESTestCase { - public void testToStringNoTask() { - ListTasksResponse tasksResponse = new ListTasksResponse(); - String string = tasksResponse.toString(); - assertThat(string, Matchers.containsString("nodes")); + public void testEmptyToString() { + assertEquals("{\"tasks\":{}}", new ListTasksResponse().toString()); } - public void testToString() { + public void testNonEmptyToString() { TaskInfo info = new TaskInfo( new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0)); - - DiscoveryNodes nodes = DiscoveryNodes.builder().build(); - ListTasksResponse tasksResponse = new ListTasksResponse(Collections.singletonList(info), Collections.emptyList(), - Collections.emptyList(), nodes); - - String string = tasksResponse.toString(); - assertThat(string, Matchers.containsString("\"type\":\"dummy-type\"")); + ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList()); + assertEquals("{\"tasks\":{\"node1:1\":{\"node\":\"node1\",\"id\":1,\"type\":\"dummy-type\",\"action\":\"dummy-action\"," + + "\"description\":\"dummy-description\",\"start_time_in_millis\":0,\"running_time_in_nanos\":1,\"cancellable\":true," + + "\"parent_task_id\":\"node1:0\"}}}", tasksResponse.toString()); } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java index 4f2cb2578ac..fe7bcb1f85b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java @@ -21,11 +21,12 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.common.network.NetworkModule; -import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.tasks.Task; import java.util.Arrays; import java.util.List; @@ -43,16 +44,18 @@ public class ReindexPlugin extends Plugin implements ActionPlugin { new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class)); } + @Override + public List getNamedWriteables() { + return singletonList( + new NamedWriteableRegistry.Entry(Task.Status.class, BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new)); + } + @Override public List> getRestHandlers() { return Arrays.asList(RestReindexAction.class, RestUpdateByQueryAction.class, RestDeleteByQueryAction.class, RestRethrottleAction.class); } - public void onModule(NetworkModule networkModule) { - networkModule.registerTaskStatus(BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new); - } - @Override public List> getSettings() { return singletonList(TransportReindexAction.REMOTE_CLUSTER_WHITELIST); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestRethrottleAction.java index 06c575769f5..26c2aad8ce9 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestRethrottleAction.java @@ -19,8 +19,6 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -29,11 +27,10 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.tasks.TaskId; import static org.elasticsearch.rest.RestRequest.Method.POST; -import static org.elasticsearch.rest.action.admin.cluster.RestListTasksAction.nodeSettingListener; +import static org.elasticsearch.rest.action.admin.cluster.RestListTasksAction.listTasksResponseListener; public class RestRethrottleAction extends BaseRestHandler { private final ClusterService clusterService; @@ -56,7 +53,6 @@ public class RestRethrottleAction extends BaseRestHandler { throw new IllegalArgumentException("requests_per_second is a required parameter"); } internalRequest.setRequestsPerSecond(requestsPerSecond); - ActionListener listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel)); - client.execute(RethrottleAction.INSTANCE, internalRequest, listener); + client.execute(RethrottleAction.INSTANCE, internalRequest, listTasksResponseListener(clusterService, channel)); } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java index 408204c801e..89af3bd39bd 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java @@ -40,9 +40,8 @@ public class TransportRethrottleAction extends TransportTasksAction new ListTasksResponse(clusterService.state().nodes()), - ThreadPool.Names.MANAGEMENT); + super(settings, RethrottleAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, + RethrottleRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT); } @Override @@ -60,7 +59,7 @@ public class TransportRethrottleAction extends TransportTasksAction tasks, List taskOperationFailures, List failedNodeExceptions) { - return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions, clusterService.state().nodes()); + return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java index 03c3a0853bd..fcf80ea283c 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java @@ -28,14 +28,21 @@ import java.util.Collection; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; -@ClusterScope(scope = SUITE, transportClientRatio = 0) +/** + * Base test case for integration tests against the reindex plugin. + */ +@ClusterScope(scope = SUITE) public abstract class ReindexTestCase extends ESIntegTestCase { - @Override protected Collection> nodePlugins() { return Arrays.asList(ReindexPlugin.class); } + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList(ReindexPlugin.class); + } + protected ReindexRequestBuilder reindex() { return ReindexAction.INSTANCE.newRequestBuilder(client()); }