Cleanup some Serialization Code around Snapshots (#59532) (#59606)

A number of obvious possible simplifications that also improve efficiency
in some cases (better empty collection handling and size hint use).
Also, added a shortcut for writing and reading immutable open maps that
can be used to dry up additional spots.
This commit is contained in:
Armin Braun 2020-07-15 20:40:43 +02:00 committed by GitHub
parent 776e9507fb
commit cc7093645c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 188 additions and 227 deletions

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.snapshots.SnapshotInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -52,10 +51,7 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb
(p, c) -> SnapshotInfo.SNAPSHOT_INFO_PARSER.apply(p, c).build(), new ParseField("snapshots"));
}
private List<SnapshotInfo> snapshots = Collections.emptyList();
GetSnapshotsResponse() {
}
private final List<SnapshotInfo> snapshots;
public GetSnapshotsResponse(List<SnapshotInfo> snapshots) {
this.snapshots = Collections.unmodifiableList(snapshots);
@ -63,12 +59,7 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb
GetSnapshotsResponse(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
List<SnapshotInfo> builder = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
builder.add(new SnapshotInfo(in));
}
snapshots = Collections.unmodifiableList(builder);
snapshots = Collections.unmodifiableList(in.readList(SnapshotInfo::new));
}
/**
@ -82,10 +73,7 @@ public class GetSnapshotsResponse extends ActionResponse implements ToXContentOb
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(snapshots.size());
for (SnapshotInfo snapshotInfo : snapshots) {
snapshotInfo.writeTo(out);
}
out.writeList(snapshots);
}
@Override

View File

@ -43,7 +43,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
public class RestoreSnapshotResponse extends ActionResponse implements ToXContentObject {
@Nullable
private RestoreInfo restoreInfo;
private final RestoreInfo restoreInfo;
public RestoreSnapshotResponse(@Nullable RestoreInfo restoreInfo) {
this.restoreInfo = restoreInfo;

View File

@ -167,7 +167,7 @@ public class SnapshotShardsStats implements ToXContentObject {
PARSER.declareInt(constructorArg(), new ParseField(Fields.TOTAL));
}
public static SnapshotShardsStats fromXContent(XContentParser parser) throws IOException {
public static SnapshotShardsStats fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

View File

@ -57,11 +57,11 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
*/
public class SnapshotStatus implements ToXContentObject, Writeable {
private Snapshot snapshot;
private final Snapshot snapshot;
private State state;
private final State state;
private List<SnapshotIndexShardStatus> shards;
private final List<SnapshotIndexShardStatus> shards;
private Map<String, SnapshotIndexStatus> indicesStatus;
@ -75,12 +75,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
SnapshotStatus(StreamInput in) throws IOException {
snapshot = new Snapshot(in);
state = State.fromValue(in.readByte());
int size = in.readVInt();
List<SnapshotIndexShardStatus> builder = new ArrayList<>();
for (int i = 0; i < size; i++) {
builder.add(new SnapshotIndexShardStatus(in));
}
shards = Collections.unmodifiableList(builder);
shards = Collections.unmodifiableList(in.readList(SnapshotIndexShardStatus::new));
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
includeGlobalState = in.readOptionalBoolean();
}
@ -185,10 +180,7 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
public void writeTo(StreamOutput out) throws IOException {
snapshot.writeTo(out);
out.writeByte(state.value());
out.writeVInt(shards.size());
for (SnapshotIndexShardStatus shard : shards) {
shard.writeTo(out);
}
out.writeList(shards);
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
out.writeOptionalBoolean(includeGlobalState);
}
@ -299,14 +291,9 @@ public class SnapshotStatus implements ToXContentObject, Writeable {
if (o == null || getClass() != o.getClass()) return false;
SnapshotStatus that = (SnapshotStatus) o;
if (snapshot != null ? !snapshot.equals(that.snapshot) : that.snapshot != null) return false;
if (state != that.state) return false;
if (indicesStatus != null ? !indicesStatus.equals(that.indicesStatus) : that.indicesStatus != null)
return false;
if (shardsStats != null ? !shardsStats.equals(that.shardsStats) : that.shardsStats != null) return false;
if (stats != null ? !stats.equals(that.stats) : that.stats != null) return false;
return includeGlobalState != null ? includeGlobalState.equals(that.includeGlobalState) : that.includeGlobalState == null;
return Objects.equals(snapshot, that.snapshot) && state == that.state && Objects.equals(indicesStatus, that.indicesStatus)
&& Objects.equals(shardsStats, that.shardsStats) && Objects.equals(stats, that.stats)
&& Objects.equals(includeGlobalState, that.includeGlobalState);
}
@Override

View File

@ -32,6 +32,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@ -40,7 +41,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
*/
public class SnapshotsStatusResponse extends ActionResponse implements ToXContentObject {
private List<SnapshotStatus> snapshots = Collections.emptyList();
private final List<SnapshotStatus> snapshots;
public SnapshotsStatusResponse(StreamInput in) throws IOException {
super(in);
@ -105,9 +106,7 @@ public class SnapshotsStatusResponse extends ActionResponse implements ToXConten
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SnapshotsStatusResponse response = (SnapshotsStatusResponse) o;
return snapshots != null ? snapshots.equals(response.snapshots) : response.snapshots == null;
return Objects.equals(snapshots, ((SnapshotsStatusResponse) o).snapshots);
}
@Override

View File

@ -166,7 +166,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
public static class NodeRequest extends BaseNodeRequest {
private List<Snapshot> snapshots;
private final List<Snapshot> snapshots;
public NodeRequest(StreamInput in) throws IOException {
super(in);
@ -186,24 +186,12 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
public static class NodeSnapshotStatus extends BaseNodeResponse {
private Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> status;
private final Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> status;
public NodeSnapshotStatus(StreamInput in) throws IOException {
super(in);
int numberOfSnapshots = in.readVInt();
Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> snapshotMapBuilder = new HashMap<>(numberOfSnapshots);
for (int i = 0; i < numberOfSnapshots; i++) {
Snapshot snapshot = new Snapshot(in);
int numberOfShards = in.readVInt();
Map<ShardId, SnapshotIndexShardStatus> shardMapBuilder = new HashMap<>(numberOfShards);
for (int j = 0; j < numberOfShards; j++) {
ShardId shardId = new ShardId(in);
SnapshotIndexShardStatus status = new SnapshotIndexShardStatus(in);
shardMapBuilder.put(shardId, status);
}
snapshotMapBuilder.put(snapshot, unmodifiableMap(shardMapBuilder));
}
status = unmodifiableMap(snapshotMapBuilder);
status = unmodifiableMap(
in.readMap(Snapshot::new, input -> unmodifiableMap(input.readMap(ShardId::new, SnapshotIndexShardStatus::new))));
}
public NodeSnapshotStatus(DiscoveryNode node, Map<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> status) {
@ -219,15 +207,8 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (status != null) {
out.writeVInt(status.size());
for (Map.Entry<Snapshot, Map<ShardId, SnapshotIndexShardStatus>> entry : status.entrySet()) {
entry.getKey().writeTo(out);
out.writeVInt(entry.getValue().size());
for (Map.Entry<ShardId, SnapshotIndexShardStatus> shardEntry : entry.getValue().entrySet()) {
shardEntry.getKey().writeTo(out);
shardEntry.getValue().writeTo(out);
}
}
out.writeMap(status, (o, s) -> s.writeTo(o),
(output, v) -> output.writeMap(v, (o, shardId) -> shardId.writeTo(o), (o, sis) -> sis.writeTo(o)));
} else {
out.writeVInt(0);
}

View File

@ -27,13 +27,13 @@ import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -69,12 +69,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RestoreInProgress that = (RestoreInProgress) o;
if (!entries.equals(that.entries)) return false;
return true;
return entries.equals(((RestoreInProgress) o).entries);
}
@Override
@ -225,7 +220,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
/**
* Represents status of a restored shard
*/
public static class ShardRestoreStatus {
public static class ShardRestoreStatus implements Writeable {
private State state;
private String nodeId;
private String reason;
@ -320,6 +315,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
*
* @param out stream input
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
@ -368,7 +364,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
*/
FAILURE((byte) 3);
private byte value;
private final byte value;
/**
* Constructs new state
@ -448,19 +444,9 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
}
Snapshot snapshot = new Snapshot(in);
State state = State.fromValue(in.readByte());
int indices = in.readVInt();
List<String> indexBuilder = new ArrayList<>();
for (int j = 0; j < indices; j++) {
indexBuilder.add(in.readString());
}
ImmutableOpenMap.Builder<ShardId, ShardRestoreStatus> builder = ImmutableOpenMap.builder();
int shards = in.readVInt();
for (int j = 0; j < shards; j++) {
ShardId shardId = new ShardId(in);
ShardRestoreStatus shardState = ShardRestoreStatus.readShardRestoreStatus(in);
builder.put(shardId, shardState);
}
entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder), builder.build()));
List<String> indexBuilder = in.readStringList();
entriesBuilder.put(uuid, new Entry(uuid, snapshot, state, Collections.unmodifiableList(indexBuilder),
in.readImmutableMap(ShardId::new, ShardRestoreStatus::readShardRestoreStatus)));
}
this.entries = entriesBuilder.build();
}
@ -475,15 +461,8 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
}
entry.snapshot().writeTo(out);
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
for (String index : entry.indices()) {
out.writeString(index);
}
out.writeVInt(entry.shards().size());
for (ObjectObjectCursor<ShardId, ShardRestoreStatus> shardEntry : entry.shards()) {
shardEntry.key.writeTo(out);
shardEntry.value.writeTo(out);
}
out.writeStringCollection(entry.indices);
out.writeMap(entry.shards);
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -39,7 +40,6 @@ import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotsService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -88,7 +88,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return builder.append("]").toString();
}
public static class Entry implements ToXContent, RepositoryOperation {
public static class Entry implements Writeable, ToXContent, RepositoryOperation {
private final State state;
private final Snapshot snapshot;
private final boolean includeGlobalState;
@ -122,6 +122,38 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
this.version = version;
}
private Entry(StreamInput in) throws IOException {
snapshot = new Snapshot(in);
includeGlobalState = in.readBoolean();
partial = in.readBoolean();
state = State.fromValue(in.readByte());
indices = in.readList(IndexId::new);
startTime = in.readLong();
shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::new);
repositoryStateId = in.readLong();
failure = in.readOptionalString();
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
userMetadata = in.readMap();
} else {
userMetadata = null;
}
if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
version = Version.readVersion(in);
} else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
// If an older master informs us that shard generations are supported we use the minimum shard generation compatible
// version. If shard generations are not supported yet we use a placeholder for a version that does not use shard
// generations.
version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT;
} else {
version = SnapshotsService.OLD_SNAPSHOT_FORMAT;
}
if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
dataStreams = in.readStringList();
} else {
dataStreams = Collections.emptyList();
}
}
private static boolean assertShardsConsistent(State state, List<IndexId> indices,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) {
@ -316,6 +348,30 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
snapshot.writeTo(out);
out.writeBoolean(includeGlobalState);
out.writeBoolean(partial);
out.writeByte(state.value());
out.writeList(indices);
out.writeLong(startTime);
out.writeMap(shards);
out.writeLong(repositoryStateId);
out.writeOptionalString(failure);
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(userMetadata);
}
if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
Version.writeVersion(version, out);
} else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(SnapshotsService.useShardGenerations(version));
}
if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
out.writeStringCollection(dataStreams);
}
}
@Override
public boolean isFragment() {
return false;
@ -337,7 +393,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return true;
}
public static class ShardSnapshotStatus {
public static class ShardSnapshotStatus implements Writeable {
/**
* Shard snapshot status for shards that are waiting for another operation to finish before they can be assigned to a node.
@ -417,6 +473,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return state == ShardState.INIT || state == ShardState.ABORTED || state == ShardState.WAITING;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
@ -548,114 +605,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
}
public SnapshotsInProgress(StreamInput in) throws IOException {
Entry[] entries = new Entry[in.readVInt()];
for (int i = 0; i < entries.length; i++) {
Snapshot snapshot = new Snapshot(in);
boolean includeGlobalState = in.readBoolean();
boolean partial = in.readBoolean();
State state = State.fromValue(in.readByte());
List<IndexId> indexBuilder = in.readList(IndexId::new);
long startTime = in.readLong();
int shards = in.readVInt();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> builder = ImmutableOpenMap.builder(shards);
for (int j = 0; j < shards; j++) {
ShardId shardId = new ShardId(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
builder.put(shardId, new ShardSnapshotStatus(in));
} else {
String nodeId = in.readOptionalString();
ShardState shardState = ShardState.fromValue(in.readByte());
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
// Some old snapshot might still have null in shard failure reasons
String reason = shardState.failed() ? "" : null;
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState, reason));
}
}
long repositoryStateId = in.readLong();
final String failure;
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
failure = in.readOptionalString();
} else {
failure = null;
}
Map<String, Object> userMetadata = null;
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
userMetadata = in.readMap();
}
final Version version;
if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
version = Version.readVersion(in);
} else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
// If an older master informs us that shard generations are supported we use the minimum shard generation compatible
// version. If shard generations are not supported yet we use a placeholder for a version that does not use shard
// generations.
version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT;
} else {
version = SnapshotsService.OLD_SNAPSHOT_FORMAT;
}
List<String> dataStreams;
if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
dataStreams = in.readStringList();
} else {
dataStreams = Collections.emptyList();
}
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
state,
Collections.unmodifiableList(indexBuilder),
dataStreams,
startTime,
repositoryStateId,
builder.build(),
failure,
userMetadata,
version
);
}
this.entries = Arrays.asList(entries);
this.entries = in.readList(SnapshotsInProgress.Entry::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(entries.size());
for (Entry entry : entries) {
entry.snapshot().writeTo(out);
out.writeBoolean(entry.includeGlobalState());
out.writeBoolean(entry.partial());
out.writeByte(entry.state().value());
out.writeVInt(entry.indices().size());
for (IndexId index : entry.indices()) {
index.writeTo(out);
}
out.writeLong(entry.startTime());
out.writeVInt(entry.shards().size());
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards()) {
shardEntry.key.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
shardEntry.value.writeTo(out);
} else {
out.writeOptionalString(shardEntry.value.nodeId());
out.writeByte(shardEntry.value.state().value);
}
}
out.writeLong(entry.repositoryStateId);
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
out.writeOptionalString(entry.failure);
}
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(entry.userMetadata);
}
if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
Version.writeVersion(entry.version, out);
} else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(SnapshotsService.useShardGenerations(entry.version));
}
if (out.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) {
out.writeStringCollection(entry.dataStreams);
}
}
out.writeList(entries);
}
private static final String REPOSITORY = "repository";

View File

@ -36,7 +36,6 @@ import org.elasticsearch.repositories.RepositoryData;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@ -165,11 +164,7 @@ public class RepositoriesMetadata extends AbstractNamedDiffable<Custom> implemen
}
public RepositoriesMetadata(StreamInput in) throws IOException {
RepositoryMetadata[] repository = new RepositoryMetadata[in.readVInt()];
for (int i = 0; i < repository.length; i++) {
repository[i] = new RepositoryMetadata(in);
}
this.repositories = Collections.unmodifiableList(Arrays.asList(repository));
this.repositories = in.readList(RepositoryMetadata::new);
}
public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
@ -181,10 +176,7 @@ public class RepositoriesMetadata extends AbstractNamedDiffable<Custom> implemen
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(repositories.size());
for (RepositoryMetadata repository : repositories) {
repository.writeTo(out);
}
out.writeList(repositories);
}
public static RepositoriesMetadata fromXContent(XContentParser parser) throws IOException {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster.metadata;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryData;
@ -30,7 +31,7 @@ import java.util.Objects;
/**
* Metadata about registered repository
*/
public class RepositoryMetadata {
public class RepositoryMetadata implements Writeable {
public static final Version REPO_GEN_IN_CS_VERSION = Version.V_7_6_0;
@ -142,6 +143,7 @@ public class RepositoryMetadata {
*
* @param out stream output
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(type);

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.text.Text;
@ -678,6 +679,25 @@ public abstract class StreamInput extends InputStream {
return (Map<String, Object>) readGenericValue();
}
/**
* Read {@link ImmutableOpenMap} using given key and value readers.
*
* @param keyReader key reader
* @param valueReader value reader
*/
public <K, V> ImmutableOpenMap<K, V> readImmutableMap(Writeable.Reader<K> keyReader, Writeable.Reader<V> valueReader)
throws IOException {
final int size = readVInt();
if (size == 0) {
return ImmutableOpenMap.of();
}
final ImmutableOpenMap.Builder<K,V> builder = ImmutableOpenMap.builder(size);
for (int i = 0; i < size; i++) {
builder.put(keyReader.read(this), valueReader.read(this));
}
return builder.build();
}
/**
* Reads a value of unspecified type. If a collection is read then the collection will be mutable if it contains any entry but might
* be immutable if it is empty.

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.io.stream;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
@ -35,6 +36,7 @@ import org.elasticsearch.common.CharArrays;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.Writeable.Writer;
import org.elasticsearch.common.settings.SecureString;
@ -628,6 +630,28 @@ public abstract class StreamOutput extends OutputStream {
}
}
/**
* Write a {@link ImmutableOpenMap} of {@code K}-type keys to {@code V}-type.
*
* @param keyWriter The key writer
* @param valueWriter The value writer
*/
public final <K, V> void writeMap(final ImmutableOpenMap<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter)
throws IOException {
writeVInt(map.size());
for (final ObjectObjectCursor<K, V> entry : map) {
keyWriter.write(this, entry.key);
valueWriter.write(this, entry.value);
}
}
/**
* Write a {@link ImmutableOpenMap} of {@code K}-type keys to {@code V}-type.
*/
public final <K extends Writeable, V extends Writeable> void writeMap(final ImmutableOpenMap<K, V> map) throws IOException {
writeMap(map, (o, k) -> k.writeTo(o), (o, v) -> v.writeTo(o));
}
/**
* Writes an {@link Instant} to the stream with nanosecond resolution
*/

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@ -61,12 +60,7 @@ public class RestoreInfo implements ToXContentObject, Writeable {
public RestoreInfo(StreamInput in) throws IOException {
name = in.readString();
int size = in.readVInt();
List<String> indicesListBuilder = new ArrayList<>();
for (int i = 0; i < size; i++) {
indicesListBuilder.add(in.readString());
}
indices = Collections.unmodifiableList(indicesListBuilder);
indices = Collections.unmodifiableList(in.readStringList());
totalShards = in.readVInt();
successfulShards = in.readVInt();
}
@ -173,10 +167,7 @@ public class RestoreInfo implements ToXContentObject, Writeable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeVInt(indices.size());
for (String index : indices) {
out.writeString(index);
}
out.writeStringCollection(indices);
out.writeVInt(totalShards);
out.writeVInt(successfulShards);
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Constants;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.unit.TimeValue;
@ -498,6 +499,38 @@ public class BytesStreamsTests extends ESTestCase {
assertThat(expected, equalTo(loaded));
}
public void testWriteImmutableMap() throws IOException {
final int size = randomIntBetween(0, 100);
final ImmutableOpenMap.Builder<String, String> expectedBuilder = ImmutableOpenMap.builder(randomIntBetween(0, 100));
for (int i = 0; i < size; ++i) {
expectedBuilder.put(randomAlphaOfLength(2), randomAlphaOfLength(5));
}
final ImmutableOpenMap<String, String> expected = expectedBuilder.build();
final BytesStreamOutput out = new BytesStreamOutput();
out.writeMap(expected, StreamOutput::writeString, StreamOutput::writeString);
final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
final ImmutableOpenMap<String, String> loaded = in.readImmutableMap(StreamInput::readString, StreamInput::readString);
assertThat(expected, equalTo(loaded));
}
public void testWriteImmutableMapOfWritable() throws IOException {
final int size = randomIntBetween(0, 100);
final ImmutableOpenMap.Builder<TestWriteable, TestWriteable> expectedBuilder = ImmutableOpenMap.builder(randomIntBetween(0, 100));
for (int i = 0; i < size; ++i) {
expectedBuilder.put(new TestWriteable(randomBoolean()), new TestWriteable(randomBoolean()));
}
final ImmutableOpenMap<TestWriteable, TestWriteable> expected = expectedBuilder.build();
final BytesStreamOutput out = new BytesStreamOutput();
out.writeMap(expected);
final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
final ImmutableOpenMap<TestWriteable, TestWriteable> loaded = in.readImmutableMap(TestWriteable::new, TestWriteable::new);
assertThat(expected, equalTo(loaded));
}
public void testWriteMapOfLists() throws IOException {
final int size = randomIntBetween(0, 5);
final Map<String, List<String>> expected = new HashMap<>(size);
@ -628,6 +661,16 @@ public class BytesStreamsTests extends ESTestCase {
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(value);
}
@Override
public boolean equals(Object o) {
return o instanceof TestWriteable && value == ((TestWriteable) o).value;
}
@Override
public int hashCode() {
return Objects.hash(value);
}
}
public void testWriteMapWithConsistentOrder() throws IOException {