Convert direct implementations of Streamable to Writeable (#44605) (#44646)

This commit converts Streamable to Writeable for direct implementations.

relates #34389
This commit is contained in:
Ryan Ernst 2019-07-20 08:32:29 -07:00 committed by GitHub
parent 1f7fc1b497
commit f4ee2e9e91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 235 additions and 537 deletions

View File

@ -1,70 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import java.io.IOException;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
/**
* @deprecated Use {@link AbstractResponseTestCase} instead of this class.
*/
// TODO: Remove and change subclasses to use AbstractResponseTestCase instead
@Deprecated
public abstract class AbstractHlrcStreamableXContentTestCase<T extends ToXContent & Streamable, H>
extends AbstractStreamableXContentTestCase<T> {
/**
* Generic test that creates new instance of HLRC request/response from the test instance and checks
* both for equality and asserts equality on the two queries.
*/
public final void testHlrcFromXContent() throws IOException {
xContentTester(this::createParser, this::createTestInstance, getToXContentParams(),
p -> convertHlrcToInternal(doHlrcParseInstance(p)))
.numberOfTestRuns(NUMBER_OF_TEST_RUNS)
.supportsUnknownFields(supportsUnknownFields())
.shuffleFieldsExceptions(getShuffleFieldsExceptions())
.randomFieldsExcludeFilter(getRandomFieldsExcludeFilter())
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(true)
.test();
}
/**
* Parses to a new HLRC instance using the provided {@link XContentParser}
*/
public abstract H doHlrcParseInstance(XContentParser parser) throws IOException;
/**
* Converts a HLRC instance to a XPack instance
*/
public abstract T convertHlrcToInternal(H instance);
//TODO this would be final ideally: why do both responses need to parse from xcontent, only one (H) should? I think that T#fromXContent
//are only there for testing and could go away? Then the additional testHlrcFromXContent is also no longer needed.
@Override
protected T doParseInstance(XContentParser parser) throws IOException {
return convertHlrcToInternal(doHlrcParseInstance(parser));
}
}

View File

@ -24,7 +24,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.script.Script;
@ -194,11 +194,11 @@ public class RoundTripTests extends ESTestCase {
assertEquals(request.getTaskId(), tripped.getTaskId());
}
private StreamInput toInputByteStream(Streamable example) throws IOException {
private StreamInput toInputByteStream(Writeable example) throws IOException {
return toInputByteStream(Version.CURRENT, example);
}
private StreamInput toInputByteStream(Version version, Streamable example) throws IOException {
private StreamInput toInputByteStream(Version version, Writeable example) throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(version);
example.writeTo(out);

View File

@ -147,7 +147,7 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
stage = SnapshotIndexShardStage.fromValue(in.readByte());
stats = SnapshotStats.readSnapshotStats(in);
stats = new SnapshotStats(in);
nodeId = in.readOptionalString();
failure = in.readOptionalString();
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.admin.cluster.snapshots.status;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -33,7 +33,7 @@ import org.elasticsearch.common.xcontent.XContentParserUtils;
import java.io.IOException;
public class SnapshotStats implements Streamable, ToXContentObject {
public class SnapshotStats implements Writeable, ToXContentObject {
private long startTime;
private long time;
@ -44,7 +44,25 @@ public class SnapshotStats implements Streamable, ToXContentObject {
private long totalSize;
private long processedSize;
SnapshotStats() {
SnapshotStats() {}
SnapshotStats(StreamInput in) throws IOException {
startTime = in.readVLong();
time = in.readVLong();
incrementalFileCount = in.readVInt();
processedFileCount = in.readVInt();
incrementalSize = in.readVLong();
processedSize = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
totalFileCount = in.readVInt();
totalSize = in.readVLong();
} else {
totalFileCount = incrementalFileCount;
totalSize = incrementalSize;
}
}
SnapshotStats(long startTime, long time,
@ -116,13 +134,6 @@ public class SnapshotStats implements Streamable, ToXContentObject {
return processedSize;
}
public static SnapshotStats readSnapshotStats(StreamInput in) throws IOException {
SnapshotStats stats = new SnapshotStats();
stats.readFrom(in);
return stats;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
@ -140,26 +151,6 @@ public class SnapshotStats implements Streamable, ToXContentObject {
}
}
@Override
public void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
time = in.readVLong();
incrementalFileCount = in.readVInt();
processedFileCount = in.readVInt();
incrementalSize = in.readVLong();
processedSize = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
totalFileCount = in.readVInt();
totalSize = in.readVLong();
} else {
totalFileCount = incrementalFileCount;
totalSize = incrementalSize;
}
}
static final class Fields {
static final String STATS = "stats";

View File

@ -27,7 +27,7 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
@ -55,7 +55,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
/**
* Status of a snapshot
*/
public class SnapshotStatus implements ToXContentObject, Streamable {
public class SnapshotStatus implements ToXContentObject, Writeable {
private Snapshot snapshot;
@ -72,6 +72,30 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
@Nullable
private Boolean includeGlobalState;
SnapshotStatus(StreamInput in) throws IOException {
snapshot = new Snapshot(in);
state = State.fromValue(in.readByte());
int size = in.readVInt();
List<SnapshotIndexShardStatus> builder = new ArrayList<>();
for (int i = 0; i < size; i++) {
builder.add(SnapshotIndexShardStatus.readShardSnapshotStatus(in));
}
shards = Collections.unmodifiableList(builder);
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
includeGlobalState = in.readOptionalBoolean();
}
final long startTime;
final long time;
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
startTime = in.readLong();
time = in.readLong();
} else {
startTime = 0L;
time = 0L;
}
updateShardStats(startTime, time);
}
SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards, Boolean includeGlobalState,
long startTime, long time) {
this.snapshot = Objects.requireNonNull(snapshot);
@ -94,9 +118,6 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
this.includeGlobalState = includeGlobalState;
}
SnapshotStatus() {
}
/**
* Returns snapshot
*/
@ -159,31 +180,6 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
}
@Override
public void readFrom(StreamInput in) throws IOException {
snapshot = new Snapshot(in);
state = State.fromValue(in.readByte());
int size = in.readVInt();
List<SnapshotIndexShardStatus> builder = new ArrayList<>();
for (int i = 0; i < size; i++) {
builder.add(SnapshotIndexShardStatus.readShardSnapshotStatus(in));
}
shards = Collections.unmodifiableList(builder);
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
includeGlobalState = in.readOptionalBoolean();
}
final long startTime;
final long time;
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
startTime = in.readLong();
time = in.readLong();
} else {
startTime = 0L;
time = 0L;
}
updateShardStats(startTime, time);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
snapshot.writeTo(out);
@ -201,18 +197,6 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
}
}
/**
* Reads snapshot status from stream input
*
* @param in stream input
* @return deserialized snapshot status
*/
public static SnapshotStatus readSnapshotStatus(StreamInput in) throws IOException {
SnapshotStatus snapshotInfo = new SnapshotStatus();
snapshotInfo.readFrom(in);
return snapshotInfo;
}
@Override
public String toString() {
return Strings.toString(this, true, false);

View File

@ -47,7 +47,7 @@ public class SnapshotsStatusResponse extends ActionResponse implements ToXConten
int size = in.readVInt();
List<SnapshotStatus> builder = new ArrayList<>();
for (int i = 0; i < size; i++) {
builder.add(SnapshotStatus.readSnapshotStatus(in));
builder.add(new SnapshotStatus(in));
}
snapshots = Collections.unmodifiableList(builder);
}

View File

@ -49,7 +49,7 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse {
int size = in.readVInt();
shardsStats = new ShardStats[size];
for (int i = 0; i < size; i++) {
shardsStats[i] = ShardStats.readShardStats(in);
shardsStats[i] = new ShardStats(in);
}
}

View File

@ -27,9 +27,8 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -44,7 +43,7 @@ import java.util.Map;
/**
* Represents an alias, to be associated with an index
*/
public class Alias implements Streamable, ToXContentFragment {
public class Alias implements Writeable, ToXContentFragment {
private static final ParseField FILTER = new ParseField("filter");
private static final ParseField ROUTING = new ParseField("routing");
@ -66,8 +65,16 @@ public class Alias implements Streamable, ToXContentFragment {
@Nullable
private Boolean writeIndex;
private Alias() {
public Alias(StreamInput in) throws IOException {
name = in.readString();
filter = in.readOptionalString();
indexRouting = in.readOptionalString();
searchRouting = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
writeIndex = in.readOptionalBoolean();
} else {
writeIndex = null;
}
}
public Alias(String name) {
@ -187,28 +194,6 @@ public class Alias implements Streamable, ToXContentFragment {
return this;
}
/**
* Allows to read an alias from the provided input stream
*/
public static Alias read(StreamInput in) throws IOException {
Alias alias = new Alias();
alias.readFrom(in);
return alias;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
filter = in.readOptionalString();
indexRouting = in.readOptionalString();
searchRouting = in.readOptionalString();
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
writeIndex = in.readOptionalBoolean();
} else {
writeIndex = null;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);

View File

@ -113,7 +113,7 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
}
int aliasesSize = in.readVInt();
for (int i = 0; i < aliasesSize; i++) {
aliases.add(Alias.read(in));
aliases.add(new Alias(in));
}
if (in.getVersion().before(Version.V_7_0_0)) {
in.readBoolean(); // updateAllTypes

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.admin.indices.shards;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse;
@ -31,7 +30,7 @@ import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -40,8 +39,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus.readStoreStatus;
/**
* Response for {@link IndicesShardStoresAction}
*
@ -53,7 +50,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
/**
* Shard store information from a node
*/
public static class StoreStatus implements Streamable, ToXContentFragment, Comparable<StoreStatus> {
public static class StoreStatus implements Writeable, ToXContentFragment, Comparable<StoreStatus> {
private DiscoveryNode node;
private String allocationId;
private Exception storeException;
@ -112,7 +109,17 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
}
}
private StoreStatus() {
public StoreStatus(StreamInput in) throws IOException {
node = new DiscoveryNode(in);
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
// legacy version
in.readLong();
}
allocationId = in.readOptionalString();
allocationStatus = AllocationStatus.readFrom(in);
if (in.readBoolean()) {
storeException = in.readException();
}
}
public StoreStatus(DiscoveryNode node, String allocationId, AllocationStatus allocationStatus, Exception storeException) {
@ -155,26 +162,6 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
return allocationStatus;
}
public static StoreStatus readStoreStatus(StreamInput in) throws IOException {
StoreStatus storeStatus = new StoreStatus();
storeStatus.readFrom(in);
return storeStatus;
}
@Override
public void readFrom(StreamInput in) throws IOException {
node = new DiscoveryNode(in);
if (in.getVersion().before(Version.V_6_0_0_alpha1)) {
// legacy version
in.readLong();
}
allocationId = in.readOptionalString();
allocationStatus = AllocationStatus.readFrom(in);
if (in.readBoolean()) {
storeException = in.readException();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
@ -303,7 +290,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
int nodeEntries = in.readVInt();
List<StoreStatus> storeStatuses = new ArrayList<>(nodeEntries);
for (int nodeCount = 0; nodeCount < nodeEntries; nodeCount++) {
storeStatuses.add(readStoreStatus(in));
storeStatuses.add(new StoreStatus(in));
}
shardEntries.put(shardID, storeStatuses);
}

View File

@ -21,20 +21,27 @@ package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
public class IndexShardStats implements Iterable<ShardStats>, Streamable {
public class IndexShardStats implements Iterable<ShardStats>, Writeable {
private ShardId shardId;
private ShardStats[] shards;
private IndexShardStats() {}
public IndexShardStats(StreamInput in) throws IOException {
shardId = new ShardId(in);
int shardSize = in.readVInt();
shards = new ShardStats[shardSize];
for (int i = 0; i < shardSize; i++) {
shards[i] = new ShardStats(in);
}
}
public IndexShardStats(ShardId shardId, ShardStats[] shards) {
this.shardId = shardId;
@ -88,16 +95,6 @@ public class IndexShardStats implements Iterable<ShardStats>, Streamable {
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
shardId = new ShardId(in);
int shardSize = in.readVInt();
shards = new ShardStats[shardSize];
for (int i = 0; i < shardSize; i++) {
shards[i] = ShardStats.readShardStats(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
@ -106,11 +103,4 @@ public class IndexShardStats implements Iterable<ShardStats>, Streamable {
stats.writeTo(out);
}
}
public static IndexShardStats readIndexShardStats(StreamInput in) throws IOException {
IndexShardStats indexShardStats = new IndexShardStats();
indexShardStats.readFrom(in);
return indexShardStats;
}
}

View File

@ -45,7 +45,7 @@ public class IndicesStatsResponse extends BroadcastResponse {
IndicesStatsResponse(StreamInput in) throws IOException {
super(in);
shards = in.readArray(ShardStats::readShardStats, (size) -> new ShardStats[size]);
shards = in.readArray(ShardStats::new, (size) -> new ShardStats[size]);
}
IndicesStatsResponse(ShardStats[] shards, int totalShards, int successfulShards, int failedShards,

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -35,7 +34,7 @@ import org.elasticsearch.index.shard.ShardPath;
import java.io.IOException;
public class ShardStats implements Streamable, Writeable, ToXContentFragment {
public class ShardStats implements Writeable, ToXContentFragment {
private ShardRouting shardRouting;
private CommonStats commonStats;
@ -60,7 +59,19 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment {
private String statePath;
private boolean isCustomDataPath;
ShardStats() {
public ShardStats(StreamInput in) throws IOException {
shardRouting = new ShardRouting(in);
commonStats = new CommonStats(in);
commitStats = CommitStats.readOptionalCommitStatsFrom(in);
statePath = in.readString();
dataPath = in.readString();
isCustomDataPath = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
}
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new);
}
}
public ShardStats(
@ -113,28 +124,6 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment {
return isCustomDataPath;
}
public static ShardStats readShardStats(StreamInput in) throws IOException {
ShardStats stats = new ShardStats();
stats.readFrom(in);
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
shardRouting = new ShardRouting(in);
commonStats = new CommonStats(in);
commitStats = CommitStats.readOptionalCommitStatsFrom(in);
statePath = in.readString();
dataPath = in.readString();
isCustomDataPath = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
}
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);

View File

@ -77,7 +77,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction<
@Override
protected ShardStats readShardResult(StreamInput in) throws IOException {
return ShardStats.readShardStats(in);
return new ShardStats(in);
}
@Override

View File

@ -118,7 +118,7 @@ public class PutIndexTemplateRequest extends MasterNodeRequest<PutIndexTemplateR
}
int aliasesSize = in.readVInt();
for (int i = 0; i < aliasesSize; i++) {
aliases.add(Alias.read(in));
aliases.add(new Alias(in));
}
version = in.readOptionalVInt();
}

View File

@ -23,7 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -35,7 +35,7 @@ import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class QueryExplanation implements Streamable, ToXContentFragment {
public class QueryExplanation implements Writeable, ToXContentFragment {
public static final String INDEX_FIELD = "index";
public static final String SHARD_FIELD = "shard";
@ -80,8 +80,16 @@ public class QueryExplanation implements Streamable, ToXContentFragment {
private String error;
QueryExplanation() {
public QueryExplanation(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
index = in.readOptionalString();
} else {
index = in.readString();
}
shard = in.readInt();
valid = in.readBoolean();
explanation = in.readOptionalString();
error = in.readOptionalString();
}
public QueryExplanation(String index, int shard, boolean valid, String explanation,
@ -113,19 +121,6 @@ public class QueryExplanation implements Streamable, ToXContentFragment {
return this.explanation;
}
@Override
public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
index = in.readOptionalString();
} else {
index = in.readString();
}
shard = in.readInt();
valid = in.readBoolean();
explanation = in.readOptionalString();
error = in.readOptionalString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
@ -139,12 +134,6 @@ public class QueryExplanation implements Streamable, ToXContentFragment {
out.writeOptionalString(error);
}
public static QueryExplanation readQueryExplanation(StreamInput in) throws IOException {
QueryExplanation exp = new QueryExplanation();
exp.readFrom(in);
return exp;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (getIndex() != null) {

View File

@ -34,7 +34,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.action.admin.indices.validate.query.QueryExplanation.readQueryExplanation;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
@ -84,7 +83,7 @@ public class ValidateQueryResponse extends BroadcastResponse {
if (size > 0) {
queryExplanations = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
queryExplanations.add(readQueryExplanation(in));
queryExplanations.add(new QueryExplanation(in));
}
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
@ -35,7 +34,7 @@ import java.io.IOException;
* Interface implemented by requests that modify the documents in an index like {@link IndexRequest}, {@link UpdateRequest}, and
* {@link BulkRequest}. Rather than implement this directly most implementers should extend {@link ReplicatedWriteRequest}.
*/
public interface WriteRequest<R extends WriteRequest<R>> extends Streamable {
public interface WriteRequest<R extends WriteRequest<R>> extends Writeable {
/**
* Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh (
* {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default).

View File

@ -27,7 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -54,7 +54,7 @@ public class ReplicationResponse extends ActionResponse {
public ReplicationResponse(StreamInput in) throws IOException {
super(in);
shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in);
shardInfo = new ReplicationResponse.ShardInfo(in);
}
@Override
@ -70,7 +70,7 @@ public class ReplicationResponse extends ActionResponse {
this.shardInfo = shardInfo;
}
public static class ShardInfo implements Streamable, ToXContentObject {
public static class ShardInfo implements Writeable, ToXContentObject {
private static final String TOTAL = "total";
private static final String SUCCESSFUL = "successful";
@ -81,7 +81,16 @@ public class ReplicationResponse extends ActionResponse {
private int successful;
private Failure[] failures = EMPTY;
public ShardInfo() {
public ShardInfo() {}
public ShardInfo(StreamInput in) throws IOException {
total = in.readVInt();
successful = in.readVInt();
int size = in.readVInt();
failures = new Failure[size];
for (int i = 0; i < size; i++) {
failures[i] = new Failure(in);
}
}
public ShardInfo(int total, int successful, Failure... failures) {
@ -131,17 +140,6 @@ public class ReplicationResponse extends ActionResponse {
return status;
}
@Override
public void readFrom(StreamInput in) throws IOException {
total = in.readVInt();
successful = in.readVInt();
int size = in.readVInt();
failures = new Failure[size];
for (int i = 0; i < size; i++) {
failures[i] = new Failure(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total);
@ -216,12 +214,6 @@ public class ReplicationResponse extends ActionResponse {
'}';
}
static ShardInfo readShardInfo(StreamInput in) throws IOException {
ShardInfo shardInfo = new ShardInfo();
shardInfo.readFrom(in);
return shardInfo;
}
public static class Failure extends ShardOperationFailedException implements ToXContentObject {
private static final String _INDEX = "_index";

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
@ -325,7 +326,7 @@ public class UpdateHelper {
}
@SuppressWarnings("unchecked")
public <T extends Streamable> T action() {
public <T extends Writeable> T action() {
return (T) action;
}

View File

@ -175,7 +175,7 @@ public class NodeIndicesStats implements Streamable, ToXContentFragment {
int indexShardListSize = in.readVInt();
List<IndexShardStats> indexShardStats = new ArrayList<>(indexShardListSize);
for (int j = 0; j < indexShardListSize; j++) {
indexShardStats.add(IndexShardStats.readIndexShardStats(in));
indexShardStats.add(new IndexShardStats(in));
}
statsByShard.put(index, indexShardStats);
}

View File

@ -29,6 +29,12 @@ import java.io.IOException;
public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest {
public static class Empty extends TransportRequest {
public static final Empty INSTANCE = new Empty();
public Empty() {}
public Empty(StreamInput in) throws IOException {
super(in);
}
}
/**

View File

@ -19,12 +19,13 @@
package org.elasticsearch.action.admin.indices.validate.query;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractStreamableXContentTestCase;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
public class QueryExplanationTests extends AbstractStreamableXContentTestCase<QueryExplanation> {
public class QueryExplanationTests extends AbstractSerializingTestCase<QueryExplanation> {
static QueryExplanation createRandomQueryExplanation(boolean isValid) {
String index = "index_" + randomInt(1000);
@ -47,13 +48,13 @@ public class QueryExplanationTests extends AbstractStreamableXContentTestCase<Qu
return QueryExplanation.fromXContent(parser);
}
@Override
protected QueryExplanation createBlankInstance() {
return new QueryExplanation();
}
@Override
protected QueryExplanation createTestInstance() {
return createRandomQueryExplanation();
}
@Override
protected Writeable.Reader<QueryExplanation> instanceReader() {
return QueryExplanation::new;
}
}

View File

@ -27,7 +27,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -361,7 +361,7 @@ public class UpdateRequestTests extends ESTestCase {
// We simulate that the document is not existing yet
GetResult getResult = new GetResult("test", "type1", "2", UNASSIGNED_SEQ_NO, 0, 0, false, null, null, null);
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis);
Streamable action = result.action();
Writeable action = result.action();
assertThat(action, instanceOf(IndexRequest.class));
IndexRequest indexAction = (IndexRequest) action;
assertEquals(nowInMillis, indexAction.sourceAsMap().get("update_timestamp"));
@ -374,7 +374,7 @@ public class UpdateRequestTests extends ESTestCase {
// We simulate that the document is not existing yet
GetResult getResult = new GetResult("test", "type1", "2", 0, 1, 0, true, new BytesArray("{}"), null, null);
UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> 42L);
Streamable action = result.action();
Writeable action = result.action();
assertThat(action, instanceOf(IndexRequest.class));
}
}
@ -424,7 +424,7 @@ public class UpdateRequestTests extends ESTestCase {
updateRequest,
getResult,
ESTestCase::randomNonNegativeLong);
final Streamable action = result.action();
final Writeable action = result.action();
assertThat(action, instanceOf(ReplicationRequest.class));
final ReplicationRequest<?> request = (ReplicationRequest<?>) action;
assertThat(request.timeout(), equalTo(updateRequest.timeout()));

View File

@ -1457,7 +1457,7 @@ public class IndexShardTests extends IndexShardTestCase {
BytesStreamOutput out = new BytesStreamOutput();
stats.writeTo(out);
StreamInput in = out.bytes().streamInput();
stats = ShardStats.readShardStats(in);
stats = new ShardStats(in);
XContentBuilder builder = jsonBuilder();
builder.startObject();

View File

@ -83,7 +83,7 @@ public class TransportActionProxyTests extends ESTestCase {
public void testSendMessage() throws InterruptedException {
serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse("TS_A");
@ -92,7 +92,7 @@ public class TransportActionProxyTests extends ESTestCase {
TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new);
serviceA.connectToNode(nodeB);
serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse("TS_B");
@ -100,7 +100,7 @@ public class TransportActionProxyTests extends ESTestCase {
});
TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new);
serviceB.connectToNode(nodeC);
serviceC.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
serviceC.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse("TS_C");
@ -143,7 +143,7 @@ public class TransportActionProxyTests extends ESTestCase {
}
public void testException() throws InterruptedException {
serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse("TS_A");
@ -152,7 +152,7 @@ public class TransportActionProxyTests extends ESTestCase {
TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new);
serviceA.connectToNode(nodeB);
serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse("TS_B");
@ -160,7 +160,7 @@ public class TransportActionProxyTests extends ESTestCase {
});
TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new);
serviceB.connectToNode(nodeC);
serviceC.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME,
serviceC.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new,
(request, channel, task) -> {
throw new ElasticsearchException("greetings from TS_C");
});
@ -209,10 +209,14 @@ public class TransportActionProxyTests extends ESTestCase {
}
public SimpleTestRequest() {}
public SimpleTestRequest(StreamInput in) throws IOException {
super(in);
sourceNode = in.readString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sourceNode = in.readString();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -1,45 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
public abstract class AbstractStreamableTestCase<T extends Streamable> extends AbstractWireTestCase<T> {
@Override
protected final T copyInstance(T instance, Version version) throws IOException {
return copyStreamable(instance, getNamedWriteableRegistry(), this::createBlankInstance, version);
}
@Override
protected final Writeable.Reader<T> instanceReader() {
return Streamable.newWriteableReader(this::createBlankInstance);
}
/**
* Creates an empty instance to use when deserialising the
* {@link Streamable}. This usually returns an instance created using the
* zer-arg constructor
*/
protected abstract T createBlankInstance();
}

View File

@ -1,102 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.function.Predicate;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public abstract class AbstractStreamableXContentTestCase<T extends ToXContent & Streamable> extends AbstractStreamableTestCase<T> {
/**
* Generic test that creates new instance from the test instance and checks
* both for equality and asserts equality on the two queries.
*/
public final void testFromXContent() throws IOException {
xContentTester(this::createParser, this::createXContextTestInstance, getToXContentParams(), this::doParseInstance)
.numberOfTestRuns(NUMBER_OF_TEST_RUNS)
.supportsUnknownFields(supportsUnknownFields())
.shuffleFieldsExceptions(getShuffleFieldsExceptions())
.randomFieldsExcludeFilter(getRandomFieldsExcludeFilter())
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(assertToXContentEquivalence())
.test();
}
/**
* Creates a random instance to use in the xcontent tests.
* Override this method if the random instance that you build
* should be aware of the {@link XContentType} used in the test.
*/
protected T createXContextTestInstance(XContentType xContentType) {
return createTestInstance();
}
/**
* Parses to a new instance using the provided {@link XContentParser}
*/
protected abstract T doParseInstance(XContentParser parser) throws IOException;
/**
* Indicates whether the parser supports unknown fields or not. In case it does, such behaviour will be tested by
* inserting random fields before parsing and checking that they don't make parsing fail.
*/
protected boolean supportsUnknownFields() {
return true;
}
/**
* Returns a predicate that given the field name indicates whether the field has to be excluded from random fields insertion or not
*/
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> false;
}
/**
* Fields that have to be ignored when shuffling as part of testFromXContent
*/
protected String[] getShuffleFieldsExceptions() {
return Strings.EMPTY_ARRAY;
}
/**
* Whether or not to assert equivalence of the {@link org.elasticsearch.common.xcontent.XContent} of the test instance and the instance
* parsed from the {@link org.elasticsearch.common.xcontent.XContent} of the test instance.
*
* @return true if equivalence should be asserted, otherwise false
*/
protected boolean assertToXContentEquivalence() {
return true;
}
/**
* Params that have to be provided when calling calling {@link ToXContent#toXContent(XContentBuilder, ToXContent.Params)}
*/
protected ToXContent.Params getToXContentParams() {
return ToXContent.EMPTY_PARAMS;
}
}

View File

@ -64,7 +64,6 @@ import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.joda.JodaDeprecationPatterns;
import org.elasticsearch.common.logging.DeprecationLogger;
@ -1161,18 +1160,6 @@ public abstract class ESTestCase extends LuceneTestCase {
return copyInstance(original, namedWriteableRegistry, (out, value) -> value.writeTo(out), reader, version);
}
/**
* Create a copy of an original {@link Streamable} object by running it through a {@link BytesStreamOutput} and
* reading it in again using a provided {@link Writeable.Reader}. The stream that is wrapped around the {@link StreamInput}
* potentially need to use a {@link NamedWriteableRegistry}, so this needs to be provided too (although it can be
* empty if the object that is streamed doesn't contain any {@link NamedWriteable} objects itself.
*/
public static <T extends Streamable> T copyStreamable(T original, NamedWriteableRegistry namedWriteableRegistry,
Supplier<T> supplier, Version version) throws IOException {
return copyInstance(original, namedWriteableRegistry, (out, value) -> value.writeTo(out),
Streamable.newWriteableReader(supplier), version);
}
protected static <T> T copyInstance(T original, NamedWriteableRegistry namedWriteableRegistry, Writeable.Writer<T> writer,
Writeable.Reader<T> reader, Version version) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {

View File

@ -228,7 +228,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testHelloWorld() {
serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
assertThat("moshe", equalTo(request.message));
try {
@ -304,7 +304,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testThreadContext() throws ExecutionException, InterruptedException {
serviceA.registerRequestHandler("internal:ping_pong", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:ping_pong", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
assertEquals("ping_user", threadPool.getThreadContext().getHeader("test.ping.user"));
assertNull(threadPool.getThreadContext().getTransient("my_private_context"));
@ -363,7 +363,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
// this should be a noop
serviceA.disconnectFromNode(nodeA);
final AtomicReference<Exception> exception = new AtomicReference<>();
serviceA.registerRequestHandler("internal:localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:localNode", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
try {
channel.sendResponse(new StringMessageResponse(request.message));
@ -416,9 +416,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
};
final String ACTION = "internal:action";
serviceA.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler(ACTION, ThreadPool.Names.GENERIC, TransportRequest.Empty::new,
requestHandler);
serviceB.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
serviceB.registerRequestHandler(ACTION, ThreadPool.Names.GENERIC, TransportRequest.Empty::new,
requestHandler);
class CountingListener implements TransportMessageListener {
@ -533,7 +533,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceC.start();
serviceC.acceptIncomingRequests();
serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, TransportRequest.Empty::new,
(request, channel, task) -> {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
@ -586,7 +586,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceC.start();
serviceC.acceptIncomingRequests();
serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
assertThat("moshe", equalTo(request.message));
try {
@ -636,7 +636,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testErrorMessage() {
serviceA.registerRequestHandler("internal:sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHelloException", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
@ -694,8 +694,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierException, InterruptedException {
Set<Exception> sendingErrors = ConcurrentCollections.newConcurrentSet();
Set<Exception> responseErrors = ConcurrentCollections.newConcurrentSet();
serviceA.registerRequestHandler("internal:test", TestRequest::new,
randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, (request, channel, task) -> {
serviceA.registerRequestHandler("internal:test", randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC,
TestRequest::new, (request, channel, task) -> {
try {
channel.sendResponse(new TestResponse((String) null));
} catch (Exception e) {
@ -711,7 +711,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
logger.trace("caught exception while responding from node B", e);
}
};
serviceB.registerRequestHandler("internal:test", TestRequest::new, ThreadPool.Names.SAME, ignoringRequestHandler);
serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, TestRequest::new, ignoringRequestHandler);
int halfSenders = scaledRandomIntBetween(3, 10);
final CyclicBarrier go = new CyclicBarrier(halfSenders * 2 + 1);
@ -797,7 +797,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
// simulate restart of nodeB
serviceB.close();
MockTransportService newService = buildService("TS_B_" + i, version1, Settings.EMPTY);
newService.registerRequestHandler("internal:test", TestRequest::new, ThreadPool.Names.SAME, ignoringRequestHandler);
newService.registerRequestHandler("internal:test", ThreadPool.Names.SAME, TestRequest::new, ignoringRequestHandler);
serviceB = newService;
nodeB = newService.getLocalDiscoNode();
serviceB.connectToNode(nodeA);
@ -819,7 +819,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final CountDownLatch latch2 = new CountDownLatch(1);
final CountDownLatch latch3 = new CountDownLatch(1);
try {
serviceA.registerRequestHandler("internal:foobar", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:foobar", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
try {
latch2.await();
@ -848,7 +848,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception {
serviceA.registerRequestHandler("internal:sayHelloTimeoutNoResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHelloTimeoutNoResponse", ThreadPool.Names.GENERIC, StringMessageRequest::new,
new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) {
@ -893,7 +893,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
CountDownLatch waitForever = new CountDownLatch(1);
CountDownLatch doneWaitingForever = new CountDownLatch(1);
Semaphore inFlight = new Semaphore(Integer.MAX_VALUE);
serviceA.registerRequestHandler("internal:sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHelloTimeoutDelayedResponse", ThreadPool.Names.GENERIC, StringMessageRequest::new,
new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws InterruptedException {
@ -1031,12 +1031,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
};
serviceA.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceA.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceA.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError);
serviceB.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceB.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler);
serviceB.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError);
serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, StringMessageRequest::new, handler);
serviceA.registerRequestHandler("internal:testNotSeen", ThreadPool.Names.SAME, StringMessageRequest::new, handler);
serviceA.registerRequestHandler("internal:testError", ThreadPool.Names.SAME, StringMessageRequest::new, handlerWithError);
serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, StringMessageRequest::new, handler);
serviceB.registerRequestHandler("internal:testNotSeen", ThreadPool.Names.SAME, StringMessageRequest::new, handler);
serviceB.registerRequestHandler("internal:testError", ThreadPool.Names.SAME, StringMessageRequest::new, handlerWithError);
String includeSettings;
String excludeSettings;
@ -1134,7 +1134,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
this.timeout = timeout;
}
public StringMessageRequest() {
public StringMessageRequest(StreamInput in) throws IOException {
super(in);
message = in.readString();
timeout = in.readLong();
}
public StringMessageRequest(String message) {
@ -1147,9 +1150,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
message = in.readString();
timeout = in.readLong();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
@ -1185,11 +1186,16 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
int value1;
Version0Request() {}
Version0Request(StreamInput in) throws IOException {
super(in);
value1 = in.readInt();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
value1 = in.readInt();
public final void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
@ -1203,9 +1209,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
int value2;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Version1Request() {}
Version1Request(StreamInput in) throws IOException {
super(in);
if (in.getVersion().onOrAfter(version1)) {
value2 = in.readInt();
}
@ -1270,7 +1277,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testVersionFrom0to1() throws Exception {
serviceB.registerRequestHandler("internal:version", Version1Request::new, ThreadPool.Names.SAME,
serviceB.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version1Request::new,
new TransportRequestHandler<Version1Request>() {
@Override
public void messageReceived(Version1Request request, TransportChannel channel, Task task) throws Exception {
@ -1312,7 +1319,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testVersionFrom1to0() throws Exception {
serviceA.registerRequestHandler("internal:version", Version0Request::new, ThreadPool.Names.SAME,
serviceA.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version0Request::new,
new TransportRequestHandler<Version0Request>() {
@Override
public void messageReceived(Version0Request request, TransportChannel channel, Task task) throws Exception {
@ -1356,7 +1363,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testVersionFrom1to1() throws Exception {
serviceB.registerRequestHandler("internal:version", Version1Request::new, ThreadPool.Names.SAME,
serviceB.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version1Request::new,
(request, channel, task) -> {
assertThat(request.value1, equalTo(1));
assertThat(request.value2, equalTo(2));
@ -1398,7 +1405,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testVersionFrom0to0() throws Exception {
serviceA.registerRequestHandler("internal:version", Version0Request::new, ThreadPool.Names.SAME,
serviceA.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version0Request::new,
(request, channel, task) -> {
assertThat(request.value1, equalTo(1));
Version0Response response = new Version0Response(1);
@ -1436,7 +1443,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testMockFailToSendNoConnectRule() throws Exception {
serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
@ -1493,7 +1500,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testMockUnresponsiveRule() throws IOException {
serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new,
(request, channel, task) -> {
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
@ -1548,7 +1555,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(2);
final AtomicReference<TransportAddress> addressA = new AtomicReference<>();
final AtomicReference<TransportAddress> addressB = new AtomicReference<>();
serviceB.registerRequestHandler("internal:action1", TestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> {
serviceB.registerRequestHandler("internal:action1", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> {
addressA.set(request.remoteAddress());
channel.sendResponse(new TestResponse((String) null));
latch.countDown();
@ -1588,7 +1595,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
try (TransportService service = buildService("TS_TEST", version0, null,
Settings.EMPTY, false, false)) {
AtomicBoolean requestProcessed = new AtomicBoolean(false);
service.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME,
service.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new,
(request, channel, task) -> {
requestProcessed.set(true);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
@ -1638,7 +1645,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
String info;
int resendCount;
public TestRequest() {
public TestRequest() {}
public TestRequest(StreamInput in) throws IOException {
super(in);
info = in.readOptionalString();
resendCount = in.readInt();
}
public TestRequest(String info) {
@ -1647,9 +1659,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
info = in.readOptionalString();
resendCount = in.readInt();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
@ -1798,11 +1808,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
}
serviceB.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
serviceB.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), TestRequest::new,
new TestRequestHandler(serviceB));
serviceC.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
serviceC.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), TestRequest::new,
new TestRequestHandler(serviceC));
serviceA.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
serviceA.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), TestRequest::new,
new TestRequestHandler(serviceA));
int iters = randomIntBetween(30, 60);
CountDownLatch allRequestsDone = new CountDownLatch(iters);
@ -1866,19 +1876,20 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testRegisterHandlerTwice() {
serviceB.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
serviceB.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), TestRequest::new,
(request, message, task) -> {
throw new AssertionError("boom");
});
expectThrows(IllegalArgumentException.class, () ->
serviceB.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME,
ThreadPool.Names.GENERIC),
serviceB.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
TestRequest::new,
(request, message, task) -> {
throw new AssertionError("boom");
})
);
serviceA.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
serviceA.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
TestRequest::new,
(request, message, task) -> {
throw new AssertionError("boom");
});
@ -2081,7 +2092,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testResponseHeadersArePreserved() throws InterruptedException {
List<String> executors = new ArrayList<>(ThreadPool.THREAD_POOL_TYPES.keySet());
CollectionUtil.timSort(executors); // makes sure it's reproducible
serviceA.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME,
serviceA.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new,
(request, channel, task) -> {
threadPool.getThreadContext().putTransient("boom", new Object());
@ -2142,7 +2153,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
List<String> executors = new ArrayList<>(ThreadPool.THREAD_POOL_TYPES.keySet());
CollectionUtil.timSort(executors); // makes sure it's reproducible
TransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY);
serviceC.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME,
serviceC.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new,
(request, channel, task) -> {
// do nothing
});
@ -2202,7 +2213,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
CountDownLatch receivedLatch = new CountDownLatch(1);
CountDownLatch sendResponseLatch = new CountDownLatch(1);
serviceC.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME,
serviceC.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new,
(request, channel, task) -> {
// don't block on a network thread here
threadPool.generic().execute(new AbstractRunnable() {
@ -2270,7 +2281,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY);
CountDownLatch receivedLatch = new CountDownLatch(1);
CountDownLatch sendResponseLatch = new CountDownLatch(1);
serviceB.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME,
serviceB.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new,
(request, channel, task) -> {
// don't block on a network thread here
threadPool.generic().execute(new AbstractRunnable() {
@ -2383,7 +2394,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
CountDownLatch sendResponseLatch = new CountDownLatch(1);
Exception ex = new RuntimeException("boom");
ex.setStackTrace(new StackTraceElement[0]);
serviceB.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME,
serviceB.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new,
(request, channel, task) -> {
// don't block on a network thread here
threadPool.generic().execute(new AbstractRunnable() {

View File

@ -256,7 +256,7 @@ public class DisruptableMockTransportTests extends ESTestCase {
}
private void registerRequestHandler(TransportService transportService, TransportRequestHandler<TransportRequest.Empty> handler) {
transportService.registerRequestHandler("internal:dummy", () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.GENERIC, handler);
transportService.registerRequestHandler("internal:dummy", ThreadPool.Names.GENERIC, TransportRequest.Empty::new, handler);
}
private void send(TransportService transportService, DiscoveryNode destinationNode,