diff --git a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java index 61328a78df6..5594a21a2cc 100644 --- a/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/DocWriteRequest.java @@ -226,9 +226,7 @@ public interface DocWriteRequest extends IndicesRequest { } else if (type == 1) { docWriteRequest = new DeleteRequest(in); } else if (type == 2) { - UpdateRequest updateRequest = new UpdateRequest(); - updateRequest.readFrom(in); - docWriteRequest = updateRequest; + docWriteRequest = new UpdateRequest(in); } else { throw new IllegalStateException("invalid request type [" + type+ " ]"); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java index 978e07555b5..4a689a664fa 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskAction.java @@ -19,23 +19,18 @@ package org.elasticsearch.action.admin.cluster.node.tasks.get; -import org.elasticsearch.action.StreamableResponseActionType; +import org.elasticsearch.action.ActionType; /** * ActionType for retrieving a list of currently running tasks */ -public class GetTaskAction extends StreamableResponseActionType { +public class GetTaskAction extends ActionType { public static final String TASKS_ORIGIN = "tasks"; public static final GetTaskAction INSTANCE = new GetTaskAction(); public static final String NAME = "cluster:monitor/task/get"; private GetTaskAction() { - super(NAME); - } - - @Override - public GetTaskResponse newResponse() { - return new GetTaskResponse(); + super(NAME, GetTaskResponse::new); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java index b8eb33edc82..2fc6b36788c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskRequest.java @@ -38,6 +38,15 @@ public class GetTaskRequest extends ActionRequest { private boolean waitForCompletion = false; private TimeValue timeout = null; + public GetTaskRequest() {} + + public GetTaskRequest(StreamInput in) throws IOException { + super(in); + taskId = TaskId.readFromStream(in); + timeout = in.readOptionalTimeValue(); + waitForCompletion = in.readBoolean(); + } + /** * Get the TaskId to look up. */ @@ -103,10 +112,7 @@ public class GetTaskRequest extends ActionRequest { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - taskId = TaskId.readFromStream(in); - timeout = in.readOptionalTimeValue(); - waitForCompletion = in.readBoolean(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java index b1369c33f71..df3b328cb4c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/GetTaskResponse.java @@ -35,19 +35,20 @@ import static java.util.Objects.requireNonNull; * Returns the list of tasks currently running on the nodes */ public class GetTaskResponse extends ActionResponse implements ToXContentObject { - private TaskResult task; - - public GetTaskResponse() { - } + private final TaskResult task; public GetTaskResponse(TaskResult task) { this.task = requireNonNull(task, "task is required"); } + public GetTaskResponse(StreamInput in) throws IOException { + super(in); + task = in.readOptionalWriteable(TaskResult::new); + } + @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - task = in.readOptionalWriteable(TaskResult::new); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index 06237ed2f74..0e3df01db52 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -75,7 +75,7 @@ public class TransportGetTaskAction extends HandledTransportAction() { @Override public GetTaskResponse read(StreamInput in) throws IOException { - GetTaskResponse response = new GetTaskResponse(); - response.readFrom(in); - return response; + return new GetTaskResponse(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoAction.java index 6ced57bde05..9e6befcfd1d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoAction.java @@ -19,19 +19,14 @@ package org.elasticsearch.action.admin.cluster.remote; -import org.elasticsearch.action.StreamableResponseActionType; +import org.elasticsearch.action.ActionType; -public final class RemoteInfoAction extends StreamableResponseActionType { +public final class RemoteInfoAction extends ActionType { public static final String NAME = "cluster:monitor/remote/info"; public static final RemoteInfoAction INSTANCE = new RemoteInfoAction(); public RemoteInfoAction() { - super(NAME); - } - - @Override - public RemoteInfoResponse newResponse() { - return new RemoteInfoResponse(); + super(NAME, RemoteInfoResponse::new); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoRequest.java index e13c7fc9146..95f685614c4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoRequest.java @@ -27,11 +27,9 @@ import java.io.IOException; public final class RemoteInfoRequest extends ActionRequest { - public RemoteInfoRequest() { + public RemoteInfoRequest() {} - } - - public RemoteInfoRequest(StreamInput in) throws IOException { + RemoteInfoRequest(StreamInput in) throws IOException { super(in); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java index e561e1701ec..6c76ead83dc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteInfoResponse.java @@ -36,7 +36,9 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte private List infos; - RemoteInfoResponse() { + RemoteInfoResponse(StreamInput in) throws IOException { + super(in); + infos = in.readList(RemoteConnectionInfo::new); } RemoteInfoResponse(Collection infos) { @@ -54,8 +56,7 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - infos = in.readList(RemoteConnectionInfo::new); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java index 48688d91f2e..dd3198a003a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/remote/TransportRemoteInfoAction.java @@ -37,7 +37,7 @@ public final class TransportRemoteInfoAction extends HandledTransportAction>> mappings = emptyMap(); + private final Map>> mappings; GetFieldMappingsResponse(Map>> mappings) { this.mappings = mappings; } - - GetFieldMappingsResponse() { - } - GetFieldMappingsResponse(StreamInput in) throws IOException { super(in); int size = in.readVInt(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java index 399b96b8413..5d56601017f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsAction.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; public class TransportGetFieldMappingsAction extends HandledTransportAction { @@ -46,7 +47,7 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction indexResponses = new AtomicReferenceArray<>(concreteIndices.length); if (concreteIndices.length == 0) { - listener.onResponse(new GetFieldMappingsResponse()); + listener.onResponse(new GetFieldMappingsResponse(emptyMap())); } else { boolean probablySingleFieldRequest = concreteIndices.length == 1 && request.types().length == 1 && request.fields().length == 1; for (final String index : concreteIndices) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/ShardValidateQueryRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/ShardValidateQueryRequest.java index 2ccf2f1bd3e..40768bb45d3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/ShardValidateQueryRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/validate/query/ShardValidateQueryRequest.java @@ -42,7 +42,21 @@ public class ShardValidateQueryRequest extends BroadcastShardRequest { private long nowInMillis; private AliasFilter filteringAliases; - public ShardValidateQueryRequest() { + public ShardValidateQueryRequest(StreamInput in) throws IOException { + super(in); + query = in.readNamedWriteable(QueryBuilder.class); + + int typesSize = in.readVInt(); + if (typesSize > 0) { + types = new String[typesSize]; + for (int i = 0; i < typesSize; i++) { + types[i] = in.readString(); + } + } + filteringAliases = new AliasFilter(in); + explain = in.readBoolean(); + rewrite = in.readBoolean(); + nowInMillis = in.readVLong(); } public ShardValidateQueryRequest(ShardId shardId, AliasFilter filteringAliases, ValidateQueryRequest request) { @@ -81,20 +95,7 @@ public class ShardValidateQueryRequest extends BroadcastShardRequest { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - query = in.readNamedWriteable(QueryBuilder.class); - - int typesSize = in.readVInt(); - if (typesSize > 0) { - types = new String[typesSize]; - for (int i = 0; i < typesSize; i++) { - types[i] = in.readString(); - } - } - filteringAliases = new AliasFilter(in); - explain = in.readBoolean(); - rewrite = in.readBoolean(); - nowInMillis = in.readVLong(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetAction.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetAction.java index 9d989c5644d..d6946a16e4b 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetAction.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetAction.java @@ -19,19 +19,14 @@ package org.elasticsearch.action.get; -import org.elasticsearch.action.StreamableResponseActionType; +import org.elasticsearch.action.ActionType; -public class MultiGetAction extends StreamableResponseActionType { +public class MultiGetAction extends ActionType { public static final MultiGetAction INSTANCE = new MultiGetAction(); public static final String NAME = "indices:data/read/mget"; private MultiGetAction() { - super(NAME); - } - - @Override - public MultiGetResponse newResponse() { - return new MultiGetResponse(); + super(NAME, MultiGetResponse::new); } } diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetItemResponse.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetItemResponse.java index e104359a65e..d2ae4be9f98 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetItemResponse.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetItemResponse.java @@ -21,27 +21,33 @@ package org.elasticsearch.action.get; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; /** * A single multi get response. */ -public class MultiGetItemResponse implements Streamable { +public class MultiGetItemResponse implements Writeable { - private GetResponse response; - private MultiGetResponse.Failure failure; - - MultiGetItemResponse() { - - } + private final GetResponse response; + private final MultiGetResponse.Failure failure; public MultiGetItemResponse(GetResponse response, MultiGetResponse.Failure failure) { this.response = response; this.failure = failure; } + MultiGetItemResponse(StreamInput in) throws IOException { + if (in.readBoolean()) { + failure = new MultiGetResponse.Failure(in); + response = null; + } else { + response = new GetResponse(in); + failure = null; + } + } + /** * The index name of the document. */ @@ -93,21 +99,6 @@ public class MultiGetItemResponse implements Streamable { return this.failure; } - public static MultiGetItemResponse readItemResponse(StreamInput in) throws IOException { - MultiGetItemResponse response = new MultiGetItemResponse(); - response.readFrom(in); - return response; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - if (in.readBoolean()) { - failure = MultiGetResponse.Failure.readFailure(in); - } else { - response = new GetResponse(in); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { if (failure != null) { diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java index 9bd026e9187..2ec662695cf 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetRequest.java @@ -34,7 +34,7 @@ import org.elasticsearch.common.ParsingException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -68,7 +68,7 @@ public class MultiGetRequest extends ActionRequest /** * A single get item. */ - public static class Item implements Streamable, IndicesRequest, ToXContentObject { + public static class Item implements Writeable, IndicesRequest, ToXContentObject { private String index; private String type; @@ -83,6 +83,21 @@ public class MultiGetRequest extends ActionRequest } + public Item(StreamInput in) throws IOException { + index = in.readString(); + type = in.readOptionalString(); + id = in.readString(); + routing = in.readOptionalString(); + if (in.getVersion().before(Version.V_7_0_0)) { + in.readOptionalString(); // _parent + } + storedFields = in.readOptionalStringArray(); + version = in.readLong(); + versionType = VersionType.fromValue(in.readByte()); + + fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new); + } + /** * Constructs a single get item. * @@ -182,28 +197,6 @@ public class MultiGetRequest extends ActionRequest return this; } - public static Item readItem(StreamInput in) throws IOException { - Item item = new Item(); - item.readFrom(in); - return item; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - index = in.readString(); - type = in.readOptionalString(); - id = in.readString(); - routing = in.readOptionalString(); - if (in.getVersion().before(Version.V_7_0_0)) { - in.readOptionalString(); // _parent - } - storedFields = in.readOptionalStringArray(); - version = in.readLong(); - versionType = VersionType.fromValue(in.readByte()); - - fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new); - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(index); @@ -279,6 +272,21 @@ public class MultiGetRequest extends ActionRequest boolean refresh; List items = new ArrayList<>(); + public MultiGetRequest() {} + + public MultiGetRequest(StreamInput in) throws IOException { + super(in); + preference = in.readOptionalString(); + refresh = in.readBoolean(); + realtime = in.readBoolean(); + + int size = in.readVInt(); + items = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + items.add(new Item(in)); + } + } + public List getItems() { return this.items; } @@ -539,16 +547,7 @@ public class MultiGetRequest extends ActionRequest @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - preference = in.readOptionalString(); - refresh = in.readBoolean(); - realtime = in.readBoolean(); - - int size = in.readVInt(); - items = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - items.add(Item.readItem(in)); - } + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/get/MultiGetResponse.java b/server/src/main/java/org/elasticsearch/action/get/MultiGetResponse.java index ca0f7aac32c..ba818a6cefc 100644 --- a/server/src/main/java/org/elasticsearch/action/get/MultiGetResponse.java +++ b/server/src/main/java/org/elasticsearch/action/get/MultiGetResponse.java @@ -24,7 +24,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -48,15 +48,12 @@ public class MultiGetResponse extends ActionResponse implements Iterable { +public class MainAction extends ActionType { public static final String NAME = "cluster:monitor/main"; public static final MainAction INSTANCE = new MainAction(); public MainAction() { - super(NAME); + super(NAME, MainResponse::new); } - @Override - public MainResponse newResponse() { - return new MainResponse(); - } } diff --git a/server/src/main/java/org/elasticsearch/action/main/MainRequest.java b/server/src/main/java/org/elasticsearch/action/main/MainRequest.java index 1736e56a8dc..976b845d34b 100644 --- a/server/src/main/java/org/elasticsearch/action/main/MainRequest.java +++ b/server/src/main/java/org/elasticsearch/action/main/MainRequest.java @@ -21,12 +21,25 @@ package org.elasticsearch.action.main; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; + +import java.io.IOException; public class MainRequest extends ActionRequest { + public MainRequest() {} + + MainRequest(StreamInput in) throws IOException { + super(in); + } + @Override public ActionRequestValidationException validate() { return null; } + @Override + public void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } } diff --git a/server/src/main/java/org/elasticsearch/action/main/MainResponse.java b/server/src/main/java/org/elasticsearch/action/main/MainResponse.java index 0e8cc43ffac..6982f824023 100644 --- a/server/src/main/java/org/elasticsearch/action/main/MainResponse.java +++ b/server/src/main/java/org/elasticsearch/action/main/MainResponse.java @@ -42,7 +42,18 @@ public class MainResponse extends ActionResponse implements ToXContentObject { private String clusterUuid; private Build build; - MainResponse() { + MainResponse() {} + + MainResponse(StreamInput in) throws IOException { + super(in); + nodeName = in.readString(); + version = Version.readVersion(in); + clusterName = new ClusterName(in); + clusterUuid = in.readString(); + build = Build.readBuild(in); + if (in.getVersion().before(Version.V_7_0_0)) { + in.readBoolean(); + } } public MainResponse(String nodeName, Version version, ClusterName clusterName, String clusterUuid, Build build) { @@ -88,15 +99,7 @@ public class MainResponse extends ActionResponse implements ToXContentObject { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - nodeName = in.readString(); - version = Version.readVersion(in); - clusterName = new ClusterName(in); - clusterUuid = in.readString(); - build = Build.readBuild(in); - if (in.getVersion().before(Version.V_7_0_0)) { - in.readBoolean(); - } + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java b/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java index 442344d33fc..f6290f58a0b 100644 --- a/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java +++ b/server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java @@ -40,7 +40,7 @@ public class TransportMainAction extends HandledTransportAction> request) { @@ -64,9 +69,7 @@ public abstract class BroadcastShardRequest extends TransportRequest implements @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - shardId = new ShardId(in); - originalIndices = OriginalIndices.readOriginalIndices(in); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 14423ad6d93..0e774d120c4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -49,7 +49,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; -import java.util.function.Supplier; public abstract class TransportBroadcastAction< Request extends BroadcastRequest, @@ -68,7 +67,7 @@ public abstract class TransportBroadcastAction< protected TransportBroadcastAction(String actionName, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader request, - Supplier shardRequest, String shardExecutor) { + Writeable.Reader shardRequest, String shardExecutor) { super(actionName, transportService, actionFilters, request); this.clusterService = clusterService; this.transportService = transportService; @@ -76,7 +75,7 @@ public abstract class TransportBroadcastAction< this.transportShardAction = actionName + "[s]"; this.shardExecutor = shardExecutor; - transportService.registerRequestHandler(transportShardAction, shardRequest, ThreadPool.Names.SAME, new ShardTransportHandler()); + transportService.registerRequestHandler(transportShardAction, ThreadPool.Names.SAME, shardRequest, new ShardTransportHandler()); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java index d1a93bde83f..a378bb6580d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/InstanceShardOperationRequest.java @@ -48,6 +48,19 @@ public abstract class InstanceShardOperationRequest, @@ -66,15 +66,15 @@ public abstract class TransportInstanceSingleOperationAction< protected TransportInstanceSingleOperationAction(String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Supplier request) { - super(actionName, transportService, request, actionFilters); + Writeable.Reader request) { + super(actionName, transportService, actionFilters, request); this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.executor = executor(); this.shardActionName = actionName + "[s]"; - transportService.registerRequestHandler(shardActionName, request, executor, new ShardTransportHandler()); + transportService.registerRequestHandler(shardActionName, executor, request, new ShardTransportHandler()); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index bbd17ab4a72..d268ed09977 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -130,6 +130,48 @@ public class UpdateRequest extends InstanceShardOperationRequest } + public UpdateRequest(StreamInput in) throws IOException { + super(in); + waitForActiveShards = ActiveShardCount.readFrom(in); + type = in.readString(); + id = in.readString(); + routing = in.readOptionalString(); + if (in.getVersion().before(Version.V_7_0_0)) { + in.readOptionalString(); // _parent + } + if (in.readBoolean()) { + script = new Script(in); + } + retryOnConflict = in.readVInt(); + refreshPolicy = RefreshPolicy.readFrom(in); + if (in.readBoolean()) { + doc = new IndexRequest(in); + } + if (in.getVersion().before(Version.V_7_0_0)) { + String[] fields = in.readOptionalStringArray(); + if (fields != null) { + throw new IllegalArgumentException("[fields] is no longer supported"); + } + } + fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new); + if (in.readBoolean()) { + upsertRequest = new IndexRequest(in); + } + docAsUpsert = in.readBoolean(); + if (in.getVersion().before(Version.V_7_0_0)) { + long version = in.readLong(); + VersionType versionType = VersionType.readFromStream(in); + if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { + throw new UnsupportedOperationException( + "versioned update requests have been removed in 7.0. Use if_seq_no and if_primary_term"); + } + } + ifSeqNo = in.readZLong(); + ifPrimaryTerm = in.readVLong(); + detectNoop = in.readBoolean(); + scriptedUpsert = in.readBoolean(); + } + public UpdateRequest(String index, String id) { super(index); this.id = id; @@ -831,45 +873,7 @@ public class UpdateRequest extends InstanceShardOperationRequest @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - waitForActiveShards = ActiveShardCount.readFrom(in); - type = in.readString(); - id = in.readString(); - routing = in.readOptionalString(); - if (in.getVersion().before(Version.V_7_0_0)) { - in.readOptionalString(); // _parent - } - if (in.readBoolean()) { - script = new Script(in); - } - retryOnConflict = in.readVInt(); - refreshPolicy = RefreshPolicy.readFrom(in); - if (in.readBoolean()) { - doc = new IndexRequest(in); - } - if (in.getVersion().before(Version.V_7_0_0)) { - String[] fields = in.readOptionalStringArray(); - if (fields != null) { - throw new IllegalArgumentException("[fields] is no longer supported"); - } - } - fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new); - if (in.readBoolean()) { - upsertRequest = new IndexRequest(in); - } - docAsUpsert = in.readBoolean(); - if (in.getVersion().before(Version.V_7_0_0)) { - long version = in.readLong(); - VersionType versionType = VersionType.readFromStream(in); - if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) { - throw new UnsupportedOperationException( - "versioned update requests have been removed in 7.0. Use if_seq_no and if_primary_term"); - } - } - ifSeqNo = in.readZLong(); - ifPrimaryTerm = in.readVLong(); - detectNoop = in.readBoolean(); - scriptedUpsert = in.readBoolean(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/ShardValidateQueryRequestTests.java b/server/src/test/java/org/elasticsearch/action/ShardValidateQueryRequestTests.java index c884d68a276..11745f1b19f 100644 --- a/server/src/test/java/org/elasticsearch/action/ShardValidateQueryRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/ShardValidateQueryRequestTests.java @@ -61,8 +61,7 @@ public class ShardValidateQueryRequestTests extends ESTestCase { new AliasFilter(QueryBuilders.termQuery("filter_field", "value"), new String[] {"alias0", "alias1"}), validateQueryRequest); request.writeTo(output); try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) { - ShardValidateQueryRequest readRequest = new ShardValidateQueryRequest(); - readRequest.readFrom(in); + ShardValidateQueryRequest readRequest = new ShardValidateQueryRequest(in); assertEquals(request.filteringAliases(), readRequest.filteringAliases()); assertArrayEquals(request.types(), readRequest.types()); assertEquals(request.explain(), readRequest.explain()); diff --git a/server/src/test/java/org/elasticsearch/action/main/MainResponseTests.java b/server/src/test/java/org/elasticsearch/action/main/MainResponseTests.java index 1dff130fb98..9c04b1c10fc 100644 --- a/server/src/test/java/org/elasticsearch/action/main/MainResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/main/MainResponseTests.java @@ -23,17 +23,18 @@ import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractStreamableXContentTestCase; +import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.Date; -public class MainResponseTests extends AbstractStreamableXContentTestCase { +public class MainResponseTests extends AbstractSerializingTestCase { @Override protected MainResponse createTestInstance() { @@ -50,8 +51,8 @@ public class MainResponseTests extends AbstractStreamableXContentTestCase instanceReader() { + return MainResponse::new; } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java index 9160d52098e..07afdf48020 100644 --- a/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationActionTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.rest.RestStatus; @@ -61,7 +62,6 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.Supplier; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; @@ -78,12 +78,18 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { private TestTransportInstanceSingleOperationAction action; public static class Request extends InstanceShardOperationRequest { - public Request() { + public Request() {} + + public Request(StreamInput in) throws IOException { + super(in); } } public static class Response extends ActionResponse { - public Response() { + public Response() {} + + public Response(StreamInput in) throws IOException { + super(in); } @Override @@ -95,7 +101,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase { TestTransportInstanceSingleOperationAction(String actionName, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - Supplier request) { + Writeable.Reader request) { super(actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request); }