Replace Streamable w/ Writeable in SingleShardRequest and subclasses (#43222) (#43364)

Backport of: https://github.com/elastic/elasticsearch/pull/43222

This commit replaces usages of Streamable with Writeable for the
SingleShardRequest / TransportSingleShardAction classes and subclasses of
these classes.

Note that where possible response fields were made final and default
constructors were removed.

Relates to #34389
This commit is contained in:
Martijn van Groningen 2019-06-19 16:15:09 +02:00 committed by GitHub
parent 86b58d9ff3
commit a4c45b5d70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 216 additions and 266 deletions

View File

@ -265,9 +265,9 @@ public class PainlessExecuteAction extends Action<PainlessExecuteAction.Response
}
private Script script;
private ScriptContext<?> context = PainlessTestScript.CONTEXT;
private ContextSetup contextSetup;
private final Script script;
private final ScriptContext<?> context;
private final ContextSetup contextSetup;
static Request parse(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
@ -275,16 +275,27 @@ public class PainlessExecuteAction extends Action<PainlessExecuteAction.Response
Request(Script script, String scriptContextName, ContextSetup setup) {
this.script = Objects.requireNonNull(script);
if (scriptContextName != null) {
this.context = fromScriptContextName(scriptContextName);
}
this.context = scriptContextName != null ? fromScriptContextName(scriptContextName) : PainlessTestScript.CONTEXT;
if (setup != null) {
this.contextSetup = setup;
index(contextSetup.index);
} else {
contextSetup = null;
}
}
Request() {
Request(StreamInput in) throws IOException {
super(in);
script = new Script(in);
if (in.getVersion().before(Version.V_6_4_0)) {
byte scriptContextId = in.readByte();
assert scriptContextId == 0;
context = null;
contextSetup = null;
} else {
context = fromScriptContextName(in.readString());
contextSetup = in.readOptionalWriteable(ContextSetup::new);
}
}
public Script getScript() {
@ -316,19 +327,6 @@ public class PainlessExecuteAction extends Action<PainlessExecuteAction.Response
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
script = new Script(in);
if (in.getVersion().before(Version.V_6_4_0)) {
byte scriptContextId = in.readByte();
assert scriptContextId == 0;
} else {
context = fromScriptContextName(in.readString());
contextSetup = in.readOptionalWriteable(ContextSetup::new);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.painless.action;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -35,7 +36,7 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -43,7 +44,7 @@ import java.util.Collections;
import static org.hamcrest.Matchers.equalTo;
public class PainlessExecuteRequestTests extends AbstractStreamableTestCase<PainlessExecuteAction.Request> {
public class PainlessExecuteRequestTests extends AbstractWireSerializingTestCase<PainlessExecuteAction.Request> {
// Testing XContent serialization manually here, because the xContentType field in ContextSetup determines
// how the request needs to parse and the xcontent serialization framework randomizes that. The XContentType
@ -89,8 +90,8 @@ public class PainlessExecuteRequestTests extends AbstractStreamableTestCase<Pain
}
@Override
protected PainlessExecuteAction.Request createBlankInstance() {
return new PainlessExecuteAction.Request();
protected Writeable.Reader<PainlessExecuteAction.Request> instanceReader() {
return PainlessExecuteAction.Request::new;
}
public void testValidate() {

View File

@ -83,6 +83,19 @@ public class AnalyzeAction extends Action<AnalyzeAction.Response> {
public Request() {
}
Request(StreamInput in) throws IOException {
super(in);
text = in.readStringArray();
analyzer = in.readOptionalString();
tokenizer = in.readOptionalWriteable(NameOrDefinition::new);
tokenFilters.addAll(in.readList(NameOrDefinition::new));
charFilters.addAll(in.readList(NameOrDefinition::new));
field = in.readOptionalString();
explain = in.readBoolean();
attributes = in.readStringArray();
normalizer = in.readOptionalString();
}
/**
* Constructs a new analyzer request for the provided index.
*
@ -240,20 +253,6 @@ public class AnalyzeAction extends Action<AnalyzeAction.Response> {
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
text = in.readStringArray();
analyzer = in.readOptionalString();
tokenizer = in.readOptionalWriteable(NameOrDefinition::new);
tokenFilters.addAll(in.readList(NameOrDefinition::new));
charFilters.addAll(in.readList(NameOrDefinition::new));
field = in.readOptionalString();
explain = in.readBoolean();
attributes = in.readStringArray();
normalizer = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.single.shard.SingleShardRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -31,14 +30,20 @@ import java.io.IOException;
public class GetFieldMappingsIndexRequest extends SingleShardRequest<GetFieldMappingsIndexRequest> {
private boolean probablySingleFieldRequest;
private boolean includeDefaults;
private String[] fields = Strings.EMPTY_ARRAY;
private String[] types = Strings.EMPTY_ARRAY;
private final boolean probablySingleFieldRequest;
private final boolean includeDefaults;
private final String[] fields;
private final String[] types;
private OriginalIndices originalIndices;
public GetFieldMappingsIndexRequest() {
GetFieldMappingsIndexRequest(StreamInput in) throws IOException {
super(in);
types = in.readStringArray();
fields = in.readStringArray();
includeDefaults = in.readBoolean();
probablySingleFieldRequest = in.readBoolean();
originalIndices = OriginalIndices.readOriginalIndices(in);
}
GetFieldMappingsIndexRequest(GetFieldMappingsRequest other, String index, boolean probablySingleFieldRequest) {
@ -92,14 +97,4 @@ public class GetFieldMappingsIndexRequest extends SingleShardRequest<GetFieldMap
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
types = in.readStringArray();
fields = in.readStringArray();
includeDefaults = in.readBoolean();
probablySingleFieldRequest = in.readBoolean();
originalIndices = OriginalIndices.readOriginalIndices(in);
}
}

View File

@ -74,6 +74,19 @@ public class ExplainRequest extends SingleShardRequest<ExplainRequest> implement
this.id = id;
}
ExplainRequest(StreamInput in) throws IOException {
super(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
preference = in.readOptionalString();
query = in.readNamedWriteable(QueryBuilder.class);
filteringAlias = new AliasFilter(in);
storedFields = in.readOptionalStringArray();
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
nowInMillis = in.readVLong();
}
/**
* @deprecated Types are in the process of being removed.
*/
@ -184,20 +197,6 @@ public class ExplainRequest extends SingleShardRequest<ExplainRequest> implement
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
preference = in.readOptionalString();
query = in.readNamedWriteable(QueryBuilder.class);
filteringAlias = new AliasFilter(in);
storedFields = in.readOptionalStringArray();
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
nowInMillis = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -31,11 +31,19 @@ import java.io.IOException;
public class FieldCapabilitiesIndexRequest extends SingleShardRequest<FieldCapabilitiesIndexRequest> {
private String[] fields;
private OriginalIndices originalIndices;
private final String[] fields;
private final OriginalIndices originalIndices;
// For serialization
FieldCapabilitiesIndexRequest() {}
FieldCapabilitiesIndexRequest(StreamInput in) throws IOException {
super(in);
fields = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
originalIndices = OriginalIndices.readOriginalIndices(in);
} else {
originalIndices = OriginalIndices.NONE;
}
}
FieldCapabilitiesIndexRequest(String[] fields, String index, OriginalIndices originalIndices) {
super(index);
@ -62,17 +70,6 @@ public class FieldCapabilitiesIndexRequest extends SingleShardRequest<FieldCapab
return originalIndices.indicesOptions();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
fields = in.readStringArray();
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
originalIndices = OriginalIndices.readOriginalIndices(in);
} else {
originalIndices = OriginalIndices.NONE;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -70,6 +70,24 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
type = MapperService.SINGLE_MAPPING_NAME;
}
GetRequest(StreamInput in) throws IOException {
super(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString();
}
preference = in.readOptionalString();
refresh = in.readBoolean();
storedFields = in.readOptionalStringArray();
realtime = in.readBoolean();
this.versionType = VersionType.fromValue(in.readByte());
this.version = in.readLong();
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
}
/**
* Constructs a new get request against the specified index. The {@link #id(String)} must also be set.
*/
@ -262,25 +280,6 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
return this.versionType;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readString();
id = in.readString();
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString();
}
preference = in.readOptionalString();
refresh = in.readBoolean();
storedFields = in.readOptionalStringArray();
realtime = in.readBoolean();
this.versionType = VersionType.fromValue(in.readByte());
this.version = in.readLong();
fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -33,14 +33,26 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
private int shardId;
private String preference;
boolean realtime = true;
boolean refresh;
private boolean realtime;
private boolean refresh;
IntArrayList locations;
List<MultiGetRequest.Item> items;
public MultiGetShardRequest() {
MultiGetShardRequest(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
locations = new IntArrayList(size);
items = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
items.add(MultiGetRequest.Item.readItem(in));
}
preference = in.readOptionalString();
refresh = in.readBoolean();
realtime = in.readBoolean();
}
MultiGetShardRequest(MultiGetRequest multiGetRequest, String index, int shardId) {
@ -108,23 +120,6 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
return indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
locations = new IntArrayList(size);
items = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
items.add(MultiGetRequest.Item.readItem(in));
}
preference = in.readOptionalString();
refresh = in.readBoolean();
realtime = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -48,6 +48,15 @@ public abstract class SingleShardRequest<Request extends SingleShardRequest<Requ
public SingleShardRequest() {
}
public SingleShardRequest(StreamInput in) throws IOException {
super(in);
if (in.readBoolean()) {
internalShardId = new ShardId(in);
}
index = in.readOptionalString();
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
protected SingleShardRequest(String index) {
this.index = index;
}
@ -93,16 +102,6 @@ public abstract class SingleShardRequest<Request extends SingleShardRequest<Requ
return INDICES_OPTIONS;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.readBoolean()) {
internalShardId = new ShardId(in);
}
index = in.readOptionalString();
// no need to pass threading over the network, they are always false when coming throw a thread pool
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -51,7 +51,6 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.function.Supplier;
import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
@ -73,7 +72,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
protected TransportSingleShardAction(String actionName, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> request,
String executor) {
super(actionName, actionFilters, transportService.getTaskManager());
this.threadPool = threadPool;
@ -85,9 +84,9 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
this.executor = executor;
if (!isSubAction()) {
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, request, new TransportHandler());
}
transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler());
transportService.registerRequestHandler(transportShardAction, ThreadPool.Names.SAME, request, new ShardTransportHandler());
}
/**

View File

@ -143,7 +143,7 @@ public class MultiTermVectorsRequest extends ActionRequest
int size = in.readVInt();
requests = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
requests.add(TermVectorsRequest.readTermVectorsRequest(in));
requests.add(new TermVectorsRequest(in));
}
}

View File

@ -37,8 +37,17 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest<MultiTermVe
IntArrayList locations;
List<TermVectorsRequest> requests;
public MultiTermVectorsShardRequest() {
MultiTermVectorsShardRequest(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
locations = new IntArrayList(size);
requests = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
requests.add(new TermVectorsRequest(in));
}
preference = in.readOptionalString();
}
MultiTermVectorsShardRequest(String index, int shardId) {
@ -86,20 +95,6 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest<MultiTermVe
return indices;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
locations = new IntArrayList(size);
requests = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
locations.add(in.readVInt());
requests.add(TermVectorsRequest.readTermVectorsRequest(in));
}
preference = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -160,6 +160,48 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
public TermVectorsRequest() {
}
TermVectorsRequest(StreamInput in) throws IOException {
super(in);
type = in.readString();
id = in.readString();
if (in.readBoolean()) {
doc = in.readBytesReference();
xContentType = in.readEnum(XContentType.class);
}
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
preference = in.readOptionalString();
long flags = in.readVLong();
flagsEnum.clear();
for (Flag flag : Flag.values()) {
if ((flags & (1 << flag.ordinal())) != 0) {
flagsEnum.add(flag);
}
}
int numSelectedFields = in.readVInt();
if (numSelectedFields > 0) {
selectedFields = new HashSet<>();
for (int i = 0; i < numSelectedFields; i++) {
selectedFields.add(in.readString());
}
}
if (in.readBoolean()) {
perFieldAnalyzer = readPerFieldAnalyzer(in.readMap());
}
if (in.readBoolean()) {
filterSettings = new FilterSettings();
filterSettings.readFrom(in);
}
realtime = in.readBoolean();
versionType = VersionType.fromValue(in.readByte());
version = in.readLong();
}
/**
* Constructs a new term vector request for a document that will be fetch
* from the provided index. Use {@link #type(String)} and
@ -488,56 +530,6 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
return validationException;
}
public static TermVectorsRequest readTermVectorsRequest(StreamInput in) throws IOException {
TermVectorsRequest termVectorsRequest = new TermVectorsRequest();
termVectorsRequest.readFrom(in);
return termVectorsRequest;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
type = in.readString();
id = in.readString();
if (in.readBoolean()) {
doc = in.readBytesReference();
xContentType = in.readEnum(XContentType.class);
}
routing = in.readOptionalString();
if (in.getVersion().before(Version.V_7_0_0)) {
in.readOptionalString(); // _parent
}
preference = in.readOptionalString();
long flags = in.readVLong();
flagsEnum.clear();
for (Flag flag : Flag.values()) {
if ((flags & (1 << flag.ordinal())) != 0) {
flagsEnum.add(flag);
}
}
int numSelectedFields = in.readVInt();
if (numSelectedFields > 0) {
selectedFields = new HashSet<>();
for (int i = 0; i < numSelectedFields; i++) {
selectedFields.add(in.readString());
}
}
if (in.readBoolean()) {
perFieldAnalyzer = readPerFieldAnalyzer(in.readMap());
}
if (in.readBoolean()) {
filterSettings = new FilterSettings();
filterSettings.readFrom(in);
}
realtime = in.readBoolean();
versionType = VersionType.fromValue(in.readByte());
version = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -44,7 +44,6 @@ import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;
/**
* This class holds all actions related to retention leases. Note carefully that these actions are executed under a primary permit. Care is
@ -70,7 +69,7 @@ public class RetentionLeaseActions {
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final IndicesService indicesService,
final Supplier<T> requestSupplier) {
final Writeable.Reader<T> requestSupplier) {
super(
name,
threadPool,
@ -272,19 +271,22 @@ public class RetentionLeaseActions {
private abstract static class Request<T extends SingleShardRequest<T>> extends SingleShardRequest<T> {
private ShardId shardId;
private final ShardId shardId;
public ShardId getShardId() {
return shardId;
}
private String id;
private final String id;
public String getId() {
return id;
}
Request() {
Request(StreamInput in) throws IOException {
super(in);
shardId = new ShardId(in);
id = in.readString();
}
Request(final ShardId shardId, final String id) {
@ -298,13 +300,6 @@ public class RetentionLeaseActions {
return null;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
shardId = new ShardId(in);
id = in.readString();
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
@ -316,19 +311,22 @@ public class RetentionLeaseActions {
private abstract static class AddOrRenewRequest<T extends SingleShardRequest<T>> extends Request<T> {
private long retainingSequenceNumber;
private final long retainingSequenceNumber;
public long getRetainingSequenceNumber() {
return retainingSequenceNumber;
}
private String source;
private final String source;
public String getSource() {
return source;
}
AddOrRenewRequest() {
AddOrRenewRequest(StreamInput in) throws IOException {
super(in);
retainingSequenceNumber = in.readZLong();
source = in.readString();
}
AddOrRenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) {
@ -340,13 +338,6 @@ public class RetentionLeaseActions {
this.source = Objects.requireNonNull(source);
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
retainingSequenceNumber = in.readZLong();
source = in.readString();
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
@ -358,7 +349,8 @@ public class RetentionLeaseActions {
public static class AddRequest extends AddOrRenewRequest<AddRequest> {
public AddRequest() {
AddRequest(StreamInput in) throws IOException {
super(in);
}
public AddRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) {
@ -369,7 +361,8 @@ public class RetentionLeaseActions {
public static class RenewRequest extends AddOrRenewRequest<RenewRequest> {
public RenewRequest() {
RenewRequest(StreamInput in) throws IOException {
super(in);
}
public RenewRequest(final ShardId shardId, final String id, final long retainingSequenceNumber, final String source) {
@ -380,7 +373,8 @@ public class RetentionLeaseActions {
public static class RemoveRequest extends Request<RemoveRequest> {
public RemoveRequest() {
RemoveRequest(StreamInput in) throws IOException {
super(in);
}
public RemoveRequest(final ShardId shardId, final String id) {

View File

@ -109,8 +109,7 @@ public class AnalyzeRequestTests extends ESTestCase {
try (BytesStreamOutput output = new BytesStreamOutput()) {
request.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
AnalyzeAction.Request serialized = new AnalyzeAction.Request();
serialized.readFrom(in);
AnalyzeAction.Request serialized = new AnalyzeAction.Request(in);
assertArrayEquals(request.text(), serialized.text());
assertEquals(request.tokenizer().name, serialized.tokenizer().name);
assertEquals(request.tokenFilters().get(0).name, serialized.tokenFilters().get(0).name);

View File

@ -64,8 +64,7 @@ public class ExplainRequestTests extends ESTestCase {
request.routing("some_routing");
request.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
ExplainRequest readRequest = new ExplainRequest();
readRequest.readFrom(in);
ExplainRequest readRequest = new ExplainRequest(in);
assertEquals(request.filteringAlias(), readRequest.filteringAlias());
assertArrayEquals(request.storedFields(), readRequest.storedFields());
assertEquals(request.preference(), readRequest.preference());

View File

@ -71,9 +71,7 @@ public class MultiGetShardRequestTests extends ESTestCase {
StreamInput in = out.bytes().streamInput();
in.setVersion(out.getVersion());
MultiGetShardRequest multiGetShardRequest2 = new MultiGetShardRequest();
multiGetShardRequest2.readFrom(in);
MultiGetShardRequest multiGetShardRequest2 = new MultiGetShardRequest(in);
assertThat(multiGetShardRequest2.index(), equalTo(multiGetShardRequest.index()));
assertThat(multiGetShardRequest2.preference(), equalTo(multiGetShardRequest.preference()));
assertThat(multiGetShardRequest2.realtime(), equalTo(multiGetShardRequest.realtime()));

View File

@ -243,8 +243,7 @@ public class TermVectorsUnitTests extends ESTestCase {
// read
ByteArrayInputStream esInBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
InputStreamStreamInput esBuffer = new InputStreamStreamInput(esInBuffer);
TermVectorsRequest req2 = new TermVectorsRequest(null, null, null);
req2.readFrom(esBuffer);
TermVectorsRequest req2 = new TermVectorsRequest(esBuffer);
assertThat(request.offsets(), equalTo(req2.offsets()));
assertThat(request.fieldStatistics(), equalTo(req2.fieldStatistics()));

View File

@ -75,8 +75,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
private long fromSeqNo;
private int maxOperationCount;
private ShardId shardId;
private String expectedHistoryUUID;
private final ShardId shardId;
private final String expectedHistoryUUID;
private TimeValue pollTimeout = TransportResumeFollowAction.DEFAULT_READ_POLL_TIMEOUT;
private ByteSizeValue maxBatchSize = TransportResumeFollowAction.DEFAULT_MAX_READ_REQUEST_SIZE;
@ -88,7 +88,17 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
this.expectedHistoryUUID = expectedHistoryUUID;
}
Request() {
Request(StreamInput in) throws IOException {
super(in);
fromSeqNo = in.readVLong();
maxOperationCount = in.readVInt();
shardId = new ShardId(in);
expectedHistoryUUID = in.readString();
pollTimeout = in.readTimeValue();
maxBatchSize = new ByteSizeValue(in);
// Starting the clock in order to know how much time is spent on fetching operations:
relativeStartNanos = System.nanoTime();
}
public ShardId getShard() {
@ -148,20 +158,6 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
return validationException;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
fromSeqNo = in.readVLong();
maxOperationCount = in.readVInt();
shardId = new ShardId(in);
expectedHistoryUUID = in.readString();
pollTimeout = in.readTimeValue();
maxBatchSize = new ByteSizeValue(in);
// Starting the clock in order to know how much time is spent on fetching operations:
relativeStartNanos = System.nanoTime();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -16,10 +16,13 @@ import java.io.IOException;
public class PutCcrRestoreSessionRequest extends SingleShardRequest<PutCcrRestoreSessionRequest> {
private String sessionUUID;
private ShardId shardId;
private final String sessionUUID;
private final ShardId shardId;
PutCcrRestoreSessionRequest() {
PutCcrRestoreSessionRequest(StreamInput in) throws IOException {
super(in);
sessionUUID = in.readString();
shardId = new ShardId(in);
}
public PutCcrRestoreSessionRequest(String sessionUUID, ShardId shardId) {
@ -33,13 +36,6 @@ public class PutCcrRestoreSessionRequest extends SingleShardRequest<PutCcrRestor
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
shardId = new ShardId(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -5,13 +5,14 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.nullValue;
public class ShardChangesRequestTests extends AbstractStreamableTestCase<ShardChangesAction.Request> {
public class ShardChangesRequestTests extends AbstractWireSerializingTestCase<ShardChangesAction.Request> {
@Override
protected ShardChangesAction.Request createTestInstance() {
@ -23,8 +24,8 @@ public class ShardChangesRequestTests extends AbstractStreamableTestCase<ShardCh
}
@Override
protected ShardChangesAction.Request createBlankInstance() {
return new ShardChangesAction.Request();
protected Writeable.Reader<ShardChangesAction.Request> instanceReader() {
return ShardChangesAction.Request::new;
}
public void testValidate() {