From cc7093645ccad4af6ffacff2df8a7c5a70a9ad7a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 15 Jul 2020 20:40:43 +0200 Subject: [PATCH] Cleanup some Serialization Code around Snapshots (#59532) (#59606) A number of obvious possible simplifications that also improve efficiency in some cases (better empty collection handling and size hint use). Also, added a shortcut for writing and reading immutable open maps that can be used to dry up additional spots. --- .../snapshots/get/GetSnapshotsResponse.java | 18 +- .../restore/RestoreSnapshotResponse.java | 2 +- .../snapshots/status/SnapshotShardsStats.java | 2 +- .../snapshots/status/SnapshotStatus.java | 29 +-- .../status/SnapshotsStatusResponse.java | 7 +- .../status/TransportNodesSnapshotsStatus.java | 31 +--- .../cluster/RestoreInProgress.java | 41 ++--- .../cluster/SnapshotsInProgress.java | 169 +++++++----------- .../metadata/RepositoriesMetadata.java | 12 +- .../cluster/metadata/RepositoryMetadata.java | 4 +- .../common/io/stream/StreamInput.java | 20 +++ .../common/io/stream/StreamOutput.java | 24 +++ .../elasticsearch/snapshots/RestoreInfo.java | 13 +- .../common/io/stream/BytesStreamsTests.java | 43 +++++ 14 files changed, 188 insertions(+), 227 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java index 0a917fcc433..43501579de6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/GetSnapshotsResponse.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.snapshots.SnapshotInfo; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -52,10 +51,7 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb (p, c) -> SnapshotInfo.SNAPSHOT_INFO_PARSER.apply(p, c).build(), new ParseField("snapshots")); } - private List snapshots = Collections.emptyList(); - - GetSnapshotsResponse() { - } + private final List snapshots; public GetSnapshotsResponse(List snapshots) { this.snapshots = Collections.unmodifiableList(snapshots); @@ -63,12 +59,7 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb GetSnapshotsResponse(StreamInput in) throws IOException { super(in); - int size = in.readVInt(); - List builder = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - builder.add(new SnapshotInfo(in)); - } - snapshots = Collections.unmodifiableList(builder); + snapshots = Collections.unmodifiableList(in.readList(SnapshotInfo::new)); } /** @@ -82,10 +73,7 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(snapshots.size()); - for (SnapshotInfo snapshotInfo : snapshots) { - snapshotInfo.writeTo(out); - } + out.writeList(snapshots); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreSnapshotResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreSnapshotResponse.java index f48054b1488..62ccb4610f0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreSnapshotResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/restore/RestoreSnapshotResponse.java @@ -43,7 +43,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona public class RestoreSnapshotResponse extends ActionResponse implements ToXContentObject { @Nullable - private RestoreInfo restoreInfo; + private final RestoreInfo restoreInfo; public RestoreSnapshotResponse(@Nullable RestoreInfo restoreInfo) { this.restoreInfo = restoreInfo; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java index c0ac432292d..f40506bd8bf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotShardsStats.java @@ -167,7 +167,7 @@ public class SnapshotShardsStats implements ToXContentObject { PARSER.declareInt(constructorArg(), new ParseField(Fields.TOTAL)); } - public static SnapshotShardsStats fromXContent(XContentParser parser) throws IOException { + public static SnapshotShardsStats fromXContent(XContentParser parser) { return PARSER.apply(parser, null); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java index 9e4318bf1e8..2b8633aca43 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotStatus.java @@ -57,11 +57,11 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona */ public class SnapshotStatus implements ToXContentObject, Writeable { - private Snapshot snapshot; + private final Snapshot snapshot; - private State state; + private final State state; - private List shards; + private final List shards; private Map indicesStatus; @@ -75,12 +75,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable { SnapshotStatus(StreamInput in) throws IOException { snapshot = new Snapshot(in); state = State.fromValue(in.readByte()); - int size = in.readVInt(); - List builder = new ArrayList<>(); - for (int i = 0; i < size; i++) { - builder.add(new SnapshotIndexShardStatus(in)); - } - shards = Collections.unmodifiableList(builder); + shards = Collections.unmodifiableList(in.readList(SnapshotIndexShardStatus::new)); if (in.getVersion().onOrAfter(Version.V_6_2_0)) { includeGlobalState = in.readOptionalBoolean(); } @@ -185,10 +180,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable { public void writeTo(StreamOutput out) throws IOException { snapshot.writeTo(out); out.writeByte(state.value()); - out.writeVInt(shards.size()); - for (SnapshotIndexShardStatus shard : shards) { - shard.writeTo(out); - } + out.writeList(shards); if (out.getVersion().onOrAfter(Version.V_6_2_0)) { out.writeOptionalBoolean(includeGlobalState); } @@ -299,14 +291,9 @@ public class SnapshotStatus implements ToXContentObject, Writeable { if (o == null || getClass() != o.getClass()) return false; SnapshotStatus that = (SnapshotStatus) o; - - if (snapshot != null ? !snapshot.equals(that.snapshot) : that.snapshot != null) return false; - if (state != that.state) return false; - if (indicesStatus != null ? !indicesStatus.equals(that.indicesStatus) : that.indicesStatus != null) - return false; - if (shardsStats != null ? !shardsStats.equals(that.shardsStats) : that.shardsStats != null) return false; - if (stats != null ? !stats.equals(that.stats) : that.stats != null) return false; - return includeGlobalState != null ? includeGlobalState.equals(that.includeGlobalState) : that.includeGlobalState == null; + return Objects.equals(snapshot, that.snapshot) && state == that.state && Objects.equals(indicesStatus, that.indicesStatus) + && Objects.equals(shardsStats, that.shardsStats) && Objects.equals(stats, that.stats) + && Objects.equals(includeGlobalState, that.includeGlobalState); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java index acf609b9bd1..3909b4bc99d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/SnapshotsStatusResponse.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; @@ -40,7 +41,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru */ public class SnapshotsStatusResponse extends ActionResponse implements ToXContentObject { - private List snapshots = Collections.emptyList(); + private final List snapshots; public SnapshotsStatusResponse(StreamInput in) throws IOException { super(in); @@ -105,9 +106,7 @@ public class SnapshotsStatusResponse extends ActionResponse implements ToXConten if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - SnapshotsStatusResponse response = (SnapshotsStatusResponse) o; - - return snapshots != null ? snapshots.equals(response.snapshots) : response.snapshots == null; + return Objects.equals(snapshots, ((SnapshotsStatusResponse) o).snapshots); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java index cfa0611a5d2..c3c1dcdcee6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportNodesSnapshotsStatus.java @@ -166,7 +166,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction snapshots; + private final List snapshots; public NodeRequest(StreamInput in) throws IOException { super(in); @@ -186,24 +186,12 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction> status; + private final Map> status; public NodeSnapshotStatus(StreamInput in) throws IOException { super(in); - int numberOfSnapshots = in.readVInt(); - Map> snapshotMapBuilder = new HashMap<>(numberOfSnapshots); - for (int i = 0; i < numberOfSnapshots; i++) { - Snapshot snapshot = new Snapshot(in); - int numberOfShards = in.readVInt(); - Map shardMapBuilder = new HashMap<>(numberOfShards); - for (int j = 0; j < numberOfShards; j++) { - ShardId shardId = new ShardId(in); - SnapshotIndexShardStatus status = new SnapshotIndexShardStatus(in); - shardMapBuilder.put(shardId, status); - } - snapshotMapBuilder.put(snapshot, unmodifiableMap(shardMapBuilder)); - } - status = unmodifiableMap(snapshotMapBuilder); + status = unmodifiableMap( + in.readMap(Snapshot::new, input -> unmodifiableMap(input.readMap(ShardId::new, SnapshotIndexShardStatus::new)))); } public NodeSnapshotStatus(DiscoveryNode node, Map> status) { @@ -219,15 +207,8 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction> entry : status.entrySet()) { - entry.getKey().writeTo(out); - out.writeVInt(entry.getValue().size()); - for (Map.Entry shardEntry : entry.getValue().entrySet()) { - shardEntry.getKey().writeTo(out); - shardEntry.getValue().writeTo(out); - } - } + out.writeMap(status, (o, s) -> s.writeTo(o), + (output, v) -> output.writeMap(v, (o, shardId) -> shardId.writeTo(o), (o, sis) -> sis.writeTo(o))); } else { out.writeVInt(0); } diff --git a/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java index 0fe0996fdde..ab2c0e9cc0b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java @@ -27,13 +27,13 @@ import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.Snapshot; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -69,12 +69,7 @@ public class RestoreInProgress extends AbstractNamedDiffable implements public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - - RestoreInProgress that = (RestoreInProgress) o; - - if (!entries.equals(that.entries)) return false; - - return true; + return entries.equals(((RestoreInProgress) o).entries); } @Override @@ -225,7 +220,7 @@ public class RestoreInProgress extends AbstractNamedDiffable implements /** * Represents status of a restored shard */ - public static class ShardRestoreStatus { + public static class ShardRestoreStatus implements Writeable { private State state; private String nodeId; private String reason; @@ -320,6 +315,7 @@ public class RestoreInProgress extends AbstractNamedDiffable implements * * @param out stream input */ + @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(nodeId); out.writeByte(state.value); @@ -368,7 +364,7 @@ public class RestoreInProgress extends AbstractNamedDiffable implements */ FAILURE((byte) 3); - private byte value; + private final byte value; /** * Constructs new state @@ -448,19 +444,9 @@ public class RestoreInProgress extends AbstractNamedDiffable implements } Snapshot snapshot = new Snapshot(in); State state = State.fromValue(in.readByte()); - int indices = in.readVInt(); - List indexBuilder = new ArrayList<>(); - for (int j = 0; j < indices; j++) { - indexBuilder.add(in.readString()); - } - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); - int shards = in.readVInt(); - for (int j = 0; j < shards; j++) { - ShardId shardId = new ShardId(in); - ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in); - builder.put(shardId, shardState); - } - entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build())); + List indexBuilder = in.readStringList(); + entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder), + in.readImmutableMap(ShardId::new, ShardRestoreStatus::readShardRestoreStatus))); } this.entries = entriesBuilder.build(); } @@ -475,15 +461,8 @@ public class RestoreInProgress extends AbstractNamedDiffable implements } entry.snapshot().writeTo(out); out.writeByte(entry.state().value()); - out.writeVInt(entry.indices().size()); - for (String index : entry.indices()) { - out.writeString(index); - } - out.writeVInt(entry.shards().size()); - for (ObjectObjectCursor shardEntry : entry.shards()) { - shardEntry.key.writeTo(out); - shardEntry.value.writeTo(out); - } + out.writeStringCollection(entry.indices); + out.writeMap(entry.shards); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 5ffe67f5dd1..5a291e7b01a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -39,7 +40,6 @@ import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotsService; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -88,7 +88,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement return builder.append("]").toString(); } - public static class Entry implements ToXContent, RepositoryOperation { + public static class Entry implements Writeable, ToXContent, RepositoryOperation { private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; @@ -122,6 +122,38 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement this.version = version; } + private Entry(StreamInput in) throws IOException { + snapshot = new Snapshot(in); + includeGlobalState = in.readBoolean(); + partial = in.readBoolean(); + state = State.fromValue(in.readByte()); + indices = in.readList(IndexId::new); + startTime = in.readLong(); + shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::new); + repositoryStateId = in.readLong(); + failure = in.readOptionalString(); + if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { + userMetadata = in.readMap(); + } else { + userMetadata = null; + } + if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { + version = Version.readVersion(in); + } else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + // If an older master informs us that shard generations are supported we use the minimum shard generation compatible + // version. If shard generations are not supported yet we use a placeholder for a version that does not use shard + // generations. + version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT; + } else { + version = SnapshotsService.OLD_SNAPSHOT_FORMAT; + } + if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { + dataStreams = in.readStringList(); + } else { + dataStreams = Collections.emptyList(); + } + } + private static boolean assertShardsConsistent(State state, List indices, ImmutableOpenMap shards) { if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) { @@ -316,6 +348,30 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement return builder; } + @Override + public void writeTo(StreamOutput out) throws IOException { + snapshot.writeTo(out); + out.writeBoolean(includeGlobalState); + out.writeBoolean(partial); + out.writeByte(state.value()); + out.writeList(indices); + out.writeLong(startTime); + out.writeMap(shards); + out.writeLong(repositoryStateId); + out.writeOptionalString(failure); + if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { + out.writeMap(userMetadata); + } + if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { + Version.writeVersion(version, out); + } else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + out.writeBoolean(SnapshotsService.useShardGenerations(version)); + } + if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { + out.writeStringCollection(dataStreams); + } + } + @Override public boolean isFragment() { return false; @@ -337,7 +393,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement return true; } - public static class ShardSnapshotStatus { + public static class ShardSnapshotStatus implements Writeable { /** * Shard snapshot status for shards that are waiting for another operation to finish before they can be assigned to a node. @@ -417,6 +473,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement return state == ShardState.INIT || state == ShardState.ABORTED || state == ShardState.WAITING; } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(nodeId); out.writeByte(state.value); @@ -548,114 +605,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement } public SnapshotsInProgress(StreamInput in) throws IOException { - Entry[] entries = new Entry[in.readVInt()]; - for (int i = 0; i < entries.length; i++) { - Snapshot snapshot = new Snapshot(in); - boolean includeGlobalState = in.readBoolean(); - boolean partial = in.readBoolean(); - State state = State.fromValue(in.readByte()); - List indexBuilder = in.readList(IndexId::new); - long startTime = in.readLong(); - int shards = in.readVInt(); - ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(shards); - for (int j = 0; j < shards; j++) { - ShardId shardId = new ShardId(in); - if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { - builder.put(shardId, new ShardSnapshotStatus(in)); - } else { - String nodeId = in.readOptionalString(); - ShardState shardState = ShardState.fromValue(in.readByte()); - // Workaround for https://github.com/elastic/elasticsearch/issues/25878 - // Some old snapshot might still have null in shard failure reasons - String reason = shardState.failed() ? "" : null; - builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState, reason)); - } - } - long repositoryStateId = in.readLong(); - final String failure; - if (in.getVersion().onOrAfter(Version.V_6_7_0)) { - failure = in.readOptionalString(); - } else { - failure = null; - } - Map userMetadata = null; - if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { - userMetadata = in.readMap(); - } - final Version version; - if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { - version = Version.readVersion(in); - } else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { - // If an older master informs us that shard generations are supported we use the minimum shard generation compatible - // version. If shard generations are not supported yet we use a placeholder for a version that does not use shard - // generations. - version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT; - } else { - version = SnapshotsService.OLD_SNAPSHOT_FORMAT; - } - - List dataStreams; - if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - dataStreams = in.readStringList(); - } else { - dataStreams = Collections.emptyList(); - } - entries[i] = new Entry(snapshot, - includeGlobalState, - partial, - state, - Collections.unmodifiableList(indexBuilder), - dataStreams, - startTime, - repositoryStateId, - builder.build(), - failure, - userMetadata, - version - ); - } - this.entries = Arrays.asList(entries); + this.entries = in.readList(SnapshotsInProgress.Entry::new); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(entries.size()); - for (Entry entry : entries) { - entry.snapshot().writeTo(out); - out.writeBoolean(entry.includeGlobalState()); - out.writeBoolean(entry.partial()); - out.writeByte(entry.state().value()); - out.writeVInt(entry.indices().size()); - for (IndexId index : entry.indices()) { - index.writeTo(out); - } - out.writeLong(entry.startTime()); - out.writeVInt(entry.shards().size()); - for (ObjectObjectCursor shardEntry : entry.shards()) { - shardEntry.key.writeTo(out); - if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { - shardEntry.value.writeTo(out); - } else { - out.writeOptionalString(shardEntry.value.nodeId()); - out.writeByte(shardEntry.value.state().value); - } - } - out.writeLong(entry.repositoryStateId); - if (out.getVersion().onOrAfter(Version.V_6_7_0)) { - out.writeOptionalString(entry.failure); - } - if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { - out.writeMap(entry.userMetadata); - } - if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { - Version.writeVersion(entry.version, out); - } else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { - out.writeBoolean(SnapshotsService.useShardGenerations(entry.version)); - } - if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { - out.writeStringCollection(entry.dataStreams); - } - } + out.writeList(entries); } private static final String REPOSITORY = "repository"; diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java index 2805981ed31..91150650a1a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetadata.java @@ -36,7 +36,6 @@ import org.elasticsearch.repositories.RepositoryData; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -165,11 +164,7 @@ public class RepositoriesMetadata extends AbstractNamedDiffable implemen } public RepositoriesMetadata(StreamInput in) throws IOException { - RepositoryMetadata[] repository = new RepositoryMetadata[in.readVInt()]; - for (int i = 0; i < repository.length; i++) { - repository[i] = new RepositoryMetadata(in); - } - this.repositories = Collections.unmodifiableList(Arrays.asList(repository)); + this.repositories = in.readList(RepositoryMetadata::new); } public static NamedDiff readDiffFrom(StreamInput in) throws IOException { @@ -181,10 +176,7 @@ public class RepositoriesMetadata extends AbstractNamedDiffable implemen */ @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(repositories.size()); - for (RepositoryMetadata repository : repositories) { - repository.writeTo(out); - } + out.writeList(repositories); } public static RepositoriesMetadata fromXContent(XContentParser parser) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java index 57c54f73bc5..1c5b8a12e78 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetadata.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.RepositoryData; @@ -30,7 +31,7 @@ import java.util.Objects; /** * Metadata about registered repository */ -public class RepositoryMetadata { +public class RepositoryMetadata implements Writeable { public static final Version REPO_GEN_IN_CS_VERSION = Version.V_7_6_0; @@ -142,6 +143,7 @@ public class RepositoryMetadata { * * @param out stream output */ + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); out.writeString(type); diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index a52a0ec9377..c975eb902cc 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.text.Text; @@ -678,6 +679,25 @@ public abstract class StreamInput extends InputStream { return (Map) readGenericValue(); } + /** + * Read {@link ImmutableOpenMap} using given key and value readers. + * + * @param keyReader key reader + * @param valueReader value reader + */ + public ImmutableOpenMap readImmutableMap(Writeable.Reader keyReader, Writeable.Reader valueReader) + throws IOException { + final int size = readVInt(); + if (size == 0) { + return ImmutableOpenMap.of(); + } + final ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(size); + for (int i = 0; i < size; i++) { + builder.put(keyReader.read(this), valueReader.read(this)); + } + return builder.build(); + } + /** * Reads a value of unspecified type. If a collection is read then the collection will be mutable if it contains any entry but might * be immutable if it is empty. diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index b08aea9c963..1085d4ac5d1 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -19,6 +19,7 @@ package org.elasticsearch.common.io.stream; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; @@ -35,6 +36,7 @@ import org.elasticsearch.common.CharArrays; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.io.stream.Writeable.Writer; import org.elasticsearch.common.settings.SecureString; @@ -628,6 +630,28 @@ public abstract class StreamOutput extends OutputStream { } } + /** + * Write a {@link ImmutableOpenMap} of {@code K}-type keys to {@code V}-type. + * + * @param keyWriter The key writer + * @param valueWriter The value writer + */ + public final void writeMap(final ImmutableOpenMap map, final Writer keyWriter, final Writer valueWriter) + throws IOException { + writeVInt(map.size()); + for (final ObjectObjectCursor entry : map) { + keyWriter.write(this, entry.key); + valueWriter.write(this, entry.value); + } + } + + /** + * Write a {@link ImmutableOpenMap} of {@code K}-type keys to {@code V}-type. + */ + public final void writeMap(final ImmutableOpenMap map) throws IOException { + writeMap(map, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o)); + } + /** * Writes an {@link Instant} to the stream with nanosecond resolution */ diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java index 33007fc6ce5..6e94452a518 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreInfo.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.RestStatus; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -61,12 +60,7 @@ public class RestoreInfo implements ToXContentObject, Writeable { public RestoreInfo(StreamInput in) throws IOException { name = in.readString(); - int size = in.readVInt(); - List indicesListBuilder = new ArrayList<>(); - for (int i = 0; i < size; i++) { - indicesListBuilder.add(in.readString()); - } - indices = Collections.unmodifiableList(indicesListBuilder); + indices = Collections.unmodifiableList(in.readStringList()); totalShards = in.readVInt(); successfulShards = in.readVInt(); } @@ -173,10 +167,7 @@ public class RestoreInfo implements ToXContentObject, Writeable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); - out.writeVInt(indices.size()); - for (String index : indices) { - out.writeString(index); - } + out.writeStringCollection(indices); out.writeVInt(totalShards); out.writeVInt(successfulShards); } diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index 631ab18d8c9..e14e303b9f9 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -24,6 +24,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Constants; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.geo.GeoPoint; import org.elasticsearch.common.lucene.BytesRefs; import org.elasticsearch.common.unit.TimeValue; @@ -498,6 +499,38 @@ public class BytesStreamsTests extends ESTestCase { assertThat(expected, equalTo(loaded)); } + public void testWriteImmutableMap() throws IOException { + final int size = randomIntBetween(0, 100); + final ImmutableOpenMap.Builder expectedBuilder = ImmutableOpenMap.builder(randomIntBetween(0, 100)); + for (int i = 0; i < size; ++i) { + expectedBuilder.put(randomAlphaOfLength(2), randomAlphaOfLength(5)); + } + + final ImmutableOpenMap expected = expectedBuilder.build(); + final BytesStreamOutput out = new BytesStreamOutput(); + out.writeMap(expected, StreamOutput::writeString, StreamOutput::writeString); + final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes())); + final ImmutableOpenMap loaded = in.readImmutableMap(StreamInput::readString, StreamInput::readString); + + assertThat(expected, equalTo(loaded)); + } + + public void testWriteImmutableMapOfWritable() throws IOException { + final int size = randomIntBetween(0, 100); + final ImmutableOpenMap.Builder expectedBuilder = ImmutableOpenMap.builder(randomIntBetween(0, 100)); + for (int i = 0; i < size; ++i) { + expectedBuilder.put(new TestWriteable(randomBoolean()), new TestWriteable(randomBoolean())); + } + + final ImmutableOpenMap expected = expectedBuilder.build(); + final BytesStreamOutput out = new BytesStreamOutput(); + out.writeMap(expected); + final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes())); + final ImmutableOpenMap loaded = in.readImmutableMap(TestWriteable::new, TestWriteable::new); + + assertThat(expected, equalTo(loaded)); + } + public void testWriteMapOfLists() throws IOException { final int size = randomIntBetween(0, 5); final Map> expected = new HashMap<>(size); @@ -628,6 +661,16 @@ public class BytesStreamsTests extends ESTestCase { public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(value); } + + @Override + public boolean equals(Object o) { + return o instanceof TestWriteable && value == ((TestWriteable) o).value; + } + + @Override + public int hashCode() { + return Objects.hash(value); + } } public void testWriteMapWithConsistentOrder() throws IOException {