diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/NoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/NoopBulkAction.java index e6412099fee..065c249a25e 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/NoopBulkAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/NoopBulkAction.java @@ -18,20 +18,15 @@ */ package org.elasticsearch.plugin.noop.action.bulk; -import org.elasticsearch.action.StreamableResponseActionType; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.bulk.BulkResponse; -public class NoopBulkAction extends StreamableResponseActionType { +public class NoopBulkAction extends ActionType { public static final String NAME = "mock:data/write/bulk"; public static final NoopBulkAction INSTANCE = new NoopBulkAction(); private NoopBulkAction() { - super(NAME); - } - - @Override - public BulkResponse newResponse() { - return new BulkResponse(null, 0); + super(NAME, BulkResponse::new); } } diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/TransportNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/TransportNoopBulkAction.java index 47302c41cd6..6580d213548 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/TransportNoopBulkAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/TransportNoopBulkAction.java @@ -38,7 +38,7 @@ public class TransportNoopBulkAction extends HandledTransportAction { +public class BulkAction extends ActionType { public static final BulkAction INSTANCE = new BulkAction(); public static final String NAME = "indices:data/write/bulk"; private BulkAction() { - super(NAME); - } - - @Override - public BulkResponse newResponse() { - return new BulkResponse(); + super(NAME, BulkResponse::new); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 2b935f0f0fc..078cee647ca 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -78,7 +78,17 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques private long sizeInBytes = 0; - public BulkRequest() { + public BulkRequest() {} + + public BulkRequest(StreamInput in) throws IOException { + super(in); + waitForActiveShards = ActiveShardCount.readFrom(in); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + requests.add(DocWriteRequest.readDocumentRequest(in)); + } + refreshPolicy = RefreshPolicy.readFrom(in); + timeout = in.readTimeValue(); } public BulkRequest(@Nullable String globalIndex) { @@ -391,14 +401,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - waitForActiveShards = ActiveShardCount.readFrom(in); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - requests.add(DocWriteRequest.readDocumentRequest(in)); - } - refreshPolicy = RefreshPolicy.readFrom(in); - timeout = in.readTimeValue(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java index 6713db98692..74ec15df6aa 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkResponse.java @@ -56,7 +56,16 @@ public class BulkResponse extends ActionResponse implements Iterable request, - ActionFilters actionFilters, String executor) { - super(actionName, actionFilters, transportService.getTaskManager()); - transportService.registerRequestHandler(actionName, request, executor, false, true, - new TransportHandler()); - } - protected HandledTransportAction(String actionName, boolean canTripCircuitBreaker, TransportService transportService, ActionFilters actionFilters, Writeable.Reader requestReader) { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 21dc2fbf4d9..3413f32177c 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -115,10 +115,18 @@ public abstract class AbstractBulkByScrollRequest { + public static class Add extends ActionType { public static final Add INSTANCE = new Add(); public static final String ACTION_NAME = "indices:admin/seq_no/add_retention_lease"; private Add() { - super(ACTION_NAME); + super(ACTION_NAME, Response::new); } public static class TransportAction extends TransportRetentionLeaseAction { @@ -168,21 +168,15 @@ public class RetentionLeaseActions { } } - - @Override - public Response newResponse() { - return new Response(); - } - } - public static class Renew extends StreamableResponseActionType { + public static class Renew extends ActionType { public static final Renew INSTANCE = new Renew(); public static final String ACTION_NAME = "indices:admin/seq_no/renew_retention_lease"; private Renew() { - super(ACTION_NAME); + super(ACTION_NAME, Response::new); } public static class TransportAction extends TransportRetentionLeaseAction { @@ -214,21 +208,15 @@ public class RetentionLeaseActions { } } - - @Override - public Response newResponse() { - return new Response(); - } - } - public static class Remove extends StreamableResponseActionType { + public static class Remove extends ActionType { public static final Remove INSTANCE = new Remove(); public static final String ACTION_NAME = "indices:admin/seq_no/remove_retention_lease"; private Remove() { - super(ACTION_NAME); + super(ACTION_NAME, Response::new); } public static class TransportAction extends TransportRetentionLeaseAction { @@ -261,12 +249,6 @@ public class RetentionLeaseActions { } } - - @Override - public Response newResponse() { - return new Response(); - } - } private abstract static class Request> extends SingleShardRequest { @@ -385,8 +367,7 @@ public class RetentionLeaseActions { public static class Response extends ActionResponse { - public Response() { - } + public Response() {} Response(final StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java index ec268fe77f8..5b0c79c34e3 100644 --- a/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java @@ -20,11 +20,11 @@ package org.elasticsearch.persistent; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.StreamableResponseActionType; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.action.support.master.StreamableTransportMasterNodeAction; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -46,18 +46,13 @@ import static org.elasticsearch.action.ValidateActions.addValidationError; * ActionType that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be * removed from the cluster state in case of successful completion or restarted on some other node in case of failure. */ -public class CompletionPersistentTaskAction extends StreamableResponseActionType { +public class CompletionPersistentTaskAction extends ActionType { public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction(); public static final String NAME = "cluster:admin/persistent/completion"; private CompletionPersistentTaskAction() { - super(NAME); - } - - @Override - public PersistentTaskResponse newResponse() { - return new PersistentTaskResponse(); + super(NAME, PersistentTaskResponse::new); } public static class Request extends MasterNodeRequest { @@ -127,7 +122,7 @@ public class CompletionPersistentTaskAction extends StreamableResponseActionType } } - public static class TransportAction extends StreamableTransportMasterNodeAction { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTasksClusterService persistentTasksClusterService; @@ -147,8 +142,8 @@ public class CompletionPersistentTaskAction extends StreamableResponseActionType } @Override - protected PersistentTaskResponse newResponse() { - return new PersistentTaskResponse(); + protected PersistentTaskResponse read(StreamInput in) throws IOException { + return new PersistentTaskResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskResponse.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskResponse.java index ed8550f4a3e..0cbe0275549 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskResponse.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskResponse.java @@ -32,8 +32,9 @@ import java.util.Objects; public class PersistentTaskResponse extends ActionResponse { private PersistentTask task; - public PersistentTaskResponse() { - super(); + public PersistentTaskResponse(StreamInput in) throws IOException { + super(in); + task = in.readOptionalWriteable(PersistentTask::new); } public PersistentTaskResponse(PersistentTask task) { @@ -42,8 +43,7 @@ public class PersistentTaskResponse extends ActionResponse { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - task = in.readOptionalWriteable(PersistentTask::new); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java index 9b87a9263dc..bc66e624315 100644 --- a/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/RemovePersistentTaskAction.java @@ -20,11 +20,11 @@ package org.elasticsearch.persistent; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.StreamableResponseActionType; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.action.support.master.StreamableTransportMasterNodeAction; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -40,18 +40,13 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Objects; -public class RemovePersistentTaskAction extends StreamableResponseActionType { +public class RemovePersistentTaskAction extends ActionType { public static final RemovePersistentTaskAction INSTANCE = new RemovePersistentTaskAction(); public static final String NAME = "cluster:admin/persistent/remove"; private RemovePersistentTaskAction() { - super(NAME); - } - - @Override - public PersistentTaskResponse newResponse() { - return new PersistentTaskResponse(); + super(NAME, PersistentTaskResponse::new); } public static class Request extends MasterNodeRequest { @@ -112,7 +107,7 @@ public class RemovePersistentTaskAction extends StreamableResponseActionType { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTasksClusterService persistentTasksClusterService; @@ -132,8 +127,8 @@ public class RemovePersistentTaskAction extends StreamableResponseActionType { +public class StartPersistentTaskAction extends ActionType { public static final StartPersistentTaskAction INSTANCE = new StartPersistentTaskAction(); public static final String NAME = "cluster:admin/persistent/start"; private StartPersistentTaskAction() { - super(NAME); - } - - @Override - public PersistentTaskResponse newResponse() { - return new PersistentTaskResponse(); + super(NAME, PersistentTaskResponse::new); } public static class Request extends MasterNodeRequest { @@ -183,7 +178,7 @@ public class StartPersistentTaskAction extends StreamableResponseActionType { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTasksClusterService persistentTasksClusterService; @@ -208,8 +203,8 @@ public class StartPersistentTaskAction extends StreamableResponseActionType { +public class UpdatePersistentTaskStatusAction extends ActionType { public static final UpdatePersistentTaskStatusAction INSTANCE = new UpdatePersistentTaskStatusAction(); public static final String NAME = "cluster:admin/persistent/update_status"; private UpdatePersistentTaskStatusAction() { - super(NAME); - } - - @Override - public PersistentTaskResponse newResponse() { - return new PersistentTaskResponse(); + super(NAME, PersistentTaskResponse::new); } public static class Request extends MasterNodeRequest { @@ -143,7 +138,7 @@ public class UpdatePersistentTaskStatusAction extends StreamableResponseActionTy } } - public static class TransportAction extends StreamableTransportMasterNodeAction { + public static class TransportAction extends TransportMasterNodeAction { private final PersistentTasksClusterService persistentTasksClusterService; @@ -163,8 +158,8 @@ public class UpdatePersistentTaskStatusAction extends StreamableResponseActionTy } @Override - protected PersistentTaskResponse newResponse() { - return new PersistentTaskResponse(); + protected PersistentTaskResponse read(StreamInput in) throws IOException { + return new PersistentTaskResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 2caeaff519d..851233019ca 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -30,7 +30,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.action.support.master.StreamableTransportMasterNodeAction; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; @@ -506,9 +506,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements new TransportResponseHandler() { @Override public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { - final UpdateIndexShardSnapshotStatusResponse response = new UpdateIndexShardSnapshotStatusResponse(); - response.readFrom(in); - return response; + return new UpdateIndexShardSnapshotStatusResponse(in); } @Override @@ -606,12 +604,19 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { + + UpdateIndexShardSnapshotStatusResponse() {} + + UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException { + super(in); + } + @Override public void writeTo(StreamOutput out) throws IOException {} } private class UpdateSnapshotStatusAction - extends StreamableTransportMasterNodeAction { + extends TransportMasterNodeAction { UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { super( @@ -626,8 +631,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } @Override - protected UpdateIndexShardSnapshotStatusResponse newResponse() { - return new UpdateIndexShardSnapshotStatusResponse(); + protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { + return new UpdateIndexShardSnapshotStatusResponse(in); } @Override diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java index 342098f6867..ea0d8b2e841 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorResponseTests.java @@ -20,14 +20,15 @@ package org.elasticsearch.persistent; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; +import org.elasticsearch.test.AbstractWireSerializingTestCase; import java.util.Collections; -public class PersistentTasksExecutorResponseTests extends AbstractStreamableTestCase { +public class PersistentTasksExecutorResponseTests extends AbstractWireSerializingTestCase { @Override protected PersistentTaskResponse createTestInstance() { @@ -37,13 +38,13 @@ public class PersistentTasksExecutorResponseTests extends AbstractStreamableTest new TestPersistentTasksPlugin.TestParams("test"), randomLong(), PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT)); } else { - return new PersistentTaskResponse(null); + return new PersistentTaskResponse((PersistentTask) null); } } @Override - protected PersistentTaskResponse createBlankInstance() { - return new PersistentTaskResponse(); + protected Writeable.Reader instanceReader() { + return PersistentTaskResponse::new; } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java index 7ddafa8c707..1edbe62518c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/protocol/xpack/watcher/PutWatchRequest.java @@ -42,7 +42,14 @@ public final class PutWatchRequest extends ActionRequest { public PutWatchRequest() {} public PutWatchRequest(StreamInput in) throws IOException { - readFrom(in); + super(in); + id = in.readString(); + source = in.readBytesReference(); + active = in.readBoolean(); + xContentType = in.readEnum(XContentType.class); + version = in.readZLong(); + ifSeqNo = in.readZLong(); + ifPrimaryTerm = in.readVLong(); } public PutWatchRequest(String id, BytesReference source, XContentType xContentType) { @@ -53,14 +60,7 @@ public final class PutWatchRequest extends ActionRequest { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readString(); - source = in.readBytesReference(); - active = in.readBoolean(); - xContentType = in.readEnum(XContentType.class); - version = in.readZLong(); - ifSeqNo = in.readZLong(); - ifPrimaryTerm = in.readVLong(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override