Moving RecoveryState.Index to a top-level class and renaming (#3075)

* Moving RecoveryState.Index to a top-level class and renaming

This class is a building block of replication and will be re-used between peer recovery and segment replication. Thus, the inner class has been extracted to a top-level class and moved to the replication.common package. It has been renamed to ReplicationLuceneIndex to better reflect what it represents. It has two dependent inner classes from RecoveryState that have also been moved along with it - these remain inner classes since they are not currently used anywhere else. The RecoveryFilesDetails class has been renamed to FilesDetails and the FileDetail class has been renamed to FileMetadata.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Incorporate PR comments

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Revert Project_Default.xml

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Update REST Action test to no longer mock a final class

Instead, the test now populates dummy data.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Removing mocking of ReplicationLuceneIndex in RecoverySourceHandlerTests

The class has been marked final, so it can no longer be mocked. Instead, the test class sets up the lucene index class by adding the smae file metadata that is set up for the store.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Configure @opensearch.internal as custom Javadoc tag

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Revert "Configure @opensearch.internal as custom Javadoc tag"

This reverts commit 2077d76e421fc2ceea7c1c9714b6a1906a555a31.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
This commit is contained in:
Kartik Ganesh 2022-05-05 12:22:47 -07:00 committed by GitHub
parent 6003921ce7
commit 44573baff2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 634 additions and 559 deletions

View File

@ -62,6 +62,7 @@ import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
@ -547,7 +548,7 @@ public class RecoveryFromGatewayIT extends OpenSearchIntegTestCase {
final Set<String> files = new HashSet<>();
for (final RecoveryState recoveryState : initialRecoveryReponse.shardRecoveryStates().get("test")) {
if (recoveryState.getTargetNode().getName().equals(replicaNode)) {
for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (final ReplicationLuceneIndex.FileMetadata file : recoveryState.getIndex().fileDetails()) {
files.add(file.name());
}
break;
@ -607,7 +608,7 @@ public class RecoveryFromGatewayIT extends OpenSearchIntegTestCase {
long reused = 0;
int filesRecovered = 0;
int filesReused = 0;
for (final RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (final ReplicationLuceneIndex.FileMetadata file : recoveryState.getIndex().fileDetails()) {
if (files.contains(file.name()) == false) {
recovered += file.length();
filesRecovered++;

View File

@ -102,6 +102,7 @@ import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
@ -836,7 +837,7 @@ public class IndexRecoveryIT extends OpenSearchIntegTestCase {
return client().admin().indices().prepareStats(name).execute().actionGet();
}
private void validateIndexRecoveryState(RecoveryState.Index indexState) {
private void validateIndexRecoveryState(ReplicationLuceneIndex indexState) {
assertThat(indexState.time(), greaterThanOrEqualTo(0L));
assertThat(indexState.recoveredFilesPercent(), greaterThanOrEqualTo(0.0f));
assertThat(indexState.recoveredFilesPercent(), lessThanOrEqualTo(100.0f));

View File

@ -62,6 +62,7 @@ import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
@ -176,7 +177,7 @@ final class StoreRecovery {
}
void addIndices(
final RecoveryState.Index indexRecoveryStats,
final ReplicationLuceneIndex indexRecoveryStats,
final Directory target,
final Sort indexSort,
final Directory[] sources,
@ -231,9 +232,9 @@ final class StoreRecovery {
* Directory wrapper that records copy process for recovery statistics
*/
static final class StatsDirectoryWrapper extends FilterDirectory {
private final RecoveryState.Index index;
private final ReplicationLuceneIndex index;
StatsDirectoryWrapper(Directory in, RecoveryState.Index indexRecoveryStats) {
StatsDirectoryWrapper(Directory in, ReplicationLuceneIndex indexRecoveryStats) {
super(in);
this.index = indexRecoveryStats;
}
@ -354,7 +355,7 @@ final class StoreRecovery {
+ "]";
if (logger.isTraceEnabled()) {
RecoveryState.Index index = recoveryState.getIndex();
ReplicationLuceneIndex index = recoveryState.getIndex();
StringBuilder sb = new StringBuilder();
sb.append(" index : files [")
.append(index.totalFileCount())
@ -471,7 +472,7 @@ final class StoreRecovery {
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
final RecoveryState.Index index = recoveryState.getIndex();
final ReplicationLuceneIndex index = recoveryState.getIndex();
try {
if (si != null) {
addRecoveredFileDetails(si, store, index);
@ -509,7 +510,7 @@ final class StoreRecovery {
assert indexShard.loadRetentionLeases().leases().isEmpty();
}
private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState.Index index) throws IOException {
private void addRecoveredFileDetails(SegmentInfos si, Store store, ReplicationLuceneIndex index) throws IOException {
final Directory directory = store.directory();
for (String name : Lucene.files(si)) {
long length = directory.fileLength(name);

View File

@ -44,6 +44,7 @@ import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.transport.Transports;
import java.io.IOException;
@ -58,7 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class MultiFileWriter extends AbstractRefCounted implements Releasable {
public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
public MultiFileWriter(Store store, ReplicationLuceneIndex indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
super("multi_file_writer");
this.store = store;
this.indexState = indexState;
@ -71,7 +72,7 @@ public class MultiFileWriter extends AbstractRefCounted implements Releasable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Logger logger;
private final Store store;
private final RecoveryState.Index indexState;
private final ReplicationLuceneIndex indexState;
private final String tempFilePrefix;
private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();

View File

@ -70,6 +70,7 @@ import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
@ -521,8 +522,8 @@ public class PeerRecoveryTargetService implements IndexEventListener {
return;
}
final RecoveryState.Index indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) {
final ReplicationLuceneIndex indexState = recoveryTarget.state().getIndex();
if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) {
indexState.addSourceThrottling(request.sourceThrottleTimeInNanos());
}

View File

@ -37,29 +37,19 @@ import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.StoreStats;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
* Keeps track of state related to shard recovery.
@ -120,7 +110,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
private Stage stage;
private final Index index;
private final ReplicationLuceneIndex index;
private final Translog translog;
private final VerifyIndex verifyIndex;
private final ReplicationTimer timer;
@ -133,10 +123,15 @@ public class RecoveryState implements ToXContentFragment, Writeable {
private boolean primary;
public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode) {
this(shardRouting, targetNode, sourceNode, new Index());
this(shardRouting, targetNode, sourceNode, new ReplicationLuceneIndex());
}
public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) {
public RecoveryState(
ShardRouting shardRouting,
DiscoveryNode targetNode,
@Nullable DiscoveryNode sourceNode,
ReplicationLuceneIndex index
) {
assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting;
RecoverySource recoverySource = shardRouting.recoverySource();
assert (recoverySource.getType() == RecoverySource.Type.PEER) == (sourceNode != null)
@ -161,7 +156,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
recoverySource = RecoverySource.readFrom(in);
targetNode = new DiscoveryNode(in);
sourceNode = in.readOptionalWriteable(DiscoveryNode::new);
index = new Index(in);
index = new ReplicationLuceneIndex(in);
translog = new Translog(in);
verifyIndex = new VerifyIndex(in);
primary = in.readBoolean();
@ -245,7 +240,7 @@ public class RecoveryState implements ToXContentFragment, Writeable {
return this;
}
public Index getIndex() {
public ReplicationLuceneIndex getIndex() {
return index;
}
@ -353,23 +348,10 @@ public class RecoveryState implements ToXContentFragment, Writeable {
static final String TOTAL_ON_START = "total_on_start";
static final String VERIFY_INDEX = "verify_index";
static final String RECOVERED = "recovered";
static final String RECOVERED_IN_BYTES = "recovered_in_bytes";
static final String CHECK_INDEX_TIME = "check_index_time";
static final String CHECK_INDEX_TIME_IN_MILLIS = "check_index_time_in_millis";
static final String LENGTH = "length";
static final String LENGTH_IN_BYTES = "length_in_bytes";
static final String FILES = "files";
static final String TOTAL = "total";
static final String TOTAL_IN_BYTES = "total_in_bytes";
static final String REUSED = "reused";
static final String REUSED_IN_BYTES = "reused_in_bytes";
static final String PERCENT = "percent";
static final String DETAILS = "details";
static final String SIZE = "size";
static final String SOURCE_THROTTLE_TIME = "source_throttle_time";
static final String SOURCE_THROTTLE_TIME_IN_MILLIS = "source_throttle_time_in_millis";
static final String TARGET_THROTTLE_TIME = "target_throttle_time";
static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis";
}
public static class VerifyIndex extends ReplicationTimer implements ToXContentFragment, Writeable {
@ -554,466 +536,4 @@ public class RecoveryState implements ToXContentFragment, Writeable {
}
}
public static class FileDetail implements ToXContentObject, Writeable {
private String name;
private long length;
private long recovered;
private boolean reused;
public FileDetail(String name, long length, boolean reused) {
assert name != null;
this.name = name;
this.length = length;
this.reused = reused;
}
public FileDetail(StreamInput in) throws IOException {
name = in.readString();
length = in.readVLong();
recovered = in.readVLong();
reused = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeVLong(length);
out.writeVLong(recovered);
out.writeBoolean(reused);
}
void addRecoveredBytes(long bytes) {
assert reused == false : "file is marked as reused, can't update recovered bytes";
assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]";
recovered += bytes;
}
/**
* file name *
*/
public String name() {
return name;
}
/**
* file length *
*/
public long length() {
return length;
}
/**
* number of bytes recovered for this file (so far). 0 if the file is reused *
*/
public long recovered() {
return recovered;
}
/**
* returns true if the file is reused from a local copy
*/
public boolean reused() {
return reused;
}
boolean fullyRecovered() {
return reused == false && length == recovered;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields.NAME, name);
builder.humanReadableField(Fields.LENGTH_IN_BYTES, Fields.LENGTH, new ByteSizeValue(length));
builder.field(Fields.REUSED, reused);
builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recovered));
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof FileDetail) {
FileDetail other = (FileDetail) obj;
return name.equals(other.name) && length == other.length() && reused == other.reused() && recovered == other.recovered();
}
return false;
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + Long.hashCode(length);
result = 31 * result + Long.hashCode(recovered);
result = 31 * result + (reused ? 1 : 0);
return result;
}
@Override
public String toString() {
return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], recovered [" + recovered + "])";
}
}
public static class RecoveryFilesDetails implements ToXContentFragment, Writeable {
protected final Map<String, FileDetail> fileDetails = new HashMap<>();
protected boolean complete;
public RecoveryFilesDetails() {}
RecoveryFilesDetails(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
FileDetail file = new FileDetail(in);
fileDetails.put(file.name, file);
}
if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
complete = in.readBoolean();
} else {
// This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not
// then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete
// so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path
// anyway since they always use IndexShard#getRecoveryState which is never transported over the wire.
complete = fileDetails.isEmpty() == false;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
final FileDetail[] files = values().toArray(new FileDetail[0]);
out.writeVInt(files.length);
for (FileDetail file : files) {
file.writeTo(out);
}
if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
out.writeBoolean(complete);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (params.paramAsBoolean("detailed", false)) {
builder.startArray(Fields.DETAILS);
for (FileDetail file : values()) {
file.toXContent(builder, params);
}
builder.endArray();
}
return builder;
}
public void addFileDetails(String name, long length, boolean reused) {
assert complete == false : "addFileDetail for [" + name + "] when file details are already complete";
FileDetail existing = fileDetails.put(name, new FileDetail(name, length, reused));
assert existing == null : "file [" + name + "] is already reported";
}
public void addRecoveredBytesToFile(String name, long bytes) {
FileDetail file = fileDetails.get(name);
assert file != null : "file [" + name + "] hasn't been reported";
file.addRecoveredBytes(bytes);
}
public FileDetail get(String name) {
return fileDetails.get(name);
}
public void setComplete() {
complete = true;
}
public int size() {
return fileDetails.size();
}
public boolean isEmpty() {
return fileDetails.isEmpty();
}
public void clear() {
fileDetails.clear();
complete = false;
}
public Collection<FileDetail> values() {
return fileDetails.values();
}
public boolean isComplete() {
return complete;
}
}
public static class Index extends ReplicationTimer implements ToXContentFragment, Writeable {
private final RecoveryFilesDetails fileDetails;
public static final long UNKNOWN = -1L;
private long sourceThrottlingInNanos = UNKNOWN;
private long targetThrottleTimeInNanos = UNKNOWN;
public Index() {
this(new RecoveryFilesDetails());
}
public Index(RecoveryFilesDetails recoveryFilesDetails) {
this.fileDetails = recoveryFilesDetails;
}
public Index(StreamInput in) throws IOException {
super(in);
fileDetails = new RecoveryFilesDetails(in);
sourceThrottlingInNanos = in.readLong();
targetThrottleTimeInNanos = in.readLong();
}
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
fileDetails.writeTo(out);
out.writeLong(sourceThrottlingInNanos);
out.writeLong(targetThrottleTimeInNanos);
}
public synchronized List<FileDetail> fileDetails() {
return Collections.unmodifiableList(new ArrayList<>(fileDetails.values()));
}
public synchronized void reset() {
super.reset();
fileDetails.clear();
sourceThrottlingInNanos = UNKNOWN;
targetThrottleTimeInNanos = UNKNOWN;
}
public synchronized void addFileDetail(String name, long length, boolean reused) {
fileDetails.addFileDetails(name, length, reused);
}
public synchronized void setFileDetailsComplete() {
fileDetails.setComplete();
}
public synchronized void addRecoveredBytesToFile(String name, long bytes) {
fileDetails.addRecoveredBytesToFile(name, bytes);
}
public synchronized void addSourceThrottling(long timeInNanos) {
if (sourceThrottlingInNanos == UNKNOWN) {
sourceThrottlingInNanos = timeInNanos;
} else {
sourceThrottlingInNanos += timeInNanos;
}
}
public synchronized void addTargetThrottling(long timeInNanos) {
if (targetThrottleTimeInNanos == UNKNOWN) {
targetThrottleTimeInNanos = timeInNanos;
} else {
targetThrottleTimeInNanos += timeInNanos;
}
}
public synchronized TimeValue sourceThrottling() {
return TimeValue.timeValueNanos(sourceThrottlingInNanos);
}
public synchronized TimeValue targetThrottling() {
return TimeValue.timeValueNanos(targetThrottleTimeInNanos);
}
/**
* total number of files that are part of this recovery, both re-used and recovered
*/
public synchronized int totalFileCount() {
return fileDetails.size();
}
/**
* total number of files to be recovered (potentially not yet done)
*/
public synchronized int totalRecoverFiles() {
int total = 0;
for (FileDetail file : fileDetails.values()) {
if (file.reused() == false) {
total++;
}
}
return total;
}
/**
* number of file that were recovered (excluding on ongoing files)
*/
public synchronized int recoveredFileCount() {
int count = 0;
for (FileDetail file : fileDetails.values()) {
if (file.fullyRecovered()) {
count++;
}
}
return count;
}
/**
* percent of recovered (i.e., not reused) files out of the total files to be recovered
*/
public synchronized float recoveredFilesPercent() {
int total = 0;
int recovered = 0;
for (FileDetail file : fileDetails.values()) {
if (file.reused() == false) {
total++;
if (file.fullyRecovered()) {
recovered++;
}
}
}
if (total == 0 && fileDetails.size() == 0) { // indicates we are still in init phase
return 0.0f;
}
if (total == recovered) {
return 100.0f;
} else {
float result = 100.0f * (recovered / (float) total);
return result;
}
}
/**
* total number of bytes in th shard
*/
public synchronized long totalBytes() {
long total = 0;
for (FileDetail file : fileDetails.values()) {
total += file.length();
}
return total;
}
/**
* total number of bytes recovered so far, including both existing and reused
*/
public synchronized long recoveredBytes() {
long recovered = 0;
for (FileDetail file : fileDetails.values()) {
recovered += file.recovered();
}
return recovered;
}
/**
* total bytes of files to be recovered (potentially not yet done)
*/
public synchronized long totalRecoverBytes() {
long total = 0;
for (FileDetail file : fileDetails.values()) {
if (file.reused() == false) {
total += file.length();
}
}
return total;
}
/**
* @return number of bytes still to recover, i.e. {@link Index#totalRecoverBytes()} minus {@link Index#recoveredBytes()}, or
* {@code -1} if the full set of files to recover is not yet known
*/
public synchronized long bytesStillToRecover() {
if (fileDetails.isComplete() == false) {
return -1L;
}
long total = 0L;
for (FileDetail file : fileDetails.values()) {
if (file.reused() == false) {
total += file.length() - file.recovered();
}
}
return total;
}
/**
* percent of bytes recovered out of total files bytes *to be* recovered
*/
public synchronized float recoveredBytesPercent() {
long total = 0;
long recovered = 0;
for (FileDetail file : fileDetails.values()) {
if (file.reused() == false) {
total += file.length();
recovered += file.recovered();
}
}
if (total == 0 && fileDetails.size() == 0) {
// indicates we are still in init phase
return 0.0f;
}
if (total == recovered) {
return 100.0f;
} else {
return 100.0f * recovered / total;
}
}
public synchronized int reusedFileCount() {
int reused = 0;
for (FileDetail file : fileDetails.values()) {
if (file.reused()) {
reused++;
}
}
return reused;
}
public synchronized long reusedBytes() {
long reused = 0;
for (FileDetail file : fileDetails.values()) {
if (file.reused()) {
reused += file.length();
}
}
return reused;
}
@Override
public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// stream size first, as it matters more and the files section can be long
builder.startObject(Fields.SIZE);
builder.humanReadableField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, new ByteSizeValue(totalBytes()));
builder.humanReadableField(Fields.REUSED_IN_BYTES, Fields.REUSED, new ByteSizeValue(reusedBytes()));
builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recoveredBytes()));
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredBytesPercent()));
builder.endObject();
builder.startObject(Fields.FILES);
builder.field(Fields.TOTAL, totalFileCount());
builder.field(Fields.REUSED, reusedFileCount());
builder.field(Fields.RECOVERED, recoveredFileCount());
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredFilesPercent()));
fileDetails.toXContent(builder, params);
builder.endObject();
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(time()));
builder.humanReadableField(Fields.SOURCE_THROTTLE_TIME_IN_MILLIS, Fields.SOURCE_THROTTLE_TIME, sourceThrottling());
builder.humanReadableField(Fields.TARGET_THROTTLE_TIME_IN_MILLIS, Fields.TARGET_THROTTLE_TIME, targetThrottling());
return builder;
}
@Override
public synchronized String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return Strings.toString(builder);
} catch (IOException e) {
return "{ \"error\" : \"" + e.getMessage() + "\"}";
}
}
public synchronized FileDetail getFileDetails(String dest) {
return fileDetails.get(dest);
}
}
}

View File

@ -60,6 +60,7 @@ import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import java.io.IOException;
import java.nio.file.Path;
@ -423,7 +424,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
ActionListener.completeWith(listener, () -> {
indexShard.resetRecoveryStage();
indexShard.prepareForIndexRecovery();
final RecoveryState.Index index = state().getIndex();
final ReplicationLuceneIndex index = state().getIndex();
for (int i = 0; i < phase1ExistingFileNames.size(); i++) {
index.addFileDetail(phase1ExistingFileNames.get(i), phase1ExistingFileSizes.get(i), true);
}

View File

@ -0,0 +1,525 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.indices.replication.common;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.index.store.StoreStats;
import org.opensearch.indices.recovery.RecoveryState;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
* Represents the Lucene Index (set of files on a single shard) involved
* in the replication process.
*
* @opensearch.internal
*/
public final class ReplicationLuceneIndex extends ReplicationTimer implements ToXContentFragment, Writeable {
private final FilesDetails filesDetails;
public static final long UNKNOWN = -1L;
private long sourceThrottlingInNanos = UNKNOWN;
private long targetThrottleTimeInNanos = UNKNOWN;
public ReplicationLuceneIndex() {
this(new FilesDetails());
}
public ReplicationLuceneIndex(FilesDetails filesDetails) {
this.filesDetails = filesDetails;
}
public ReplicationLuceneIndex(StreamInput in) throws IOException {
super(in);
filesDetails = new FilesDetails(in);
sourceThrottlingInNanos = in.readLong();
targetThrottleTimeInNanos = in.readLong();
}
@Override
public synchronized void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
filesDetails.writeTo(out);
out.writeLong(sourceThrottlingInNanos);
out.writeLong(targetThrottleTimeInNanos);
}
public synchronized List<FileMetadata> fileDetails() {
return Collections.unmodifiableList(new ArrayList<>(filesDetails.values()));
}
public synchronized void reset() {
super.reset();
filesDetails.clear();
sourceThrottlingInNanos = UNKNOWN;
targetThrottleTimeInNanos = UNKNOWN;
}
public synchronized void addFileDetail(String name, long length, boolean reused) {
filesDetails.addFileDetails(name, length, reused);
}
public synchronized void setFileDetailsComplete() {
filesDetails.setComplete();
}
public synchronized void addRecoveredBytesToFile(String name, long bytes) {
filesDetails.addRecoveredBytesToFile(name, bytes);
}
public synchronized void addSourceThrottling(long timeInNanos) {
if (sourceThrottlingInNanos == UNKNOWN) {
sourceThrottlingInNanos = timeInNanos;
} else {
sourceThrottlingInNanos += timeInNanos;
}
}
public synchronized void addTargetThrottling(long timeInNanos) {
if (targetThrottleTimeInNanos == UNKNOWN) {
targetThrottleTimeInNanos = timeInNanos;
} else {
targetThrottleTimeInNanos += timeInNanos;
}
}
public synchronized TimeValue sourceThrottling() {
return TimeValue.timeValueNanos(sourceThrottlingInNanos);
}
public synchronized TimeValue targetThrottling() {
return TimeValue.timeValueNanos(targetThrottleTimeInNanos);
}
/**
* total number of files that are part of this recovery, both re-used and recovered
*/
public synchronized int totalFileCount() {
return filesDetails.size();
}
/**
* total number of files to be recovered (potentially not yet done)
*/
public synchronized int totalRecoverFiles() {
int total = 0;
for (FileMetadata file : filesDetails.values()) {
if (file.reused() == false) {
total++;
}
}
return total;
}
/**
* number of file that were recovered (excluding on ongoing files)
*/
public synchronized int recoveredFileCount() {
int count = 0;
for (FileMetadata file : filesDetails.values()) {
if (file.fullyRecovered()) {
count++;
}
}
return count;
}
/**
* percent of recovered (i.e., not reused) files out of the total files to be recovered
*/
public synchronized float recoveredFilesPercent() {
int total = 0;
int recovered = 0;
for (FileMetadata file : filesDetails.values()) {
if (file.reused() == false) {
total++;
if (file.fullyRecovered()) {
recovered++;
}
}
}
if (total == 0 && filesDetails.size() == 0) { // indicates we are still in init phase
return 0.0f;
}
if (total == recovered) {
return 100.0f;
} else {
float result = 100.0f * (recovered / (float) total);
return result;
}
}
/**
* total number of bytes in th shard
*/
public synchronized long totalBytes() {
long total = 0;
for (FileMetadata file : filesDetails.values()) {
total += file.length();
}
return total;
}
/**
* total number of bytes recovered so far, including both existing and reused
*/
public synchronized long recoveredBytes() {
long recovered = 0;
for (FileMetadata file : filesDetails.values()) {
recovered += file.recovered();
}
return recovered;
}
/**
* total bytes of files to be recovered (potentially not yet done)
*/
public synchronized long totalRecoverBytes() {
long total = 0;
for (FileMetadata file : filesDetails.values()) {
if (file.reused() == false) {
total += file.length();
}
}
return total;
}
/**
* @return number of bytes still to recover, i.e. {@link ReplicationLuceneIndex#totalRecoverBytes()} minus {@link ReplicationLuceneIndex#recoveredBytes()}, or
* {@code -1} if the full set of files to recover is not yet known
*/
public synchronized long bytesStillToRecover() {
if (filesDetails.isComplete() == false) {
return -1L;
}
long total = 0L;
for (FileMetadata file : filesDetails.values()) {
if (file.reused() == false) {
total += file.length() - file.recovered();
}
}
return total;
}
/**
* percent of bytes recovered out of total files bytes *to be* recovered
*/
public synchronized float recoveredBytesPercent() {
long total = 0;
long recovered = 0;
for (FileMetadata file : filesDetails.values()) {
if (file.reused() == false) {
total += file.length();
recovered += file.recovered();
}
}
if (total == 0 && filesDetails.size() == 0) {
// indicates we are still in init phase
return 0.0f;
}
if (total == recovered) {
return 100.0f;
} else {
return 100.0f * recovered / total;
}
}
public synchronized int reusedFileCount() {
int reused = 0;
for (FileMetadata file : filesDetails.values()) {
if (file.reused()) {
reused++;
}
}
return reused;
}
public synchronized long reusedBytes() {
long reused = 0;
for (FileMetadata file : filesDetails.values()) {
if (file.reused()) {
reused += file.length();
}
}
return reused;
}
@Override
public synchronized XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
// stream size first, as it matters more and the files section can be long
builder.startObject(Fields.SIZE);
builder.humanReadableField(Fields.TOTAL_IN_BYTES, Fields.TOTAL, new ByteSizeValue(totalBytes()));
builder.humanReadableField(Fields.REUSED_IN_BYTES, Fields.REUSED, new ByteSizeValue(reusedBytes()));
builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recoveredBytes()));
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredBytesPercent()));
builder.endObject();
builder.startObject(Fields.FILES);
builder.field(Fields.TOTAL, totalFileCount());
builder.field(Fields.REUSED, reusedFileCount());
builder.field(Fields.RECOVERED, recoveredFileCount());
builder.field(Fields.PERCENT, String.format(Locale.ROOT, "%1.1f%%", recoveredFilesPercent()));
filesDetails.toXContent(builder, params);
builder.endObject();
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(time()));
builder.humanReadableField(Fields.SOURCE_THROTTLE_TIME_IN_MILLIS, Fields.SOURCE_THROTTLE_TIME, sourceThrottling());
builder.humanReadableField(Fields.TARGET_THROTTLE_TIME_IN_MILLIS, Fields.TARGET_THROTTLE_TIME, targetThrottling());
return builder;
}
@Override
public synchronized String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return Strings.toString(builder);
} catch (IOException e) {
return "{ \"error\" : \"" + e.getMessage() + "\"}";
}
}
public synchronized FileMetadata getFileDetails(String dest) {
return filesDetails.get(dest);
}
private static final class FilesDetails implements ToXContentFragment, Writeable {
protected final Map<String, FileMetadata> fileMetadataMap = new HashMap<>();
protected boolean complete;
public FilesDetails() {}
FilesDetails(StreamInput in) throws IOException {
int size = in.readVInt();
for (int i = 0; i < size; i++) {
FileMetadata file = new FileMetadata(in);
fileMetadataMap.put(file.name, file);
}
if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
complete = in.readBoolean();
} else {
// This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not
// then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete
// so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path
// anyway since they always use IndexShard#getRecoveryState which is never transported over the wire.
complete = fileMetadataMap.isEmpty() == false;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
final FileMetadata[] files = values().toArray(new FileMetadata[0]);
out.writeVInt(files.length);
for (FileMetadata file : files) {
file.writeTo(out);
}
if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) {
out.writeBoolean(complete);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (params.paramAsBoolean("detailed", false)) {
builder.startArray(Fields.DETAILS);
for (FileMetadata file : values()) {
file.toXContent(builder, params);
}
builder.endArray();
}
return builder;
}
public void addFileDetails(String name, long length, boolean reused) {
assert complete == false : "addFileDetail for [" + name + "] when file details are already complete";
FileMetadata existing = fileMetadataMap.put(name, new FileMetadata(name, length, reused));
assert existing == null : "file [" + name + "] is already reported";
}
public void addRecoveredBytesToFile(String name, long bytes) {
FileMetadata file = fileMetadataMap.get(name);
assert file != null : "file [" + name + "] hasn't been reported";
file.addRecoveredBytes(bytes);
}
public FileMetadata get(String name) {
return fileMetadataMap.get(name);
}
public void setComplete() {
complete = true;
}
public int size() {
return fileMetadataMap.size();
}
public boolean isEmpty() {
return fileMetadataMap.isEmpty();
}
public void clear() {
fileMetadataMap.clear();
complete = false;
}
public Collection<FileMetadata> values() {
return fileMetadataMap.values();
}
public boolean isComplete() {
return complete;
}
}
public static final class FileMetadata implements ToXContentObject, Writeable {
private String name;
private long length;
private long recovered;
private boolean reused;
public FileMetadata(String name, long length, boolean reused) {
assert name != null;
this.name = name;
this.length = length;
this.reused = reused;
}
public FileMetadata(StreamInput in) throws IOException {
name = in.readString();
length = in.readVLong();
recovered = in.readVLong();
reused = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeVLong(length);
out.writeVLong(recovered);
out.writeBoolean(reused);
}
public void addRecoveredBytes(long bytes) {
assert reused == false : "file is marked as reused, can't update recovered bytes";
assert bytes >= 0 : "can't recovered negative bytes. got [" + bytes + "]";
recovered += bytes;
}
/**
* file name
*/
public String name() {
return name;
}
/**
* file length
*/
public long length() {
return length;
}
/**
* number of bytes recovered for this file (so far). 0 if the file is reused
*/
public long recovered() {
return recovered;
}
/**
* returns true if the file is reused from a local copy
*/
public boolean reused() {
return reused;
}
public boolean fullyRecovered() {
return reused == false && length == recovered;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(Fields.NAME, name);
builder.humanReadableField(Fields.LENGTH_IN_BYTES, Fields.LENGTH, new ByteSizeValue(length));
builder.field(Fields.REUSED, reused);
builder.humanReadableField(Fields.RECOVERED_IN_BYTES, Fields.RECOVERED, new ByteSizeValue(recovered));
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof FileMetadata) {
FileMetadata other = (FileMetadata) obj;
return name.equals(other.name) && length == other.length() && reused == other.reused() && recovered == other.recovered();
}
return false;
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + Long.hashCode(length);
result = 31 * result + Long.hashCode(recovered);
result = 31 * result + (reused ? 1 : 0);
return result;
}
@Override
public String toString() {
return "file (name [" + name + "], reused [" + reused + "], length [" + length + "], recovered [" + recovered + "])";
}
}
/**
* Duplicates many of Field names in {@link RecoveryState}
*/
static final class Fields {
static final String TOTAL_TIME = "total_time";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String NAME = "name";
static final String RECOVERED = "recovered";
static final String RECOVERED_IN_BYTES = "recovered_in_bytes";
static final String LENGTH = "length";
static final String LENGTH_IN_BYTES = "length_in_bytes";
static final String FILES = "files";
static final String TOTAL = "total";
static final String TOTAL_IN_BYTES = "total_in_bytes";
static final String REUSED = "reused";
static final String REUSED_IN_BYTES = "reused_in_bytes";
static final String PERCENT = "percent";
static final String DETAILS = "details";
static final String SIZE = "size";
static final String SOURCE_THROTTLE_TIME = "source_throttle_time";
static final String SOURCE_THROTTLE_TIME_IN_MILLIS = "source_throttle_time_in_millis";
static final String TARGET_THROTTLE_TIME = "target_throttle_time";
static final String TARGET_THROTTLE_TIME_IN_MILLIS = "target_throttle_time_in_millis";
}
}

View File

@ -133,6 +133,7 @@ import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotId;
@ -3139,7 +3140,7 @@ public class IndexShardTests extends IndexShardTestCase {
RecoveryState recoveryState = targetShard.recoveryState();
assertEquals(RecoveryState.Stage.DONE, recoveryState.getStage());
assertTrue(recoveryState.getIndex().fileDetails().size() > 0);
for (RecoveryState.FileDetail file : recoveryState.getIndex().fileDetails()) {
for (ReplicationLuceneIndex.FileMetadata file : recoveryState.getIndex().fileDetails()) {
if (file.reused()) {
assertEquals(file.recovered(), 0);
} else {

View File

@ -61,7 +61,7 @@ import org.opensearch.index.engine.Engine;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
@ -108,7 +108,7 @@ public class StoreRecoveryTests extends OpenSearchTestCase {
writer.close();
}
StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger);
RecoveryState.Index indexStats = new RecoveryState.Index();
ReplicationLuceneIndex indexStats = new ReplicationLuceneIndex();
Directory target = newFSDirectory(createTempDir());
final long maxSeqNo = randomNonNegativeLong();
final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong();
@ -174,7 +174,7 @@ public class StoreRecoveryTests extends OpenSearchTestCase {
writer.commit();
writer.close();
StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger);
RecoveryState.Index indexStats = new RecoveryState.Index();
ReplicationLuceneIndex indexStats = new ReplicationLuceneIndex();
Directory target = newFSDirectory(createTempDir());
final long maxSeqNo = randomNonNegativeLong();
final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong();
@ -250,7 +250,7 @@ public class StoreRecoveryTests extends OpenSearchTestCase {
public void testStatsDirWrapper() throws IOException {
Directory dir = newDirectory();
Directory target = newDirectory();
RecoveryState.Index indexStats = new RecoveryState.Index();
ReplicationLuceneIndex indexStats = new ReplicationLuceneIndex();
StoreRecovery.StatsDirectoryWrapper wrapper = new StoreRecovery.StatsDirectoryWrapper(target, indexStats);
try (IndexOutput output = dir.createOutput("foo.bar", IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, "foo", 0);

View File

@ -94,6 +94,7 @@ import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.test.CorruptionUtils;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;
@ -189,12 +190,14 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
writer.close();
Store.MetadataSnapshot metadata = store.getMetadata(null);
ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex();
List<StoreFileMetadata> metas = new ArrayList<>();
for (StoreFileMetadata md : metadata) {
metas.add(md);
luceneIndex.addFileDetail(md.name(), md.length(), false);
}
Store targetStore = newStore(createTempDir());
MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {});
MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, luceneIndex, "", logger, () -> {});
RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
@Override
public void writeFileChunk(
@ -508,10 +511,12 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
writer.commit();
writer.close();
ReplicationLuceneIndex luceneIndex = new ReplicationLuceneIndex();
Store.MetadataSnapshot metadata = store.getMetadata(null);
List<StoreFileMetadata> metas = new ArrayList<>();
for (StoreFileMetadata md : metadata) {
metas.add(md);
luceneIndex.addFileDetail(md.name(), md.length(), false);
}
CorruptionUtils.corruptFile(
@ -522,7 +527,7 @@ public class RecoverySourceHandlerTests extends OpenSearchTestCase {
)
);
Store targetStore = newStore(createTempDir(), false);
MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, mock(RecoveryState.Index.class), "", logger, () -> {});
MultiFileWriter multiFileWriter = new MultiFileWriter(targetStore, luceneIndex, "", logger, () -> {});
RecoveryTargetHandler target = new TestRecoveryTargetHandler() {
@Override
public void writeFileChunk(

View File

@ -41,8 +41,8 @@ import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoveryState.FileDetail;
import org.opensearch.indices.recovery.RecoveryState.Index;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex.FileMetadata;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.recovery.RecoveryState.Translog;
import org.opensearch.indices.recovery.RecoveryState.VerifyIndex;
@ -136,11 +136,11 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
public void testIndexTimer() throws Throwable {
AtomicBoolean stop = new AtomicBoolean();
Index index = new Index();
Streamer<Index> streamer = new Streamer<>(stop, index) {
ReplicationLuceneIndex index = new ReplicationLuceneIndex();
Streamer<ReplicationLuceneIndex> streamer = new Streamer<>(stop, index) {
@Override
Index createObj(StreamInput in) throws IOException {
return new Index(in);
ReplicationLuceneIndex createObj(StreamInput in) throws IOException {
return new ReplicationLuceneIndex(in);
}
};
doTimerTest(index, streamer);
@ -200,8 +200,8 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
}
public void testIndex() throws Throwable {
FileDetail[] files = new FileDetail[randomIntBetween(1, 20)];
ArrayList<FileDetail> filesToRecover = new ArrayList<>();
FileMetadata[] files = new FileMetadata[randomIntBetween(1, 20)];
ArrayList<FileMetadata> filesToRecover = new ArrayList<>();
long totalFileBytes = 0;
long totalReusedBytes = 0;
int totalReused = 0;
@ -209,7 +209,7 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
final int fileLength = randomIntBetween(1, 1000);
final boolean reused = randomBoolean();
totalFileBytes += fileLength;
files[i] = new FileDetail("f_" + i, fileLength, reused);
files[i] = new FileMetadata("f_" + i, fileLength, reused);
if (reused) {
totalReused++;
totalReusedBytes += fileLength;
@ -219,7 +219,7 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
}
Collections.shuffle(Arrays.asList(files), random());
final RecoveryState.Index index = new RecoveryState.Index();
final ReplicationLuceneIndex index = new ReplicationLuceneIndex();
assertThat(index.bytesStillToRecover(), equalTo(-1L));
if (randomBoolean()) {
@ -246,11 +246,11 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
// before we start we must report 0
assertThat(index.recoveredFilesPercent(), equalTo((float) 0.0));
assertThat(index.recoveredBytesPercent(), equalTo((float) 0.0));
assertThat(index.sourceThrottling().nanos(), equalTo(Index.UNKNOWN));
assertThat(index.targetThrottling().nanos(), equalTo(Index.UNKNOWN));
assertThat(index.sourceThrottling().nanos(), equalTo(ReplicationLuceneIndex.UNKNOWN));
assertThat(index.targetThrottling().nanos(), equalTo(ReplicationLuceneIndex.UNKNOWN));
index.start();
for (FileDetail file : files) {
for (FileMetadata file : files) {
index.addFileDetail(file.name(), file.length(), file.reused());
}
@ -278,24 +278,24 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
}
AtomicBoolean streamShouldStop = new AtomicBoolean();
Streamer<Index> backgroundReader = new Streamer<RecoveryState.Index>(streamShouldStop, index) {
Streamer<ReplicationLuceneIndex> backgroundReader = new Streamer<ReplicationLuceneIndex>(streamShouldStop, index) {
@Override
Index createObj(StreamInput in) throws IOException {
return new Index(in);
ReplicationLuceneIndex createObj(StreamInput in) throws IOException {
return new ReplicationLuceneIndex(in);
}
};
backgroundReader.start();
long recoveredBytes = 0;
long sourceThrottling = Index.UNKNOWN;
long targetThrottling = Index.UNKNOWN;
long sourceThrottling = ReplicationLuceneIndex.UNKNOWN;
long targetThrottling = ReplicationLuceneIndex.UNKNOWN;
while (bytesToRecover > 0) {
FileDetail file = randomFrom(filesToRecover);
FileMetadata file = randomFrom(filesToRecover);
final long toRecover = Math.min(bytesToRecover, randomIntBetween(1, (int) (file.length() - file.recovered())));
final long throttledOnSource = rarely() ? randomIntBetween(10, 200) : 0;
index.addSourceThrottling(throttledOnSource);
if (sourceThrottling == Index.UNKNOWN) {
if (sourceThrottling == ReplicationLuceneIndex.UNKNOWN) {
sourceThrottling = throttledOnSource;
} else {
sourceThrottling += throttledOnSource;
@ -303,7 +303,7 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
index.addRecoveredBytesToFile(file.name(), toRecover);
file.addRecoveredBytes(toRecover);
final long throttledOnTarget = rarely() ? randomIntBetween(10, 200) : 0;
if (targetThrottling == Index.UNKNOWN) {
if (targetThrottling == ReplicationLuceneIndex.UNKNOWN) {
targetThrottling = throttledOnTarget;
} else {
targetThrottling += throttledOnTarget;
@ -325,7 +325,7 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
logger.info("testing serialized information");
streamShouldStop.set(true);
backgroundReader.join();
final Index lastRead = backgroundReader.lastRead();
final ReplicationLuceneIndex lastRead = backgroundReader.lastRead();
assertThat(lastRead.fileDetails().toArray(), arrayContainingInAnyOrder(index.fileDetails().toArray()));
assertThat(lastRead.startTime(), equalTo(index.startTime()));
if (completeRecovery) {
@ -543,12 +543,12 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
}
public void testConcurrentModificationIndexFileDetailsMap() throws InterruptedException {
final Index index = new Index();
final ReplicationLuceneIndex index = new ReplicationLuceneIndex();
final AtomicBoolean stop = new AtomicBoolean(false);
Streamer<Index> readWriteIndex = new Streamer<Index>(stop, index) {
Streamer<ReplicationLuceneIndex> readWriteIndex = new Streamer<ReplicationLuceneIndex>(stop, index) {
@Override
Index createObj(StreamInput in) throws IOException {
return new Index(in);
ReplicationLuceneIndex createObj(StreamInput in) throws IOException {
return new ReplicationLuceneIndex(in);
}
};
Thread modifyThread = new Thread() {
@ -568,14 +568,14 @@ public class RecoveryTargetTests extends OpenSearchTestCase {
}
public void testFileHashCodeAndEquals() {
FileDetail f = new FileDetail("foo", randomIntBetween(0, 100), randomBoolean());
FileDetail anotherFile = new FileDetail(f.name(), f.length(), f.reused());
FileMetadata f = new FileMetadata("foo", randomIntBetween(0, 100), randomBoolean());
FileMetadata anotherFile = new FileMetadata(f.name(), f.length(), f.reused());
assertEquals(f, anotherFile);
assertEquals(f.hashCode(), anotherFile.hashCode());
int iters = randomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
f = new FileDetail("foo", randomIntBetween(0, 100), randomBoolean());
anotherFile = new FileDetail(f.name(), randomIntBetween(0, 100), randomBoolean());
f = new FileMetadata("foo", randomIntBetween(0, 100), randomBoolean());
anotherFile = new FileMetadata(f.name(), randomIntBetween(0, 100), randomBoolean());
if (f.equals(anotherFile)) {
assertEquals(f.hashCode(), anotherFile.hashCode());
} else if (f.hashCode() != anotherFile.hashCode()) {

View File

@ -68,6 +68,7 @@ import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.Store;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.blobstore.BlobStoreTestUtil;
import org.opensearch.snapshots.Snapshot;
@ -203,12 +204,12 @@ public class FsRepositoryTests extends OpenSearchTestCase {
futureC.actionGet();
assertEquals(secondState.getIndex().reusedFileCount(), commitFileNames.size() - 2);
assertEquals(secondState.getIndex().recoveredFileCount(), 2);
List<RecoveryState.FileDetail> recoveredFiles = secondState.getIndex()
List<ReplicationLuceneIndex.FileMetadata> recoveredFiles = secondState.getIndex()
.fileDetails()
.stream()
.filter(f -> f.reused() == false)
.collect(Collectors.toList());
Collections.sort(recoveredFiles, Comparator.comparing(RecoveryState.FileDetail::name));
Collections.sort(recoveredFiles, Comparator.comparing(ReplicationLuceneIndex.FileMetadata::name));
assertTrue(recoveredFiles.get(0).name(), recoveredFiles.get(0).name().endsWith(".liv"));
assertTrue(recoveredFiles.get(1).name(), recoveredFiles.get(1).name().endsWith("segments_" + incIndexCommit.getGeneration()));
} finally {

View File

@ -45,6 +45,7 @@ import org.opensearch.common.xcontent.XContentOpenSearchExtension;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.test.OpenSearchTestCase;
@ -91,21 +92,7 @@ public class RestRecoveryActionTests extends OpenSearchTestCase {
when(targetNode.getHostName()).thenReturn(randomAlphaOfLength(8));
when(state.getTargetNode()).thenReturn(targetNode);
RecoveryState.Index index = mock(RecoveryState.Index.class);
final int totalRecoveredFiles = randomIntBetween(1, 64);
when(index.totalRecoverFiles()).thenReturn(totalRecoveredFiles);
final int recoveredFileCount = randomIntBetween(0, totalRecoveredFiles);
when(index.recoveredFileCount()).thenReturn(recoveredFileCount);
when(index.recoveredFilesPercent()).thenReturn((100f * recoveredFileCount) / totalRecoveredFiles);
when(index.totalFileCount()).thenReturn(randomIntBetween(totalRecoveredFiles, 2 * totalRecoveredFiles));
final int totalRecoveredBytes = randomIntBetween(1, 1 << 24);
when(index.totalRecoverBytes()).thenReturn((long) totalRecoveredBytes);
final int recoveredBytes = randomIntBetween(0, totalRecoveredBytes);
when(index.recoveredBytes()).thenReturn((long) recoveredBytes);
when(index.recoveredBytesPercent()).thenReturn((100f * recoveredBytes) / totalRecoveredBytes);
when(index.totalRecoverBytes()).thenReturn((long) randomIntBetween(totalRecoveredBytes, 2 * totalRecoveredBytes));
ReplicationLuceneIndex index = createTestIndex();
when(state.getIndex()).thenReturn(index);
final RecoveryState.Translog translog = mock(RecoveryState.Translog.class);
@ -214,6 +201,36 @@ public class RestRecoveryActionTests extends OpenSearchTestCase {
}
}
private ReplicationLuceneIndex createTestIndex() {
ReplicationLuceneIndex index = new ReplicationLuceneIndex();
final int filesToRecoverCount = randomIntBetween(1, 64);
final int recoveredFilesCount = randomIntBetween(0, filesToRecoverCount);
addTestFileMetadata(index, 0, recoveredFilesCount, false, true);
addTestFileMetadata(index, recoveredFilesCount, filesToRecoverCount, false, false);
final int totalFilesCount = randomIntBetween(filesToRecoverCount, 2 * filesToRecoverCount);
addTestFileMetadata(index, filesToRecoverCount, totalFilesCount, true, false);
return index;
}
private void addTestFileMetadata(ReplicationLuceneIndex index, int startIndex, int endIndex, boolean reused, boolean isFullyRecovered) {
for (int i = startIndex; i < endIndex; i++) {
final int completeFileSize = randomIntBetween(1, 1024);
index.addFileDetail(String.valueOf(i), completeFileSize, reused);
if (!reused) {
final int recoveredFileSize;
if (isFullyRecovered) {
recoveredFileSize = completeFileSize;
} else {
recoveredFileSize = randomIntBetween(0, completeFileSize);
}
index.addRecoveredBytesToFile(String.valueOf(i), recoveredFileSize);
}
}
}
private static String percent(float percent) {
return String.format(Locale.ROOT, "%1.1f%%", percent);
}