Replace Streamable w/ Writeable in BaseTasksRequest and subclasses (#35854)

* Replace Streamable w/ Writeable in BaseTasksRequest and subclasses

This commit replaces usages of Streamable with Writeable for the
BaseTasksRequest / TransportTasksAction classes and subclasses of
these classes.

Relates to #34389
This commit is contained in:
Martijn van Groningen 2018-12-03 08:04:29 +01:00 committed by GitHub
parent 9c0a429709
commit 43773a32a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 392 additions and 390 deletions

View File

@ -39,6 +39,20 @@ public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
*/
private Float requestsPerSecond;
public RethrottleRequest() {
}
public RethrottleRequest(StreamInput in) throws IOException {
super(in);
this.requestsPerSecond = in.readFloat();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeFloat(requestsPerSecond);
}
/**
* The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
*/
@ -80,15 +94,4 @@ public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestsPerSecond = in.readFloat();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeFloat(requestsPerSecond);
}
}

View File

@ -184,9 +184,7 @@ public class RoundTripTests extends ESTestCase {
} else {
request.setTaskId(new TaskId(randomAlphaOfLength(5), randomLong()));
}
RethrottleRequest tripped = new RethrottleRequest();
// We use readFrom here because Rethrottle does not support the Writeable.Reader interface
tripped.readFrom(toInputByteStream(request));
RethrottleRequest tripped = new RethrottleRequest(toInputByteStream(request));
assertEquals(request.getRequestsPerSecond(), tripped.getRequestsPerSecond(), 0.00001);
assertArrayEquals(request.getActions(), tripped.getActions());
assertEquals(request.getTaskId(), tripped.getTaskId());

View File

@ -36,10 +36,11 @@ public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {
private String reason = DEFAULT_REASON;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
reason = in.readString();
public CancelTasksRequest() {}
public CancelTasksRequest(StreamInput in) throws IOException {
super(in);
this.reason = in.readString();
}
@Override

View File

@ -65,8 +65,8 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
public TransportCancelTasksAction(ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
super(CancelTasksAction.NAME, clusterService, transportService, actionFilters,
CancelTasksRequest::new, CancelTasksResponse::new, ThreadPool.Names.MANAGEMENT);
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, BanParentTaskRequest::new, ThreadPool.Names.SAME, new
BanParentRequestHandler());
transportService.registerRequestHandler(BAN_PARENT_ACTION_NAME, ThreadPool.Names.SAME, BanParentTaskRequest::new,
new BanParentRequestHandler());
}
@Override
@ -233,11 +233,9 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
private static class BanParentTaskRequest extends TransportRequest {
private TaskId parentTaskId;
private boolean ban;
private String reason;
private final TaskId parentTaskId;
private final boolean ban;
private final String reason;
static BanParentTaskRequest createSetBanParentTaskRequest(TaskId parentTaskId, String reason) {
return new BanParentTaskRequest(parentTaskId, reason);
@ -256,19 +254,14 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
private BanParentTaskRequest(TaskId parentTaskId) {
this.parentTaskId = parentTaskId;
this.ban = false;
this.reason = null;
}
BanParentTaskRequest() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
private BanParentTaskRequest(StreamInput in) throws IOException {
super(in);
parentTaskId = TaskId.readFromStream(in);
ban = in.readBoolean();
if (ban) {
reason = in.readString();
}
reason = ban ? in.readString() : null;
}
@Override

View File

@ -33,6 +33,22 @@ public class ListTasksRequest extends BaseTasksRequest<ListTasksRequest> {
private boolean detailed = false;
private boolean waitForCompletion = false;
public ListTasksRequest() {
}
public ListTasksRequest(StreamInput in) throws IOException {
super(in);
detailed = in.readBoolean();
waitForCompletion = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(detailed);
out.writeBoolean(waitForCompletion);
}
/**
* Should the detailed task information be returned.
*/
@ -63,17 +79,4 @@ public class ListTasksRequest extends BaseTasksRequest<ListTasksRequest> {
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
detailed = in.readBoolean();
waitForCompletion = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(detailed);
out.writeBoolean(waitForCompletion);
}
}

View File

@ -52,9 +52,30 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
private TaskId taskId = TaskId.EMPTY_TASK_ID;
// NOTE: This constructor is only needed, because the setters in this class,
// otherwise it can be removed and above fields can be made final.
public BaseTasksRequest() {
}
protected BaseTasksRequest(StreamInput in) throws IOException {
super(in);
taskId = TaskId.readFromStream(in);
parentTaskId = TaskId.readFromStream(in);
nodes = in.readStringArray();
actions = in.readStringArray();
timeout = in.readOptionalTimeValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
taskId.writeTo(out);
parentTaskId.writeTo(out);
out.writeStringArrayNullable(nodes);
out.writeStringArrayNullable(actions);
out.writeOptionalTimeValue(timeout);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@ -137,26 +158,6 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
return (Request) this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = TaskId.readFromStream(in);
parentTaskId = TaskId.readFromStream(in);
nodes = in.readStringArray();
actions = in.readStringArray();
timeout = in.readOptionalTimeValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
taskId.writeTo(out);
parentTaskId.writeTo(out);
out.writeStringArrayNullable(nodes);
out.writeStringArrayNullable(actions);
out.writeOptionalTimeValue(timeout);
}
public boolean match(Task task) {
if (getActions() != null && getActions().length > 0 && Regex.simpleMatch(getActions(), task.getAction()) == false) {
return false;

View File

@ -71,13 +71,13 @@ public abstract class TransportTasksAction<
protected final ClusterService clusterService;
protected final TransportService transportService;
protected final Supplier<TasksRequest> requestSupplier;
protected final Writeable.Reader<TasksRequest> requestSupplier;
protected final Supplier<TasksResponse> responseSupplier;
protected final String transportNodeAction;
protected TransportTasksAction(String actionName, ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, Supplier<TasksRequest> requestSupplier,
ActionFilters actionFilters, Writeable.Reader<TasksRequest> requestSupplier,
Supplier<TasksResponse> responseSupplier, String nodeExecutor) {
super(actionName, transportService, actionFilters, requestSupplier);
this.clusterService = clusterService;
@ -86,7 +86,7 @@ public abstract class TransportTasksAction<
this.requestSupplier = requestSupplier;
this.responseSupplier = responseSupplier;
transportService.registerRequestHandler(transportNodeAction, NodeTaskRequest::new, nodeExecutor, new NodeTransportHandler());
transportService.registerRequestHandler(transportNodeAction, nodeExecutor, NodeTaskRequest::new, new NodeTransportHandler());
}
@Override
@ -362,20 +362,9 @@ public abstract class TransportTasksAction<
private class NodeTaskRequest extends TransportRequest {
private TasksRequest tasksRequest;
protected NodeTaskRequest() {
super();
}
protected NodeTaskRequest(TasksRequest tasksRequest) {
super();
this.tasksRequest = tasksRequest;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tasksRequest = requestSupplier.get();
tasksRequest.readFrom(in);
protected NodeTaskRequest(StreamInput in) throws IOException {
super(in);
this.tasksRequest = requestSupplier.read(in);
}
@Override
@ -383,6 +372,12 @@ public abstract class TransportTasksAction<
super.writeTo(out);
tasksRequest.writeTo(out);
}
protected NodeTaskRequest(TasksRequest tasksRequest) {
super();
this.tasksRequest = tasksRequest;
}
}
private class NodeTasksResponse extends TransportResponse {

View File

@ -396,6 +396,13 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin, NetworkPlugi
public static class UnblockTestTasksRequest extends BaseTasksRequest<UnblockTestTasksRequest> {
UnblockTestTasksRequest() {}
UnblockTestTasksRequest(StreamInput in) throws IOException {
super(in);
}
@Override
public boolean match(Task task) {
return task instanceof TestTask && super.match(task);

View File

@ -191,6 +191,11 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
static class TestTasksRequest extends BaseTasksRequest<TestTasksRequest> {
TestTasksRequest(StreamInput in) throws IOException {
super(in);
}
TestTasksRequest() {}
}
static class TestTasksResponse extends BaseTasksResponse {

View File

@ -447,9 +447,8 @@ public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, P
public TestTasksRequest() {
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public TestTasksRequest(StreamInput in) throws IOException {
super(in);
operation = in.readOptionalString();
}

View File

@ -84,7 +84,6 @@ public class TransportFollowStatsAction extends TransportTasksAction<
protected void processTasks(final FollowStatsAction.StatsRequest request, final Consumer<ShardFollowNodeTask> operation) {
final ClusterState state = clusterService.state();
final PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasksMetaData == null) {
return;
}

View File

@ -5,14 +5,15 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
public class StatsRequestTests extends AbstractStreamableTestCase<FollowStatsAction.StatsRequest> {
public class StatsRequestTests extends AbstractWireSerializingTestCase<FollowStatsAction.StatsRequest> {
@Override
protected FollowStatsAction.StatsRequest createBlankInstance() {
return new FollowStatsAction.StatsRequest();
protected Writeable.Reader<FollowStatsAction.StatsRequest> instanceReader() {
return FollowStatsAction.StatsRequest::new;
}
@Override

View File

@ -129,6 +129,19 @@ public class FollowStatsAction extends Action<FollowStatsAction.StatsResponses>
private String[] indices;
public StatsRequest() {}
public StatsRequest(StreamInput in) throws IOException {
super(in);
indices = in.readOptionalStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(indices);
}
@Override
public String[] indices() {
return indices;
@ -161,18 +174,6 @@ public class FollowStatsAction extends Action<FollowStatsAction.StatsResponses>
return null;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readOptionalStringArray();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalStringArray(indices);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -79,6 +79,31 @@ public class CloseJobAction extends Action<CloseJobAction.Response> {
openJobIds = new String[] {};
}
public Request(StreamInput in) throws IOException {
super(in);
jobId = in.readString();
timeout = in.readTimeValue();
force = in.readBoolean();
openJobIds = in.readStringArray();
local = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeTimeValue(timeout);
out.writeBoolean(force);
out.writeStringArray(openJobIds);
out.writeBoolean(local);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoJobs);
}
}
public Request(String jobId) {
this();
this.jobId = jobId;
@ -128,32 +153,6 @@ public class CloseJobAction extends Action<CloseJobAction.Response> {
this.openJobIds = openJobIds;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
timeout = in.readTimeValue();
force = in.readBoolean();
openJobIds = in.readStringArray();
local = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeTimeValue(timeout);
out.writeBoolean(force);
out.writeStringArray(openJobIds);
out.writeBoolean(local);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoJobs);
}
}
@Override
public boolean match(Task task) {
for (String id : openJobIds) {

View File

@ -75,6 +75,25 @@ public class FlushJobAction extends Action<FlushJobAction.Response> {
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
calcInterim = in.readBoolean();
start = in.readOptionalString();
end = in.readOptionalString();
advanceTime = in.readOptionalString();
skipTime = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(calcInterim);
out.writeOptionalString(start);
out.writeOptionalString(end);
out.writeOptionalString(advanceTime);
out.writeOptionalString(skipTime);
}
public Request(String jobId) {
super(jobId);
}
@ -119,26 +138,6 @@ public class FlushJobAction extends Action<FlushJobAction.Response> {
this.skipTime = skipTime;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
calcInterim = in.readBoolean();
start = in.readOptionalString();
end = in.readOptionalString();
advanceTime = in.readOptionalString();
skipTime = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(calcInterim);
out.writeOptionalString(start);
out.writeOptionalString(end);
out.writeOptionalString(advanceTime);
out.writeOptionalString(skipTime);
}
@Override
public int hashCode() {
return Objects.hash(jobId, calcInterim, start, end, advanceTime, skipTime);

View File

@ -68,6 +68,19 @@ public class ForecastJobAction extends Action<ForecastJobAction.Response> {
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
this.duration = in.readOptionalTimeValue();
this.expiresIn = in.readOptionalTimeValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalTimeValue(duration);
out.writeOptionalTimeValue(expiresIn);
}
public Request(String jobId) {
super(jobId);
}
@ -108,20 +121,6 @@ public class ForecastJobAction extends Action<ForecastJobAction.Response> {
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.duration = in.readOptionalTimeValue();
this.expiresIn = in.readOptionalTimeValue();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalTimeValue(duration);
out.writeOptionalTimeValue(expiresIn);
}
@Override
public int hashCode() {
return Objects.hash(jobId, duration, expiresIn);

View File

@ -76,6 +76,25 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
jobId = in.readString();
expandedJobsIds = in.readList(StreamInput::readString);
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeStringList(expandedJobsIds);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoJobs);
}
}
public List<String> getExpandedJobsIds() { return expandedJobsIds; }
public void setExpandedJobsIds(List<String> expandedJobsIds) { this.expandedJobsIds = expandedJobsIds; }
@ -102,26 +121,6 @@ public class GetJobsStatsAction extends Action<GetJobsStatsAction.Response> {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
expandedJobsIds = in.readList(StreamInput::readString);
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoJobs = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
out.writeStringList(expandedJobsIds);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(allowNoJobs);
}
}
@Override
public int hashCode() {
return Objects.hash(jobId, allowNoJobs);

View File

@ -78,6 +78,17 @@ public class IsolateDatafeedAction extends Action<IsolateDatafeedAction.Response
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
datafeedId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
}
public String getDatafeedId() {
return datafeedId;
}
@ -93,18 +104,6 @@ public class IsolateDatafeedAction extends Action<IsolateDatafeedAction.Response
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
}
@Override
public int hashCode() {
return Objects.hash(datafeedId);

View File

@ -21,6 +21,17 @@ public class JobTaskRequest<R extends JobTaskRequest<R>> extends BaseTasksReques
JobTaskRequest() {
}
JobTaskRequest(StreamInput in) throws IOException {
super(in);
this.jobId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
}
JobTaskRequest(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
@ -29,18 +40,6 @@ public class JobTaskRequest<R extends JobTaskRequest<R>> extends BaseTasksReques
return jobId;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(jobId);
}
@Override
public boolean match(Task task) {
return OpenJobAction.JobTaskMatcher.match(task, jobId);

View File

@ -46,6 +46,10 @@ public class KillProcessAction extends Action<KillProcessAction.Response> {
public Request() {
super();
}
public Request(StreamInput in) throws IOException {
super(in);
}
}
public static class Response extends BaseTasksResponse implements Writeable {

View File

@ -35,6 +35,19 @@ public class PersistJobAction extends Action<PersistJobAction.Response> {
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
// isBackground for fwc
in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
// isBackground for fwc
out.writeBoolean(true);
}
public Request(String jobId) {
super(jobId);
}
@ -47,20 +60,6 @@ public class PersistJobAction extends Action<PersistJobAction.Response> {
return !isBackGround();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
// isBackground for fwc
in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
// isBackground for fwc
out.writeBoolean(true);
}
@Override
public int hashCode() {
return Objects.hash(jobId, isBackGround());

View File

@ -126,6 +126,31 @@ public class PostDataAction extends Action<PostDataAction.Response> {
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
resetStart = in.readOptionalString();
resetEnd = in.readOptionalString();
dataDescription = in.readOptionalWriteable(DataDescription::new);
content = in.readBytesReference();
if (in.readBoolean()) {
xContentType = in.readEnum(XContentType.class);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(resetStart);
out.writeOptionalString(resetEnd);
out.writeOptionalWriteable(dataDescription);
out.writeBytesReference(content);
boolean hasXContentType = xContentType != null;
out.writeBoolean(hasXContentType);
if (hasXContentType) {
out.writeEnum(xContentType);
}
}
public Request(String jobId) {
super(jobId);
}
@ -165,32 +190,6 @@ public class PostDataAction extends Action<PostDataAction.Response> {
this.xContentType = xContentType;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
resetStart = in.readOptionalString();
resetEnd = in.readOptionalString();
dataDescription = in.readOptionalWriteable(DataDescription::new);
content = in.readBytesReference();
if (in.readBoolean()) {
xContentType = in.readEnum(XContentType.class);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalString(resetStart);
out.writeOptionalString(resetEnd);
out.writeOptionalWriteable(dataDescription);
out.writeBytesReference(content);
boolean hasXContentType = xContentType != null;
out.writeBoolean(hasXContentType);
if (hasXContentType) {
out.writeEnum(xContentType);
}
}
@Override
public int hashCode() {
// content stream not included

View File

@ -85,6 +85,17 @@ public class StopDatafeedAction extends Action<StopDatafeedAction.Response> {
public Request() {
}
public Request(StreamInput in) throws IOException {
super(in);
datafeedId = in.readString();
resolvedStartedDatafeedIds = in.readStringArray();
stopTimeout = in.readTimeValue();
force = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoDatafeeds = in.readBoolean();
}
}
public String getDatafeedId() {
return datafeedId;
}
@ -137,18 +148,6 @@ public class StopDatafeedAction extends Action<StopDatafeedAction.Response> {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
resolvedStartedDatafeedIds = in.readStringArray();
stopTimeout = in.readTimeValue();
force = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
allowNoDatafeeds = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -109,7 +109,33 @@ public class UpdateProcessAction extends Action<UpdateProcessAction.Response> {
private MlFilter filter;
private boolean updateScheduledEvents = false;
public Request() {
public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
modelPlotConfig = in.readOptionalWriteable(ModelPlotConfig::new);
if (in.readBoolean()) {
detectorUpdates = in.readList(JobUpdate.DetectorUpdate::new);
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
filter = in.readOptionalWriteable(MlFilter::new);
updateScheduledEvents = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(modelPlotConfig);
boolean hasDetectorUpdates = detectorUpdates != null;
out.writeBoolean(hasDetectorUpdates);
if (hasDetectorUpdates) {
out.writeList(detectorUpdates);
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeOptionalWriteable(filter);
out.writeBoolean(updateScheduledEvents);
}
}
public Request(String jobId, ModelPlotConfig modelPlotConfig, List<JobUpdate.DetectorUpdate> detectorUpdates, MlFilter filter,
@ -137,34 +163,6 @@ public class UpdateProcessAction extends Action<UpdateProcessAction.Response> {
return updateScheduledEvents;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
modelPlotConfig = in.readOptionalWriteable(ModelPlotConfig::new);
if (in.readBoolean()) {
detectorUpdates = in.readList(JobUpdate.DetectorUpdate::new);
}
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
filter = in.readOptionalWriteable(MlFilter::new);
updateScheduledEvents = in.readBoolean();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeOptionalWriteable(modelPlotConfig);
boolean hasDetectorUpdates = detectorUpdates != null;
out.writeBoolean(hasDetectorUpdates);
if (hasDetectorUpdates) {
out.writeList(detectorUpdates);
}
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeOptionalWriteable(filter);
out.writeBoolean(updateScheduledEvents);
}
}
@Override
public int hashCode() {
return Objects.hash(getJobId(), modelPlotConfig, detectorUpdates, filter, updateScheduledEvents);

View File

@ -52,6 +52,11 @@ public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response
public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
}
@Override
public boolean match(Task task) {
return task.getDescription().equals(RollupField.NAME + "_" + id);
@ -61,12 +66,6 @@ public class DeleteRollupJobAction extends Action<DeleteRollupJobAction.Response
return id;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -66,6 +66,20 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (Strings.isNullOrEmpty(id) || id.equals("*")) {
this.id = MetaData.ALL;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}
@Override
public boolean match(Task task) {
// If we are retrieving all the jobs, the task description just needs to start
@ -81,21 +95,6 @@ public class GetRollupJobsAction extends Action<GetRollupJobsAction.Response> {
return id;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readString();
if (Strings.isNullOrEmpty(id) || id.equals("*")) {
this.id = MetaData.ALL;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}
@Override
public ActionRequestValidationException validate() {
return null;

View File

@ -48,13 +48,8 @@ public class StartRollupJobAction extends Action<StartRollupJobAction.Response>
public Request() {}
public String getId() {
return id;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
}
@ -64,6 +59,10 @@ public class StartRollupJobAction extends Action<StartRollupJobAction.Response>
out.writeString(id);
}
public String getId() {
return id;
}
@Override
public ActionRequestValidationException validate() {
return null;

View File

@ -63,21 +63,8 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
public Request() {}
public String getId() {
return id;
}
public TimeValue timeout() {
return timeout;
}
public boolean waitForCompletion() {
return waitForCompletion;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
waitForCompletion = in.readBoolean();
@ -95,6 +82,18 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
}
}
public String getId() {
return id;
}
public TimeValue timeout() {
return timeout;
}
public boolean waitForCompletion() {
return waitForCompletion;
}
@Override
public ActionRequestValidationException validate() {
return null;

View File

@ -5,12 +5,13 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction.Request;
public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
public class CloseJobActionRequestTests extends AbstractSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
@ -33,8 +34,8 @@ public class CloseJobActionRequestTests extends AbstractStreamableXContentTestCa
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override

View File

@ -5,14 +5,15 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.ForecastJobAction.Request;
import static org.hamcrest.Matchers.equalTo;
public class ForecastJobActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
public class ForecastJobActionRequestTests extends AbstractSerializingTestCase<Request> {
@Override
protected Request doParseInstance(XContentParser parser) {
@ -37,8 +38,8 @@ public class ForecastJobActionRequestTests extends AbstractStreamableXContentTes
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
public void testSetDuration_GivenZero() {

View File

@ -6,14 +6,15 @@
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction.Request;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
public class GetJobStatsActionRequestTests extends AbstractStreamableTestCase<Request> {
public class GetJobStatsActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
@ -23,8 +24,8 @@ public class GetJobStatsActionRequestTests extends AbstractStreamableTestCase<Re
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
public void testMatch_GivenAll_FailsForNonJobTasks() {

View File

@ -5,12 +5,14 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class PersistJobActionRequestTests extends AbstractWireSerializingTestCase<PersistJobAction.Request> {
public class PersistJobActionRequestTests extends AbstractStreamableTestCase<PersistJobAction.Request> {
@Override
protected PersistJobAction.Request createBlankInstance() {
return new PersistJobAction.Request();
protected Writeable.Reader<PersistJobAction.Request> instanceReader() {
return PersistJobAction.Request::new;
}
@Override

View File

@ -6,12 +6,13 @@
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription.DataFormat;
public class PostDataActionRequestTests extends AbstractStreamableTestCase<PostDataAction.Request> {
public class PostDataActionRequestTests extends AbstractWireSerializingTestCase<PostDataAction.Request> {
@Override
protected PostDataAction.Request createTestInstance() {
PostDataAction.Request request = new PostDataAction.Request(randomAlphaOfLengthBetween(1, 20));
@ -33,7 +34,7 @@ public class PostDataActionRequestTests extends AbstractStreamableTestCase<PostD
}
@Override
protected PostDataAction.Request createBlankInstance() {
return new PostDataAction.Request();
protected Writeable.Reader<PostDataAction.Request> instanceReader() {
return PostDataAction.Request::new;
}
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.FlushJobAction.Request;
public class PostDataFlushRequestTests extends AbstractStreamableTestCase<Request> {
public class PostDataFlushRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
@ -27,11 +28,11 @@ public class PostDataFlushRequestTests extends AbstractStreamableTestCase<Reques
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
public void testNullJobIdThrows() {
expectThrows(IllegalArgumentException.class, () -> new Request(null));
expectThrows(IllegalArgumentException.class, () -> new Request((String) null));
}
}
}

View File

@ -5,12 +5,13 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction.Request;
public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTestCase<Request> {
public class StopDatafeedActionRequestTests extends AbstractSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
@ -36,8 +37,8 @@ public class StopDatafeedActionRequestTests extends AbstractStreamableXContentTe
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override

View File

@ -5,7 +5,8 @@
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.config.MlFilterTests;
@ -14,8 +15,7 @@ import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import java.util.ArrayList;
import java.util.List;
public class UpdateProcessActionRequestTests extends AbstractStreamableTestCase<UpdateProcessAction.Request> {
public class UpdateProcessActionRequestTests extends AbstractWireSerializingTestCase<UpdateProcessAction.Request> {
@Override
protected UpdateProcessAction.Request createTestInstance() {
@ -39,7 +39,7 @@ public class UpdateProcessActionRequestTests extends AbstractStreamableTestCase<
}
@Override
protected UpdateProcessAction.Request createBlankInstance() {
return new UpdateProcessAction.Request();
protected Writeable.Reader<UpdateProcessAction.Request> instanceReader() {
return UpdateProcessAction.Request::new;
}
}
}

View File

@ -39,7 +39,7 @@ public abstract class TransportJobTaskAction<Request extends JobTaskRequest<Requ
TransportJobTaskAction(String actionName, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
Supplier<Request> requestSupplier,
Writeable.Reader<Request> requestSupplier,
Supplier<Response> responseSupplier, String nodeExecutor, AutodetectProcessManager processManager) {
super(actionName, clusterService, transportService, actionFilters,
requestSupplier, responseSupplier, nodeExecutor);

View File

@ -5,12 +5,11 @@
*/
package org.elasticsearch.xpack.rollup.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction.Request;
public class DeleteJobActionRequestTests extends AbstractStreamableTestCase<Request> {
public class DeleteJobActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
@ -18,8 +17,8 @@ public class DeleteJobActionRequestTests extends AbstractStreamableTestCase<Requ
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
}

View File

@ -5,12 +5,12 @@
*/
package org.elasticsearch.xpack.rollup.action;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction;
import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction.Request;
@ -20,8 +20,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class GetJobsActionRequestTests extends AbstractStreamableTestCase<Request> {
public class GetJobsActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
@ -32,8 +31,8 @@ public class GetJobsActionRequestTests extends AbstractStreamableTestCase<Reques
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
public void testStateCheckNoPersistentTasks() {

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.rollup.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction.Request;
public class StartJobActionRequestTests extends AbstractStreamableTestCase<Request> {
public class StartJobActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
@ -16,8 +17,8 @@ public class StartJobActionRequestTests extends AbstractStreamableTestCase<Reque
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
}

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.rollup.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction.Request;
public class StopJobActionRequestTests extends AbstractStreamableTestCase<Request> {
public class StopJobActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
@ -16,8 +17,7 @@ public class StopJobActionRequestTests extends AbstractStreamableTestCase<Reques
}
@Override
protected Request createBlankInstance() {
return new Request();
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
}