Refactor TransportSingleShardAction to serialize Writeable responses (#41985) (#42040)

Previously, TransportSingleShardAction required constructing a new
empty response object. This response object's Streamable readFrom
was used. As part of the migration to Writeable, the interface here
was updated to leverage Writeable.Reader.

relates to #34389.
This commit is contained in:
Tal Levy 2019-05-09 22:08:31 -07:00 committed by GitHub
parent 99a50ac3b7
commit 5640197632
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 280 additions and 183 deletions

View File

@ -103,7 +103,7 @@ public class PainlessExecuteAction extends Action<PainlessExecuteAction.Response
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
public static class Request extends SingleShardRequest<Request> implements ToXContentObject {
@ -388,20 +388,22 @@ public class PainlessExecuteAction extends Action<PainlessExecuteAction.Response
private Object result;
Response() {}
Response(Object result) {
this.result = result;
}
Response(StreamInput in) throws IOException {
super(in);
result = in.readGenericValue();
}
public Object getResult() {
return result;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
result = in.readGenericValue();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
@ -476,8 +478,8 @@ public class PainlessExecuteAction extends Action<PainlessExecuteAction.Response
}
@Override
protected Response newResponse() {
return new Response();
protected Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
@Override

View File

@ -18,17 +18,57 @@
*/
package org.elasticsearch.painless.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
public class PainlessExecuteResponseTests extends AbstractStreamableTestCase<PainlessExecuteAction.Response> {
import java.io.IOException;
public class PainlessExecuteResponseTests extends AbstractSerializingTestCase<PainlessExecuteAction.Response> {
@Override
protected PainlessExecuteAction.Response createBlankInstance() {
return new PainlessExecuteAction.Response();
protected Writeable.Reader<PainlessExecuteAction.Response> instanceReader() {
return PainlessExecuteAction.Response::new;
}
@Override
protected PainlessExecuteAction.Response createTestInstance() {
return new PainlessExecuteAction.Response(randomAlphaOfLength(10));
Object result;
switch (randomIntBetween(0, 2)) {
case 0:
result = randomAlphaOfLength(10);
break;
case 1:
result = randomBoolean();
break;
case 2:
result = randomDoubleBetween(-10, 10, true);
break;
default:
throw new IllegalStateException("invalid branch");
}
return new PainlessExecuteAction.Response(result);
}
@Override
protected PainlessExecuteAction.Response doParseInstance(XContentParser parser) throws IOException {
parser.nextToken(); // START-OBJECT
parser.nextToken(); // FIELD-NAME
XContentParser.Token token = parser.nextToken(); // result value
Object result;
switch (token) {
case VALUE_STRING:
result = parser.text();
break;
case VALUE_BOOLEAN:
result = parser.booleanValue();
break;
case VALUE_NUMBER:
result = parser.doubleValue();
break;
default:
throw new IOException("invalid response");
}
return new PainlessExecuteAction.Response(result);
}
}

View File

@ -71,7 +71,7 @@ public class AnalyzeResponse extends ActionResponse implements Iterable<AnalyzeR
return Objects.hash(term, startOffset, endOffset, position, positionLength, attributes, type);
}
public AnalyzeToken(String term, int position, int startOffset, int endOffset, int positionLength,
AnalyzeToken(String term, int position, int startOffset, int endOffset, int positionLength,
String type, Map<String, Object> attributes) {
this.term = term;
this.position = position;
@ -82,7 +82,7 @@ public class AnalyzeResponse extends ActionResponse implements Iterable<AnalyzeR
this.attributes = attributes;
}
public AnalyzeToken(StreamInput in) throws IOException {
AnalyzeToken(StreamInput in) throws IOException {
term = in.readString();
startOffset = in.readInt();
endOffset = in.readInt();
@ -203,7 +203,6 @@ public class AnalyzeResponse extends ActionResponse implements Iterable<AnalyzeR
}
private final DetailAnalyzeResponse detail;
private final List<AnalyzeToken> tokens;
public AnalyzeResponse(List<AnalyzeToken> tokens, DetailAnalyzeResponse detail) {

View File

@ -96,11 +96,6 @@ public class TransportAnalyzeAction extends TransportSingleShardAction<AnalyzeRe
this.environment = environment;
}
@Override
protected AnalyzeResponse newResponse() {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
protected Writeable.Reader<AnalyzeResponse> getResponseReader() {
return AnalyzeResponse::new;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.mapping.get;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
public class GetFieldMappingsAction extends Action<GetFieldMappingsResponse> {
@ -32,6 +33,11 @@ public class GetFieldMappingsAction extends Action<GetFieldMappingsResponse> {
@Override
public GetFieldMappingsResponse newResponse() {
return new GetFieldMappingsResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<GetFieldMappingsResponse> getResponseReader() {
return GetFieldMappingsResponse::new;
}
}

View File

@ -92,9 +92,33 @@ public class GetFieldMappingsResponse extends ActionResponse implements ToXConte
this.mappings = mappings;
}
GetFieldMappingsResponse() {
}
GetFieldMappingsResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
Map<String, Map<String, Map<String, FieldMappingMetaData>>> indexMapBuilder = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String index = in.readString();
int typesSize = in.readVInt();
Map<String, Map<String, FieldMappingMetaData>> typeMapBuilder = new HashMap<>(typesSize);
for (int j = 0; j < typesSize; j++) {
String type = in.readString();
int fieldSize = in.readVInt();
Map<String, FieldMappingMetaData> fieldMapBuilder = new HashMap<>(fieldSize);
for (int k = 0; k < fieldSize; k++) {
fieldMapBuilder.put(in.readString(), new FieldMappingMetaData(in.readString(), in.readBytesReference()));
}
typeMapBuilder.put(type, unmodifiableMap(fieldMapBuilder));
}
indexMapBuilder.put(index, unmodifiableMap(typeMapBuilder));
}
mappings = unmodifiableMap(indexMapBuilder);
}
/** returns the retrieved field mapping. The return map keys are index, type, field (as specified in the request). */
public Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings() {
return mappings;
@ -269,25 +293,7 @@ public class GetFieldMappingsResponse extends ActionResponse implements ToXConte
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
Map<String, Map<String, Map<String, FieldMappingMetaData>>> indexMapBuilder = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String index = in.readString();
int typesSize = in.readVInt();
Map<String, Map<String, FieldMappingMetaData>> typeMapBuilder = new HashMap<>(typesSize);
for (int j = 0; j < typesSize; j++) {
String type = in.readString();
int fieldSize = in.readVInt();
Map<String, FieldMappingMetaData> fieldMapBuilder = new HashMap<>(fieldSize);
for (int k = 0; k < fieldSize; k++) {
fieldMapBuilder.put(in.readString(), new FieldMappingMetaData(in.readString(), in.readBytesReference()));
}
typeMapBuilder.put(type, unmodifiableMap(fieldMapBuilder));
}
indexMapBuilder.put(index, unmodifiableMap(typeMapBuilder));
}
mappings = unmodifiableMap(indexMapBuilder);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -123,8 +124,8 @@ public class TransportGetFieldMappingsIndexAction
}
@Override
protected GetFieldMappingsResponse newResponse() {
return new GetFieldMappingsResponse();
protected Writeable.Reader<GetFieldMappingsResponse> getResponseReader() {
return GetFieldMappingsResponse::new;
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.explain;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
/**
* Entry point for the explain feature.
@ -35,6 +36,11 @@ public class ExplainAction extends Action<ExplainResponse> {
@Override
public ExplainResponse newResponse() {
return new ExplainResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<ExplainResponse> getResponseReader() {
return ExplainResponse::new;
}
}

View File

@ -60,6 +60,7 @@ public class ExplainResponse extends ActionResponse implements StatusToXContentO
private Explanation explanation;
private GetResult getResult;
// TODO(talevy): remove dependency on empty constructor from ExplainResponseTests
ExplainResponse() {
}
@ -80,6 +81,20 @@ public class ExplainResponse extends ActionResponse implements StatusToXContentO
this.getResult = getResult;
}
public ExplainResponse(StreamInput in) throws IOException {
super(in);
index = in.readString();
type = in.readString();
id = in.readString();
exists = in.readBoolean();
if (in.readBoolean()) {
explanation = readExplanation(in);
}
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
}
}
public String getIndex() {
return index;
}
@ -123,17 +138,7 @@ public class ExplainResponse extends ActionResponse implements StatusToXContentO
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
type = in.readString();
id = in.readString();
exists = in.readBoolean();
if (in.readBoolean()) {
explanation = readExplanation(in);
}
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
@ -152,8 +153,8 @@ public class TransportExplainAction extends TransportSingleShardAction<ExplainRe
}
@Override
protected ExplainResponse newResponse() {
return new ExplainResponse();
protected Writeable.Reader<ExplainResponse> getResponseReader() {
return ExplainResponse::new;
}
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ObjectMapper;
@ -114,8 +115,8 @@ public class TransportFieldCapabilitiesIndexAction extends TransportSingleShardA
}
@Override
protected FieldCapabilitiesIndexResponse newResponse() {
return new FieldCapabilitiesIndexResponse();
protected Writeable.Reader<FieldCapabilitiesIndexResponse> getResponseReader() {
return FieldCapabilitiesIndexResponse::new;
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.get;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
public class GetAction extends Action<GetResponse> {
@ -32,6 +33,11 @@ public class GetAction extends Action<GetResponse> {
@Override
public GetResponse newResponse() {
return new GetResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<GetResponse> getResponseReader() {
return GetResponse::new;
}
}

View File

@ -48,7 +48,9 @@ public class GetResponse extends ActionResponse implements Iterable<DocumentFiel
GetResult getResult;
GetResponse() {
GetResponse(StreamInput in) throws IOException {
super(in);
getResult = GetResult.readGetResult(in);
}
public GetResponse(GetResult getResult) {
@ -203,8 +205,7 @@ public class GetResponse extends ActionResponse implements Iterable<DocumentFiel
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
getResult = GetResult.readGetResult(in);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -104,8 +104,7 @@ public class MultiGetItemResponse implements Streamable {
if (in.readBoolean()) {
failure = MultiGetResponse.Failure.readFailure(in);
} else {
response = new GetResponse();
response.readFrom(in);
response = new GetResponse(in);
}
}

View File

@ -30,9 +30,9 @@ import java.util.List;
public class MultiGetShardResponse extends ActionResponse {
IntArrayList locations;
List<GetResponse> responses;
List<MultiGetResponse.Failure> failures;
final IntArrayList locations;
final List<GetResponse> responses;
final List<MultiGetResponse.Failure> failures;
MultiGetShardResponse() {
locations = new IntArrayList();
@ -40,6 +40,27 @@ public class MultiGetShardResponse extends ActionResponse {
failures = new ArrayList<>();
}
MultiGetShardResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
locations = new IntArrayList(size);
responses = new ArrayList<>(size);
failures = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
if (in.readBoolean()) {
responses.add(new GetResponse(in));
} else {
responses.add(null);
}
if (in.readBoolean()) {
failures.add(MultiGetResponse.Failure.readFailure(in));
} else {
failures.add(null);
}
}
}
public void add(int location, GetResponse response) {
locations.add(location);
responses.add(response);
@ -54,26 +75,7 @@ public class MultiGetShardResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
locations = new IntArrayList(size);
responses = new ArrayList<>(size);
failures = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
if (in.readBoolean()) {
GetResponse response = new GetResponse();
response.readFrom(in);
responses.add(response);
} else {
responses.add(null);
}
if (in.readBoolean()) {
failures.add(MultiGetResponse.Failure.readFailure(in));
} else {
failures.add(null);
}
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
@ -108,8 +109,8 @@ public class TransportGetAction extends TransportSingleShardAction<GetRequest, G
}
@Override
protected GetResponse newResponse() {
return new GetResponse();
protected Writeable.Reader<GetResponse> getResponseReader() {
return GetResponse::new;
}
@Override

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
@ -57,8 +58,8 @@ public class TransportShardMultiGetAction extends TransportSingleShardAction<Mul
}
@Override
protected MultiGetShardResponse newResponse() {
return new MultiGetShardResponse();
protected Writeable.Reader<MultiGetShardResponse> getResponseReader() {
return MultiGetShardResponse::new;
}
@Override

View File

@ -120,16 +120,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
});
}
@Deprecated
protected abstract Response newResponse();
protected Writeable.Reader<Response> getResponseReader() {
return in -> {
Response response = newResponse();
response.readFrom(in);
return response;
};
}
protected abstract Writeable.Reader<Response> getResponseReader();
protected abstract boolean resolveIndex(Request request);

View File

@ -105,8 +105,7 @@ public class MultiTermVectorsItemResponse implements Streamable {
if (in.readBoolean()) {
failure = MultiTermVectorsResponse.Failure.readFailure(in);
} else {
response = new TermVectorsResponse();
response.readFrom(in);
response = new TermVectorsResponse(in);
}
}

View File

@ -30,9 +30,9 @@ import java.util.List;
public class MultiTermVectorsShardResponse extends ActionResponse {
IntArrayList locations;
List<TermVectorsResponse> responses;
List<MultiTermVectorsResponse.Failure> failures;
final IntArrayList locations;
final List<TermVectorsResponse> responses;
final List<MultiTermVectorsResponse.Failure> failures;
MultiTermVectorsShardResponse() {
locations = new IntArrayList();
@ -40,6 +40,27 @@ public class MultiTermVectorsShardResponse extends ActionResponse {
failures = new ArrayList<>();
}
MultiTermVectorsShardResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
locations = new IntArrayList(size);
responses = new ArrayList<>(size);
failures = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
if (in.readBoolean()) {
responses.add(new TermVectorsResponse(in));
} else {
responses.add(null);
}
if (in.readBoolean()) {
failures.add(MultiTermVectorsResponse.Failure.readFailure(in));
} else {
failures.add(null);
}
}
}
public void add(int location, TermVectorsResponse response) {
locations.add(location);
responses.add(response);
@ -54,26 +75,7 @@ public class MultiTermVectorsShardResponse extends ActionResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
locations = new IntArrayList(size);
responses = new ArrayList<>(size);
failures = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
if (in.readBoolean()) {
TermVectorsResponse response = new TermVectorsResponse();
response.readFrom(in);
responses.add(response);
} else {
responses.add(null);
}
if (in.readBoolean()) {
failures.add(MultiTermVectorsResponse.Failure.readFailure(in));
} else {
failures.add(null);
}
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.termvectors;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
public class TermVectorsAction extends Action<TermVectorsResponse> {
@ -32,6 +33,11 @@ public class TermVectorsAction extends Action<TermVectorsResponse> {
@Override
public TermVectorsResponse newResponse() {
return new TermVectorsResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<TermVectorsResponse> getResponseReader() {
return TermVectorsResponse::new;
}
}

View File

@ -103,6 +103,20 @@ public class TermVectorsResponse extends ActionResponse implements ToXContentObj
TermVectorsResponse() {
}
TermVectorsResponse(StreamInput in) throws IOException {
index = in.readString();
type = in.readString();
id = in.readString();
docVersion = in.readVLong();
exists = in.readBoolean();
artificial = in.readBoolean();
tookInMillis = in.readVLong();
if (in.readBoolean()) {
headerRef = in.readBytesReference();
termVectors = in.readBytesReference();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
@ -127,17 +141,7 @@ public class TermVectorsResponse extends ActionResponse implements ToXContentObj
@Override
public void readFrom(StreamInput in) throws IOException {
index = in.readString();
type = in.readString();
id = in.readString();
docVersion = in.readVLong();
exists = in.readBoolean();
artificial = in.readBoolean();
tookInMillis = in.readVLong();
if (in.readBoolean()) {
headerRef = in.readBytesReference();
termVectors = in.readBytesReference();
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
public Fields getFields() throws IOException {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
@ -58,8 +59,8 @@ public class TransportShardMultiTermsVectorAction extends
}
@Override
protected MultiTermVectorsShardResponse newResponse() {
return new MultiTermVectorsShardResponse();
protected Writeable.Reader<MultiTermVectorsShardResponse> getResponseReader() {
return MultiTermVectorsShardResponse::new;
}
@Override

View File

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
@ -110,8 +111,8 @@ public class TransportTermVectorsAction extends TransportSingleShardAction<TermV
}
@Override
protected TermVectorsResponse newResponse() {
return new TermVectorsResponse();
protected Writeable.Reader<TermVectorsResponse> getResponseReader() {
return TermVectorsResponse::new;
}
@Override

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
@ -119,8 +120,8 @@ public class RetentionLeaseActions {
abstract void doRetentionLeaseAction(IndexShard indexShard, T request, ActionListener<Response> listener);
@Override
protected Response newResponse() {
return new Response();
protected Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
@Override
@ -169,6 +170,10 @@ public class RetentionLeaseActions {
ActionListener.map(listener, r -> new Response()));
}
@Override
protected Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
}
@Override
@ -392,6 +397,12 @@ public class RetentionLeaseActions {
public static class Response extends ActionResponse {
public Response() {
}
Response(StreamInput in) throws IOException {
super(in);
}
}
}

View File

@ -23,11 +23,12 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsRespon
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import java.util.Collections;
@ -38,7 +39,7 @@ import java.util.function.Predicate;
import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER;
import static org.hamcrest.CoreMatchers.equalTo;
public class GetFieldMappingsResponseTests extends AbstractStreamableXContentTestCase<GetFieldMappingsResponse> {
public class GetFieldMappingsResponseTests extends AbstractSerializingTestCase<GetFieldMappingsResponse> {
public void testManualSerialization() throws IOException {
Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings = new HashMap<>();
@ -48,9 +49,8 @@ public class GetFieldMappingsResponseTests extends AbstractStreamableXContentTes
try (BytesStreamOutput out = new BytesStreamOutput()) {
response.writeTo(out);
GetFieldMappingsResponse serialized = new GetFieldMappingsResponse();
try (StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes)) {
serialized.readFrom(in);
GetFieldMappingsResponse serialized = new GetFieldMappingsResponse(in);
FieldMappingMetaData metaData = serialized.fieldMappings("index", "type", "field");
assertNotNull(metaData);
assertEquals(new BytesArray("{}"), metaData.getSource());
@ -106,13 +106,13 @@ public class GetFieldMappingsResponseTests extends AbstractStreamableXContentTes
}
@Override
protected GetFieldMappingsResponse createBlankInstance() {
return new GetFieldMappingsResponse();
protected GetFieldMappingsResponse createTestInstance() {
return new GetFieldMappingsResponse(randomMapping());
}
@Override
protected GetFieldMappingsResponse createTestInstance() {
return new GetFieldMappingsResponse(randomMapping());
protected Writeable.Reader<GetFieldMappingsResponse> instanceReader() {
return GetFieldMappingsResponse::new;
}
@Override

View File

@ -23,13 +23,14 @@ import org.apache.lucene.search.Explanation;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.test.RandomObjects;
import java.io.IOException;
@ -42,15 +43,16 @@ import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.equalTo;
public class ExplainResponseTests extends AbstractStreamableXContentTestCase<ExplainResponse> {
public class ExplainResponseTests extends AbstractSerializingTestCase<ExplainResponse> {
@Override
protected ExplainResponse doParseInstance(XContentParser parser) throws IOException {
return ExplainResponse.fromXContent(parser, randomBoolean());
}
@Override
protected ExplainResponse createBlankInstance() {
return new ExplainResponse();
protected Writeable.Reader<ExplainResponse> instanceReader() {
return ExplainResponse::new;
}
@Override

View File

@ -77,8 +77,7 @@ public class TermVectorsUnitTests extends ESTestCase {
// read
ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer);
TermVectorsResponse inResponse = new TermVectorsResponse("a", "b", "c");
inResponse.readFrom(esBuffer);
TermVectorsResponse inResponse = new TermVectorsResponse(esBuffer);
// see if correct
checkIfStandardTermVector(inResponse);
@ -93,8 +92,7 @@ public class TermVectorsUnitTests extends ESTestCase {
// read
esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
esBuffer = new InputStreamStreamInput(esInBuffer);
inResponse = new TermVectorsResponse("a", "b", "c");
inResponse.readFrom(esBuffer);
inResponse = new TermVectorsResponse(esBuffer);
assertTrue(inResponse.isExists());
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
@ -61,7 +62,12 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
@Override
public Response newResponse() {
return new Response();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
public static class Request extends SingleShardRequest<Request> {
@ -246,6 +252,17 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
Response() {
}
Response(StreamInput in) throws IOException {
super(in);
mappingVersion = in.readVLong();
settingsVersion = in.readVLong();
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
tookInMillis = in.readVLong();
}
Response(
final long mappingVersion,
final long settingsVersion,
@ -265,15 +282,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
mappingVersion = in.readVLong();
settingsVersion = in.readVLong();
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
tookInMillis = in.readVLong();
public void readFrom(final StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
@ -459,8 +469,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
}
@Override
protected Response newResponse() {
return new Response();
protected Writeable.Reader<Response> getResponseReader() {
return Response::new;
}
}

View File

@ -77,8 +77,8 @@ public class PutCcrRestoreSessionAction extends Action<PutCcrRestoreSessionActio
}
@Override
protected PutCcrRestoreSessionResponse newResponse() {
return new PutCcrRestoreSessionResponse();
protected Writeable.Reader<PutCcrRestoreSessionResponse> getResponseReader() {
return PutCcrRestoreSessionResponse::new;
}
@Override

View File

@ -5,10 +5,11 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardChangesAction.Response> {
public class ShardChangesResponseTests extends AbstractWireSerializingTestCase<ShardChangesAction.Response> {
@Override
protected ShardChangesAction.Response createTestInstance() {
@ -34,8 +35,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
}
@Override
protected ShardChangesAction.Response createBlankInstance() {
return new ShardChangesAction.Response();
protected Writeable.Reader<ShardChangesAction.Response> instanceReader() {
return ShardChangesAction.Response::new;
}
}