Replace Streamable w/ Writeable in BaseTasksResponse and subclasses (#36176)

This commit replaces usages of Streamable with Writeable for the
BaseTasksResponse / TransportTasksAction classes and subclasses of
these classes.

Note that where possible response fields were made final.

Relates to #34389
This commit is contained in:
Martijn van Groningen 2018-12-05 13:14:10 +01:00 committed by GitHub
parent 59b0900174
commit 11935cd480
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
55 changed files with 362 additions and 541 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.io.stream.Writeable;
public class RethrottleAction extends Action<ListTasksResponse> {
public static final RethrottleAction INSTANCE = new RethrottleAction();
@ -32,6 +33,11 @@ public class RethrottleAction extends Action<ListTasksResponse> {
@Override
public ListTasksResponse newResponse() {
return new ListTasksResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<ListTasksResponse> getResponseReader() {
return ListTasksResponse::new;
}
}

View File

@ -29,13 +29,11 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
public class TransportRethrottleAction extends TransportTasksAction<BulkByScrollTask, RethrottleRequest, ListTasksResponse, TaskInfo> {
@ -45,7 +43,7 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
public TransportRethrottleAction(ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, Client client) {
super(RethrottleAction.NAME, clusterService, transportService, actionFilters,
RethrottleRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
RethrottleRequest::new, ListTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
this.client = client;
}
@ -101,11 +99,6 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
listener.onResponse(task.taskInfo(localNodeId, true));
}
@Override
protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
return new TaskInfo(in);
}
@Override
protected ListTasksResponse newResponse(RethrottleRequest request, List<TaskInfo> tasks,
List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
/**
* Action for cancelling running tasks
@ -35,6 +36,11 @@ public class CancelTasksAction extends Action<CancelTasksResponse> {
@Override
public CancelTasksResponse newResponse() {
return new CancelTasksResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<CancelTasksResponse> getResponseReader() {
return CancelTasksResponse::new;
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -40,7 +41,8 @@ public class CancelTasksResponse extends ListTasksResponse {
private static final ConstructingObjectParser<CancelTasksResponse, Void> PARSER =
setupParser("cancel_tasks_response", CancelTasksResponse::new);
public CancelTasksResponse() {
public CancelTasksResponse(StreamInput in) throws IOException {
super(in);
}
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends ElasticsearchException>

View File

@ -64,7 +64,7 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
@Inject
public TransportCancelTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(CancelTasksAction.NAME, clusterService, transportService, actionFilters,
CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT);
CancelTasksRequest::new, CancelTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new,
new BanParentRequestHandler());
}
@ -75,11 +75,6 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
return new CancelTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
return new TaskInfo(in);
}
protected void processTasks(CancelTasksRequest request, Consumer<CancellableTask> operation) {
if (request.getTaskId().isSet()) {
// we are only checking one task, we can optimize it

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.node.tasks.list;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
/**
* Action for retrieving a list of currently running tasks
@ -35,6 +36,11 @@ public class ListTasksAction extends Action<ListTasksResponse> {
@Override
public ListTasksResponse newResponse() {
return new ListTasksResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<ListTasksResponse> getResponseReader() {
return ListTasksResponse::new;
}
}

View File

@ -52,22 +52,28 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject {
private static final String TASKS = "tasks";
private List<TaskInfo> tasks;
private final List<TaskInfo> tasks;
private Map<String, List<TaskInfo>> perNodeTasks;
private List<TaskGroup> groups;
public ListTasksResponse() {
this(null, null, null);
}
public ListTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures,
List<? extends ElasticsearchException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
public ListTasksResponse(StreamInput in) throws IOException {
super(in);
tasks = Collections.unmodifiableList(in.readList(TaskInfo::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(tasks);
}
protected static <T> ConstructingObjectParser<T, Void> setupParser(String name,
TriFunction<
@ -96,18 +102,6 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContentOb
private static final ConstructingObjectParser<ListTasksResponse, Void> PARSER =
setupParser("list_tasks_response", ListTasksResponse::new);
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tasks = Collections.unmodifiableList(in.readList(TaskInfo::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(tasks);
}
/**
* Returns the list of tasks by node
*/

View File

@ -26,14 +26,12 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
@ -52,7 +50,7 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
@Inject
public TransportListTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(ListTasksAction.NAME, clusterService, transportService, actionFilters,
ListTasksRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
ListTasksRequest::new, ListTasksResponse::new, TaskInfo::new, ThreadPool.Names.MANAGEMENT);
}
@Override
@ -61,11 +59,6 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
return new TaskInfo(in);
}
@Override
protected void taskOperation(ListTasksRequest request, Task task, ActionListener<TaskInfo> listener) {
listener.onResponse(task.taskInfo(clusterService.localNode().getId(), request.getDetailed()));

View File

@ -55,34 +55,8 @@ public class BaseTasksResponse extends ActionResponse {
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures));
}
/**
* The list of task failures exception.
*/
public List<TaskOperationFailure> getTaskFailures() {
return taskFailures;
}
/**
* The list of node failures exception.
*/
public List<ElasticsearchException> getNodeFailures() {
return nodeFailures;
}
/**
* Rethrow task failures if there are any.
*/
public void rethrowFailures(String operationName) {
rethrowAndSuppress(Stream.concat(
getNodeFailures().stream(),
getTaskFailures().stream().map(f -> new ElasticsearchException(
"{} of [{}] failed", f.getCause(), operationName, new TaskId(f.getNodeId(), f.getTaskId()))))
.collect(toList()));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public BaseTasksResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
List<TaskOperationFailure> taskFailures = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
@ -110,6 +84,31 @@ public class BaseTasksResponse extends ActionResponse {
}
}
/**
* The list of task failures exception.
*/
public List<TaskOperationFailure> getTaskFailures() {
return taskFailures;
}
/**
* The list of node failures exception.
*/
public List<ElasticsearchException> getNodeFailures() {
return nodeFailures;
}
/**
* Rethrow task failures if there are any.
*/
public void rethrowFailures(String operationName) {
rethrowAndSuppress(Stream.concat(
getNodeFailures().stream(),
getTaskFailures().stream().map(f -> new ElasticsearchException(
"{} of [{}] failed", f.getCause(), operationName, new TaskId(f.getNodeId(), f.getTaskId()))))
.collect(toList()));
}
protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException {
if (getTaskFailures() != null && getTaskFailures().size() > 0) {
builder.startArray(TASK_FAILURES);

View File

@ -55,7 +55,6 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static java.util.Collections.emptyList;
@ -71,20 +70,23 @@ public abstract class TransportTasksAction<
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final Writeable.Reader<TasksRequest> requestSupplier;
protected final Supplier<TasksResponse> responseSupplier;
protected final Writeable.Reader<TasksRequest> requestReader;
protected final Writeable.Reader<TasksResponse> responsesReader;
protected final Writeable.Reader<TaskResponse> responseReader;
protected final String transportNodeAction;
protected TransportTasksAction(String actionName, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, Writeable.Reader<TasksRequest> requestSupplier,
Supplier<TasksResponse> responseSupplier, String nodeExecutor) {
super(actionName, transportService, actionFilters, requestSupplier);
ActionFilters actionFilters, Writeable.Reader<TasksRequest> requestReader,
Writeable.Reader<TasksResponse> responsesReader, Writeable.Reader<TaskResponse> responseReader,
String nodeExecutor) {
super(actionName, transportService, actionFilters, requestReader);
this.clusterService = clusterService;
this.transportService = transportService;
this.transportNodeAction = actionName + "[n]";
this.requestSupplier = requestSupplier;
this.responseSupplier = responseSupplier;
this.requestReader = requestReader;
this.responsesReader = responsesReader;
this.responseReader = responseReader;
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, NodeTaskRequest::new, new NodeTransportHandler());
}
@ -205,8 +207,6 @@ public abstract class TransportTasksAction<
return newResponse(request, tasks, taskOperationFailures, failedNodeExceptions);
}
protected abstract TaskResponse readTaskResponse(StreamInput in) throws IOException;
/**
* Perform the required operation on the task. It is OK start an asynchronous operation or to throw an exception but not both.
*/
@ -364,7 +364,7 @@ public abstract class TransportTasksAction<
protected NodeTaskRequest(StreamInput in) throws IOException {
super(in);
this.tasksRequest = requestSupplier.read(in);
this.tasksRequest = requestReader.read(in);
}
@Override
@ -411,7 +411,7 @@ public abstract class TransportTasksAction<
int resultsSize = in.readVInt();
results = new ArrayList<>(resultsSize);
for (; resultsSize > 0; resultsSize--) {
final TaskResponse result = in.readBoolean() ? readTaskResponse(in) : null;
final TaskResponse result = in.readBoolean() ? responseReader.read(in) : null;
results.add(result);
}
if (in.readBoolean()) {

View File

@ -413,19 +413,14 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
private List<UnblockTestTaskResponse> tasks;
public UnblockTestTasksResponse() {
super(null, null);
}
public UnblockTestTasksResponse(List<UnblockTestTaskResponse> tasks, List<TaskOperationFailure> taskFailures, List<? extends
FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public UnblockTestTasksResponse(StreamInput in) throws IOException {
super(in);
int taskCount = in.readVInt();
List<UnblockTestTaskResponse> builder = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
@ -453,7 +448,7 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
@Inject
public TransportUnblockTestTasksAction(ClusterService clusterService, TransportService transportService) {
super(UnblockTestTasksAction.NAME, clusterService, transportService, new ActionFilters(new HashSet<>()),
UnblockTestTasksRequest::new, UnblockTestTasksResponse::new, ThreadPool.Names.MANAGEMENT);
UnblockTestTasksRequest::new, UnblockTestTasksResponse::new, UnblockTestTaskResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
@ -463,11 +458,6 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
return new UnblockTestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected UnblockTestTaskResponse readTaskResponse(StreamInput in) throws IOException {
return new UnblockTestTaskResponse(in);
}
@Override
protected void taskOperation(UnblockTestTasksRequest request, Task task, ActionListener<UnblockTestTaskResponse> listener) {
((TestTask) task).unblock();
@ -487,7 +477,12 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
@Override
public UnblockTestTasksResponse newResponse() {
return new UnblockTestTasksResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<UnblockTestTasksResponse> getResponseReader() {
return UnblockTestTasksResponse::new;
}
}

View File

@ -202,19 +202,14 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
private List<TestTaskResponse> tasks;
TestTasksResponse() {
super(null, null);
}
TestTasksResponse(List<TestTaskResponse> tasks, List<TaskOperationFailure> taskFailures,
List<? extends FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
TestTasksResponse(StreamInput in) throws IOException {
super(in);
int taskCount = in.readVInt();
List<TestTaskResponse> builder = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
@ -241,8 +236,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
protected TestTasksAction(String actionName,
ClusterService clusterService, TransportService transportService) {
super(actionName, clusterService, transportService, new ActionFilters(new HashSet<>()),
TestTasksRequest::new, TestTasksResponse::new,
ThreadPool.Names.MANAGEMENT);
TestTasksRequest::new, TestTasksResponse::new, TestTaskResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
@ -251,11 +245,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
return new TestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected TestTaskResponse readTaskResponse(StreamInput in) throws IOException {
return new TestTaskResponse(in);
}
}
private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch) throws InterruptedException {

View File

@ -295,7 +295,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
// That should trigger cancellation request
assertThat(capturedTaskId.get(), equalTo(localId));
// Notify successful cancellation
capturedListener.get().onResponse(new CancelTasksResponse());
capturedListener.get().onResponse(
new CancelTasksResponse(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
// finish or fail task
if (randomBoolean()) {

View File

@ -399,7 +399,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
@Override
public TestTasksResponse newResponse() {
return new TestTasksResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<TestTasksResponse> getResponseReader() {
return TestTasksResponse::new;
}
}
@ -484,19 +489,14 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
private List<TestTaskResponse> tasks;
public TestTasksResponse() {
super(null, null);
}
public TestTasksResponse(List<TestTaskResponse> tasks, List<TaskOperationFailure> taskFailures,
List<? extends FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public TestTasksResponse(StreamInput in) throws IOException {
super(in);
tasks = in.readList(TestTaskResponse::new);
}
@ -517,7 +517,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
@Inject
public TransportTestTaskAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(TestTaskAction.NAME, clusterService, transportService, actionFilters,
TestTasksRequest::new, TestTasksResponse::new, ThreadPool.Names.MANAGEMENT);
TestTasksRequest::new, TestTasksResponse::new, TestTaskResponse::new, ThreadPool.Names.MANAGEMENT);
}
@Override
@ -527,11 +527,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
return new TestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
}
@Override
protected TestTaskResponse readTaskResponse(StreamInput in) throws IOException {
return new TestTaskResponse(in);
}
@Override
protected void taskOperation(TestTasksRequest request, TestTask task, ActionListener<TestTaskResponse> listener) {
task.setOperation(request.operation);

View File

@ -46,7 +46,7 @@ public class ListTasksResponseTests extends AbstractXContentTestCase<ListTasksRe
public void testEmptyToString() {
assertEquals("{\n" +
" \"tasks\" : [ ]\n" +
"}", new ListTasksResponse().toString());
"}", new ListTasksResponse(null, null, null).toString());
}
public void testNonEmptyToString() {

View File

@ -14,7 +14,6 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
@ -23,7 +22,6 @@ import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -50,6 +48,7 @@ public class TransportFollowStatsAction extends TransportTasksAction<
actionFilters,
FollowStatsAction.StatsRequest::new,
FollowStatsAction.StatsResponses::new,
FollowStatsAction.StatsResponse::new,
Ccr.CCR_THREAD_POOL_NAME);
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
}
@ -75,11 +74,6 @@ public class TransportFollowStatsAction extends TransportTasksAction<
return new FollowStatsAction.StatsResponses(taskOperationFailures, failedNodeExceptions, statsRespons);
}
@Override
protected FollowStatsAction.StatsResponse readTaskResponse(final StreamInput in) throws IOException {
return new FollowStatsAction.StatsResponse(in);
}
@Override
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();

View File

@ -6,7 +6,8 @@
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
@ -14,11 +15,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class StatsResponsesTests extends AbstractStreamableTestCase<FollowStatsAction.StatsResponses> {
public class StatsResponsesTests extends AbstractWireSerializingTestCase<FollowStatsAction.StatsResponses> {
@Override
protected FollowStatsAction.StatsResponses createBlankInstance() {
return new FollowStatsAction.StatsResponses();
protected Writeable.Reader<FollowStatsAction.StatsResponses> instanceReader() {
return FollowStatsAction.StatsResponses::new;
}
@Override

View File

@ -72,8 +72,7 @@ public class CcrStatsAction extends Action<CcrStatsAction.Response> {
public Response(StreamInput in) throws IOException {
super(in);
autoFollowStats = new AutoFollowStats(in);
followStats = new FollowStatsAction.StatsResponses();
followStats.readFrom(in);
followStats = new FollowStatsAction.StatsResponses(in);
}
public AutoFollowStats getAutoFollowStats() {

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -42,21 +41,22 @@ public class FollowStatsAction extends Action<FollowStatsAction.StatsResponses>
@Override
public StatsResponses newResponse() {
return new StatsResponses();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<StatsResponses> getResponseReader() {
return StatsResponses::new;
}
public static class StatsResponses extends BaseTasksResponse implements ToXContentObject {
private List<StatsResponse> statsResponse;
private final List<StatsResponse> statsResponse;
public List<StatsResponse> getStatsResponses() {
return statsResponse;
}
public StatsResponses() {
this(Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
}
public StatsResponses(
final List<TaskOperationFailure> taskFailures,
final List<? extends FailedNodeException> nodeFailures,
@ -65,6 +65,17 @@ public class FollowStatsAction extends Action<FollowStatsAction.StatsResponses>
this.statsResponse = statsResponse;
}
public StatsResponses(StreamInput in) throws IOException {
super(in);
statsResponse = in.readList(StatsResponse::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(statsResponse);
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
// sort by index name, then shard ID
@ -99,18 +110,6 @@ public class FollowStatsAction extends Action<FollowStatsAction.StatsResponses>
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
statsResponse = in.readList(StatsResponse::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(statsResponse);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -38,7 +38,12 @@ public class CloseJobAction extends Action<CloseJobAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends BaseTasksRequest<Request> implements ToXContentObject {
@ -207,30 +212,15 @@ public class CloseJobAction extends Action<CloseJobAction.Response> {
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean closed;
public Response() {
super(null, null);
}
public Response(StreamInput in) throws IOException {
super(null, null);
readFrom(in);
}
private final boolean closed;
public Response(boolean closed) {
super(null, null);
this.closed = closed;
}
public boolean isClosed() {
return closed;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
closed = in.readBoolean();
}
@ -240,6 +230,10 @@ public class CloseJobAction extends Action<CloseJobAction.Response> {
out.writeBoolean(closed);
}
public boolean isClosed() {
return closed;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -36,7 +36,12 @@ public class FlushJobAction extends Action<FlushJobAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends JobTaskRequest<Request> implements ToXContentObject {
@ -194,27 +199,14 @@ public class FlushJobAction extends Action<FlushJobAction.Response> {
private boolean flushed;
private Date lastFinalizedBucketEnd;
public Response() {
super(null, null);
}
public Response(boolean flushed, @Nullable Date lastFinalizedBucketEnd) {
super(null, null);
this.flushed = flushed;
this.lastFinalizedBucketEnd = lastFinalizedBucketEnd;
}
public boolean isFlushed() {
return flushed;
}
public Date getLastFinalizedBucketEnd() {
return lastFinalizedBucketEnd;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
flushed = in.readBoolean();
lastFinalizedBucketEnd = new Date(in.readVLong());
}
@ -226,6 +218,14 @@ public class FlushJobAction extends Action<FlushJobAction.Response> {
out.writeVLong(lastFinalizedBucketEnd.getTime());
}
public boolean isFlushed() {
return flushed;
}
public Date getLastFinalizedBucketEnd() {
return lastFinalizedBucketEnd;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -35,7 +35,12 @@ public class ForecastJobAction extends Action<ForecastJobAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends JobTaskRequest<Request> implements ToXContentObject {
@ -167,27 +172,14 @@ public class ForecastJobAction extends Action<ForecastJobAction.Response> {
private boolean acknowledged;
private String forecastId;
public Response() {
super(null, null);
}
public Response(boolean acknowledged, String forecastId) {
super(null, null);
this.acknowledged = acknowledged;
this.forecastId = forecastId;
}
public boolean isAcknowledged() {
return acknowledged;
}
public String getForecastId() {
return forecastId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
acknowledged = in.readBoolean();
forecastId = in.readString();
}
@ -199,6 +191,14 @@ public class ForecastJobAction extends Action<ForecastJobAction.Response> {
out.writeString(forecastId);
}
public boolean isAcknowledged() {
return acknowledged;
}
public String getForecastId() {
return forecastId;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -56,7 +56,12 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends BaseTasksRequest<Request> {
@ -317,17 +322,8 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
this.jobsStats = jobsStats;
}
public Response() {
super(Collections.emptyList(), Collections.emptyList());
}
public QueryPage<JobStats> getResponse() {
return jobsStats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
jobsStats = new QueryPage<>(in, JobStats::new);
}
@ -337,6 +333,10 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
jobsStats.writeTo(out);
}
public QueryPage<JobStats> getResponse() {
return jobsStats;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -46,7 +46,12 @@ public class IsolateDatafeedAction extends Action<IsolateDatafeedAction.Response
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends BaseTasksRequest<Request> implements ToXContentObject {
@ -132,7 +137,7 @@ public class IsolateDatafeedAction extends Action<IsolateDatafeedAction.Response
public static class Response extends BaseTasksResponse implements Writeable {
private boolean isolated;
private final boolean isolated;
public Response(boolean isolated) {
super(null, null);
@ -140,17 +145,7 @@ public class IsolateDatafeedAction extends Action<IsolateDatafeedAction.Response
}
public Response(StreamInput in) throws IOException {
super(null, null);
readFrom(in);
}
public Response() {
super(null, null);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
super(in);
isolated = in.readBoolean();
}

View File

@ -27,7 +27,12 @@ public class KillProcessAction extends Action<KillProcessAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
@ -54,15 +59,11 @@ public class KillProcessAction extends Action<KillProcessAction.Response> {
public static class Response extends BaseTasksResponse implements Writeable {
private boolean killed;
public Response() {
super(null, null);
}
private final boolean killed;
public Response(StreamInput in) throws IOException {
super(null, null);
readFrom(in);
super(in);
killed = in.readBoolean();
}
public Response(boolean killed) {
@ -74,12 +75,6 @@ public class KillProcessAction extends Action<KillProcessAction.Response> {
return killed;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
killed = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -27,7 +27,12 @@ public class PersistJobAction extends Action<PersistJobAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends JobTaskRequest<PersistJobAction.Request> {
@ -80,24 +85,15 @@ public class PersistJobAction extends Action<PersistJobAction.Response> {
public static class Response extends BaseTasksResponse implements Writeable {
boolean persisted;
public Response() {
super(null, null);
}
private final boolean persisted;
public Response(boolean persisted) {
super(null, null);
this.persisted = persisted;
}
public boolean isPersisted() {
return persisted;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
persisted = in.readBoolean();
}
@ -107,6 +103,10 @@ public class PersistJobAction extends Action<PersistJobAction.Response> {
out.writeBoolean(persisted);
}
public boolean isPersisted() {
return persisted;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -35,7 +35,12 @@ public class PostDataAction extends Action<PostDataAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
@ -47,29 +52,20 @@ public class PostDataAction extends Action<PostDataAction.Response> {
public static class Response extends BaseTasksResponse implements StatusToXContentObject, Writeable {
private DataCounts dataCounts;
private final DataCounts dataCounts;
public Response(String jobId) {
super(null, null);
dataCounts = new DataCounts(jobId);
}
public Response() {
super(null, null);
}
public Response(DataCounts counts) {
super(null, null);
this.dataCounts = counts;
}
public DataCounts getDataCounts() {
return dataCounts;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
dataCounts = new DataCounts(in);
}
@ -79,6 +75,10 @@ public class PostDataAction extends Action<PostDataAction.Response> {
dataCounts.writeTo(out);
}
public DataCounts getDataCounts() {
return dataCounts;
}
@Override
public RestStatus status() {
return RestStatus.ACCEPTED;

View File

@ -41,7 +41,12 @@ public class StopDatafeedAction extends Action<StopDatafeedAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends BaseTasksRequest<Request> implements ToXContentObject {
@ -194,7 +199,7 @@ public class StopDatafeedAction extends Action<StopDatafeedAction.Response> {
public static class Response extends BaseTasksResponse implements Writeable {
private boolean stopped;
private final boolean stopped;
public Response(boolean stopped) {
super(null, null);
@ -202,21 +207,7 @@ public class StopDatafeedAction extends Action<StopDatafeedAction.Response> {
}
public Response(StreamInput in) throws IOException {
super(null, null);
readFrom(in);
}
public Response() {
super(null, null);
}
public boolean isStopped() {
return stopped;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
super(in);
stopped = in.readBoolean();
}
@ -225,6 +216,11 @@ public class StopDatafeedAction extends Action<StopDatafeedAction.Response> {
super.writeTo(out);
out.writeBoolean(stopped);
}
public boolean isStopped() {
return stopped;
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response> {

View File

@ -35,7 +35,12 @@ public class UpdateProcessAction extends Action<UpdateProcessAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
@ -47,16 +52,15 @@ public class UpdateProcessAction extends Action<UpdateProcessAction.Response> {
public static class Response extends BaseTasksResponse implements StatusToXContentObject, Writeable {
private boolean isUpdated;
private final boolean isUpdated;
public Response() {
super(null, null);
this.isUpdated = true;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
isUpdated = in.readBoolean();
}

View File

@ -40,7 +40,12 @@ public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends BaseTasksRequest<Request> implements ToXContentFragment {
@ -109,12 +114,7 @@ public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean acknowledged;
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}
private final boolean acknowledged;
public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
super(taskFailures, nodeFailures);
@ -126,18 +126,8 @@ public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response
this.acknowledged = acknowledged;
}
public Response() {
super(Collections.emptyList(), Collections.emptyList());
this.acknowledged = false;
}
public boolean isDeleted() {
return acknowledged;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
acknowledged = in.readBoolean();
}
@ -147,6 +137,10 @@ public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response
out.writeBoolean(acknowledged);
}
public boolean isDeleted() {
return acknowledged;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -50,7 +50,12 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
@ -133,7 +138,7 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private List<JobWrapper> jobs;
private final List<JobWrapper> jobs;
public Response(List<JobWrapper> jobs) {
super(Collections.emptyList(), Collections.emptyList());
@ -145,22 +150,8 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
this.jobs = jobs;
}
public Response() {
super(Collections.emptyList(), Collections.emptyList());
}
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}
public List<JobWrapper> getJobs() {
return jobs;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
super(in);
jobs = in.readList(JobWrapper::new);
}
@ -170,6 +161,10 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
out.writeList(jobs);
}
public List<JobWrapper> getJobs() {
return jobs;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -36,7 +36,12 @@ public class StartRollupJobAction extends Action<StartRollupJobAction.Response>
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
@ -101,30 +106,15 @@ public class StartRollupJobAction extends Action<StartRollupJobAction.Response>
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean started;
public Response() {
super(Collections.emptyList(), Collections.emptyList());
}
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}
private final boolean started;
public Response(boolean started) {
super(Collections.emptyList(), Collections.emptyList());
this.started = started;
}
public boolean isStarted() {
return started;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
started = in.readBoolean();
}
@ -134,6 +124,10 @@ public class StartRollupJobAction extends Action<StartRollupJobAction.Response>
out.writeBoolean(started);
}
public boolean isStarted() {
return started;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -43,7 +43,12 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends BaseTasksRequest<Request> implements ToXContent {
@ -138,30 +143,15 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
private boolean stopped;
public Response() {
super(Collections.emptyList(), Collections.emptyList());
}
public Response(StreamInput in) throws IOException {
super(Collections.emptyList(), Collections.emptyList());
readFrom(in);
}
private final boolean stopped;
public Response(boolean stopped) {
super(Collections.emptyList(), Collections.emptyList());
this.stopped = stopped;
}
public boolean isStopped() {
return stopped;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Response(StreamInput in) throws IOException {
super(in);
stopped = in.readBoolean();
}
@ -171,6 +161,10 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
out.writeBoolean(stopped);
}
public boolean isStopped() {
return stopped;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction.Response;
public class CloseJobActionResponseTests extends AbstractStreamableTestCase<Response> {
public class CloseJobActionResponseTests extends AbstractWireSerializingTestCase<Response> {
@Override
protected Response createTestInstance() {
@ -16,7 +17,7 @@ public class CloseJobActionResponseTests extends AbstractStreamableTestCase<Resp
}
@Override
protected Response createBlankInstance() {
return new Response();
protected Writeable.Reader<Response> instanceReader() {
return Response::new;
}
}
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.ForecastJobAction.Response;
public class ForecastJobActionResponseTests extends AbstractStreamableTestCase<Response> {
public class ForecastJobActionResponseTests extends AbstractWireSerializingTestCase<Response> {
@Override
protected Response createTestInstance() {
@ -16,8 +17,7 @@ public class ForecastJobActionResponseTests extends AbstractStreamableTestCase<R
}
@Override
protected Response createBlankInstance() {
return new Response();
protected Writeable.Reader<Response> instanceReader() {
return Response::new;
}
}

View File

@ -7,9 +7,10 @@ package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.job.config.Job;
@ -27,7 +28,7 @@ import java.util.List;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<Response> {
public class GetJobStatsActionResponseTests extends AbstractWireSerializingTestCase<Response> {
@Override
protected Response createTestInstance() {
@ -75,8 +76,7 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase<R
}
@Override
protected Response createBlankInstance() {
return new Response();
protected Writeable.Reader<Response> instanceReader() {
return Response::new;
}
}

View File

@ -5,12 +5,13 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class PersistJobActionResponseTests extends AbstractStreamableTestCase<PersistJobAction.Response> {
public class PersistJobActionResponseTests extends AbstractWireSerializingTestCase<PersistJobAction.Response> {
@Override
protected PersistJobAction.Response createBlankInstance() {
return new PersistJobAction.Response();
protected Writeable.Reader<PersistJobAction.Response> instanceReader() {
return PersistJobAction.Response::new;
}
@Override

View File

@ -5,11 +5,12 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests;
public class PostDataActionResponseTests extends AbstractStreamableTestCase<PostDataAction.Response> {
public class PostDataActionResponseTests extends AbstractWireSerializingTestCase<PostDataAction.Response> {
@Override
protected PostDataAction.Response createTestInstance() {
@ -18,7 +19,7 @@ public class PostDataActionResponseTests extends AbstractStreamableTestCase<Post
}
@Override
protected PostDataAction.Response createBlankInstance() {
return new PostDataAction.Response("foo") ;
protected Writeable.Reader<PostDataAction.Response> instanceReader() {
return PostDataAction.Response::new;
}
}

View File

@ -5,11 +5,12 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Response;
import org.joda.time.DateTime;
public class PostDataFlushResponseTests extends AbstractStreamableTestCase<Response> {
public class PostDataFlushResponseTests extends AbstractWireSerializingTestCase<Response> {
@Override
protected Response createTestInstance() {
@ -17,7 +18,7 @@ public class PostDataFlushResponseTests extends AbstractStreamableTestCase<Respo
}
@Override
protected Response createBlankInstance() {
return new Response();
protected Writeable.Reader<Response> instanceReader() {
return Response::new;
}
}
}

View File

@ -18,7 +18,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
@ -41,7 +40,6 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -69,7 +67,7 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
PersistentTasksService persistentTasksService) {
// We fork in innerTaskOperation(...), so we can use ThreadPool.Names.SAME here:
super(CloseJobAction.NAME, clusterService, transportService, actionFilters,
CloseJobAction.Request::new, CloseJobAction.Response::new, ThreadPool.Names.SAME);
CloseJobAction.Request::new, CloseJobAction.Response::new, CloseJobAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
this.client = client;
this.clusterService = clusterService;
@ -298,11 +296,6 @@ public class TransportCloseJobAction extends TransportTasksAction<TransportOpenJ
return new CloseJobAction.Response(tasks.stream().allMatch(CloseJobAction.Response::isClosed));
}
@Override
protected CloseJobAction.Response readTaskResponse(StreamInput in) throws IOException {
return new CloseJobAction.Response(in);
}
private void forceCloseJob(ClusterState currentState, CloseJobAction.Request request, List<String> jobIdsToForceClose,
ActionListener<CloseJobAction.Response> listener) {
PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

View File

@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
@ -17,8 +16,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import java.io.IOException;
public class TransportFlushJobAction extends TransportJobTaskAction<FlushJobAction.Request, FlushJobAction.Response> {
@Inject
@ -29,13 +26,6 @@ public class TransportFlushJobAction extends TransportJobTaskAction<FlushJobActi
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
protected FlushJobAction.Response readTaskResponse(StreamInput in) throws IOException {
FlushJobAction.Response response = new FlushJobAction.Response();
response.readFrom(in);
return response;
}
@Override
protected void taskOperation(FlushJobAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<FlushJobAction.Response> listener) {

View File

@ -12,7 +12,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -27,7 +26,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Consumer;
@ -50,13 +48,6 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
protected ForecastJobAction.Response readTaskResponse(StreamInput in) throws IOException {
ForecastJobAction.Response response = new ForecastJobAction.Response();
response.readFrom(in);
return response;
}
@Override
protected void taskOperation(ForecastJobAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<ForecastJobAction.Response> listener) {

View File

@ -15,7 +15,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -25,6 +24,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response.JobStats;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
@ -34,7 +34,6 @@ import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@ -47,7 +46,7 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
public class TransportGetJobsStatsAction extends TransportTasksAction<TransportOpenJobAction.JobTask, GetJobsStatsAction.Request,
GetJobsStatsAction.Response, QueryPage<GetJobsStatsAction.Response.JobStats>> {
GetJobsStatsAction.Response, QueryPage<JobStats>> {
private final ClusterService clusterService;
private final AutodetectProcessManager processManager;
@ -57,7 +56,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
public TransportGetJobsStatsAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService,
AutodetectProcessManager processManager, JobResultsProvider jobResultsProvider) {
super(GetJobsStatsAction.NAME, clusterService, transportService, actionFilters, GetJobsStatsAction.Request::new,
GetJobsStatsAction.Response::new, ThreadPool.Names.MANAGEMENT);
GetJobsStatsAction.Response::new, in -> new QueryPage<>(in, JobStats::new), ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.processManager = processManager;
this.jobResultsProvider = jobResultsProvider;
@ -75,25 +74,20 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
@Override
protected GetJobsStatsAction.Response newResponse(GetJobsStatsAction.Request request,
List<QueryPage<GetJobsStatsAction.Response.JobStats>> tasks,
List<QueryPage<JobStats>> tasks,
List<TaskOperationFailure> taskOperationFailures,
List<FailedNodeException> failedNodeExceptions) {
List<GetJobsStatsAction.Response.JobStats> stats = new ArrayList<>();
for (QueryPage<GetJobsStatsAction.Response.JobStats> task : tasks) {
List<JobStats> stats = new ArrayList<>();
for (QueryPage<JobStats> task : tasks) {
stats.addAll(task.results());
}
return new GetJobsStatsAction.Response(taskOperationFailures, failedNodeExceptions, new QueryPage<>(stats, stats.size(),
Job.RESULTS_FIELD));
}
@Override
protected QueryPage<GetJobsStatsAction.Response.JobStats> readTaskResponse(StreamInput in) throws IOException {
return new QueryPage<>(in, GetJobsStatsAction.Response.JobStats::new);
}
@Override
protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<QueryPage<GetJobsStatsAction.Response.JobStats>> listener) {
ActionListener<QueryPage<JobStats>> listener) {
String jobId = task.getJobId();
logger.debug("Get stats for job [{}]", jobId);
ClusterState state = clusterService.state();
@ -106,7 +100,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
String assignmentExplanation = pTask.getAssignment().getExplanation();
TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task));
gatherForecastStats(jobId, forecastStats -> {
GetJobsStatsAction.Response.JobStats jobStats = new GetJobsStatsAction.Response.JobStats(jobId, stats.get().v1(),
JobStats jobStats = new JobStats(jobId, stats.get().v1(),
stats.get().v2(), forecastStats, jobState, node, assignmentExplanation, openTime);
listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD));
}, listener::onFailure);
@ -128,7 +122,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
}
AtomicInteger counter = new AtomicInteger(jobIds.size());
AtomicArray<GetJobsStatsAction.Response.JobStats> jobStats = new AtomicArray<>(jobIds.size());
AtomicArray<JobStats> jobStats = new AtomicArray<>(jobIds.size());
PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
for (int i = 0; i < jobIds.size(); i++) {
int slot = i;
@ -141,10 +135,10 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
if (pTask != null) {
assignmentExplanation = pTask.getAssignment().getExplanation();
}
jobStats.set(slot, new GetJobsStatsAction.Response.JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState,
jobStats.set(slot, new JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState,
null, assignmentExplanation, null));
if (counter.decrementAndGet() == 0) {
List<GetJobsStatsAction.Response.JobStats> results = response.getResponse().results();
List<JobStats> results = response.getResponse().results();
results.addAll(jobStats.asList());
listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(),
new QueryPage<>(results, results.size(), Job.RESULTS_FIELD)));
@ -177,8 +171,8 @@ public class TransportGetJobsStatsAction extends TransportTasksAction<TransportO
static List<String> determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetadata,
List<String> requestedJobIds,
List<GetJobsStatsAction.Response.JobStats> stats) {
Set<String> excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet());
List<JobStats> stats) {
Set<String> excludeJobIds = stats.stream().map(JobStats::getJobId).collect(Collectors.toSet());
return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) &&
!mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList());
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
@ -22,7 +21,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import java.util.List;
public class TransportIsolateDatafeedAction extends TransportTasksAction<TransportStartDatafeedAction.DatafeedTask,
@ -31,7 +29,7 @@ public class TransportIsolateDatafeedAction extends TransportTasksAction<Transpo
@Inject
public TransportIsolateDatafeedAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
super(IsolateDatafeedAction.NAME, clusterService, transportService, actionFilters, IsolateDatafeedAction.Request::new,
IsolateDatafeedAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
IsolateDatafeedAction.Response::new, IsolateDatafeedAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
}
@Override
@ -42,7 +40,7 @@ public class TransportIsolateDatafeedAction extends TransportTasksAction<Transpo
if (datafeedTask == null || datafeedTask.getExecutorNode() == null) {
// No running datafeed task to isolate
listener.onResponse(new IsolateDatafeedAction.Response());
listener.onResponse(new IsolateDatafeedAction.Response(false));
return;
}
@ -64,7 +62,7 @@ public class TransportIsolateDatafeedAction extends TransportTasksAction<Transpo
throw org.elasticsearch.ExceptionsHelper
.convertToElastic(failedNodeExceptions.get(0));
} else {
return new IsolateDatafeedAction.Response();
return new IsolateDatafeedAction.Response(false);
}
}
@ -72,11 +70,7 @@ public class TransportIsolateDatafeedAction extends TransportTasksAction<Transpo
protected void taskOperation(IsolateDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTask,
ActionListener<IsolateDatafeedAction.Response> listener) {
datafeedTask.isolate();
listener.onResponse(new IsolateDatafeedAction.Response());
listener.onResponse(new IsolateDatafeedAction.Response(false));
}
@Override
protected IsolateDatafeedAction.Response readTaskResponse(StreamInput in) throws IOException {
return new IsolateDatafeedAction.Response(in);
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import java.util.List;
import java.util.function.Supplier;
/**
* Base class that redirects a request to a node where the job task is running.
@ -39,10 +38,10 @@ public abstract class TransportJobTaskAction<Request extends JobTaskRequest<Requ
TransportJobTaskAction(String actionName, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
Writeable.Reader<Request> requestSupplier,
Supplier<Response> responseSupplier, String nodeExecutor, AutodetectProcessManager processManager) {
Writeable.Reader<Request> requestReader, Writeable.Reader<Response> responseReader,
String nodeExecutor, AutodetectProcessManager processManager) {
super(actionName, clusterService, transportService, actionFilters,
requestSupplier, responseSupplier, nodeExecutor);
requestReader, responseReader, responseReader, nodeExecutor);
this.processManager = processManager;
}

View File

@ -11,7 +11,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
@ -23,8 +22,6 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.io.IOException;
public class TransportKillProcessAction extends TransportJobTaskAction<KillProcessAction.Request, KillProcessAction.Response> {
private final Auditor auditor;
@ -72,9 +69,4 @@ public class TransportKillProcessAction extends TransportJobTaskAction<KillProce
super.doExecute(task, request, listener);
}
@Override
protected KillProcessAction.Response readTaskResponse(StreamInput in) throws IOException {
return new KillProcessAction.Response(in);
}
}

View File

@ -13,7 +13,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -23,8 +22,6 @@ import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import java.io.IOException;
public class TransportPersistJobAction extends TransportJobTaskAction<PersistJobAction.Request, PersistJobAction.Response> {
@Inject
@ -35,13 +32,6 @@ public class TransportPersistJobAction extends TransportJobTaskAction<PersistJob
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
protected PersistJobAction.Response readTaskResponse(StreamInput in) throws IOException {
PersistJobAction.Response response = new PersistJobAction.Response();
response.readFrom(in);
return response;
}
@Override
protected void taskOperation(PersistJobAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<PersistJobAction.Response> listener) {

View File

@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -18,7 +17,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import java.io.IOException;
import java.util.Optional;
public class TransportPostDataAction extends TransportJobTaskAction<PostDataAction.Request, PostDataAction.Response> {
@ -34,13 +32,6 @@ public class TransportPostDataAction extends TransportJobTaskAction<PostDataActi
this.analysisRegistry = analysisRegistry;
}
@Override
protected PostDataAction.Response readTaskResponse(StreamInput in) throws IOException {
PostDataAction.Response response = new PostDataAction.Response();
response.readFrom(in);
return response;
}
@Override
protected void taskOperation(PostDataAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<PostDataAction.Response> listener) {

View File

@ -17,7 +17,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
@ -35,7 +34,6 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -53,8 +51,8 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
@Inject
public TransportStopDatafeedAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
ClusterService clusterService, PersistentTasksService persistentTasksService) {
super(StopDatafeedAction.NAME, clusterService, transportService, actionFilters,
StopDatafeedAction.Request::new, StopDatafeedAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
super(StopDatafeedAction.NAME, clusterService, transportService, actionFilters, StopDatafeedAction.Request::new,
StopDatafeedAction.Response::new, StopDatafeedAction.Response::new, MachineLearning.UTILITY_THREAD_POOL_NAME);
this.threadPool = threadPool;
this.persistentTasksService = persistentTasksService;
}
@ -316,9 +314,4 @@ public class TransportStopDatafeedAction extends TransportTasksAction<TransportS
return new StopDatafeedAction.Response(tasks.stream().allMatch(StopDatafeedAction.Response::isStopped));
}
@Override
protected StopDatafeedAction.Response readTaskResponse(StreamInput in) throws IOException {
return new StopDatafeedAction.Response(in);
}
}

View File

@ -9,15 +9,12 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
import java.io.IOException;
public class TransportUpdateProcessAction extends TransportJobTaskAction<UpdateProcessAction.Request, UpdateProcessAction.Response> {
@Inject
@ -28,13 +25,6 @@ public class TransportUpdateProcessAction extends TransportJobTaskAction<UpdateP
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@Override
protected UpdateProcessAction.Response readTaskResponse(StreamInput in) throws IOException {
UpdateProcessAction.Response response = new UpdateProcessAction.Response();
response.readFrom(in);
return response;
}
@Override
protected void taskOperation(UpdateProcessAction.Request request, TransportOpenJobAction.JobTask task,
ActionListener<UpdateProcessAction.Response> listener) {

View File

@ -87,7 +87,7 @@ public class DatafeedJobTests extends ESTestCase {
dataDescription.setFormat(DataDescription.DataFormat.XCONTENT);
postDataFuture = mock(ActionFuture.class);
flushJobFuture = mock(ActionFuture.class);
flushJobResponse = new FlushJobAction.Response();
flushJobResponse = new FlushJobAction.Response(true, new Date());
delayedDataDetector = mock(DelayedDataDetector.class);
when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS);
currentTime = 0;

View File

@ -16,7 +16,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
@ -27,7 +26,6 @@ import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
import org.elasticsearch.xpack.rollup.job.RollupJobTask;
import java.io.IOException;
import java.util.List;
public class TransportDeleteRollupJobAction extends TransportTasksAction<RollupJobTask, DeleteRollupJobAction.Request,
@ -35,8 +33,8 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction<RollupJ
@Inject
public TransportDeleteRollupJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
super(DeleteRollupJobAction.NAME, clusterService, transportService, actionFilters,
DeleteRollupJobAction.Request::new, DeleteRollupJobAction.Response::new, ThreadPool.Names.SAME);
super(DeleteRollupJobAction.NAME, clusterService, transportService, actionFilters, DeleteRollupJobAction.Request::new,
DeleteRollupJobAction.Response::new, DeleteRollupJobAction.Response::new, ThreadPool.Names.SAME);
}
@Override
@ -94,10 +92,4 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction<RollupJ
return new DeleteRollupJobAction.Response(cancelled, taskOperationFailures, failedNodeExceptions);
}
@Override
protected DeleteRollupJobAction.Response readTaskResponse(StreamInput in) throws IOException {
DeleteRollupJobAction.Response response = new DeleteRollupJobAction.Response();
response.readFrom(in);
return response;
}
}

View File

@ -16,7 +16,6 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.tasks.Task;
@ -27,7 +26,6 @@ import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
import org.elasticsearch.xpack.rollup.job.RollupJobTask;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -38,8 +36,8 @@ public class TransportGetRollupJobAction extends TransportTasksAction<RollupJobT
@Inject
public TransportGetRollupJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
super(GetRollupJobsAction.NAME, clusterService, transportService, actionFilters,
GetRollupJobsAction.Request::new, GetRollupJobsAction.Response::new, ThreadPool.Names.SAME);
super(GetRollupJobsAction.NAME, clusterService, transportService, actionFilters, GetRollupJobsAction.Request::new,
GetRollupJobsAction.Response::new, GetRollupJobsAction.Response::new, ThreadPool.Names.SAME);
}
@Override
@ -118,8 +116,4 @@ public class TransportGetRollupJobAction extends TransportTasksAction<RollupJobT
return new GetRollupJobsAction.Response(jobs, taskOperationFailures, failedNodeExceptions);
}
@Override
protected GetRollupJobsAction.Response readTaskResponse(StreamInput in) throws IOException {
return new GetRollupJobsAction.Response(in);
}
}

View File

@ -13,7 +13,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task;
@ -23,7 +22,6 @@ import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction;
import org.elasticsearch.xpack.rollup.job.RollupJobTask;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
@ -36,7 +34,7 @@ public class TransportStartRollupAction extends TransportTasksAction<RollupJobTa
public TransportStartRollupAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService,
XPackLicenseState licenseState) {
super(StartRollupJobAction.NAME, clusterService, transportService, actionFilters, StartRollupJobAction.Request::new,
StartRollupJobAction.Response::new, ThreadPool.Names.SAME);
StartRollupJobAction.Response::new, StartRollupJobAction.Response::new, ThreadPool.Names.SAME);
this.licenseState = licenseState;
}
@ -95,9 +93,4 @@ public class TransportStartRollupAction extends TransportTasksAction<RollupJobTa
return new StartRollupJobAction.Response(allStarted);
}
@Override
protected StartRollupJobAction.Response readTaskResponse(StreamInput in) throws IOException {
return new StartRollupJobAction.Response(in);
}
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -24,7 +23,6 @@ import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
import org.elasticsearch.xpack.rollup.job.RollupJobTask;
import java.io.IOException;
import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
@ -37,8 +35,8 @@ public class TransportStopRollupAction extends TransportTasksAction<RollupJobTas
@Inject
public TransportStopRollupAction(TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, ThreadPool threadPool) {
super(StopRollupJobAction.NAME, clusterService, transportService, actionFilters,
StopRollupJobAction.Request::new, StopRollupJobAction.Response::new, ThreadPool.Names.SAME);
super(StopRollupJobAction.NAME, clusterService, transportService, actionFilters, StopRollupJobAction.Request::new,
StopRollupJobAction.Response::new, StopRollupJobAction.Response::new, ThreadPool.Names.SAME);
this.threadPool = threadPool;
}
@ -146,9 +144,4 @@ public class TransportStopRollupAction extends TransportTasksAction<RollupJobTas
return new StopRollupJobAction.Response(allStopped);
}
@Override
protected StopRollupJobAction.Response readTaskResponse(StreamInput in) throws IOException {
return new StopRollupJobAction.Response(in);
}
}