diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/AbstractHlrcStreamableXContentTestCase.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/AbstractHlrcStreamableXContentTestCase.java deleted file mode 100644 index 1bf78c26a37..00000000000 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/AbstractHlrcStreamableXContentTestCase.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.client; - -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractStreamableXContentTestCase; - -import java.io.IOException; - -import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; - -/** - * @deprecated Use {@link AbstractResponseTestCase} instead of this class. - */ -// TODO: Remove and change subclasses to use AbstractResponseTestCase instead -@Deprecated -public abstract class AbstractHlrcStreamableXContentTestCase - extends AbstractStreamableXContentTestCase { - - /** - * Generic test that creates new instance of HLRC request/response from the test instance and checks - * both for equality and asserts equality on the two queries. - */ - public final void testHlrcFromXContent() throws IOException { - xContentTester(this::createParser, this::createTestInstance, getToXContentParams(), - p -> convertHlrcToInternal(doHlrcParseInstance(p))) - .numberOfTestRuns(NUMBER_OF_TEST_RUNS) - .supportsUnknownFields(supportsUnknownFields()) - .shuffleFieldsExceptions(getShuffleFieldsExceptions()) - .randomFieldsExcludeFilter(getRandomFieldsExcludeFilter()) - .assertEqualsConsumer(this::assertEqualInstances) - .assertToXContentEquivalence(true) - .test(); - } - - /** - * Parses to a new HLRC instance using the provided {@link XContentParser} - */ - public abstract H doHlrcParseInstance(XContentParser parser) throws IOException; - - /** - * Converts a HLRC instance to a XPack instance - */ - public abstract T convertHlrcToInternal(H instance); - - //TODO this would be final ideally: why do both responses need to parse from xcontent, only one (H) should? I think that T#fromXContent - //are only there for testing and could go away? Then the additional testHlrcFromXContent is also no longer needed. - @Override - protected T doParseInstance(XContentParser parser) throws IOException { - return convertHlrcToInternal(doHlrcParseInstance(parser)); - } -} diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java index 3bb5b8b2eec..901c4db006f 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java @@ -24,7 +24,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.script.Script; @@ -194,11 +194,11 @@ public class RoundTripTests extends ESTestCase { assertEquals(request.getTaskId(), tripped.getTaskId()); } - private StreamInput toInputByteStream(Streamable example) throws IOException { + private StreamInput toInputByteStream(Writeable example) throws IOException { return toInputByteStream(Version.CURRENT, example); } - private StreamInput toInputByteStream(Version version, Streamable example) throws IOException { + private StreamInput toInputByteStream(Version version, Writeable example) throws IOException { BytesStreamOutput out = new BytesStreamOutput(); out.setVersion(version); example.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java index 5159f334250..1e2004005c2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotIndexShardStatus.java @@ -147,7 +147,7 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements public void readFrom(StreamInput in) throws IOException { super.readFrom(in); stage = SnapshotIndexShardStage.fromValue(in.readByte()); - stats = SnapshotStats.readSnapshotStats(in); + stats = new SnapshotStats(in); nodeId = in.readOptionalString(); failure = in.readOptionalString(); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java index 4e69efd9ab6..16410eefbf0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStats.java @@ -22,7 +22,7 @@ package org.elasticsearch.action.admin.cluster.snapshots.status; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; @@ -33,7 +33,7 @@ import org.elasticsearch.common.xcontent.XContentParserUtils; import java.io.IOException; -public class SnapshotStats implements Streamable, ToXContentObject { +public class SnapshotStats implements Writeable, ToXContentObject { private long startTime; private long time; @@ -44,7 +44,25 @@ public class SnapshotStats implements Streamable, ToXContentObject { private long totalSize; private long processedSize; - SnapshotStats() { + SnapshotStats() {} + + SnapshotStats(StreamInput in) throws IOException { + startTime = in.readVLong(); + time = in.readVLong(); + + incrementalFileCount = in.readVInt(); + processedFileCount = in.readVInt(); + + incrementalSize = in.readVLong(); + processedSize = in.readVLong(); + + if (in.getVersion().onOrAfter(Version.V_6_4_0)) { + totalFileCount = in.readVInt(); + totalSize = in.readVLong(); + } else { + totalFileCount = incrementalFileCount; + totalSize = incrementalSize; + } } SnapshotStats(long startTime, long time, @@ -116,13 +134,6 @@ public class SnapshotStats implements Streamable, ToXContentObject { return processedSize; } - - public static SnapshotStats readSnapshotStats(StreamInput in) throws IOException { - SnapshotStats stats = new SnapshotStats(); - stats.readFrom(in); - return stats; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(startTime); @@ -140,26 +151,6 @@ public class SnapshotStats implements Streamable, ToXContentObject { } } - @Override - public void readFrom(StreamInput in) throws IOException { - startTime = in.readVLong(); - time = in.readVLong(); - - incrementalFileCount = in.readVInt(); - processedFileCount = in.readVInt(); - - incrementalSize = in.readVLong(); - processedSize = in.readVLong(); - - if (in.getVersion().onOrAfter(Version.V_6_4_0)) { - totalFileCount = in.readVInt(); - totalSize = in.readVLong(); - } else { - totalFileCount = incrementalFileCount; - totalSize = incrementalSize; - } - } - static final class Fields { static final String STATS = "stats"; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java index e93857571e3..c3147ea6649 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -27,7 +27,7 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -55,7 +55,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona /** * Status of a snapshot */ -public class SnapshotStatus implements ToXContentObject, Streamable { +public class SnapshotStatus implements ToXContentObject, Writeable { private Snapshot snapshot; @@ -72,6 +72,30 @@ public class SnapshotStatus implements ToXContentObject, Streamable { @Nullable private Boolean includeGlobalState; + SnapshotStatus(StreamInput in) throws IOException { + snapshot = new Snapshot(in); + state = State.fromValue(in.readByte()); + int size = in.readVInt(); + List builder = new ArrayList<>(); + for (int i = 0; i < size; i++) { + builder.add(SnapshotIndexShardStatus.readShardSnapshotStatus(in)); + } + shards = Collections.unmodifiableList(builder); + if (in.getVersion().onOrAfter(Version.V_6_2_0)) { + includeGlobalState = in.readOptionalBoolean(); + } + final long startTime; + final long time; + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + startTime = in.readLong(); + time = in.readLong(); + } else { + startTime = 0L; + time = 0L; + } + updateShardStats(startTime, time); + } + SnapshotStatus(Snapshot snapshot, State state, List shards, Boolean includeGlobalState, long startTime, long time) { this.snapshot = Objects.requireNonNull(snapshot); @@ -94,9 +118,6 @@ public class SnapshotStatus implements ToXContentObject, Streamable { this.includeGlobalState = includeGlobalState; } - SnapshotStatus() { - } - /** * Returns snapshot */ @@ -159,31 +180,6 @@ public class SnapshotStatus implements ToXContentObject, Streamable { } - @Override - public void readFrom(StreamInput in) throws IOException { - snapshot = new Snapshot(in); - state = State.fromValue(in.readByte()); - int size = in.readVInt(); - List builder = new ArrayList<>(); - for (int i = 0; i < size; i++) { - builder.add(SnapshotIndexShardStatus.readShardSnapshotStatus(in)); - } - shards = Collections.unmodifiableList(builder); - if (in.getVersion().onOrAfter(Version.V_6_2_0)) { - includeGlobalState = in.readOptionalBoolean(); - } - final long startTime; - final long time; - if (in.getVersion().onOrAfter(Version.V_7_4_0)) { - startTime = in.readLong(); - time = in.readLong(); - } else { - startTime = 0L; - time = 0L; - } - updateShardStats(startTime, time); - } - @Override public void writeTo(StreamOutput out) throws IOException { snapshot.writeTo(out); @@ -201,18 +197,6 @@ public class SnapshotStatus implements ToXContentObject, Streamable { } } - /** - * Reads snapshot status from stream input - * - * @param in stream input - * @return deserialized snapshot status - */ - public static SnapshotStatus readSnapshotStatus(StreamInput in) throws IOException { - SnapshotStatus snapshotInfo = new SnapshotStatus(); - snapshotInfo.readFrom(in); - return snapshotInfo; - } - @Override public String toString() { return Strings.toString(this, true, false); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java index 7d6912204ab..acf609b9bd1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java @@ -47,7 +47,7 @@ public class SnapshotsStatusResponse extends ActionResponse implements ToXConten int size = in.readVInt(); List builder = new ArrayList<>(); for (int i = 0; i < size; i++) { - builder.add(SnapshotStatus.readSnapshotStatus(in)); + builder.add(new SnapshotStatus(in)); } snapshots = Collections.unmodifiableList(builder); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java index 22bb406e4f2..b32ed7e39d2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodeResponse.java @@ -49,7 +49,7 @@ public class ClusterStatsNodeResponse extends BaseNodeResponse { int size = in.readVInt(); shardsStats = new ShardStats[size]; for (int i = 0; i < size; i++) { - shardsStats[i] = ShardStats.readShardStats(in); + shardsStats[i] = new ShardStats(in); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/Alias.java b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/Alias.java index 0f8439643b8..2a319dd8b9c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/Alias.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/Alias.java @@ -27,9 +27,8 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -44,7 +43,7 @@ import java.util.Map; /** * Represents an alias, to be associated with an index */ -public class Alias implements Streamable, ToXContentFragment { +public class Alias implements Writeable, ToXContentFragment { private static final ParseField FILTER = new ParseField("filter"); private static final ParseField ROUTING = new ParseField("routing"); @@ -66,8 +65,16 @@ public class Alias implements Streamable, ToXContentFragment { @Nullable private Boolean writeIndex; - private Alias() { - + public Alias(StreamInput in) throws IOException { + name = in.readString(); + filter = in.readOptionalString(); + indexRouting = in.readOptionalString(); + searchRouting = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_6_4_0)) { + writeIndex = in.readOptionalBoolean(); + } else { + writeIndex = null; + } } public Alias(String name) { @@ -187,28 +194,6 @@ public class Alias implements Streamable, ToXContentFragment { return this; } - /** - * Allows to read an alias from the provided input stream - */ - public static Alias read(StreamInput in) throws IOException { - Alias alias = new Alias(); - alias.readFrom(in); - return alias; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - name = in.readString(); - filter = in.readOptionalString(); - indexRouting = in.readOptionalString(); - searchRouting = in.readOptionalString(); - if (in.getVersion().onOrAfter(Version.V_6_4_0)) { - writeIndex = in.readOptionalBoolean(); - } else { - writeIndex = null; - } - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index 0a232b2f0c0..5745e50df2b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -113,7 +113,7 @@ public class CreateIndexRequest extends AcknowledgedRequest } int aliasesSize = in.readVInt(); for (int i = 0; i < aliasesSize; i++) { - aliases.add(Alias.read(in)); + aliases.add(new Alias(in)); } if (in.getVersion().before(Version.V_7_0_0)) { in.readBoolean(); // updateAllTypes diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java index ba50e26f222..ba598803911 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java @@ -21,7 +21,6 @@ package org.elasticsearch.action.admin.indices.shards; import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; @@ -31,7 +30,7 @@ import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -40,8 +39,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse.StoreStatus.readStoreStatus; - /** * Response for {@link IndicesShardStoresAction} * @@ -53,7 +50,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon /** * Shard store information from a node */ - public static class StoreStatus implements Streamable, ToXContentFragment, Comparable { + public static class StoreStatus implements Writeable, ToXContentFragment, Comparable { private DiscoveryNode node; private String allocationId; private Exception storeException; @@ -112,7 +109,17 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon } } - private StoreStatus() { + public StoreStatus(StreamInput in) throws IOException { + node = new DiscoveryNode(in); + if (in.getVersion().before(Version.V_6_0_0_alpha1)) { + // legacy version + in.readLong(); + } + allocationId = in.readOptionalString(); + allocationStatus = AllocationStatus.readFrom(in); + if (in.readBoolean()) { + storeException = in.readException(); + } } public StoreStatus(DiscoveryNode node, String allocationId, AllocationStatus allocationStatus, Exception storeException) { @@ -155,26 +162,6 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon return allocationStatus; } - public static StoreStatus readStoreStatus(StreamInput in) throws IOException { - StoreStatus storeStatus = new StoreStatus(); - storeStatus.readFrom(in); - return storeStatus; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - node = new DiscoveryNode(in); - if (in.getVersion().before(Version.V_6_0_0_alpha1)) { - // legacy version - in.readLong(); - } - allocationId = in.readOptionalString(); - allocationStatus = AllocationStatus.readFrom(in); - if (in.readBoolean()) { - storeException = in.readException(); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { node.writeTo(out); @@ -303,7 +290,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon int nodeEntries = in.readVInt(); List storeStatuses = new ArrayList<>(nodeEntries); for (int nodeCount = 0; nodeCount < nodeEntries; nodeCount++) { - storeStatuses.add(readStoreStatus(in)); + storeStatuses.add(new StoreStatus(in)); } shardEntries.put(shardID, storeStatuses); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndexShardStats.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndexShardStats.java index 6c1de5b2992..e6a3b43093c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndexShardStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndexShardStats.java @@ -21,20 +21,27 @@ package org.elasticsearch.action.admin.indices.stats; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; import java.util.Arrays; import java.util.Iterator; -public class IndexShardStats implements Iterable, Streamable { +public class IndexShardStats implements Iterable, Writeable { private ShardId shardId; private ShardStats[] shards; - private IndexShardStats() {} + public IndexShardStats(StreamInput in) throws IOException { + shardId = new ShardId(in); + int shardSize = in.readVInt(); + shards = new ShardStats[shardSize]; + for (int i = 0; i < shardSize; i++) { + shards[i] = new ShardStats(in); + } + } public IndexShardStats(ShardId shardId, ShardStats[] shards) { this.shardId = shardId; @@ -88,16 +95,6 @@ public class IndexShardStats implements Iterable, Streamable { return stats; } - @Override - public void readFrom(StreamInput in) throws IOException { - shardId = new ShardId(in); - int shardSize = in.readVInt(); - shards = new ShardStats[shardSize]; - for (int i = 0; i < shardSize; i++) { - shards[i] = ShardStats.readShardStats(in); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); @@ -106,11 +103,4 @@ public class IndexShardStats implements Iterable, Streamable { stats.writeTo(out); } } - - public static IndexShardStats readIndexShardStats(StreamInput in) throws IOException { - IndexShardStats indexShardStats = new IndexShardStats(); - indexShardStats.readFrom(in); - return indexShardStats; - } - } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java index 91a7ec5d31c..1dd093b8485 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -45,7 +45,7 @@ public class IndicesStatsResponse extends BroadcastResponse { IndicesStatsResponse(StreamInput in) throws IOException { super(in); - shards = in.readArray(ShardStats::readShardStats, (size) -> new ShardStats[size]); + shards = in.readArray(ShardStats::new, (size) -> new ShardStats[size]); } IndicesStatsResponse(ShardStats[] shards, int totalShards, int successfulShards, int failedShards, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 1fefe3199ee..5054a626e03 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -35,7 +34,7 @@ import org.elasticsearch.index.shard.ShardPath; import java.io.IOException; -public class ShardStats implements Streamable, Writeable, ToXContentFragment { +public class ShardStats implements Writeable, ToXContentFragment { private ShardRouting shardRouting; private CommonStats commonStats; @@ -60,7 +59,19 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment { private String statePath; private boolean isCustomDataPath; - ShardStats() { + public ShardStats(StreamInput in) throws IOException { + shardRouting = new ShardRouting(in); + commonStats = new CommonStats(in); + commitStats = CommitStats.readOptionalCommitStatsFrom(in); + statePath = in.readString(); + dataPath = in.readString(); + isCustomDataPath = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { + seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + } + if (in.getVersion().onOrAfter(Version.V_6_7_0)) { + retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new); + } } public ShardStats( @@ -113,28 +124,6 @@ public class ShardStats implements Streamable, Writeable, ToXContentFragment { return isCustomDataPath; } - public static ShardStats readShardStats(StreamInput in) throws IOException { - ShardStats stats = new ShardStats(); - stats.readFrom(in); - return stats; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - shardRouting = new ShardRouting(in); - commonStats = new CommonStats(in); - commitStats = CommitStats.readOptionalCommitStatsFrom(in); - statePath = in.readString(); - dataPath = in.readString(); - isCustomDataPath = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) { - seqNoStats = in.readOptionalWriteable(SeqNoStats::new); - } - if (in.getVersion().onOrAfter(Version.V_6_7_0)) { - retentionLeaseStats = in.readOptionalWriteable(RetentionLeaseStats::new); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { shardRouting.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 80c7377087e..400358f59af 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -77,7 +77,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< @Override protected ShardStats readShardResult(StreamInput in) throws IOException { - return ShardStats.readShardStats(in); + return new ShardStats(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/template/put/PutIndexTemplateRequest.java b/server/src/main/java/org/elasticsearch/action/admin/indices/template/put/PutIndexTemplateRequest.java index 9c3c0b0b2ac..36b48fabf6c 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/template/put/PutIndexTemplateRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/template/put/PutIndexTemplateRequest.java @@ -118,7 +118,7 @@ public class PutIndexTemplateRequest extends MasterNodeRequest 0) { queryExplanations = new ArrayList<>(size); for (int i = 0; i < size; i++) { - queryExplanations.add(readQueryExplanation(in)); + queryExplanations.add(new QueryExplanation(in)); } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/WriteRequest.java b/server/src/main/java/org/elasticsearch/action/support/WriteRequest.java index 50edcd39bd1..a163f0f63d0 100644 --- a/server/src/main/java/org/elasticsearch/action/support/WriteRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/WriteRequest.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; @@ -35,7 +34,7 @@ import java.io.IOException; * Interface implemented by requests that modify the documents in an index like {@link IndexRequest}, {@link UpdateRequest}, and * {@link BulkRequest}. Rather than implement this directly most implementers should extend {@link ReplicatedWriteRequest}. */ -public interface WriteRequest> extends Streamable { +public interface WriteRequest> extends Writeable { /** * Should this request trigger a refresh ({@linkplain RefreshPolicy#IMMEDIATE}), wait for a refresh ( * {@linkplain RefreshPolicy#WAIT_UNTIL}), or proceed ignore refreshes entirely ({@linkplain RefreshPolicy#NONE}, the default). diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java index 77c23844b52..f9c1b11d1d4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java @@ -27,7 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -54,7 +54,7 @@ public class ReplicationResponse extends ActionResponse { public ReplicationResponse(StreamInput in) throws IOException { super(in); - shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in); + shardInfo = new ReplicationResponse.ShardInfo(in); } @Override @@ -70,7 +70,7 @@ public class ReplicationResponse extends ActionResponse { this.shardInfo = shardInfo; } - public static class ShardInfo implements Streamable, ToXContentObject { + public static class ShardInfo implements Writeable, ToXContentObject { private static final String TOTAL = "total"; private static final String SUCCESSFUL = "successful"; @@ -81,7 +81,16 @@ public class ReplicationResponse extends ActionResponse { private int successful; private Failure[] failures = EMPTY; - public ShardInfo() { + public ShardInfo() {} + + public ShardInfo(StreamInput in) throws IOException { + total = in.readVInt(); + successful = in.readVInt(); + int size = in.readVInt(); + failures = new Failure[size]; + for (int i = 0; i < size; i++) { + failures[i] = new Failure(in); + } } public ShardInfo(int total, int successful, Failure... failures) { @@ -131,17 +140,6 @@ public class ReplicationResponse extends ActionResponse { return status; } - @Override - public void readFrom(StreamInput in) throws IOException { - total = in.readVInt(); - successful = in.readVInt(); - int size = in.readVInt(); - failures = new Failure[size]; - for (int i = 0; i < size; i++) { - failures[i] = new Failure(in); - } - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(total); @@ -216,12 +214,6 @@ public class ReplicationResponse extends ActionResponse { '}'; } - static ShardInfo readShardInfo(StreamInput in) throws IOException { - ShardInfo shardInfo = new ShardInfo(); - shardInfo.readFrom(in); - return shardInfo; - } - public static class Failure extends ShardOperationFailedException implements ToXContentObject { private static final String _INDEX = "_index"; diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index c6e45af0e6a..126e8d2eb6e 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; @@ -325,7 +326,7 @@ public class UpdateHelper { } @SuppressWarnings("unchecked") - public T action() { + public T action() { return (T) action; } diff --git a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index fa52c88c02b..eb5a8df7e20 100644 --- a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -175,7 +175,7 @@ public class NodeIndicesStats implements Streamable, ToXContentFragment { int indexShardListSize = in.readVInt(); List indexShardStats = new ArrayList<>(indexShardListSize); for (int j = 0; j < indexShardListSize; j++) { - indexShardStats.add(IndexShardStats.readIndexShardStats(in)); + indexShardStats.add(new IndexShardStats(in)); } statsByShard.put(index, indexShardStats); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportRequest.java b/server/src/main/java/org/elasticsearch/transport/TransportRequest.java index 866a34302c5..f7fdac54a67 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportRequest.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportRequest.java @@ -29,6 +29,12 @@ import java.io.IOException; public abstract class TransportRequest extends TransportMessage implements TaskAwareRequest { public static class Empty extends TransportRequest { public static final Empty INSTANCE = new Empty(); + + public Empty() {} + + public Empty(StreamInput in) throws IOException { + super(in); + } } /** diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/validate/query/QueryExplanationTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/validate/query/QueryExplanationTests.java index db167e0c766..45f0711319a 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/validate/query/QueryExplanationTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/validate/query/QueryExplanationTests.java @@ -19,12 +19,13 @@ package org.elasticsearch.action.admin.indices.validate.query; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.test.AbstractStreamableXContentTestCase; +import org.elasticsearch.test.AbstractSerializingTestCase; import java.io.IOException; -public class QueryExplanationTests extends AbstractStreamableXContentTestCase { +public class QueryExplanationTests extends AbstractSerializingTestCase { static QueryExplanation createRandomQueryExplanation(boolean isValid) { String index = "index_" + randomInt(1000); @@ -47,13 +48,13 @@ public class QueryExplanationTests extends AbstractStreamableXContentTestCase instanceReader() { + return QueryExplanation::new; + } } diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index 6549c3a8df5..8d1e785c4f4 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.document.DocumentField; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -361,7 +361,7 @@ public class UpdateRequestTests extends ESTestCase { // We simulate that the document is not existing yet GetResult getResult = new GetResult("test", "type1", "2", UNASSIGNED_SEQ_NO, 0, 0, false, null, null, null); UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> nowInMillis); - Streamable action = result.action(); + Writeable action = result.action(); assertThat(action, instanceOf(IndexRequest.class)); IndexRequest indexAction = (IndexRequest) action; assertEquals(nowInMillis, indexAction.sourceAsMap().get("update_timestamp")); @@ -374,7 +374,7 @@ public class UpdateRequestTests extends ESTestCase { // We simulate that the document is not existing yet GetResult getResult = new GetResult("test", "type1", "2", 0, 1, 0, true, new BytesArray("{}"), null, null); UpdateHelper.Result result = updateHelper.prepare(new ShardId("test", "_na_", 0), updateRequest, getResult, () -> 42L); - Streamable action = result.action(); + Writeable action = result.action(); assertThat(action, instanceOf(IndexRequest.class)); } } @@ -424,7 +424,7 @@ public class UpdateRequestTests extends ESTestCase { updateRequest, getResult, ESTestCase::randomNonNegativeLong); - final Streamable action = result.action(); + final Writeable action = result.action(); assertThat(action, instanceOf(ReplicationRequest.class)); final ReplicationRequest request = (ReplicationRequest) action; assertThat(request.timeout(), equalTo(updateRequest.timeout())); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f220311cddb..bc015ddf795 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1457,7 +1457,7 @@ public class IndexShardTests extends IndexShardTestCase { BytesStreamOutput out = new BytesStreamOutput(); stats.writeTo(out); StreamInput in = out.bytes().streamInput(); - stats = ShardStats.readShardStats(in); + stats = new ShardStats(in); XContentBuilder builder = jsonBuilder(); builder.startObject(); diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index e8927b57616..d097f1fd9dd 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -83,7 +83,7 @@ public class TransportActionProxyTests extends ESTestCase { public void testSendMessage() throws InterruptedException { - serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, + serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); SimpleTestResponse response = new SimpleTestResponse("TS_A"); @@ -92,7 +92,7 @@ public class TransportActionProxyTests extends ESTestCase { TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new); serviceA.connectToNode(nodeB); - serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, + serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); SimpleTestResponse response = new SimpleTestResponse("TS_B"); @@ -100,7 +100,7 @@ public class TransportActionProxyTests extends ESTestCase { }); TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new); serviceB.connectToNode(nodeC); - serviceC.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, + serviceC.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); SimpleTestResponse response = new SimpleTestResponse("TS_C"); @@ -143,7 +143,7 @@ public class TransportActionProxyTests extends ESTestCase { } public void testException() throws InterruptedException { - serviceA.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, + serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); SimpleTestResponse response = new SimpleTestResponse("TS_A"); @@ -152,7 +152,7 @@ public class TransportActionProxyTests extends ESTestCase { TransportActionProxy.registerProxyAction(serviceA, "internal:test", SimpleTestResponse::new); serviceA.connectToNode(nodeB); - serviceB.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, + serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> { assertEquals(request.sourceNode, "TS_A"); SimpleTestResponse response = new SimpleTestResponse("TS_B"); @@ -160,7 +160,7 @@ public class TransportActionProxyTests extends ESTestCase { }); TransportActionProxy.registerProxyAction(serviceB, "internal:test", SimpleTestResponse::new); serviceB.connectToNode(nodeC); - serviceC.registerRequestHandler("internal:test", SimpleTestRequest::new, ThreadPool.Names.SAME, + serviceC.registerRequestHandler("internal:test", ThreadPool.Names.SAME, SimpleTestRequest::new, (request, channel, task) -> { throw new ElasticsearchException("greetings from TS_C"); }); @@ -209,10 +209,14 @@ public class TransportActionProxyTests extends ESTestCase { } public SimpleTestRequest() {} + public SimpleTestRequest(StreamInput in) throws IOException { + super(in); + sourceNode = in.readString(); + } + @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - sourceNode = in.readString(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractStreamableTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractStreamableTestCase.java deleted file mode 100644 index 15ce07e9775..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractStreamableTestCase.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test; - -import org.elasticsearch.Version; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.io.stream.Writeable; - -import java.io.IOException; - -public abstract class AbstractStreamableTestCase extends AbstractWireTestCase { - - @Override - protected final T copyInstance(T instance, Version version) throws IOException { - return copyStreamable(instance, getNamedWriteableRegistry(), this::createBlankInstance, version); - } - - @Override - protected final Writeable.Reader instanceReader() { - return Streamable.newWriteableReader(this::createBlankInstance); - } - - /** - * Creates an empty instance to use when deserialising the - * {@link Streamable}. This usually returns an instance created using the - * zer-arg constructor - */ - protected abstract T createBlankInstance(); -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractStreamableXContentTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractStreamableXContentTestCase.java deleted file mode 100644 index 90dd0256eda..00000000000 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractStreamableXContentTestCase.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test; - -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.common.xcontent.ToXContent; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; - -import java.io.IOException; -import java.util.function.Predicate; - -import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester; - -public abstract class AbstractStreamableXContentTestCase extends AbstractStreamableTestCase { - - /** - * Generic test that creates new instance from the test instance and checks - * both for equality and asserts equality on the two queries. - */ - public final void testFromXContent() throws IOException { - xContentTester(this::createParser, this::createXContextTestInstance, getToXContentParams(), this::doParseInstance) - .numberOfTestRuns(NUMBER_OF_TEST_RUNS) - .supportsUnknownFields(supportsUnknownFields()) - .shuffleFieldsExceptions(getShuffleFieldsExceptions()) - .randomFieldsExcludeFilter(getRandomFieldsExcludeFilter()) - .assertEqualsConsumer(this::assertEqualInstances) - .assertToXContentEquivalence(assertToXContentEquivalence()) - .test(); - } - - /** - * Creates a random instance to use in the xcontent tests. - * Override this method if the random instance that you build - * should be aware of the {@link XContentType} used in the test. - */ - protected T createXContextTestInstance(XContentType xContentType) { - return createTestInstance(); - } - - /** - * Parses to a new instance using the provided {@link XContentParser} - */ - protected abstract T doParseInstance(XContentParser parser) throws IOException; - - /** - * Indicates whether the parser supports unknown fields or not. In case it does, such behaviour will be tested by - * inserting random fields before parsing and checking that they don't make parsing fail. - */ - protected boolean supportsUnknownFields() { - return true; - } - - /** - * Returns a predicate that given the field name indicates whether the field has to be excluded from random fields insertion or not - */ - protected Predicate getRandomFieldsExcludeFilter() { - return field -> false; - } - - /** - * Fields that have to be ignored when shuffling as part of testFromXContent - */ - protected String[] getShuffleFieldsExceptions() { - return Strings.EMPTY_ARRAY; - } - - /** - * Whether or not to assert equivalence of the {@link org.elasticsearch.common.xcontent.XContent} of the test instance and the instance - * parsed from the {@link org.elasticsearch.common.xcontent.XContent} of the test instance. - * - * @return true if equivalence should be asserted, otherwise false - */ - protected boolean assertToXContentEquivalence() { - return true; - } - - /** - * Params that have to be provided when calling calling {@link ToXContent#toXContent(XContentBuilder, ToXContent.Params)} - */ - protected ToXContent.Params getToXContentParams() { - return ToXContent.EMPTY_PARAMS; - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 0f7310c4452..2aade980ca4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -64,7 +64,6 @@ import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.joda.JodaDeprecationPatterns; import org.elasticsearch.common.logging.DeprecationLogger; @@ -1161,18 +1160,6 @@ public abstract class ESTestCase extends LuceneTestCase { return copyInstance(original, namedWriteableRegistry, (out, value) -> value.writeTo(out), reader, version); } - /** - * Create a copy of an original {@link Streamable} object by running it through a {@link BytesStreamOutput} and - * reading it in again using a provided {@link Writeable.Reader}. The stream that is wrapped around the {@link StreamInput} - * potentially need to use a {@link NamedWriteableRegistry}, so this needs to be provided too (although it can be - * empty if the object that is streamed doesn't contain any {@link NamedWriteable} objects itself. - */ - public static T copyStreamable(T original, NamedWriteableRegistry namedWriteableRegistry, - Supplier supplier, Version version) throws IOException { - return copyInstance(original, namedWriteableRegistry, (out, value) -> value.writeTo(out), - Streamable.newWriteableReader(supplier), version); - } - protected static T copyInstance(T original, NamedWriteableRegistry namedWriteableRegistry, Writeable.Writer writer, Writeable.Reader reader, Version version) throws IOException { try (BytesStreamOutput output = new BytesStreamOutput()) { diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 47abfd5fa2b..73ba545343c 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -228,7 +228,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testHelloWorld() { - serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { assertThat("moshe", equalTo(request.message)); try { @@ -304,7 +304,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testThreadContext() throws ExecutionException, InterruptedException { - serviceA.registerRequestHandler("internal:ping_pong", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:ping_pong", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { assertEquals("ping_user", threadPool.getThreadContext().getHeader("test.ping.user")); assertNull(threadPool.getThreadContext().getTransient("my_private_context")); @@ -363,7 +363,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { // this should be a noop serviceA.disconnectFromNode(nodeA); final AtomicReference exception = new AtomicReference<>(); - serviceA.registerRequestHandler("internal:localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:localNode", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { try { channel.sendResponse(new StringMessageResponse(request.message)); @@ -416,9 +416,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }; final String ACTION = "internal:action"; - serviceA.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler(ACTION, ThreadPool.Names.GENERIC, TransportRequest.Empty::new, requestHandler); - serviceB.registerRequestHandler(ACTION, TransportRequest.Empty::new, ThreadPool.Names.GENERIC, + serviceB.registerRequestHandler(ACTION, ThreadPool.Names.GENERIC, TransportRequest.Empty::new, requestHandler); class CountingListener implements TransportMessageListener { @@ -533,7 +533,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceC.start(); serviceC.acceptIncomingRequests(); - serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, TransportRequest.Empty::new, (request, channel, task) -> { try { channel.sendResponse(TransportResponse.Empty.INSTANCE); @@ -586,7 +586,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceC.start(); serviceC.acceptIncomingRequests(); - serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { assertThat("moshe", equalTo(request.message)); try { @@ -636,7 +636,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testErrorMessage() { - serviceA.registerRequestHandler("internal:sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:sayHelloException", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { assertThat("moshe", equalTo(request.message)); throw new RuntimeException("bad message !!!"); @@ -694,8 +694,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierException, InterruptedException { Set sendingErrors = ConcurrentCollections.newConcurrentSet(); Set responseErrors = ConcurrentCollections.newConcurrentSet(); - serviceA.registerRequestHandler("internal:test", TestRequest::new, - randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, (request, channel, task) -> { + serviceA.registerRequestHandler("internal:test", randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, + TestRequest::new, (request, channel, task) -> { try { channel.sendResponse(new TestResponse((String) null)); } catch (Exception e) { @@ -711,7 +711,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { logger.trace("caught exception while responding from node B", e); } }; - serviceB.registerRequestHandler("internal:test", TestRequest::new, ThreadPool.Names.SAME, ignoringRequestHandler); + serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, TestRequest::new, ignoringRequestHandler); int halfSenders = scaledRandomIntBetween(3, 10); final CyclicBarrier go = new CyclicBarrier(halfSenders * 2 + 1); @@ -797,7 +797,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { // simulate restart of nodeB serviceB.close(); MockTransportService newService = buildService("TS_B_" + i, version1, Settings.EMPTY); - newService.registerRequestHandler("internal:test", TestRequest::new, ThreadPool.Names.SAME, ignoringRequestHandler); + newService.registerRequestHandler("internal:test", ThreadPool.Names.SAME, TestRequest::new, ignoringRequestHandler); serviceB = newService; nodeB = newService.getLocalDiscoNode(); serviceB.connectToNode(nodeA); @@ -819,7 +819,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { final CountDownLatch latch2 = new CountDownLatch(1); final CountDownLatch latch3 = new CountDownLatch(1); try { - serviceA.registerRequestHandler("internal:foobar", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:foobar", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { try { latch2.await(); @@ -848,7 +848,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception { - serviceA.registerRequestHandler("internal:sayHelloTimeoutNoResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:sayHelloTimeoutNoResponse", ThreadPool.Names.GENERIC, StringMessageRequest::new, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) { @@ -893,7 +893,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { CountDownLatch waitForever = new CountDownLatch(1); CountDownLatch doneWaitingForever = new CountDownLatch(1); Semaphore inFlight = new Semaphore(Integer.MAX_VALUE); - serviceA.registerRequestHandler("internal:sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:sayHelloTimeoutDelayedResponse", ThreadPool.Names.GENERIC, StringMessageRequest::new, new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws InterruptedException { @@ -1031,12 +1031,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }; - serviceA.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); - serviceA.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler); - serviceA.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); - serviceB.registerRequestHandler("internal:test", StringMessageRequest::new, ThreadPool.Names.SAME, handler); - serviceB.registerRequestHandler("internal:testNotSeen", StringMessageRequest::new, ThreadPool.Names.SAME, handler); - serviceB.registerRequestHandler("internal:testError", StringMessageRequest::new, ThreadPool.Names.SAME, handlerWithError); + serviceA.registerRequestHandler("internal:test", ThreadPool.Names.SAME, StringMessageRequest::new, handler); + serviceA.registerRequestHandler("internal:testNotSeen", ThreadPool.Names.SAME, StringMessageRequest::new, handler); + serviceA.registerRequestHandler("internal:testError", ThreadPool.Names.SAME, StringMessageRequest::new, handlerWithError); + serviceB.registerRequestHandler("internal:test", ThreadPool.Names.SAME, StringMessageRequest::new, handler); + serviceB.registerRequestHandler("internal:testNotSeen", ThreadPool.Names.SAME, StringMessageRequest::new, handler); + serviceB.registerRequestHandler("internal:testError", ThreadPool.Names.SAME, StringMessageRequest::new, handlerWithError); String includeSettings; String excludeSettings; @@ -1134,7 +1134,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { this.timeout = timeout; } - public StringMessageRequest() { + public StringMessageRequest(StreamInput in) throws IOException { + super(in); + message = in.readString(); + timeout = in.readLong(); } public StringMessageRequest(String message) { @@ -1147,9 +1150,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - message = in.readString(); - timeout = in.readLong(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1185,11 +1186,16 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { int value1; + Version0Request() {} + + Version0Request(StreamInput in) throws IOException { + super(in); + value1 = in.readInt(); + } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - value1 = in.readInt(); + public final void readFrom(StreamInput in) throws IOException { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1203,9 +1209,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { int value2; - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + Version1Request() {} + + Version1Request(StreamInput in) throws IOException { + super(in); if (in.getVersion().onOrAfter(version1)) { value2 = in.readInt(); } @@ -1270,7 +1277,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testVersionFrom0to1() throws Exception { - serviceB.registerRequestHandler("internal:version", Version1Request::new, ThreadPool.Names.SAME, + serviceB.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version1Request::new, new TransportRequestHandler() { @Override public void messageReceived(Version1Request request, TransportChannel channel, Task task) throws Exception { @@ -1312,7 +1319,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testVersionFrom1to0() throws Exception { - serviceA.registerRequestHandler("internal:version", Version0Request::new, ThreadPool.Names.SAME, + serviceA.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version0Request::new, new TransportRequestHandler() { @Override public void messageReceived(Version0Request request, TransportChannel channel, Task task) throws Exception { @@ -1356,7 +1363,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testVersionFrom1to1() throws Exception { - serviceB.registerRequestHandler("internal:version", Version1Request::new, ThreadPool.Names.SAME, + serviceB.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version1Request::new, (request, channel, task) -> { assertThat(request.value1, equalTo(1)); assertThat(request.value2, equalTo(2)); @@ -1398,7 +1405,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testVersionFrom0to0() throws Exception { - serviceA.registerRequestHandler("internal:version", Version0Request::new, ThreadPool.Names.SAME, + serviceA.registerRequestHandler("internal:version", ThreadPool.Names.SAME, Version0Request::new, (request, channel, task) -> { assertThat(request.value1, equalTo(1)); Version0Response response = new Version0Response(1); @@ -1436,7 +1443,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testMockFailToSendNoConnectRule() throws Exception { - serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { assertThat("moshe", equalTo(request.message)); throw new RuntimeException("bad message !!!"); @@ -1493,7 +1500,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testMockUnresponsiveRule() throws IOException { - serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + serviceA.registerRequestHandler("internal:sayHello", ThreadPool.Names.GENERIC, StringMessageRequest::new, (request, channel, task) -> { assertThat("moshe", equalTo(request.message)); throw new RuntimeException("bad message !!!"); @@ -1548,7 +1555,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { final CountDownLatch latch = new CountDownLatch(2); final AtomicReference addressA = new AtomicReference<>(); final AtomicReference addressB = new AtomicReference<>(); - serviceB.registerRequestHandler("internal:action1", TestRequest::new, ThreadPool.Names.SAME, (request, channel, task) -> { + serviceB.registerRequestHandler("internal:action1", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> { addressA.set(request.remoteAddress()); channel.sendResponse(new TestResponse((String) null)); latch.countDown(); @@ -1588,7 +1595,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { try (TransportService service = buildService("TS_TEST", version0, null, Settings.EMPTY, false, false)) { AtomicBoolean requestProcessed = new AtomicBoolean(false); - service.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME, + service.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> { requestProcessed.set(true); channel.sendResponse(TransportResponse.Empty.INSTANCE); @@ -1638,7 +1645,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { String info; int resendCount; - public TestRequest() { + public TestRequest() {} + + public TestRequest(StreamInput in) throws IOException { + super(in); + info = in.readOptionalString(); + resendCount = in.readInt(); } public TestRequest(String info) { @@ -1647,9 +1659,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - info = in.readOptionalString(); - resendCount = in.readInt(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -1798,11 +1808,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } } - serviceB.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + serviceB.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), TestRequest::new, new TestRequestHandler(serviceB)); - serviceC.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + serviceC.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), TestRequest::new, new TestRequestHandler(serviceC)); - serviceA.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + serviceA.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), TestRequest::new, new TestRequestHandler(serviceA)); int iters = randomIntBetween(30, 60); CountDownLatch allRequestsDone = new CountDownLatch(iters); @@ -1866,19 +1876,20 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testRegisterHandlerTwice() { - serviceB.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + serviceB.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), TestRequest::new, (request, message, task) -> { throw new AssertionError("boom"); }); expectThrows(IllegalArgumentException.class, () -> - serviceB.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, - ThreadPool.Names.GENERIC), + serviceB.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + TestRequest::new, (request, message, task) -> { throw new AssertionError("boom"); }) ); - serviceA.registerRequestHandler("internal:action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + serviceA.registerRequestHandler("internal:action1", randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + TestRequest::new, (request, message, task) -> { throw new AssertionError("boom"); }); @@ -2081,7 +2092,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testResponseHeadersArePreserved() throws InterruptedException { List executors = new ArrayList<>(ThreadPool.THREAD_POOL_TYPES.keySet()); CollectionUtil.timSort(executors); // makes sure it's reproducible - serviceA.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME, + serviceA.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> { threadPool.getThreadContext().putTransient("boom", new Object()); @@ -2142,7 +2153,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { List executors = new ArrayList<>(ThreadPool.THREAD_POOL_TYPES.keySet()); CollectionUtil.timSort(executors); // makes sure it's reproducible TransportService serviceC = buildService("TS_C", CURRENT_VERSION, Settings.EMPTY); - serviceC.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME, + serviceC.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> { // do nothing }); @@ -2202,7 +2213,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY); CountDownLatch receivedLatch = new CountDownLatch(1); CountDownLatch sendResponseLatch = new CountDownLatch(1); - serviceC.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME, + serviceC.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> { // don't block on a network thread here threadPool.generic().execute(new AbstractRunnable() { @@ -2270,7 +2281,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { MockTransportService serviceC = buildService("TS_C", version0, Settings.EMPTY); CountDownLatch receivedLatch = new CountDownLatch(1); CountDownLatch sendResponseLatch = new CountDownLatch(1); - serviceB.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME, + serviceB.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> { // don't block on a network thread here threadPool.generic().execute(new AbstractRunnable() { @@ -2383,7 +2394,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { CountDownLatch sendResponseLatch = new CountDownLatch(1); Exception ex = new RuntimeException("boom"); ex.setStackTrace(new StackTraceElement[0]); - serviceB.registerRequestHandler("internal:action", TestRequest::new, ThreadPool.Names.SAME, + serviceB.registerRequestHandler("internal:action", ThreadPool.Names.SAME, TestRequest::new, (request, channel, task) -> { // don't block on a network thread here threadPool.generic().execute(new AbstractRunnable() { diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java index cc85ae0bad3..af525a4cdda 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java @@ -256,7 +256,7 @@ public class DisruptableMockTransportTests extends ESTestCase { } private void registerRequestHandler(TransportService transportService, TransportRequestHandler handler) { - transportService.registerRequestHandler("internal:dummy", () -> TransportRequest.Empty.INSTANCE, ThreadPool.Names.GENERIC, handler); + transportService.registerRequestHandler("internal:dummy", ThreadPool.Names.GENERIC, TransportRequest.Empty::new, handler); } private void send(TransportService transportService, DiscoveryNode destinationNode,