diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 9013cfa202d..1fed238f8dd 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentFragment; @@ -46,7 +47,7 @@ import java.util.Map; /** * Keeps track of state related to shard recovery. */ -public class RecoveryState implements ToXContentFragment, Streamable { +public class RecoveryState implements ToXContentFragment, Streamable, Writeable { public enum Stage { INIT((byte) 0), @@ -102,20 +103,17 @@ public class RecoveryState implements ToXContentFragment, Streamable { private Stage stage; - private final Index index = new Index(); - private final Translog translog = new Translog(); - private final VerifyIndex verifyIndex = new VerifyIndex(); - private final Timer timer = new Timer(); + private final Index index; + private final Translog translog; + private final VerifyIndex verifyIndex; + private final Timer timer; private RecoverySource recoverySource; private ShardId shardId; @Nullable private DiscoveryNode sourceNode; private DiscoveryNode targetNode; - private boolean primary = false; - - private RecoveryState() { - } + private boolean primary; public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) { assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting; @@ -128,9 +126,40 @@ public class RecoveryState implements ToXContentFragment, Streamable { this.sourceNode = sourceNode; this.targetNode = targetNode; stage = Stage.INIT; + index = new Index(); + translog = new Translog(); + verifyIndex = new VerifyIndex(); + timer = new Timer(); timer.start(); } + public RecoveryState(StreamInput in) throws IOException { + timer = new Timer(in); + stage = Stage.fromId(in.readByte()); + shardId = ShardId.readShardId(in); + recoverySource = RecoverySource.readFrom(in); + targetNode = new DiscoveryNode(in); + sourceNode = in.readOptionalWriteable(DiscoveryNode::new); + index = new Index(in); + translog = new Translog(in); + verifyIndex = new VerifyIndex(in); + primary = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + timer.writeTo(out); + out.writeByte(stage.id()); + shardId.writeTo(out); + recoverySource.writeTo(out); + targetNode.writeTo(out); + out.writeOptionalWriteable(sourceNode); + index.writeTo(out); + translog.writeTo(out); + verifyIndex.writeTo(out); + out.writeBoolean(primary); + } + public ShardId getShardId() { return shardId; } @@ -223,37 +252,12 @@ public class RecoveryState implements ToXContentFragment, Streamable { } public static RecoveryState readRecoveryState(StreamInput in) throws IOException { - RecoveryState recoveryState = new RecoveryState(); - recoveryState.readFrom(in); - return recoveryState; + return new RecoveryState(in); } @Override public synchronized void readFrom(StreamInput in) throws IOException { - timer.readFrom(in); - stage = Stage.fromId(in.readByte()); - shardId = ShardId.readShardId(in); - recoverySource = RecoverySource.readFrom(in); - targetNode = new DiscoveryNode(in); - sourceNode = in.readOptionalWriteable(DiscoveryNode::new); - index.readFrom(in); - translog.readFrom(in); - verifyIndex.readFrom(in); - primary = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - timer.writeTo(out); - out.writeByte(stage.id()); - shardId.writeTo(out); - recoverySource.writeTo(out); - targetNode.writeTo(out); - out.writeOptionalWriteable(sourceNode); - index.writeTo(out); - translog.writeTo(out); - verifyIndex.writeTo(out); - out.writeBoolean(primary); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override @@ -347,12 +351,31 @@ public class RecoveryState implements ToXContentFragment, Streamable { static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis"; } - public static class Timer implements Streamable { + public static class Timer implements Writeable { protected long startTime = 0; protected long startNanoTime = 0; protected long time = -1; protected long stopTime = 0; + public Timer() { + } + + public Timer(StreamInput in) throws IOException { + startTime = in.readVLong(); + startNanoTime = in.readVLong(); + stopTime = in.readVLong(); + time = in.readVLong(); + } + + @Override + public synchronized void writeTo(StreamOutput out) throws IOException { + out.writeVLong(startTime); + out.writeVLong(startNanoTime); + out.writeVLong(stopTime); + // write a snapshot of current time, which is not per se the time field + out.writeVLong(time()); + } + public synchronized void start() { assert startTime == 0 : "already started"; startTime = System.currentTimeMillis(); @@ -394,29 +417,24 @@ public class RecoveryState implements ToXContentFragment, Streamable { stopTime = 0; } - - @Override - public synchronized void readFrom(StreamInput in) throws IOException { - startTime = in.readVLong(); - startNanoTime = in.readVLong(); - stopTime = in.readVLong(); - time = in.readVLong(); - } - - @Override - public synchronized void writeTo(StreamOutput out) throws IOException { - out.writeVLong(startTime); - out.writeVLong(startNanoTime); - out.writeVLong(stopTime); - // write a snapshot of current time, which is not per se the time field - out.writeVLong(time()); - } - } - public static class VerifyIndex extends Timer implements ToXContentFragment, Streamable { + public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable { private volatile long checkIndexTime; + public VerifyIndex() { + } + + public VerifyIndex(StreamInput in) throws IOException { + super(in); + checkIndexTime = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(checkIndexTime); + } public void reset() { super.reset(); @@ -431,18 +449,6 @@ public class RecoveryState implements ToXContentFragment, Streamable { this.checkIndexTime = checkIndexTime; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - checkIndexTime = in.readVLong(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVLong(checkIndexTime); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.humanReadableField(Fields.CHECK_INDEX_TIME_IN_MILLIS, Fields.CHECK_INDEX_TIME, new TimeValue(checkIndexTime)); @@ -451,13 +457,31 @@ public class RecoveryState implements ToXContentFragment, Streamable { } } - public static class Translog extends Timer implements ToXContentFragment, Streamable { + public static class Translog extends Timer implements ToXContentFragment, Writeable { public static final int UNKNOWN = -1; private int recovered; private int total = UNKNOWN; private int totalOnStart = UNKNOWN; + public Translog() { + } + + public Translog(StreamInput in) throws IOException { + super(in); + recovered = in.readVInt(); + total = in.readVInt(); + totalOnStart = in.readVInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVInt(recovered); + out.writeVInt(total); + out.writeVInt(totalOnStart); + } + public synchronized void reset() { super.reset(); recovered = 0; @@ -533,22 +557,6 @@ public class RecoveryState implements ToXContentFragment, Streamable { return recovered * 100.0f / total; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - recovered = in.readVInt(); - total = in.readVInt(); - totalOnStart = in.readVInt(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeVInt(recovered); - out.writeVInt(total); - out.writeVInt(totalOnStart); - } - @Override public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.RECOVERED, recovered); @@ -560,7 +568,7 @@ public class RecoveryState implements ToXContentFragment, Streamable { } } - public static class File implements ToXContentObject, Streamable { + public static class File implements ToXContentObject, Writeable { private String name; private long length; private long recovered; @@ -576,6 +584,21 @@ public class RecoveryState implements ToXContentFragment, Streamable { this.reused = reused; } + public File(StreamInput in) throws IOException { + name = in.readString(); + length = in.readVLong(); + recovered = in.readVLong(); + reused = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(name); + out.writeVLong(length); + out.writeVLong(recovered); + out.writeBoolean(reused); + } + void addRecoveredBytes(long bytes) { assert reused == false : "file is marked as reused, can't update recovered bytes"; assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]"; @@ -614,28 +637,6 @@ public class RecoveryState implements ToXContentFragment, Streamable { return reused == false && length == recovered; } - public static File readFile(StreamInput in) throws IOException { - File file = new File(); - file.readFrom(in); - return file; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - name = in.readString(); - length = in.readVLong(); - recovered = in.readVLong(); - reused = in.readBoolean(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(name); - out.writeVLong(length); - out.writeVLong(recovered); - out.writeBoolean(reused); - } - @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -671,7 +672,7 @@ public class RecoveryState implements ToXContentFragment, Streamable { } } - public static class Index extends Timer implements ToXContentFragment, Streamable { + public static class Index extends Timer implements ToXContentFragment, Writeable { private Map fileDetails = new HashMap<>(); @@ -681,6 +682,32 @@ public class RecoveryState implements ToXContentFragment, Streamable { private long sourceThrottlingInNanos = UNKNOWN; private long targetThrottleTimeInNanos = UNKNOWN; + public Index() { + } + + public Index(StreamInput in) throws IOException { + super(in); + int size = in.readVInt(); + for (int i = 0; i < size; i++) { + File file = new File(in); + fileDetails.put(file.name, file); + } + sourceThrottlingInNanos = in.readLong(); + targetThrottleTimeInNanos = in.readLong(); + } + + @Override + public synchronized void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + final File[] files = fileDetails.values().toArray(new File[0]); + out.writeVInt(files.length); + for (File file : files) { + file.writeTo(out); + } + out.writeLong(sourceThrottlingInNanos); + out.writeLong(targetThrottleTimeInNanos); + } + public synchronized List fileDetails() { return Collections.unmodifiableList(new ArrayList<>(fileDetails.values())); } @@ -883,30 +910,6 @@ public class RecoveryState implements ToXContentFragment, Streamable { this.version = version; } - @Override - public synchronized void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - int size = in.readVInt(); - for (int i = 0; i < size; i++) { - File file = File.readFile(in); - fileDetails.put(file.name, file); - } - sourceThrottlingInNanos = in.readLong(); - targetThrottleTimeInNanos = in.readLong(); - } - - @Override - public synchronized void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - final File[] files = fileDetails.values().toArray(new File[0]); - out.writeVInt(files.length); - for (File file : files) { - file.writeTo(out); - } - out.writeLong(sourceThrottlingInNanos); - out.writeLong(targetThrottleTimeInNanos); - } - @Override public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { // stream size first, as it matters more and the files section can be long diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java index 7a65541cb5e..1c2b5331fef 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.recovery.RecoveryState.File; import org.elasticsearch.indices.recovery.RecoveryState.Index; @@ -57,7 +57,7 @@ import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class RecoveryTargetTests extends ESTestCase { - abstract class Streamer extends Thread { + abstract class Streamer extends Thread { private T lastRead; private final AtomicBoolean shouldStop; private final T source; @@ -93,12 +93,10 @@ public class RecoveryTargetTests extends ESTestCase { } protected T deserialize(StreamInput in) throws IOException { - T obj = createObj(); - obj.readFrom(in); - return obj; + return createObj(in); } - abstract T createObj(); + abstract T createObj(StreamInput in) throws IOException; @Override public void run() { @@ -121,32 +119,32 @@ public class RecoveryTargetTests extends ESTestCase { timer = new Timer(); streamer = new Streamer(stop, timer) { @Override - Timer createObj() { - return new Timer(); + Timer createObj(StreamInput in) throws IOException { + return new Timer(in); } }; } else if (randomBoolean()) { timer = new Index(); streamer = new Streamer(stop, timer) { @Override - Timer createObj() { - return new Index(); + Timer createObj(StreamInput in) throws IOException { + return new Index(in); } }; } else if (randomBoolean()) { timer = new VerifyIndex(); streamer = new Streamer(stop, timer) { @Override - Timer createObj() { - return new VerifyIndex(); + Timer createObj(StreamInput in) throws IOException { + return new VerifyIndex(in); } }; } else { timer = new Translog(); streamer = new Streamer(stop, timer) { @Override - Timer createObj() { - return new Translog(); + Timer createObj(StreamInput in) throws IOException { + return new Translog(in); } }; } @@ -256,8 +254,8 @@ public class RecoveryTargetTests extends ESTestCase { Streamer backgroundReader = new Streamer(streamShouldStop, index) { @Override - Index createObj() { - return new Index(); + Index createObj(StreamInput in) throws IOException { + return new Index(in); } }; @@ -381,8 +379,8 @@ public class RecoveryTargetTests extends ESTestCase { AtomicBoolean stop = new AtomicBoolean(); Streamer streamer = new Streamer(stop, translog) { @Override - Translog createObj() { - return new Translog(); + Translog createObj(StreamInput in) throws IOException { + return new Translog(in); } }; @@ -458,8 +456,8 @@ public class RecoveryTargetTests extends ESTestCase { AtomicBoolean stop = new AtomicBoolean(); Streamer streamer = new Streamer(stop, verifyIndex) { @Override - VerifyIndex createObj() { - return new VerifyIndex(); + VerifyIndex createObj(StreamInput in) throws IOException { + return new VerifyIndex(in); } }; @@ -508,8 +506,8 @@ public class RecoveryTargetTests extends ESTestCase { final AtomicBoolean stop = new AtomicBoolean(false); Streamer readWriteIndex = new Streamer(stop, index) { @Override - Index createObj() { - return new Index(); + Index createObj(StreamInput in) throws IOException { + return new Index(in); } }; Thread modifyThread = new Thread() {