Convert more classes in 'server' to Writeable. (#44600)

* Convert GetTask*.
* Convert RemoteInfo*.
* Convert GetFieldMappings*.
* Convert ValidateQueryRequest*.
* Convert MainResponse*.
* Convert MultiGet*.
* Convert Update*.
* Add a missing call to parent constructors.

Relates to #34389.
This commit is contained in:
Julie Tibshirani 2019-07-18 18:45:10 -07:00 committed by GitHub
parent 13f46aa801
commit 336364fefe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 256 additions and 258 deletions

View File

@ -226,9 +226,7 @@ public interface DocWriteRequest<T> extends IndicesRequest {
} else if (type == 1) {
docWriteRequest = new DeleteRequest(in);
} else if (type == 2) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.readFrom(in);
docWriteRequest = updateRequest;
docWriteRequest = new UpdateRequest(in);
} else {
throw new IllegalStateException("invalid request type [" + type+ " ]");
}

View File

@ -19,23 +19,18 @@
package org.elasticsearch.action.admin.cluster.node.tasks.get;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
/**
* ActionType for retrieving a list of currently running tasks
*/
public class GetTaskAction extends StreamableResponseActionType<GetTaskResponse> {
public class GetTaskAction extends ActionType<GetTaskResponse> {
public static final String TASKS_ORIGIN = "tasks";
public static final GetTaskAction INSTANCE = new GetTaskAction();
public static final String NAME = "cluster:monitor/task/get";
private GetTaskAction() {
super(NAME);
}
@Override
public GetTaskResponse newResponse() {
return new GetTaskResponse();
super(NAME, GetTaskResponse::new);
}
}

View File

@ -38,6 +38,15 @@ public class GetTaskRequest extends ActionRequest {
private boolean waitForCompletion = false;
private TimeValue timeout = null;
public GetTaskRequest() {}
public GetTaskRequest(StreamInput in) throws IOException {
super(in);
taskId = TaskId.readFromStream(in);
timeout = in.readOptionalTimeValue();
waitForCompletion = in.readBoolean();
}
/**
* Get the TaskId to look up.
*/
@ -103,10 +112,7 @@ public class GetTaskRequest extends ActionRequest {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
taskId = TaskId.readFromStream(in);
timeout = in.readOptionalTimeValue();
waitForCompletion = in.readBoolean();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -35,19 +35,20 @@ import static java.util.Objects.requireNonNull;
* Returns the list of tasks currently running on the nodes
*/
public class GetTaskResponse extends ActionResponse implements ToXContentObject {
private TaskResult task;
public GetTaskResponse() {
}
private final TaskResult task;
public GetTaskResponse(TaskResult task) {
this.task = requireNonNull(task, "task is required");
}
public GetTaskResponse(StreamInput in) throws IOException {
super(in);
task = in.readOptionalWriteable(TaskResult::new);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
task = in.readOptionalWriteable(TaskResult::new);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -75,7 +75,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
@Inject
public TransportGetTaskAction(ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
ClusterService clusterService, Client client, NamedXContentRegistry xContentRegistry) {
super(GetTaskAction.NAME, transportService, GetTaskRequest::new, actionFilters);
super(GetTaskAction.NAME, transportService, actionFilters, GetTaskRequest::new);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
@ -121,9 +121,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
new TransportResponseHandler<GetTaskResponse>() {
@Override
public GetTaskResponse read(StreamInput in) throws IOException {
GetTaskResponse response = new GetTaskResponse();
response.readFrom(in);
return response;
return new GetTaskResponse(in);
}
@Override

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.admin.cluster.remote;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public final class RemoteInfoAction extends StreamableResponseActionType<RemoteInfoResponse> {
public final class RemoteInfoAction extends ActionType<RemoteInfoResponse> {
public static final String NAME = "cluster:monitor/remote/info";
public static final RemoteInfoAction INSTANCE = new RemoteInfoAction();
public RemoteInfoAction() {
super(NAME);
}
@Override
public RemoteInfoResponse newResponse() {
return new RemoteInfoResponse();
super(NAME, RemoteInfoResponse::new);
}
}

View File

@ -27,11 +27,9 @@ import java.io.IOException;
public final class RemoteInfoRequest extends ActionRequest {
public RemoteInfoRequest() {
public RemoteInfoRequest() {}
}
public RemoteInfoRequest(StreamInput in) throws IOException {
RemoteInfoRequest(StreamInput in) throws IOException {
super(in);
}

View File

@ -36,7 +36,9 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte
private List<RemoteConnectionInfo> infos;
RemoteInfoResponse() {
RemoteInfoResponse(StreamInput in) throws IOException {
super(in);
infos = in.readList(RemoteConnectionInfo::new);
}
RemoteInfoResponse(Collection<RemoteConnectionInfo> infos) {
@ -54,8 +56,7 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
infos = in.readList(RemoteConnectionInfo::new);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -37,7 +37,7 @@ public final class TransportRemoteInfoAction extends HandledTransportAction<Remo
@Inject
public TransportRemoteInfoAction(TransportService transportService, ActionFilters actionFilters,
SearchTransportService searchTransportService) {
super(RemoteInfoAction.NAME, transportService, RemoteInfoRequest::new, actionFilters);
super(RemoteInfoAction.NAME, transportService, actionFilters, RemoteInfoRequest::new);
this.remoteClusterService = searchTransportService.getRemoteClusterService();
}

View File

@ -52,6 +52,16 @@ public class GetFieldMappingsRequest extends ActionRequest implements IndicesReq
}
public GetFieldMappingsRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
types = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
local = in.readBoolean();
fields = in.readStringArray();
includeDefaults = in.readBoolean();
}
/**
* Indicate whether the receiving node should operate based on local index information or forward requests,
* where needed, to other nodes. If running locally, request will not raise errors if running locally &amp; missing indices.
@ -133,12 +143,6 @@ public class GetFieldMappingsRequest extends ActionRequest implements IndicesReq
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
types = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
local = in.readBoolean();
fields = in.readStringArray();
includeDefaults = in.readBoolean();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

View File

@ -43,7 +43,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -86,16 +85,12 @@ public class GetFieldMappingsResponse extends ActionResponse implements ToXConte
}, MAPPINGS, ObjectParser.ValueType.OBJECT);
}
private Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings = emptyMap();
private final Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings;
GetFieldMappingsResponse(Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings) {
this.mappings = mappings;
}
GetFieldMappingsResponse() {
}
GetFieldMappingsResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();

View File

@ -34,6 +34,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
public class TransportGetFieldMappingsAction extends HandledTransportAction<GetFieldMappingsRequest, GetFieldMappingsResponse> {
@ -46,7 +47,7 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
public TransportGetFieldMappingsAction(TransportService transportService, ClusterService clusterService,
TransportGetFieldMappingsIndexAction shardAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(GetFieldMappingsAction.NAME, transportService, GetFieldMappingsRequest::new, actionFilters);
super(GetFieldMappingsAction.NAME, transportService, actionFilters, GetFieldMappingsRequest::new);
this.clusterService = clusterService;
this.shardAction = shardAction;
this.indexNameExpressionResolver = indexNameExpressionResolver;
@ -61,7 +62,7 @@ public class TransportGetFieldMappingsAction extends HandledTransportAction<GetF
final AtomicReferenceArray<Object> indexResponses = new AtomicReferenceArray<>(concreteIndices.length);
if (concreteIndices.length == 0) {
listener.onResponse(new GetFieldMappingsResponse());
listener.onResponse(new GetFieldMappingsResponse(emptyMap()));
} else {
boolean probablySingleFieldRequest = concreteIndices.length == 1 && request.types().length == 1 && request.fields().length == 1;
for (final String index : concreteIndices) {

View File

@ -42,7 +42,21 @@ public class ShardValidateQueryRequest extends BroadcastShardRequest {
private long nowInMillis;
private AliasFilter filteringAliases;
public ShardValidateQueryRequest() {
public ShardValidateQueryRequest(StreamInput in) throws IOException {
super(in);
query = in.readNamedWriteable(QueryBuilder.class);
int typesSize = in.readVInt();
if (typesSize > 0) {
types = new String[typesSize];
for (int i = 0; i < typesSize; i++) {
types[i] = in.readString();
}
}
filteringAliases = new AliasFilter(in);
explain = in.readBoolean();
rewrite = in.readBoolean();
nowInMillis = in.readVLong();
}
public ShardValidateQueryRequest(ShardId shardId, AliasFilter filteringAliases, ValidateQueryRequest request) {
@ -81,20 +95,7 @@ public class ShardValidateQueryRequest extends BroadcastShardRequest {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
query = in.readNamedWriteable(QueryBuilder.class);
int typesSize = in.readVInt();
if (typesSize > 0) {
types = new String[typesSize];
for (int i = 0; i < typesSize; i++) {
types[i] = in.readString();
}
}
filteringAliases = new AliasFilter(in);
explain = in.readBoolean();
rewrite = in.readBoolean();
nowInMillis = in.readVLong();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -19,19 +19,14 @@
package org.elasticsearch.action.get;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class MultiGetAction extends StreamableResponseActionType<MultiGetResponse> {
public class MultiGetAction extends ActionType<MultiGetResponse> {
public static final MultiGetAction INSTANCE = new MultiGetAction();
public static final String NAME = "indices:data/read/mget";
private MultiGetAction() {
super(NAME);
}
@Override
public MultiGetResponse newResponse() {
return new MultiGetResponse();
super(NAME, MultiGetResponse::new);
}
}

View File

@ -21,27 +21,33 @@ package org.elasticsearch.action.get;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
* A single multi get response.
*/
public class MultiGetItemResponse implements Streamable {
public class MultiGetItemResponse implements Writeable {
private GetResponse response;
private MultiGetResponse.Failure failure;
MultiGetItemResponse() {
}
private final GetResponse response;
private final MultiGetResponse.Failure failure;
public MultiGetItemResponse(GetResponse response, MultiGetResponse.Failure failure) {
this.response = response;
this.failure = failure;
}
MultiGetItemResponse(StreamInput in) throws IOException {
if (in.readBoolean()) {
failure = new MultiGetResponse.Failure(in);
response = null;
} else {
response = new GetResponse(in);
failure = null;
}
}
/**
* The index name of the document.
*/
@ -93,21 +99,6 @@ public class MultiGetItemResponse implements Streamable {
return this.failure;
}
public static MultiGetItemResponse readItemResponse(StreamInput in) throws IOException {
MultiGetItemResponse response = new MultiGetItemResponse();
response.readFrom(in);
return response;
}
@Override
public void readFrom(StreamInput in) throws IOException {
if (in.readBoolean()) {
failure = MultiGetResponse.Failure.readFailure(in);
} else {
response = new GetResponse(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (failure != null) {

View File

@ -34,7 +34,7 @@ import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -68,7 +68,7 @@ public class MultiGetRequest extends ActionRequest
/**
* A single get item.
*/
public static class Item implements Streamable, IndicesRequest, ToXContentObject {
public static class Item implements Writeable, IndicesRequest, ToXContentObject {
private String index;
private String type;
@ -83,6 +83,21 @@ public class MultiGetRequest extends ActionRequest
}
public Item(StreamInput in) throws IOException {
index = in.readString();
type = in.readOptionalString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
storedFields = in.readOptionalStringArray();
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
}
/**
* Constructs a single get item.
*
@ -182,28 +197,6 @@ public class MultiGetRequest extends ActionRequest
return this;
}
public static Item readItem(StreamInput in) throws IOException {
Item item = new Item();
item.readFrom(in);
return item;
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readString();
type = in.readOptionalString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
storedFields = in.readOptionalStringArray();
version = in.readLong();
versionType = VersionType.fromValue(in.readByte());
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
@ -279,6 +272,21 @@ public class MultiGetRequest extends ActionRequest
boolean refresh;
List<Item> items = new ArrayList<>();
public MultiGetRequest() {}
public MultiGetRequest(StreamInput in) throws IOException {
super(in);
preference = in.readOptionalString();
refresh = in.readBoolean();
realtime = in.readBoolean();
int size = in.readVInt();
items = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
items.add(new Item(in));
}
}
public List<Item> getItems() {
return this.items;
}
@ -539,16 +547,7 @@ public class MultiGetRequest extends ActionRequest
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
preference = in.readOptionalString();
refresh = in.readBoolean();
realtime = in.readBoolean();
int size = in.readVInt();
items = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
items.add(Item.readItem(in));
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -24,7 +24,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -48,15 +48,12 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
/**
* Represents a failure.
*/
public static class Failure implements Streamable, ToXContentObject {
public static class Failure implements Writeable, ToXContentObject {
private String index;
private String type;
private String id;
private Exception exception;
Failure() {
}
private final String index;
private final String type;
private final String id;
private final Exception exception;
public Failure(String index, String type, String id, Exception exception) {
this.index = index;
@ -65,6 +62,13 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
this.exception = exception;
}
Failure(StreamInput in) throws IOException {
index = in.readString();
type = in.readOptionalString();
id = in.readString();
exception = in.readException();
}
/**
* The index name of the action.
*/
@ -93,20 +97,6 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
return exception != null ? exception.getMessage() : null;
}
public static Failure readFailure(StreamInput in) throws IOException {
Failure failure = new Failure();
failure.readFrom(in);
return failure;
}
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readString();
type = in.readOptionalString();
id = in.readString();
exception = in.readException();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
@ -131,15 +121,20 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
}
}
private MultiGetItemResponse[] responses;
MultiGetResponse() {
}
private final MultiGetItemResponse[] responses;
public MultiGetResponse(MultiGetItemResponse[] responses) {
this.responses = responses;
}
MultiGetResponse(StreamInput in) throws IOException {
super(in);
responses = new MultiGetItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = new MultiGetItemResponse(in);
}
}
public MultiGetItemResponse[] getResponses() {
return this.responses;
}
@ -245,11 +240,7 @@ public class MultiGetResponse extends ActionResponse implements Iterable<MultiGe
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
responses = new MultiGetItemResponse[in.readVInt()];
for (int i = 0; i < responses.length; i++) {
responses[i] = MultiGetItemResponse.readItemResponse(in);
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -47,7 +47,7 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
items.add(MultiGetRequest.Item.readItem(in));
items.add(new MultiGetRequest.Item(in));
}
preference = in.readOptionalString();

View File

@ -54,7 +54,7 @@ public class MultiGetShardResponse extends ActionResponse {
responses.add(null);
}
if (in.readBoolean()) {
failures.add(MultiGetResponse.Failure.readFailure(in));
failures.add(new MultiGetResponse.Failure(in));
} else {
failures.add(null);
}

View File

@ -47,7 +47,7 @@ public class TransportMultiGetAction extends HandledTransportAction<MultiGetRequ
public TransportMultiGetAction(TransportService transportService, ClusterService clusterService,
TransportShardMultiGetAction shardAction, ActionFilters actionFilters,
IndexNameExpressionResolver resolver) {
super(MultiGetAction.NAME, transportService, MultiGetRequest::new, actionFilters);
super(MultiGetAction.NAME, transportService, actionFilters, MultiGetRequest::new);
this.clusterService = clusterService;
this.shardAction = shardAction;
this.indexNameExpressionResolver = resolver;

View File

@ -19,19 +19,15 @@
package org.elasticsearch.action.main;
import org.elasticsearch.action.StreamableResponseActionType;
import org.elasticsearch.action.ActionType;
public class MainAction extends StreamableResponseActionType<MainResponse> {
public class MainAction extends ActionType<MainResponse> {
public static final String NAME = "cluster:monitor/main";
public static final MainAction INSTANCE = new MainAction();
public MainAction() {
super(NAME);
super(NAME, MainResponse::new);
}
@Override
public MainResponse newResponse() {
return new MainResponse();
}
}

View File

@ -21,12 +21,25 @@ package org.elasticsearch.action.main;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public class MainRequest extends ActionRequest {
public MainRequest() {}
MainRequest(StreamInput in) throws IOException {
super(in);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

View File

@ -42,7 +42,18 @@ public class MainResponse extends ActionResponse implements ToXContentObject {
private String clusterUuid;
private Build build;
MainResponse() {
MainResponse() {}
MainResponse(StreamInput in) throws IOException {
super(in);
nodeName = in.readString();
version = Version.readVersion(in);
clusterName = new ClusterName(in);
clusterUuid = in.readString();
build = Build.readBuild(in);
if (in.getVersion().before(Version.V_7_0_0)) {
in.readBoolean();
}
}
public MainResponse(String nodeName, Version version, ClusterName clusterName, String clusterUuid, Build build) {
@ -88,15 +99,7 @@ public class MainResponse extends ActionResponse implements ToXContentObject {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeName = in.readString();
version = Version.readVersion(in);
clusterName = new ClusterName(in);
clusterUuid = in.readString();
build = Build.readBuild(in);
if (in.getVersion().before(Version.V_7_0_0)) {
in.readBoolean();
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -40,7 +40,7 @@ public class TransportMainAction extends HandledTransportAction<MainRequest, Mai
@Inject
public TransportMainAction(Settings settings, TransportService transportService,
ActionFilters actionFilters, ClusterService clusterService) {
super(MainAction.NAME, transportService, MainRequest::new, actionFilters);
super(MainAction.NAME, transportService, actionFilters, MainRequest::new);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
this.clusterService = clusterService;
}

View File

@ -35,7 +35,12 @@ public abstract class BroadcastShardRequest extends TransportRequest implements
protected OriginalIndices originalIndices;
public BroadcastShardRequest() {
protected BroadcastShardRequest() {}
public BroadcastShardRequest(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
protected BroadcastShardRequest(ShardId shardId, BroadcastRequest<? extends BroadcastRequest<?>> request) {
@ -64,9 +69,7 @@ public abstract class BroadcastShardRequest extends TransportRequest implements
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -49,7 +49,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;
public abstract class TransportBroadcastAction<
Request extends BroadcastRequest<Request>,
@ -68,7 +67,7 @@ public abstract class TransportBroadcastAction<
protected TransportBroadcastAction(String actionName, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
Supplier<ShardRequest> shardRequest, String shardExecutor) {
Writeable.Reader<ShardRequest> shardRequest, String shardExecutor) {
super(actionName, transportService, actionFilters, request);
this.clusterService = clusterService;
this.transportService = transportService;
@ -76,7 +75,7 @@ public abstract class TransportBroadcastAction<
this.transportShardAction = actionName + "[s]";
this.shardExecutor = shardExecutor;
transportService.registerRequestHandler(transportShardAction, shardRequest, ThreadPool.Names.SAME, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, ThreadPool.Names.SAME, shardRequest, new ShardTransportHandler());
}
@Override

View File

@ -48,6 +48,19 @@ public abstract class InstanceShardOperationRequest<Request extends InstanceShar
protected InstanceShardOperationRequest() {
}
protected InstanceShardOperationRequest(StreamInput in) throws IOException {
super(in);
index = in.readString();
if (in.readBoolean()) {
shardId = new ShardId(in);
} else {
shardId = null;
}
timeout = in.readTimeValue();
concreteIndex = in.readOptionalString();
}
public InstanceShardOperationRequest(String index) {
this.index = index;
}
@ -111,15 +124,7 @@ public abstract class InstanceShardOperationRequest<Request extends InstanceShar
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
if (in.readBoolean()) {
shardId = new ShardId(in);
} else {
shardId = null;
}
timeout = in.readTimeValue();
concreteIndex = in.readOptionalString();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
@ -48,7 +49,6 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.function.Supplier;
public abstract class TransportInstanceSingleOperationAction<
Request extends InstanceShardOperationRequest<Request>,
@ -66,15 +66,15 @@ public abstract class TransportInstanceSingleOperationAction<
protected TransportInstanceSingleOperationAction(String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request) {
super(actionName, transportService, request, actionFilters);
Writeable.Reader<Request> request) {
super(actionName, transportService, actionFilters, request);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.transportService = transportService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.executor = executor();
this.shardActionName = actionName + "[s]";
transportService.registerRequestHandler(shardActionName, request, executor, new ShardTransportHandler());
transportService.registerRequestHandler(shardActionName, executor, request, new ShardTransportHandler());
}
@Override

View File

@ -130,6 +130,48 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
}
public UpdateRequest(StreamInput in) throws IOException {
super(in);
waitForActiveShards = ActiveShardCount.readFrom(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
if (in.readBoolean()) {
script = new Script(in);
}
retryOnConflict = in.readVInt();
refreshPolicy = RefreshPolicy.readFrom(in);
if (in.readBoolean()) {
doc = new IndexRequest(in);
}
if (in.getVersion().before(Version.V_7_0_0)) {
String[] fields = in.readOptionalStringArray();
if (fields != null) {
throw new IllegalArgumentException("[fields] is no longer supported");
}
}
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
if (in.readBoolean()) {
upsertRequest = new IndexRequest(in);
}
docAsUpsert = in.readBoolean();
if (in.getVersion().before(Version.V_7_0_0)) {
long version = in.readLong();
VersionType versionType = VersionType.readFromStream(in);
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new UnsupportedOperationException(
"versioned update requests have been removed in 7.0. Use if_seq_no and if_primary_term");
}
}
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
detectNoop = in.readBoolean();
scriptedUpsert = in.readBoolean();
}
public UpdateRequest(String index, String id) {
super(index);
this.id = id;
@ -831,45 +873,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
waitForActiveShards = ActiveShardCount.readFrom(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
if (in.readBoolean()) {
script = new Script(in);
}
retryOnConflict = in.readVInt();
refreshPolicy = RefreshPolicy.readFrom(in);
if (in.readBoolean()) {
doc = new IndexRequest(in);
}
if (in.getVersion().before(Version.V_7_0_0)) {
String[] fields = in.readOptionalStringArray();
if (fields != null) {
throw new IllegalArgumentException("[fields] is no longer supported");
}
}
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
if (in.readBoolean()) {
upsertRequest = new IndexRequest(in);
}
docAsUpsert = in.readBoolean();
if (in.getVersion().before(Version.V_7_0_0)) {
long version = in.readLong();
VersionType versionType = VersionType.readFromStream(in);
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new UnsupportedOperationException(
"versioned update requests have been removed in 7.0. Use if_seq_no and if_primary_term");
}
}
ifSeqNo = in.readZLong();
ifPrimaryTerm = in.readVLong();
detectNoop = in.readBoolean();
scriptedUpsert = in.readBoolean();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -61,8 +61,7 @@ public class ShardValidateQueryRequestTests extends ESTestCase {
new AliasFilter(QueryBuilders.termQuery("filter_field", "value"), new String[] {"alias0", "alias1"}), validateQueryRequest);
request.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
ShardValidateQueryRequest readRequest = new ShardValidateQueryRequest();
readRequest.readFrom(in);
ShardValidateQueryRequest readRequest = new ShardValidateQueryRequest(in);
assertEquals(request.filteringAliases(), readRequest.filteringAliases());
assertArrayEquals(request.types(), readRequest.types());
assertEquals(request.explain(), readRequest.explain());

View File

@ -23,17 +23,18 @@ import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import java.util.Date;
public class MainResponseTests extends AbstractStreamableXContentTestCase<MainResponse> {
public class MainResponseTests extends AbstractSerializingTestCase<MainResponse> {
@Override
protected MainResponse createTestInstance() {
@ -50,8 +51,8 @@ public class MainResponseTests extends AbstractStreamableXContentTestCase<MainRe
}
@Override
protected MainResponse createBlankInstance() {
return new MainResponse();
protected Writeable.Reader<MainResponse> instanceReader() {
return MainResponse::new;
}
@Override

View File

@ -38,6 +38,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
@ -61,7 +62,6 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
@ -78,12 +78,18 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
private TestTransportInstanceSingleOperationAction action;
public static class Request extends InstanceShardOperationRequest<Request> {
public Request() {
public Request() {}
public Request(StreamInput in) throws IOException {
super(in);
}
}
public static class Response extends ActionResponse {
public Response() {
public Response() {}
public Response(StreamInput in) throws IOException {
super(in);
}
@Override
@ -95,7 +101,7 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
TestTransportInstanceSingleOperationAction(String actionName, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request) {
Writeable.Reader<Request> request) {
super(actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService,
actionFilters, indexNameExpressionResolver, request);
}