Migrate o.e.i.r.RecoveryState to Writeable (#37380)

Relates to #34389
This commit is contained in:
Like 2019-01-26 04:52:04 +08:00 committed by Nhat Nguyen
parent 9e932f4869
commit eb7bf16427
2 changed files with 156 additions and 155 deletions

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentFragment;
@ -46,7 +47,7 @@ import java.util.Map;
/**
* Keeps track of state related to shard recovery.
*/
public class RecoveryState implements ToXContentFragment, Streamable {
public class RecoveryState implements ToXContentFragment, Streamable, Writeable {
public enum Stage {
INIT((byte) 0),
@ -102,20 +103,17 @@ public class RecoveryState implements ToXContentFragment, Streamable {
private Stage stage;
private final Index index = new Index();
private final Translog translog = new Translog();
private final VerifyIndex verifyIndex = new VerifyIndex();
private final Timer timer = new Timer();
private final Index index;
private final Translog translog;
private final VerifyIndex verifyIndex;
private final Timer timer;
private RecoverySource recoverySource;
private ShardId shardId;
@Nullable
private DiscoveryNode sourceNode;
private DiscoveryNode targetNode;
private boolean primary = false;
private RecoveryState() {
}
private boolean primary;
public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) {
assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting;
@ -128,9 +126,40 @@ public class RecoveryState implements ToXContentFragment, Streamable {
this.sourceNode = sourceNode;
this.targetNode = targetNode;
stage = Stage.INIT;
index = new Index();
translog = new Translog();
verifyIndex = new VerifyIndex();
timer = new Timer();
timer.start();
}
public RecoveryState(StreamInput in) throws IOException {
timer = new Timer(in);
stage = Stage.fromId(in.readByte());
shardId = ShardId.readShardId(in);
recoverySource = RecoverySource.readFrom(in);
targetNode = new DiscoveryNode(in);
sourceNode = in.readOptionalWriteable(DiscoveryNode::new);
index = new Index(in);
translog = new Translog(in);
verifyIndex = new VerifyIndex(in);
primary = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
timer.writeTo(out);
out.writeByte(stage.id());
shardId.writeTo(out);
recoverySource.writeTo(out);
targetNode.writeTo(out);
out.writeOptionalWriteable(sourceNode);
index.writeTo(out);
translog.writeTo(out);
verifyIndex.writeTo(out);
out.writeBoolean(primary);
}
public ShardId getShardId() {
return shardId;
}
@ -223,37 +252,12 @@ public class RecoveryState implements ToXContentFragment, Streamable {
}
public static RecoveryState readRecoveryState(StreamInput in) throws IOException {
RecoveryState recoveryState = new RecoveryState();
recoveryState.readFrom(in);
return recoveryState;
return new RecoveryState(in);
}
@Override
public synchronized void readFrom(StreamInput in) throws IOException {
timer.readFrom(in);
stage = Stage.fromId(in.readByte());
shardId = ShardId.readShardId(in);
recoverySource = RecoverySource.readFrom(in);
targetNode = new DiscoveryNode(in);
sourceNode = in.readOptionalWriteable(DiscoveryNode::new);
index.readFrom(in);
translog.readFrom(in);
verifyIndex.readFrom(in);
primary = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
timer.writeTo(out);
out.writeByte(stage.id());
shardId.writeTo(out);
recoverySource.writeTo(out);
targetNode.writeTo(out);
out.writeOptionalWriteable(sourceNode);
index.writeTo(out);
translog.writeTo(out);
verifyIndex.writeTo(out);
out.writeBoolean(primary);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
@ -347,12 +351,31 @@ public class RecoveryState implements ToXContentFragment, Streamable {
static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis";
}
public static class Timer implements Streamable {
public static class Timer implements Writeable {
protected long startTime = 0;
protected long startNanoTime = 0;
protected long time = -1;
protected long stopTime = 0;
public Timer() {
}
public Timer(StreamInput in) throws IOException {
startTime = in.readVLong();
startNanoTime = in.readVLong();
stopTime = in.readVLong();
time = in.readVLong();
}
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(startNanoTime);
out.writeVLong(stopTime);
// write a snapshot of current time, which is not per se the time field
out.writeVLong(time());
}
public synchronized void start() {
assert startTime == 0 : "already started";
startTime = System.currentTimeMillis();
@ -394,29 +417,24 @@ public class RecoveryState implements ToXContentFragment, Streamable {
stopTime = 0;
}
@Override
public synchronized void readFrom(StreamInput in) throws IOException {
startTime = in.readVLong();
startNanoTime = in.readVLong();
stopTime = in.readVLong();
time = in.readVLong();
}
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeVLong(startNanoTime);
out.writeVLong(stopTime);
// write a snapshot of current time, which is not per se the time field
out.writeVLong(time());
}
}
public static class VerifyIndex extends Timer implements ToXContentFragment, Streamable {
public static class VerifyIndex extends Timer implements ToXContentFragment, Writeable {
private volatile long checkIndexTime;
public VerifyIndex() {
}
public VerifyIndex(StreamInput in) throws IOException {
super(in);
checkIndexTime = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(checkIndexTime);
}
public void reset() {
super.reset();
@ -431,18 +449,6 @@ public class RecoveryState implements ToXContentFragment, Streamable {
this.checkIndexTime = checkIndexTime;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
checkIndexTime = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(checkIndexTime);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.humanReadableField(Fields.CHECK_INDEX_TIME_IN_MILLIS, Fields.CHECK_INDEX_TIME, new TimeValue(checkIndexTime));
@ -451,13 +457,31 @@ public class RecoveryState implements ToXContentFragment, Streamable {
}
}
public static class Translog extends Timer implements ToXContentFragment, Streamable {
public static class Translog extends Timer implements ToXContentFragment, Writeable {
public static final int UNKNOWN = -1;
private int recovered;
private int total = UNKNOWN;
private int totalOnStart = UNKNOWN;
public Translog() {
}
public Translog(StreamInput in) throws IOException {
super(in);
recovered = in.readVInt();
total = in.readVInt();
totalOnStart = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(recovered);
out.writeVInt(total);
out.writeVInt(totalOnStart);
}
public synchronized void reset() {
super.reset();
recovered = 0;
@ -533,22 +557,6 @@ public class RecoveryState implements ToXContentFragment, Streamable {
return recovered * 100.0f / total;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recovered = in.readVInt();
total = in.readVInt();
totalOnStart = in.readVInt();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(recovered);
out.writeVInt(total);
out.writeVInt(totalOnStart);
}
@Override
public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.RECOVERED, recovered);
@ -560,7 +568,7 @@ public class RecoveryState implements ToXContentFragment, Streamable {
}
}
public static class File implements ToXContentObject, Streamable {
public static class File implements ToXContentObject, Writeable {
private String name;
private long length;
private long recovered;
@ -576,6 +584,21 @@ public class RecoveryState implements ToXContentFragment, Streamable {
this.reused = reused;
}
public File(StreamInput in) throws IOException {
name = in.readString();
length = in.readVLong();
recovered = in.readVLong();
reused = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeVLong(length);
out.writeVLong(recovered);
out.writeBoolean(reused);
}
void addRecoveredBytes(long bytes) {
assert reused == false : "file is marked as reused, can't update recovered bytes";
assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]";
@ -614,28 +637,6 @@ public class RecoveryState implements ToXContentFragment, Streamable {
return reused == false && length == recovered;
}
public static File readFile(StreamInput in) throws IOException {
File file = new File();
file.readFrom(in);
return file;
}
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
length = in.readVLong();
recovered = in.readVLong();
reused = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeVLong(length);
out.writeVLong(recovered);
out.writeBoolean(reused);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -671,7 +672,7 @@ public class RecoveryState implements ToXContentFragment, Streamable {
}
}
public static class Index extends Timer implements ToXContentFragment, Streamable {
public static class Index extends Timer implements ToXContentFragment, Writeable {
private Map<String, File> fileDetails = new HashMap<>();
@ -681,6 +682,32 @@ public class RecoveryState implements ToXContentFragment, Streamable {
private long sourceThrottlingInNanos = UNKNOWN;
private long targetThrottleTimeInNanos = UNKNOWN;
public Index() {
}
public Index(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
File file = new File(in);
fileDetails.put(file.name, file);
}
sourceThrottlingInNanos = in.readLong();
targetThrottleTimeInNanos = in.readLong();
}
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
final File[] files = fileDetails.values().toArray(new File[0]);
out.writeVInt(files.length);
for (File file : files) {
file.writeTo(out);
}
out.writeLong(sourceThrottlingInNanos);
out.writeLong(targetThrottleTimeInNanos);
}
public synchronized List<File> fileDetails() {
return Collections.unmodifiableList(new ArrayList<>(fileDetails.values()));
}
@ -883,30 +910,6 @@ public class RecoveryState implements ToXContentFragment, Streamable {
this.version = version;
}
@Override
public synchronized void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
File file = File.readFile(in);
fileDetails.put(file.name, file);
}
sourceThrottlingInNanos = in.readLong();
targetThrottleTimeInNanos = in.readLong();
}
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
final File[] files = fileDetails.values().toArray(new File[0]);
out.writeVInt(files.length);
for (File file : files) {
file.writeTo(out);
}
out.writeLong(sourceThrottlingInNanos);
out.writeLong(targetThrottleTimeInNanos);
}
@Override
public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// stream size first, as it matters more and the files section can be long

View File

@ -27,7 +27,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryState.File;
import org.elasticsearch.indices.recovery.RecoveryState.Index;
@ -57,7 +57,7 @@ import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class RecoveryTargetTests extends ESTestCase {
abstract class Streamer<T extends Streamable> extends Thread {
abstract class Streamer<T extends Writeable> extends Thread {
private T lastRead;
private final AtomicBoolean shouldStop;
private final T source;
@ -93,12 +93,10 @@ public class RecoveryTargetTests extends ESTestCase {
}
protected T deserialize(StreamInput in) throws IOException {
T obj = createObj();
obj.readFrom(in);
return obj;
return createObj(in);
}
abstract T createObj();
abstract T createObj(StreamInput in) throws IOException;
@Override
public void run() {
@ -121,32 +119,32 @@ public class RecoveryTargetTests extends ESTestCase {
timer = new Timer();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj() {
return new Timer();
Timer createObj(StreamInput in) throws IOException {
return new Timer(in);
}
};
} else if (randomBoolean()) {
timer = new Index();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj() {
return new Index();
Timer createObj(StreamInput in) throws IOException {
return new Index(in);
}
};
} else if (randomBoolean()) {
timer = new VerifyIndex();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj() {
return new VerifyIndex();
Timer createObj(StreamInput in) throws IOException {
return new VerifyIndex(in);
}
};
} else {
timer = new Translog();
streamer = new Streamer<Timer>(stop, timer) {
@Override
Timer createObj() {
return new Translog();
Timer createObj(StreamInput in) throws IOException {
return new Translog(in);
}
};
}
@ -256,8 +254,8 @@ public class RecoveryTargetTests extends ESTestCase {
Streamer<Index> backgroundReader = new Streamer<RecoveryState.Index>(streamShouldStop, index) {
@Override
Index createObj() {
return new Index();
Index createObj(StreamInput in) throws IOException {
return new Index(in);
}
};
@ -381,8 +379,8 @@ public class RecoveryTargetTests extends ESTestCase {
AtomicBoolean stop = new AtomicBoolean();
Streamer<Translog> streamer = new Streamer<Translog>(stop, translog) {
@Override
Translog createObj() {
return new Translog();
Translog createObj(StreamInput in) throws IOException {
return new Translog(in);
}
};
@ -458,8 +456,8 @@ public class RecoveryTargetTests extends ESTestCase {
AtomicBoolean stop = new AtomicBoolean();
Streamer<VerifyIndex> streamer = new Streamer<VerifyIndex>(stop, verifyIndex) {
@Override
VerifyIndex createObj() {
return new VerifyIndex();
VerifyIndex createObj(StreamInput in) throws IOException {
return new VerifyIndex(in);
}
};
@ -508,8 +506,8 @@ public class RecoveryTargetTests extends ESTestCase {
final AtomicBoolean stop = new AtomicBoolean(false);
Streamer<Index> readWriteIndex = new Streamer<Index>(stop, index) {
@Override
Index createObj() {
return new Index();
Index createObj(StreamInput in) throws IOException {
return new Index(in);
}
};
Thread modifyThread = new Thread() {