Fix reindex under the transport client

The big change here is cleaning up the `TaskListResponse` so it doesn't
have a breaky `toString` implementation. That was causing the reindex
tests to break.

Also removed `NetworkModule#registerTaskStatus` which is part of the
Plugin API. Use `Plugin#getNamedWriteables` instead.
This commit is contained in:
Nik Everett 2016-08-15 15:51:24 -04:00
parent 2c1b9b67db
commit fdd50612ae
14 changed files with 156 additions and 184 deletions

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.tasks.TaskInfo;
import java.util.List;
@ -35,13 +34,8 @@ public class CancelTasksResponse extends ListTasksResponse {
public CancelTasksResponse() {
}
public CancelTasksResponse(DiscoveryNodes discoveryNodes) {
super(discoveryNodes);
}
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException>
nodeFailures, DiscoveryNodes discoveryNodes) {
super(tasks, taskFailures, nodeFailures, discoveryNodes);
nodeFailures) {
super(tasks, taskFailures, nodeFailures);
}
}

View File

@ -66,7 +66,7 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver
indexNameExpressionResolver) {
super(settings, CancelTasksAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, CancelTasksRequest::new, () -> new CancelTasksResponse(clusterService.state().nodes()),
indexNameExpressionResolver, CancelTasksRequest::new, CancelTasksResponse::new,
ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, BanParentTaskRequest::new, ThreadPool.Names.SAME, new
BanParentRequestHandler());
@ -75,7 +75,7 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
@Override
protected CancelTasksResponse newResponse(CancelTasksRequest request, List<TaskInfo> tasks, List<TaskOperationFailure>
taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
return new CancelTasksResponse(tasks, taskOperationFailures, failedNodeExceptions, clusterService.state().nodes());
return new CancelTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override

View File

@ -51,21 +51,14 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
private List<TaskGroup> groups;
private final DiscoveryNodes discoveryNodes;
public ListTasksResponse() {
this(null, null, null, null);
}
public ListTasksResponse(DiscoveryNodes discoveryNodes) {
this(null, null, null, discoveryNodes);
this(null, null, null);
}
public ListTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures,
List<? extends FailedNodeException> nodeFailures, DiscoveryNodes discoveryNodes) {
List<? extends FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
this.discoveryNodes = discoveryNodes;
}
@Override
@ -90,6 +83,9 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
return perNodeTasks;
}
/**
* Get the tasks found by this request grouped by parent tasks.
*/
public List<TaskGroup> getTaskGroups() {
if (groups == null) {
buildTaskGroups();
@ -125,12 +121,76 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
this.groups = Collections.unmodifiableList(topLevelTasks.stream().map(TaskGroup.Builder::build).collect(Collectors.toList()));
}
/**
* Get the tasks found by this request.
*/
public List<TaskInfo> getTasks() {
return tasks;
}
/**
* Convert this task response to XContent grouping by executing nodes.
*/
public XContentBuilder toXContentGroupedByNode(XContentBuilder builder, Params params, DiscoveryNodes discoveryNodes)
throws IOException {
toXContentCommon(builder, params);
builder.startObject("nodes");
for (Map.Entry<String, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = discoveryNodes.get(entry.getKey());
builder.startObject(entry.getKey());
if (node != null) {
// If the node is no longer part of the cluster, oh well, we'll just skip it's useful information.
builder.field("name", node.getName());
builder.field("transport_address", node.getAddress().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());
builder.startArray("roles");
for (DiscoveryNode.Role role : node.getRoles()) {
builder.value(role.getRoleName());
}
builder.endArray();
if (!node.getAttributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
}
builder.startObject("tasks");
for(TaskInfo task : entry.getValue()) {
builder.field(task.getTaskId().toString());
task.toXContent(builder, params);
}
builder.endObject();
builder.endObject();
}
builder.endObject();
return builder;
}
/**
* Convert this response to XContent grouping by parent tasks.
*/
public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startObject("tasks");
for (TaskGroup group : getTaskGroups()) {
builder.field(group.getTaskInfo().getTaskId().toString());
group.toXContent(builder, params);
}
builder.endObject();
return builder;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return toXContentGroupedByParents(builder, params);
}
private void toXContentCommon(XContentBuilder builder, Params params) throws IOException {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
builder.startArray("task_failures");
for (TaskOperationFailure ex : getTaskFailures()){
@ -150,51 +210,6 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContent {
}
builder.endArray();
}
String groupBy = params.param("group_by", "nodes");
if ("nodes".equals(groupBy)) {
builder.startObject("nodes");
for (Map.Entry<String, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = discoveryNodes.get(entry.getKey());
builder.startObject(entry.getKey());
if (node != null) {
// If the node is no longer part of the cluster, oh well, we'll just skip it's useful information.
builder.field("name", node.getName());
builder.field("transport_address", node.getAddress().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());
builder.startArray("roles");
for (DiscoveryNode.Role role : node.getRoles()) {
builder.value(role.getRoleName());
}
builder.endArray();
if (!node.getAttributes().isEmpty()) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
}
builder.startObject("tasks");
for(TaskInfo task : entry.getValue()) {
builder.field(task.getTaskId().toString());
task.toXContent(builder, params);
}
builder.endObject();
builder.endObject();
}
builder.endObject();
} else if ("parents".equals(groupBy)) {
builder.startObject("tasks");
for (TaskGroup group : getTaskGroups()) {
builder.field(group.getTaskInfo().getTaskId().toString());
group.toXContent(builder, params);
}
builder.endObject();
}
return builder;
}
@Override

View File

@ -56,15 +56,14 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
@Inject
public TransportListTasksAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ListTasksAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, ListTasksRequest::new, () -> new ListTasksResponse(clusterService.state().nodes()),
ThreadPool.Names.MANAGEMENT);
super(settings, ListTasksAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
protected ListTasksResponse newResponse(ListTasksRequest request, List<TaskInfo> tasks,
List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions, clusterService.state().nodes());
return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override

View File

@ -19,9 +19,6 @@
package org.elasticsearch.common.network;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
@ -33,6 +30,7 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.util.Providers;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Setting;
@ -47,6 +45,9 @@ import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import java.util.ArrayList;
import java.util.List;
/**
* A module to handle registering and binding all network related classes.
*/
@ -76,11 +77,11 @@ public class NetworkModule extends AbstractModule {
private final ExtensionPoint.SelectedType<TransportService> transportServiceTypes = new ExtensionPoint.SelectedType<>("transport_service", TransportService.class);
private final ExtensionPoint.SelectedType<Transport> transportTypes = new ExtensionPoint.SelectedType<>("transport", Transport.class);
private final ExtensionPoint.SelectedType<HttpServerTransport> httpTransportTypes = new ExtensionPoint.SelectedType<>("http_transport", HttpServerTransport.class);
private final List<Entry> namedWriteables = new ArrayList<>();
private final List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>();
/**
* Creates a network module that custom networking classes can be plugged into.
* @param networkService A constructed network service object to bind.
* @param networkService A constructed network service object to bind.
* @param settings The settings for the node
* @param transportClient True if only transport classes should be allowed to be registered, false otherwise.
*/
@ -90,8 +91,8 @@ public class NetworkModule extends AbstractModule {
this.transportClient = transportClient;
registerTransportService("default", TransportService.class);
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
registerTaskStatus(ReplicationTask.Status.NAME, ReplicationTask.Status::new);
registerTaskStatus(RawTaskStatus.NAME, RawTaskStatus::new);
namedWriteables.add(new NamedWriteableRegistry.Entry(Task.Status.class, ReplicationTask.Status.NAME, ReplicationTask.Status::new));
namedWriteables.add(new NamedWriteableRegistry.Entry(Task.Status.class, RawTaskStatus.NAME, RawTaskStatus::new));
registerBuiltinAllocationCommands();
}
@ -118,10 +119,6 @@ public class NetworkModule extends AbstractModule {
httpTransportTypes.registerExtension(name, clazz);
}
public void registerTaskStatus(String name, Writeable.Reader<? extends Task.Status> reader) {
namedWriteables.add(new Entry(Task.Status.class, name, reader));
}
/**
* Register an allocation command.
* <p>

View File

@ -19,9 +19,7 @@
package org.elasticsearch.rest.action.admin.cluster;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -31,11 +29,10 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.action.admin.cluster.RestListTasksAction.nodeSettingListener;
import static org.elasticsearch.rest.action.admin.cluster.RestListTasksAction.listTasksResponseListener;
public class RestCancelTasksAction extends BaseRestHandler {
@ -61,8 +58,7 @@ public class RestCancelTasksAction extends BaseRestHandler {
cancelTasksRequest.setNodesIds(nodesIds);
cancelTasksRequest.setActions(actions);
cancelTasksRequest.setParentTaskId(parentTaskId);
ActionListener<CancelTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
client.admin().cluster().cancelTasks(cancelTasksRequest, listTasksResponseListener(clusterService, channel));
}
@Override

View File

@ -28,10 +28,15 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;
@ -68,27 +73,30 @@ public class RestListTasksAction extends BaseRestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final NodeClient client) {
ActionListener<ListTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
client.admin().cluster().listTasks(generateListTasksRequest(request), listener);
client.admin().cluster().listTasks(generateListTasksRequest(request), listTasksResponseListener(clusterService, channel));
}
/**
* Wrap the normal channel listener in one that sets the discovery nodes on the response so we can support all of it's toXContent
* formats.
* Standard listener for extensions of {@link ListTasksResponse} that supports {@code group_by=nodes}.
*/
public static <T extends ListTasksResponse> ActionListener<T> nodeSettingListener(ClusterService clusterService,
ActionListener<T> channelListener) {
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
channelListener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
channelListener.onFailure(e);
}
};
public static <T extends ListTasksResponse> ActionListener<T> listTasksResponseListener(ClusterService clusterService,
RestChannel channel) {
String groupBy = channel.request().param("group_by", "nodes");
if ("nodes".equals(groupBy)) {
return new RestBuilderListener<T>(channel) {
@Override
public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContentGroupedByNode(builder, channel.request(), clusterService.state().nodes());
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
};
} else if ("parents".equals(groupBy)) {
return new RestToXContentListener<>(channel);
} else {
throw new IllegalArgumentException("[group_by] must be one of [nodes] or [parents] but was [" + groupBy + "]");
}
}
@Override

View File

@ -47,6 +47,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.action.admin.cluster.RestListTasksAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
@ -65,6 +66,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
import static org.hamcrest.Matchers.containsString;
@ -736,7 +738,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
ListTasksResponse response = testNodes[0].transportListTasksAction.execute(listTasksRequest).get();
assertEquals(testNodes.length + 1, response.getTasks().size());
Map<String, Object> byNodes = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "nodes")));
Map<String, Object> byNodes = serialize(response, true);
byNodes = (Map<String, Object>) byNodes.get("nodes");
// One element on the top level
assertEquals(testNodes.length, byNodes.size());
@ -750,7 +752,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
}
// Group by parents
Map<String, Object> byParent = serialize(response, new ToXContent.MapParams(Collections.singletonMap("group_by", "parents")));
Map<String, Object> byParent = serialize(response, false);
byParent = (Map<String, Object>) byParent.get("tasks");
// One element on the top level
assertEquals(1, byParent.size()); // Only one top level task
@ -763,10 +765,15 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
}
}
private Map<String, Object> serialize(ToXContent response, ToXContent.Params params) throws IOException {
private Map<String, Object> serialize(ListTasksResponse response, boolean byParents) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
response.toXContent(builder, params);
if (byParents) {
DiscoveryNodes nodes = testNodes[0].clusterService.state().nodes();
response.toXContentGroupedByNode(builder, ToXContent.EMPTY_PARAMS, nodes);
} else {
response.toXContentGroupedByParents(builder, ToXContent.EMPTY_PARAMS);
}
builder.endObject();
builder.flush();
logger.info(builder.string());

View File

@ -19,20 +19,12 @@
package org.elasticsearch.common.network;
import java.io.IOException;
import java.util.Collections;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Table;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.ModuleTestCase;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerAdapter;
import org.elasticsearch.http.HttpServerTransport;
@ -41,11 +33,12 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.cat.AbstractCatAction;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
public class NetworkModuleTests extends ModuleTestCase {
static class FakeTransportService extends TransportService {
@ -168,40 +161,4 @@ public class NetworkModuleTests extends ModuleTestCase {
assertNotBound(module, HttpServerTransport.class);
assertFalse(module.isTransportClient());
}
public void testRegisterTaskStatus() {
Settings settings = Settings.EMPTY;
NetworkModule module = new NetworkModule(new NetworkService(settings, Collections.emptyList()), settings, false);
NamedWriteableRegistry registry = new NamedWriteableRegistry(module.getNamedWriteables());
assertFalse(module.isTransportClient());
// Builtin reader comes back
assertNotNull(registry.getReader(Task.Status.class, ReplicationTask.Status.NAME));
module.registerTaskStatus(DummyTaskStatus.NAME, DummyTaskStatus::new);
assertTrue(module.getNamedWriteables().stream().anyMatch(x -> x.name.equals(DummyTaskStatus.NAME)));
}
private class DummyTaskStatus implements Task.Status {
public static final String NAME = "dummy";
public DummyTaskStatus(StreamInput in) {
throw new UnsupportedOperationException("test");
}
@Override
public String getWriteableName() {
return NAME;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
throw new UnsupportedOperationException();
}
}
}

View File

@ -20,29 +20,23 @@
package org.elasticsearch.tasks;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.util.Collections;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
public class ListTasksResponseTests extends ESTestCase {
public void testToStringNoTask() {
ListTasksResponse tasksResponse = new ListTasksResponse();
String string = tasksResponse.toString();
assertThat(string, Matchers.containsString("nodes"));
public void testEmptyToString() {
assertEquals("{\"tasks\":{}}", new ListTasksResponse().toString());
}
public void testToString() {
public void testNonEmptyToString() {
TaskInfo info = new TaskInfo(
new TaskId("node1", 1), "dummy-type", "dummy-action", "dummy-description", null, 0, 1, true, new TaskId("node1", 0));
DiscoveryNodes nodes = DiscoveryNodes.builder().build();
ListTasksResponse tasksResponse = new ListTasksResponse(Collections.singletonList(info), Collections.emptyList(),
Collections.emptyList(), nodes);
String string = tasksResponse.toString();
assertThat(string, Matchers.containsString("\"type\":\"dummy-type\""));
ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList());
assertEquals("{\"tasks\":{\"node1:1\":{\"node\":\"node1\",\"id\":1,\"type\":\"dummy-type\",\"action\":\"dummy-action\","
+ "\"description\":\"dummy-description\",\"start_time_in_millis\":0,\"running_time_in_nanos\":1,\"cancellable\":true,"
+ "\"parent_task_id\":\"node1:0\"}}}", tasksResponse.toString());
}
}

View File

@ -21,11 +21,12 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.tasks.Task;
import java.util.Arrays;
import java.util.List;
@ -43,16 +44,18 @@ public class ReindexPlugin extends Plugin implements ActionPlugin {
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class));
}
@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return singletonList(
new NamedWriteableRegistry.Entry(Task.Status.class, BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new));
}
@Override
public List<Class<? extends RestHandler>> getRestHandlers() {
return Arrays.asList(RestReindexAction.class, RestUpdateByQueryAction.class, RestDeleteByQueryAction.class,
RestRethrottleAction.class);
}
public void onModule(NetworkModule networkModule) {
networkModule.registerTaskStatus(BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new);
}
@Override
public List<Setting<?>> getSettings() {
return singletonList(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);

View File

@ -19,8 +19,6 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
@ -29,11 +27,10 @@ import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.action.admin.cluster.RestListTasksAction.nodeSettingListener;
import static org.elasticsearch.rest.action.admin.cluster.RestListTasksAction.listTasksResponseListener;
public class RestRethrottleAction extends BaseRestHandler {
private final ClusterService clusterService;
@ -56,7 +53,6 @@ public class RestRethrottleAction extends BaseRestHandler {
throw new IllegalArgumentException("requests_per_second is a required parameter");
}
internalRequest.setRequestsPerSecond(requestsPerSecond);
ActionListener<ListTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
client.execute(RethrottleAction.INSTANCE, internalRequest, listener);
client.execute(RethrottleAction.INSTANCE, internalRequest, listTasksResponseListener(clusterService, channel));
}
}

View File

@ -40,9 +40,8 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
@Inject
public TransportRethrottleAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, RethrottleAction.NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, RethrottleRequest::new, () -> new ListTasksResponse(clusterService.state().nodes()),
ThreadPool.Names.MANAGEMENT);
super(settings, RethrottleAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
RethrottleRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
@ -60,7 +59,7 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
@Override
protected ListTasksResponse newResponse(RethrottleRequest request, List<TaskInfo> tasks,
List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions, clusterService.state().nodes());
return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override

View File

@ -28,14 +28,21 @@ import java.util.Collection;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
@ClusterScope(scope = SUITE, transportClientRatio = 0)
/**
* Base test case for integration tests against the reindex plugin.
*/
@ClusterScope(scope = SUITE)
public abstract class ReindexTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class);
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(ReindexPlugin.class);
}
protected ReindexRequestBuilder reindex() {
return ReindexAction.INSTANCE.newRequestBuilder(client());
}