diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleAction.java index 27c1b851439..3cb2c60c623 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.reindex; import org.elasticsearch.action.Action; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.common.io.stream.Writeable; public class RethrottleAction extends Action { public static final RethrottleAction INSTANCE = new RethrottleAction(); @@ -32,6 +33,11 @@ public class RethrottleAction extends Action { @Override public ListTasksResponse newResponse() { - return new ListTasksResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return ListTasksResponse::new; } } diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java index 99469df3db6..ee8cf863a52 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java @@ -29,13 +29,11 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.List; public class TransportRethrottleAction extends TransportTasksAction { @@ -45,7 +43,7 @@ public class TransportRethrottleAction extends TransportTasksAction tasks, List taskOperationFailures, List failedNodeExceptions) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksAction.java index 0ea6162e59c..23cb69cf807 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks.cancel; import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable; /** * Action for cancelling running tasks @@ -35,6 +36,11 @@ public class CancelTasksAction extends Action { @Override public CancelTasksResponse newResponse() { - return new CancelTasksResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return CancelTasksResponse::new; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java index cfab4f596cd..d62a6bb2c21 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -40,7 +41,8 @@ public class CancelTasksResponse extends ListTasksResponse { private static final ConstructingObjectParser PARSER = setupParser("cancel_tasks_response", CancelTasksResponse::new); - public CancelTasksResponse() { + public CancelTasksResponse(StreamInput in) throws IOException { + super(in); } public CancelTasksResponse(List tasks, List taskFailures, List diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java index 8673000523a..ba009644dcd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/TransportCancelTasksAction.java @@ -64,7 +64,7 @@ public class TransportCancelTasksAction extends TransportTasksAction operation) { if (request.getTaskId().isSet()) { // we are only checking one task, we can optimize it diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksAction.java index b02d019859f..abba798c83c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.node.tasks.list; import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable; /** * Action for retrieving a list of currently running tasks @@ -35,6 +36,11 @@ public class ListTasksAction extends Action { @Override public ListTasksResponse newResponse() { - return new ListTasksResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return ListTasksResponse::new; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java index e6f1c52aae8..166a59f2359 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/ListTasksResponse.java @@ -52,22 +52,28 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject { private static final String TASKS = "tasks"; - private List tasks; + private final List tasks; private Map> perNodeTasks; private List groups; - public ListTasksResponse() { - this(null, null, null); - } - public ListTasksResponse(List tasks, List taskFailures, List nodeFailures) { super(taskFailures, nodeFailures); this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks)); } + public ListTasksResponse(StreamInput in) throws IOException { + super(in); + tasks = Collections.unmodifiableList(in.readList(TaskInfo::new)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(tasks); + } protected static ConstructingObjectParser setupParser(String name, TriFunction< @@ -96,18 +102,6 @@ public class ListTasksResponse extends BaseTasksResponse implements ToXContentOb private static final ConstructingObjectParser PARSER = setupParser("list_tasks_response", ListTasksResponse::new); - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - tasks = Collections.unmodifiableList(in.readList(TaskInfo::new)); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeList(tasks); - } - /** * Returns the list of tasks by node */ diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java index 9932c154b25..b35688dcee5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/list/TransportListTasksAction.java @@ -26,14 +26,12 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.List; import java.util.function.Consumer; @@ -52,7 +50,7 @@ public class TransportListTasksAction extends TransportTasksAction listener) { listener.onResponse(task.taskInfo(clusterService.localNode().getId(), request.getDetailed())); diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java index 090aaf628ac..d859959f2e6 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/BaseTasksResponse.java @@ -55,34 +55,8 @@ public class BaseTasksResponse extends ActionResponse { this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures)); } - /** - * The list of task failures exception. - */ - public List getTaskFailures() { - return taskFailures; - } - - /** - * The list of node failures exception. - */ - public List getNodeFailures() { - return nodeFailures; - } - - /** - * Rethrow task failures if there are any. - */ - public void rethrowFailures(String operationName) { - rethrowAndSuppress(Stream.concat( - getNodeFailures().stream(), - getTaskFailures().stream().map(f -> new ElasticsearchException( - "{} of [{}] failed", f.getCause(), operationName, new TaskId(f.getNodeId(), f.getTaskId())))) - .collect(toList())); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public BaseTasksResponse(StreamInput in) throws IOException { + super(in); int size = in.readVInt(); List taskFailures = new ArrayList<>(size); for (int i = 0; i < size; i++) { @@ -110,6 +84,31 @@ public class BaseTasksResponse extends ActionResponse { } } + /** + * The list of task failures exception. + */ + public List getTaskFailures() { + return taskFailures; + } + + /** + * The list of node failures exception. + */ + public List getNodeFailures() { + return nodeFailures; + } + + /** + * Rethrow task failures if there are any. + */ + public void rethrowFailures(String operationName) { + rethrowAndSuppress(Stream.concat( + getNodeFailures().stream(), + getTaskFailures().stream().map(f -> new ElasticsearchException( + "{} of [{}] failed", f.getCause(), operationName, new TaskId(f.getNodeId(), f.getTaskId())))) + .collect(toList())); + } + protected void toXContentCommon(XContentBuilder builder, ToXContent.Params params) throws IOException { if (getTaskFailures() != null && getTaskFailures().size() > 0) { builder.startArray(TASK_FAILURES); diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index e23ff4baf01..4a2a9eb08e5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -55,7 +55,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.Consumer; -import java.util.function.Supplier; import static java.util.Collections.emptyList; @@ -71,20 +70,23 @@ public abstract class TransportTasksAction< protected final ClusterService clusterService; protected final TransportService transportService; - protected final Writeable.Reader requestSupplier; - protected final Supplier responseSupplier; + protected final Writeable.Reader requestReader; + protected final Writeable.Reader responsesReader; + protected final Writeable.Reader responseReader; protected final String transportNodeAction; protected TransportTasksAction(String actionName, ClusterService clusterService, TransportService transportService, - ActionFilters actionFilters, Writeable.Reader requestSupplier, - Supplier responseSupplier, String nodeExecutor) { - super(actionName, transportService, actionFilters, requestSupplier); + ActionFilters actionFilters, Writeable.Reader requestReader, + Writeable.Reader responsesReader, Writeable.Reader responseReader, + String nodeExecutor) { + super(actionName, transportService, actionFilters, requestReader); this.clusterService = clusterService; this.transportService = transportService; this.transportNodeAction = actionName + "[n]"; - this.requestSupplier = requestSupplier; - this.responseSupplier = responseSupplier; + this.requestReader = requestReader; + this.responsesReader = responsesReader; + this.responseReader = responseReader; transportService.registerRequestHandler(transportNodeAction, nodeExecutor, NodeTaskRequest::new, new NodeTransportHandler()); } @@ -205,8 +207,6 @@ public abstract class TransportTasksAction< return newResponse(request, tasks, taskOperationFailures, failedNodeExceptions); } - protected abstract TaskResponse readTaskResponse(StreamInput in) throws IOException; - /** * Perform the required operation on the task. It is OK start an asynchronous operation or to throw an exception but not both. */ @@ -364,7 +364,7 @@ public abstract class TransportTasksAction< protected NodeTaskRequest(StreamInput in) throws IOException { super(in); - this.tasksRequest = requestSupplier.read(in); + this.tasksRequest = requestReader.read(in); } @Override @@ -411,7 +411,7 @@ public abstract class TransportTasksAction< int resultsSize = in.readVInt(); results = new ArrayList<>(resultsSize); for (; resultsSize > 0; resultsSize--) { - final TaskResponse result = in.readBoolean() ? readTaskResponse(in) : null; + final TaskResponse result = in.readBoolean() ? responseReader.read(in) : null; results.add(result); } if (in.readBoolean()) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 1baef0a5af4..18e5702ed6a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -413,19 +413,14 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi private List tasks; - public UnblockTestTasksResponse() { - super(null, null); - } - public UnblockTestTasksResponse(List tasks, List taskFailures, List nodeFailures) { super(taskFailures, nodeFailures); this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks)); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public UnblockTestTasksResponse(StreamInput in) throws IOException { + super(in); int taskCount = in.readVInt(); List builder = new ArrayList<>(); for (int i = 0; i < taskCount; i++) { @@ -453,7 +448,7 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi @Inject public TransportUnblockTestTasksAction(ClusterService clusterService, TransportService transportService) { super(UnblockTestTasksAction.NAME, clusterService, transportService, new ActionFilters(new HashSet<>()), - UnblockTestTasksRequest::new, UnblockTestTasksResponse::new, ThreadPool.Names.MANAGEMENT); + UnblockTestTasksRequest::new, UnblockTestTasksResponse::new, UnblockTestTaskResponse::new, ThreadPool.Names.MANAGEMENT); } @Override @@ -463,11 +458,6 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi return new UnblockTestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions); } - @Override - protected UnblockTestTaskResponse readTaskResponse(StreamInput in) throws IOException { - return new UnblockTestTaskResponse(in); - } - @Override protected void taskOperation(UnblockTestTasksRequest request, Task task, ActionListener listener) { ((TestTask) task).unblock(); @@ -487,7 +477,12 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi @Override public UnblockTestTasksResponse newResponse() { - return new UnblockTestTasksResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return UnblockTestTasksResponse::new; } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index 16e0caa526d..cbd0adad580 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -202,19 +202,14 @@ public class TransportTasksActionTests extends TaskManagerTestCase { private List tasks; - TestTasksResponse() { - super(null, null); - } - TestTasksResponse(List tasks, List taskFailures, List nodeFailures) { super(taskFailures, nodeFailures); this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks)); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + TestTasksResponse(StreamInput in) throws IOException { + super(in); int taskCount = in.readVInt(); List builder = new ArrayList<>(); for (int i = 0; i < taskCount; i++) { @@ -241,8 +236,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { protected TestTasksAction(String actionName, ClusterService clusterService, TransportService transportService) { super(actionName, clusterService, transportService, new ActionFilters(new HashSet<>()), - TestTasksRequest::new, TestTasksResponse::new, - ThreadPool.Names.MANAGEMENT); + TestTasksRequest::new, TestTasksResponse::new, TestTaskResponse::new, ThreadPool.Names.MANAGEMENT); } @Override @@ -251,11 +245,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase { return new TestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions); } - @Override - protected TestTaskResponse readTaskResponse(StreamInput in) throws IOException { - return new TestTaskResponse(in); - } - } private ActionFuture startBlockingTestNodesAction(CountDownLatch checkLatch) throws InterruptedException { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index b5d72e3aa43..683d1bc0089 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -295,7 +295,8 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { // That should trigger cancellation request assertThat(capturedTaskId.get(), equalTo(localId)); // Notify successful cancellation - capturedListener.get().onResponse(new CancelTasksResponse()); + capturedListener.get().onResponse( + new CancelTasksResponse(Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); // finish or fail task if (randomBoolean()) { diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index bfe5cfa3a23..352e8108c38 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -399,7 +399,12 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P @Override public TestTasksResponse newResponse() { - return new TestTasksResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return TestTasksResponse::new; } } @@ -484,19 +489,14 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P private List tasks; - public TestTasksResponse() { - super(null, null); - } - public TestTasksResponse(List tasks, List taskFailures, List nodeFailures) { super(taskFailures, nodeFailures); this.tasks = tasks == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(tasks)); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public TestTasksResponse(StreamInput in) throws IOException { + super(in); tasks = in.readList(TestTaskResponse::new); } @@ -517,7 +517,7 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P @Inject public TransportTestTaskAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) { super(TestTaskAction.NAME, clusterService, transportService, actionFilters, - TestTasksRequest::new, TestTasksResponse::new, ThreadPool.Names.MANAGEMENT); + TestTasksRequest::new, TestTasksResponse::new, TestTaskResponse::new, ThreadPool.Names.MANAGEMENT); } @Override @@ -527,11 +527,6 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P return new TestTasksResponse(tasks, taskOperationFailures, failedNodeExceptions); } - @Override - protected TestTaskResponse readTaskResponse(StreamInput in) throws IOException { - return new TestTaskResponse(in); - } - @Override protected void taskOperation(TestTasksRequest request, TestTask task, ActionListener listener) { task.setOperation(request.operation); diff --git a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java index 23ce53b6c35..4a48466f00e 100644 --- a/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/elasticsearch/tasks/ListTasksResponseTests.java @@ -46,7 +46,7 @@ public class ListTasksResponseTests extends AbstractXContentTestCase operation) { final ClusterState state = clusterService.state(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java index c1734049558..72ba0cd7067 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java @@ -6,7 +6,8 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; @@ -14,11 +15,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -public class StatsResponsesTests extends AbstractStreamableTestCase { +public class StatsResponsesTests extends AbstractWireSerializingTestCase { @Override - protected FollowStatsAction.StatsResponses createBlankInstance() { - return new FollowStatsAction.StatsResponses(); + protected Writeable.Reader instanceReader() { + return FollowStatsAction.StatsResponses::new; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java index 1ce08e5c542..5b2033443dc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java @@ -72,8 +72,7 @@ public class CcrStatsAction extends Action { public Response(StreamInput in) throws IOException { super(in); autoFollowStats = new AutoFollowStats(in); - followStats = new FollowStatsAction.StatsResponses(); - followStats.readFrom(in); + followStats = new FollowStatsAction.StatsResponses(in); } public AutoFollowStats getAutoFollowStats() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java index d56572edca5..72353f405cf 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -42,21 +41,22 @@ public class FollowStatsAction extends Action @Override public StatsResponses newResponse() { - return new StatsResponses(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return StatsResponses::new; } public static class StatsResponses extends BaseTasksResponse implements ToXContentObject { - private List statsResponse; + private final List statsResponse; public List getStatsResponses() { return statsResponse; } - public StatsResponses() { - this(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); - } - public StatsResponses( final List taskFailures, final List nodeFailures, @@ -65,6 +65,17 @@ public class FollowStatsAction extends Action this.statsResponse = statsResponse; } + public StatsResponses(StreamInput in) throws IOException { + super(in); + statsResponse = in.readList(StatsResponse::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(statsResponse); + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { // sort by index name, then shard ID @@ -99,18 +110,6 @@ public class FollowStatsAction extends Action return builder; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - statsResponse = in.readList(StatsResponse::new); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeList(statsResponse); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java index fe8199cafbe..9bc413b2e22 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CloseJobAction.java @@ -38,7 +38,12 @@ public class CloseJobAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends BaseTasksRequest implements ToXContentObject { @@ -207,30 +212,15 @@ public class CloseJobAction extends Action { public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { - private boolean closed; - - public Response() { - super(null, null); - - } - - public Response(StreamInput in) throws IOException { - super(null, null); - readFrom(in); - } + private final boolean closed; public Response(boolean closed) { super(null, null); this.closed = closed; } - public boolean isClosed() { - return closed; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); closed = in.readBoolean(); } @@ -240,6 +230,10 @@ public class CloseJobAction extends Action { out.writeBoolean(closed); } + public boolean isClosed() { + return closed; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java index cf01fb0a157..b73d2b502a4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/FlushJobAction.java @@ -36,7 +36,12 @@ public class FlushJobAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends JobTaskRequest implements ToXContentObject { @@ -194,27 +199,14 @@ public class FlushJobAction extends Action { private boolean flushed; private Date lastFinalizedBucketEnd; - public Response() { - super(null, null); - } - public Response(boolean flushed, @Nullable Date lastFinalizedBucketEnd) { super(null, null); this.flushed = flushed; this.lastFinalizedBucketEnd = lastFinalizedBucketEnd; } - public boolean isFlushed() { - return flushed; - } - - public Date getLastFinalizedBucketEnd() { - return lastFinalizedBucketEnd; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); flushed = in.readBoolean(); lastFinalizedBucketEnd = new Date(in.readVLong()); } @@ -226,6 +218,14 @@ public class FlushJobAction extends Action { out.writeVLong(lastFinalizedBucketEnd.getTime()); } + public boolean isFlushed() { + return flushed; + } + + public Date getLastFinalizedBucketEnd() { + return lastFinalizedBucketEnd; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ForecastJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ForecastJobAction.java index 08bd405ead9..0ea66ad0937 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ForecastJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ForecastJobAction.java @@ -35,7 +35,12 @@ public class ForecastJobAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends JobTaskRequest implements ToXContentObject { @@ -167,27 +172,14 @@ public class ForecastJobAction extends Action { private boolean acknowledged; private String forecastId; - public Response() { - super(null, null); - } - public Response(boolean acknowledged, String forecastId) { super(null, null); this.acknowledged = acknowledged; this.forecastId = forecastId; } - public boolean isAcknowledged() { - return acknowledged; - } - - public String getForecastId() { - return forecastId; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); acknowledged = in.readBoolean(); forecastId = in.readString(); } @@ -199,6 +191,14 @@ public class ForecastJobAction extends Action { out.writeString(forecastId); } + public boolean isAcknowledged() { + return acknowledged; + } + + public String getForecastId() { + return forecastId; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java index 0c3ff8a5061..c5dba63fcc6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java @@ -56,7 +56,12 @@ public class GetJobsStatsAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends BaseTasksRequest { @@ -317,17 +322,8 @@ public class GetJobsStatsAction extends Action { this.jobsStats = jobsStats; } - public Response() { - super(Collections.emptyList(), Collections.emptyList()); - } - - public QueryPage getResponse() { - return jobsStats; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); jobsStats = new QueryPage<>(in, JobStats::new); } @@ -337,6 +333,10 @@ public class GetJobsStatsAction extends Action { jobsStats.writeTo(out); } + public QueryPage getResponse() { + return jobsStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java index 62832001932..8f681472ee8 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/IsolateDatafeedAction.java @@ -46,7 +46,12 @@ public class IsolateDatafeedAction extends Action getResponseReader() { + return Response::new; } public static class Request extends BaseTasksRequest implements ToXContentObject { @@ -132,7 +137,7 @@ public class IsolateDatafeedAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } static class RequestBuilder extends ActionRequestBuilder { @@ -54,15 +59,11 @@ public class KillProcessAction extends Action { public static class Response extends BaseTasksResponse implements Writeable { - private boolean killed; - - public Response() { - super(null, null); - } + private final boolean killed; public Response(StreamInput in) throws IOException { - super(null, null); - readFrom(in); + super(in); + killed = in.readBoolean(); } public Response(boolean killed) { @@ -74,12 +75,6 @@ public class KillProcessAction extends Action { return killed; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - killed = in.readBoolean(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java index 81f7ebf525b..3de585efc46 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PersistJobAction.java @@ -27,7 +27,12 @@ public class PersistJobAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends JobTaskRequest { @@ -80,24 +85,15 @@ public class PersistJobAction extends Action { public static class Response extends BaseTasksResponse implements Writeable { - boolean persisted; - - public Response() { - super(null, null); - } + private final boolean persisted; public Response(boolean persisted) { super(null, null); this.persisted = persisted; } - public boolean isPersisted() { - return persisted; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); persisted = in.readBoolean(); } @@ -107,6 +103,10 @@ public class PersistJobAction extends Action { out.writeBoolean(persisted); } + public boolean isPersisted() { + return persisted; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PostDataAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PostDataAction.java index 92e23cf1e8a..0393f2c4639 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PostDataAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/PostDataAction.java @@ -35,7 +35,12 @@ public class PostDataAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } static class RequestBuilder extends ActionRequestBuilder { @@ -47,29 +52,20 @@ public class PostDataAction extends Action { public static class Response extends BaseTasksResponse implements StatusToXContentObject, Writeable { - private DataCounts dataCounts; + private final DataCounts dataCounts; public Response(String jobId) { super(null, null); dataCounts = new DataCounts(jobId); } - public Response() { - super(null, null); - } - public Response(DataCounts counts) { super(null, null); this.dataCounts = counts; } - public DataCounts getDataCounts() { - return dataCounts; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); dataCounts = new DataCounts(in); } @@ -79,6 +75,10 @@ public class PostDataAction extends Action { dataCounts.writeTo(out); } + public DataCounts getDataCounts() { + return dataCounts; + } + @Override public RestStatus status() { return RestStatus.ACCEPTED; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java index d49fe8ed0fb..c914150173b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StopDatafeedAction.java @@ -41,7 +41,12 @@ public class StopDatafeedAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends BaseTasksRequest implements ToXContentObject { @@ -194,7 +199,7 @@ public class StopDatafeedAction extends Action { public static class Response extends BaseTasksResponse implements Writeable { - private boolean stopped; + private final boolean stopped; public Response(boolean stopped) { super(null, null); @@ -202,21 +207,7 @@ public class StopDatafeedAction extends Action { } public Response(StreamInput in) throws IOException { - super(null, null); - readFrom(in); - } - - public Response() { - super(null, null); - } - - public boolean isStopped() { - return stopped; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + super(in); stopped = in.readBoolean(); } @@ -225,6 +216,11 @@ public class StopDatafeedAction extends Action { super.writeTo(out); out.writeBoolean(stopped); } + + public boolean isStopped() { + return stopped; + } + } static class RequestBuilder extends ActionRequestBuilder { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessAction.java index db1f2a6e422..5091ff1f968 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateProcessAction.java @@ -35,7 +35,12 @@ public class UpdateProcessAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } static class RequestBuilder extends ActionRequestBuilder { @@ -47,16 +52,15 @@ public class UpdateProcessAction extends Action { public static class Response extends BaseTasksResponse implements StatusToXContentObject, Writeable { - private boolean isUpdated; + private final boolean isUpdated; public Response() { super(null, null); this.isUpdated = true; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); isUpdated = in.readBoolean(); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java index a429f1e5b68..92a1a07ded0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/DeleteRollupJobAction.java @@ -40,7 +40,12 @@ public class DeleteRollupJobAction extends Action getResponseReader() { + return Response::new; } public static class Request extends BaseTasksRequest implements ToXContentFragment { @@ -109,12 +114,7 @@ public class DeleteRollupJobAction extends Action taskFailures, List nodeFailures) { super(taskFailures, nodeFailures); @@ -126,18 +126,8 @@ public class DeleteRollupJobAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends BaseTasksRequest implements ToXContent { @@ -133,7 +138,7 @@ public class GetRollupJobsAction extends Action { public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { - private List jobs; + private final List jobs; public Response(List jobs) { super(Collections.emptyList(), Collections.emptyList()); @@ -145,22 +150,8 @@ public class GetRollupJobsAction extends Action { this.jobs = jobs; } - public Response() { - super(Collections.emptyList(), Collections.emptyList()); - } - public Response(StreamInput in) throws IOException { - super(Collections.emptyList(), Collections.emptyList()); - readFrom(in); - } - - public List getJobs() { - return jobs; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + super(in); jobs = in.readList(JobWrapper::new); } @@ -170,6 +161,10 @@ public class GetRollupJobsAction extends Action { out.writeList(jobs); } + public List getJobs() { + return jobs; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StartRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StartRollupJobAction.java index d1e2f75b014..ff803b13628 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StartRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StartRollupJobAction.java @@ -36,7 +36,12 @@ public class StartRollupJobAction extends Action @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends BaseTasksRequest implements ToXContent { @@ -101,30 +106,15 @@ public class StartRollupJobAction extends Action public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { - private boolean started; - - public Response() { - super(Collections.emptyList(), Collections.emptyList()); - } - - public Response(StreamInput in) throws IOException { - super(Collections.emptyList(), Collections.emptyList()); - readFrom(in); - } + private final boolean started; public Response(boolean started) { super(Collections.emptyList(), Collections.emptyList()); this.started = started; } - public boolean isStarted() { - return started; - } - - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); started = in.readBoolean(); } @@ -134,6 +124,10 @@ public class StartRollupJobAction extends Action out.writeBoolean(started); } + public boolean isStarted() { + return started; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java index 08055716dad..dadc54726b5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java @@ -43,7 +43,12 @@ public class StopRollupJobAction extends Action { @Override public Response newResponse() { - return new Response(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; } public static class Request extends BaseTasksRequest implements ToXContent { @@ -138,30 +143,15 @@ public class StopRollupJobAction extends Action { public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject { - private boolean stopped; - - public Response() { - super(Collections.emptyList(), Collections.emptyList()); - } - - public Response(StreamInput in) throws IOException { - super(Collections.emptyList(), Collections.emptyList()); - readFrom(in); - } + private final boolean stopped; public Response(boolean stopped) { super(Collections.emptyList(), Collections.emptyList()); this.stopped = stopped; } - public boolean isStopped() { - return stopped; - } - - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public Response(StreamInput in) throws IOException { + super(in); stopped = in.readBoolean(); } @@ -171,6 +161,10 @@ public class StopRollupJobAction extends Action { out.writeBoolean(stopped); } + public boolean isStopped() { + return stopped; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CloseJobActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CloseJobActionResponseTests.java index c558a5cba0c..5d9c01dad89 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CloseJobActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/CloseJobActionResponseTests.java @@ -5,10 +5,11 @@ */ package org.elasticsearch.xpack.core.ml.action; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.CloseJobAction.Response; -public class CloseJobActionResponseTests extends AbstractStreamableTestCase { +public class CloseJobActionResponseTests extends AbstractWireSerializingTestCase { @Override protected Response createTestInstance() { @@ -16,7 +17,7 @@ public class CloseJobActionResponseTests extends AbstractStreamableTestCase instanceReader() { + return Response::new; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ForecastJobActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ForecastJobActionResponseTests.java index b6c0759bceb..1d21e2d25f5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ForecastJobActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ForecastJobActionResponseTests.java @@ -5,10 +5,11 @@ */ package org.elasticsearch.xpack.core.ml.action; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.ForecastJobAction.Response; -public class ForecastJobActionResponseTests extends AbstractStreamableTestCase { +public class ForecastJobActionResponseTests extends AbstractWireSerializingTestCase { @Override protected Response createTestInstance() { @@ -16,8 +17,7 @@ public class ForecastJobActionResponseTests extends AbstractStreamableTestCase instanceReader() { + return Response::new; } - } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java index 86a5b990728..770416b6bd3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java @@ -7,9 +7,10 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -27,7 +28,7 @@ import java.util.List; import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; -public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase { +public class GetJobStatsActionResponseTests extends AbstractWireSerializingTestCase { @Override protected Response createTestInstance() { @@ -75,8 +76,7 @@ public class GetJobStatsActionResponseTests extends AbstractStreamableTestCase instanceReader() { + return Response::new; } - } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionResponseTests.java index 746e40445af..c4cbc9021a7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PersistJobActionResponseTests.java @@ -5,12 +5,13 @@ */ package org.elasticsearch.xpack.core.ml.action; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; -public class PersistJobActionResponseTests extends AbstractStreamableTestCase { +public class PersistJobActionResponseTests extends AbstractWireSerializingTestCase { @Override - protected PersistJobAction.Response createBlankInstance() { - return new PersistJobAction.Response(); + protected Writeable.Reader instanceReader() { + return PersistJobAction.Response::new; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataActionResponseTests.java index 2f1a9d2e27d..53d67098a4f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataActionResponseTests.java @@ -5,11 +5,12 @@ */ package org.elasticsearch.xpack.core.ml.action; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests; -public class PostDataActionResponseTests extends AbstractStreamableTestCase { +public class PostDataActionResponseTests extends AbstractWireSerializingTestCase { @Override protected PostDataAction.Response createTestInstance() { @@ -18,7 +19,7 @@ public class PostDataActionResponseTests extends AbstractStreamableTestCase instanceReader() { + return PostDataAction.Response::new; } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushResponseTests.java index 14f067e38ad..88826fb096a 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/PostDataFlushResponseTests.java @@ -5,11 +5,12 @@ */ package org.elasticsearch.xpack.core.ml.action; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Response; import org.joda.time.DateTime; -public class PostDataFlushResponseTests extends AbstractStreamableTestCase { +public class PostDataFlushResponseTests extends AbstractWireSerializingTestCase { @Override protected Response createTestInstance() { @@ -17,7 +18,7 @@ public class PostDataFlushResponseTests extends AbstractStreamableTestCase instanceReader() { + return Response::new; } -} \ No newline at end of file +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 95000665bcc..846ba8c7d50 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -18,7 +18,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.discovery.MasterNotDiscoveredException; @@ -41,7 +40,6 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.notifications.Auditor; -import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -69,7 +67,7 @@ public class TransportCloseJobAction extends TransportTasksAction jobIdsToForceClose, ActionListener listener) { PersistentTasksCustomMetaData tasks = currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java index 74f84d86a48..25404d17a92 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportFlushJobAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.action.FlushJobAction; @@ -17,8 +16,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; -import java.io.IOException; - public class TransportFlushJobAction extends TransportJobTaskAction { @Inject @@ -29,13 +26,6 @@ public class TransportFlushJobAction extends TransportJobTaskAction listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java index c6f560997e3..92cf8f46942 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java @@ -12,7 +12,6 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -27,7 +26,6 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; -import java.io.IOException; import java.nio.file.Path; import java.util.List; import java.util.function.Consumer; @@ -50,13 +48,6 @@ public class TransportForecastJobAction extends TransportJobTaskAction listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index 83fce9326b0..0489f46657e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -15,7 +15,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -25,6 +24,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Response.JobStats; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -34,7 +34,6 @@ import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -47,7 +46,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; public class TransportGetJobsStatsAction extends TransportTasksAction> { + GetJobsStatsAction.Response, QueryPage> { private final ClusterService clusterService; private final AutodetectProcessManager processManager; @@ -57,7 +56,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction new QueryPage<>(in, JobStats::new), ThreadPool.Names.MANAGEMENT); this.clusterService = clusterService; this.processManager = processManager; this.jobResultsProvider = jobResultsProvider; @@ -75,25 +74,20 @@ public class TransportGetJobsStatsAction extends TransportTasksAction> tasks, + List> tasks, List taskOperationFailures, List failedNodeExceptions) { - List stats = new ArrayList<>(); - for (QueryPage task : tasks) { + List stats = new ArrayList<>(); + for (QueryPage task : tasks) { stats.addAll(task.results()); } return new GetJobsStatsAction.Response(taskOperationFailures, failedNodeExceptions, new QueryPage<>(stats, stats.size(), Job.RESULTS_FIELD)); } - @Override - protected QueryPage readTaskResponse(StreamInput in) throws IOException { - return new QueryPage<>(in, GetJobsStatsAction.Response.JobStats::new); - } - @Override protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJobAction.JobTask task, - ActionListener> listener) { + ActionListener> listener) { String jobId = task.getJobId(); logger.debug("Get stats for job [{}]", jobId); ClusterState state = clusterService.state(); @@ -106,7 +100,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction { - GetJobsStatsAction.Response.JobStats jobStats = new GetJobsStatsAction.Response.JobStats(jobId, stats.get().v1(), + JobStats jobStats = new JobStats(jobId, stats.get().v1(), stats.get().v2(), forecastStats, jobState, node, assignmentExplanation, openTime); listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD)); }, listener::onFailure); @@ -128,7 +122,7 @@ public class TransportGetJobsStatsAction extends TransportTasksAction jobStats = new AtomicArray<>(jobIds.size()); + AtomicArray jobStats = new AtomicArray<>(jobIds.size()); PersistentTasksCustomMetaData tasks = clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); for (int i = 0; i < jobIds.size(); i++) { int slot = i; @@ -141,10 +135,10 @@ public class TransportGetJobsStatsAction extends TransportTasksAction results = response.getResponse().results(); + List results = response.getResponse().results(); results.addAll(jobStats.asList()); listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); @@ -177,8 +171,8 @@ public class TransportGetJobsStatsAction extends TransportTasksAction determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetadata, List requestedJobIds, - List stats) { - Set excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet()); + List stats) { + Set excludeJobIds = stats.stream().map(JobStats::getJobId).collect(Collectors.toSet()); return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) && !mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportIsolateDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportIsolateDatafeedAction.java index 5ff43c6dc7e..07ab911ebc3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportIsolateDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportIsolateDatafeedAction.java @@ -14,7 +14,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; @@ -22,7 +21,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.IsolateDatafeedAction; import org.elasticsearch.xpack.ml.MachineLearning; -import java.io.IOException; import java.util.List; public class TransportIsolateDatafeedAction extends TransportTasksAction listener) { datafeedTask.isolate(); - listener.onResponse(new IsolateDatafeedAction.Response()); + listener.onResponse(new IsolateDatafeedAction.Response(false)); } - @Override - protected IsolateDatafeedAction.Response readTaskResponse(StreamInput in) throws IOException { - return new IsolateDatafeedAction.Response(in); - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java index 688a0d24560..81038ac0261 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportJobTaskAction.java @@ -24,7 +24,6 @@ import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import java.util.List; -import java.util.function.Supplier; /** * Base class that redirects a request to a node where the job task is running. @@ -39,10 +38,10 @@ public abstract class TransportJobTaskAction requestSupplier, - Supplier responseSupplier, String nodeExecutor, AutodetectProcessManager processManager) { + Writeable.Reader requestReader, Writeable.Reader responseReader, + String nodeExecutor, AutodetectProcessManager processManager) { super(actionName, clusterService, transportService, actionFilters, - requestSupplier, responseSupplier, nodeExecutor); + requestReader, responseReader, responseReader, nodeExecutor); this.processManager = processManager; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportKillProcessAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportKillProcessAction.java index 52899a90ade..0660965598b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportKillProcessAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportKillProcessAction.java @@ -11,7 +11,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; @@ -23,8 +22,6 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.notifications.Auditor; -import java.io.IOException; - public class TransportKillProcessAction extends TransportJobTaskAction { private final Auditor auditor; @@ -72,9 +69,4 @@ public class TransportKillProcessAction extends TransportJobTaskAction { @Inject @@ -35,13 +32,6 @@ public class TransportPersistJobAction extends TransportJobTaskAction listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostDataAction.java index b87ed078664..2206f265a7c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPostDataAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -18,7 +17,6 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; -import java.io.IOException; import java.util.Optional; public class TransportPostDataAction extends TransportJobTaskAction { @@ -34,13 +32,6 @@ public class TransportPostDataAction extends TransportJobTaskAction listener) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index a399321ffa2..0f0fdd9f54b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -17,7 +17,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.discovery.MasterNotDiscoveredException; @@ -35,7 +34,6 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; -import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -53,8 +51,8 @@ public class TransportStopDatafeedAction extends TransportTasksAction { @Inject @@ -28,13 +25,6 @@ public class TransportUpdateProcessAction extends TransportJobTaskAction listener) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 930817b5021..c4a462935bb 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -87,7 +87,7 @@ public class DatafeedJobTests extends ESTestCase { dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); postDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); - flushJobResponse = new FlushJobAction.Response(); + flushJobResponse = new FlushJobAction.Response(true, new Date()); delayedDataDetector = mock(DelayedDataDetector.class); when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS); currentTime = 0; diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java index 411ccd15058..f5a484a8e27 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.discovery.MasterNotDiscoveredException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.tasks.Task; @@ -27,7 +26,6 @@ import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; import org.elasticsearch.xpack.rollup.job.RollupJobTask; -import java.io.IOException; import java.util.List; public class TransportDeleteRollupJobAction extends TransportTasksAction