Extract recovery files details to its own class (#59121)

Backport of #59039
This commit is contained in:
Francisco Fernández Castaño 2020-07-07 12:35:57 +02:00 committed by GitHub
parent 5cc6457ed8
commit 1ced3f0eb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 101 additions and 42 deletions

View File

@ -41,6 +41,7 @@ import org.elasticsearch.index.store.StoreStats;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@ -692,10 +693,99 @@ public class RecoveryState implements ToXContentFragment, Writeable {
}
}
public static class Index extends Timer implements ToXContentFragment, Writeable {
public static class RecoveryFilesDetails implements ToXContentFragment, Writeable {
private final Map<String, File> fileDetails = new HashMap<>();
private boolean fileDetailsComplete;
private boolean complete;
RecoveryFilesDetails() {
}
RecoveryFilesDetails(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
File file = new File(in);
fileDetails.put(file.name, file);
}
if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
complete = in.readBoolean();
} else {
// This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not
// then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete
// so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path
// anyway since they always use IndexShard#getRecoveryState which is never transported over the wire.
complete = fileDetails.isEmpty() == false;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
final File[] files = values().toArray(new File[0]);
out.writeVInt(files.length);
for (File file : files) {
file.writeTo(out);
}
if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
out.writeBoolean(complete);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (params.paramAsBoolean("detailed", false)) {
builder.startArray(Fields.DETAILS);
for (File file : values()) {
file.toXContent(builder, params);
}
builder.endArray();
}
return builder;
}
public void addFileDetails(String name, long length, boolean reused) {
assert complete == false : "addFileDetail for [" + name + "] when file details are already complete";
File existing = fileDetails.put(name, new File(name, length, reused));
assert existing == null : "file [" + name + "] is already reported";
}
public void addRecoveredBytesToFile(String name, long bytes) {
File file = fileDetails.get(name);
assert file != null : "file [" + name + "] hasn't been reported";
file.addRecoveredBytes(bytes);
}
public File get(String name) {
return fileDetails.get(name);
}
public void setComplete() {
complete = true;
}
public int size() {
return fileDetails.size();
}
public boolean isEmpty() {
return fileDetails.isEmpty();
}
public void clear() {
fileDetails.clear();
complete = false;
}
public Collection<File> values() {
return fileDetails.values();
}
public boolean isComplete() {
return complete;
}
}
public static class Index extends Timer implements ToXContentFragment, Writeable {
private final RecoveryFilesDetails fileDetails;
public static final long UNKNOWN = -1L;
@ -703,24 +793,12 @@ public class RecoveryState implements ToXContentFragment, Writeable {
private long targetThrottleTimeInNanos = UNKNOWN;
public Index() {
this.fileDetails = new RecoveryFilesDetails();
}
public Index(StreamInput in) throws IOException {
super(in);
int size = in.readVInt();
for (int i = 0; i < size; i++) {
File file = new File(in);
fileDetails.put(file.name, file);
}
if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
fileDetailsComplete = in.readBoolean();
} else {
// This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not
// then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete
// so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path
// anyway since they always use IndexShard#getRecoveryState which is never transported over the wire.
fileDetailsComplete = fileDetails.isEmpty() == false;
}
fileDetails = new RecoveryFilesDetails(in);
sourceThrottlingInNanos = in.readLong();
targetThrottleTimeInNanos = in.readLong();
}
@ -728,14 +806,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
final File[] files = fileDetails.values().toArray(new File[0]);
out.writeVInt(files.length);
for (File file : files) {
file.writeTo(out);
}
if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
out.writeBoolean(fileDetailsComplete);
}
fileDetails.writeTo(out);
out.writeLong(sourceThrottlingInNanos);
out.writeLong(targetThrottleTimeInNanos);
}
@ -747,25 +818,20 @@ public class RecoveryState implements ToXContentFragment, Writeable {
public synchronized void reset() {
super.reset();
fileDetails.clear();
fileDetailsComplete = false;
sourceThrottlingInNanos = UNKNOWN;
targetThrottleTimeInNanos = UNKNOWN;
}
public synchronized void addFileDetail(String name, long length, boolean reused) {
assert fileDetailsComplete == false : "addFileDetail for [" + name + "] when file details are already complete";
File file = new File(name, length, reused);
File existing = fileDetails.put(name, file);
assert existing == null : "file [" + name + "] is already reported";
fileDetails.addFileDetails(name, length, reused);
}
public synchronized void setFileDetailsComplete() {
fileDetailsComplete = true;
fileDetails.setComplete();
}
public synchronized void addRecoveredBytesToFile(String name, long bytes) {
File file = fileDetails.get(name);
file.addRecoveredBytes(bytes);
fileDetails.addRecoveredBytesToFile(name, bytes);
}
public synchronized void addSourceThrottling(long timeInNanos) {
@ -812,7 +878,6 @@ public class RecoveryState implements ToXContentFragment, Writeable {
return total;
}
/**
* number of file that were recovered (excluding on ongoing files)
*/
@ -891,7 +956,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
* {@code -1} if the full set of files to recover is not yet known
*/
public synchronized long bytesStillToRecover() {
if (fileDetailsComplete == false) {
if (fileDetails.isComplete() == false) {
return -1L;
}
long total = 0L;
@ -961,13 +1026,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
builder.field(Fields.REUSED, reusedFileCount());
builder.field(Fields.RECOVERED, recoveredFileCount());
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredFilesPercent()));
if (params.paramAsBoolean("detailed", false)) {
builder.startArray(Fields.DETAILS);
for (File file : fileDetails.values()) {
file.toXContent(builder, params);
}
builder.endArray();
}
fileDetails.toXContent(builder, params);
builder.endObject();
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(time()));
builder.humanReadableField(Fields.SOURCE_THROTTLE_TIME_IN_MILLIS, Fields.SOURCE_THROTTLE_TIME, sourceThrottling());