mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 01:19:02 +00:00
Include size of snapshot in snapshot metadata (#29602)
Include size of snapshot in snapshot metadata Adds difference of number of files (and file sizes) between prev and current snapshot. Total number/size reflects total number/size of files in snapshot. Closes #18543
This commit is contained in:
parent
dcff63e69c
commit
81eb8ba0f0
@ -35,7 +35,7 @@ Elasticsearch 6.x in order to be readable by Elasticsearch 7.x.
|
||||
* <<breaking_70_java_changes>>
|
||||
* <<breaking_70_settings_changes>>
|
||||
* <<breaking_70_scripting_changes>>
|
||||
|
||||
* <<breaking_70_snapshotstats_changes>>
|
||||
|
||||
include::migrate_7_0/aggregations.asciidoc[]
|
||||
include::migrate_7_0/analysis.asciidoc[]
|
||||
@ -49,3 +49,4 @@ include::migrate_7_0/api.asciidoc[]
|
||||
include::migrate_7_0/java.asciidoc[]
|
||||
include::migrate_7_0/settings.asciidoc[]
|
||||
include::migrate_7_0/scripting.asciidoc[]
|
||||
include::migrate_7_0/snapshotstats.asciidoc[]
|
13
docs/reference/migration/migrate_7_0/snapshotstats.asciidoc
Normal file
13
docs/reference/migration/migrate_7_0/snapshotstats.asciidoc
Normal file
@ -0,0 +1,13 @@
|
||||
[[breaking_70_snapshotstats_changes]]
|
||||
=== Snapshot stats changes
|
||||
|
||||
Snapshot stats details are provided in a new structured way:
|
||||
|
||||
* `total` section for all the files that are referenced by the snapshot.
|
||||
* `incremental` section for those files that actually needed to be copied over as part of the incremental snapshotting.
|
||||
* In case of a snapshot that's still in progress, there's also a `processed` section for files that are in the process of being copied.
|
||||
|
||||
==== Deprecated `number_of_files`, `processed_files`, `total_size_in_bytes` and `processed_size_in_bytes` snapshot stats properties have been removed
|
||||
|
||||
* Properties `number_of_files` and `total_size_in_bytes` are removed and should be replaced by values of nested object `total`.
|
||||
* Properties `processed_files` and `processed_size_in_bytes` are removed and should be replaced by values of nested object `processed`.
|
@ -563,6 +563,62 @@ GET /_snapshot/my_backup/snapshot_1/_status
|
||||
// CONSOLE
|
||||
// TEST[continued]
|
||||
|
||||
The output looks similar to the following:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"snapshots": [
|
||||
{
|
||||
"snapshot": "snapshot_1",
|
||||
"repository": "my_backup",
|
||||
"uuid": "XuBo4l4ISYiVg0nYUen9zg",
|
||||
"state": "SUCCESS",
|
||||
"include_global_state": true,
|
||||
"shards_stats": {
|
||||
"initializing": 0,
|
||||
"started": 0,
|
||||
"finalizing": 0,
|
||||
"done": 5,
|
||||
"failed": 0,
|
||||
"total": 5
|
||||
},
|
||||
"stats": {
|
||||
"incremental": {
|
||||
"file_count": 8,
|
||||
"size_in_bytes": 4704
|
||||
},
|
||||
"processed": {
|
||||
"file_count": 7,
|
||||
"size_in_bytes": 4254
|
||||
},
|
||||
"total": {
|
||||
"file_count": 8,
|
||||
"size_in_bytes": 4704
|
||||
},
|
||||
"start_time_in_millis": 1526280280355,
|
||||
"time_in_millis": 358,
|
||||
|
||||
"number_of_files": 8,
|
||||
"processed_files": 8,
|
||||
"total_size_in_bytes": 4704,
|
||||
"processed_size_in_bytes": 4704
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE
|
||||
|
||||
The output is composed of different sections. The `stats` sub-object provides details on the number and size of files that were
|
||||
snapshotted. As snapshots are incremental, copying only the Lucene segments that are not already in the repository,
|
||||
the `stats` object contains a `total` section for all the files that are referenced by the snapshot, as well as an `incremental` section
|
||||
for those files that actually needed to be copied over as part of the incremental snapshotting. In case of a snapshot that's still
|
||||
in progress, there's also a `processed` section that contains information about the files that are in the process of being copied.
|
||||
|
||||
_Note_: Properties `number_of_files`, `processed_files`, `total_size_in_bytes` and `processed_size_in_bytes` are used for
|
||||
backward compatibility reasons with older 5.x and 6.x versions. These fields will be removed in Elasticsearch v7.0.0.
|
||||
|
||||
Multiple ids are also supported:
|
||||
|
||||
[source,sh]
|
||||
|
@ -11,7 +11,9 @@ setup:
|
||||
|
||||
---
|
||||
"Get snapshot status":
|
||||
|
||||
- skip:
|
||||
version: " - 6.99.99"
|
||||
reason: "backporting in progress: https://github.com/elastic/elasticsearch/pull/29602"
|
||||
- do:
|
||||
indices.create:
|
||||
index: test_index
|
||||
@ -32,6 +34,42 @@ setup:
|
||||
snapshot: test_snapshot
|
||||
|
||||
- is_true: snapshots
|
||||
- match: { snapshots.0.snapshot: test_snapshot }
|
||||
- match: { snapshots.0.state: SUCCESS }
|
||||
- gt: { snapshots.0.stats.incremental.file_count: 0 }
|
||||
- gt: { snapshots.0.stats.incremental.size_in_bytes: 0 }
|
||||
- gt: { snapshots.0.stats.total.file_count: 0 }
|
||||
- is_true: snapshots.0.stats.start_time_in_millis
|
||||
- is_true: snapshots.0.stats.time_in_millis
|
||||
|
||||
---
|
||||
"Get snapshot status with BWC fields":
|
||||
- do:
|
||||
indices.create:
|
||||
index: test_index
|
||||
body:
|
||||
settings:
|
||||
number_of_shards: 1
|
||||
number_of_replicas: 0
|
||||
|
||||
- do:
|
||||
snapshot.create:
|
||||
repository: test_repo_status_1
|
||||
snapshot: test_snapshot_bwc
|
||||
wait_for_completion: true
|
||||
|
||||
- do:
|
||||
snapshot.status:
|
||||
repository: test_repo_status_1
|
||||
snapshot: test_snapshot_bwc
|
||||
|
||||
- is_true: snapshots
|
||||
- match: { snapshots.0.snapshot: test_snapshot_bwc }
|
||||
- match: { snapshots.0.state: SUCCESS }
|
||||
- gt: { snapshots.0.stats.number_of_files: 0 }
|
||||
- gt: { snapshots.0.stats.processed_files: 0 }
|
||||
- gt: { snapshots.0.stats.total_size_in_bytes: 0 }
|
||||
- gt: { snapshots.0.stats.processed_size_in_bytes: 0 }
|
||||
|
||||
---
|
||||
"Get missing snapshot status throws an exception":
|
||||
|
@ -74,8 +74,8 @@ public class SnapshotIndexShardStatus extends BroadcastShardResponse implements
|
||||
throw new IllegalArgumentException("Unknown stage type " + indexShardStatus.getStage());
|
||||
}
|
||||
this.stats = new SnapshotStats(indexShardStatus.getStartTime(), indexShardStatus.getTotalTime(),
|
||||
indexShardStatus.getNumberOfFiles(), indexShardStatus.getProcessedFiles(),
|
||||
indexShardStatus.getTotalSize(), indexShardStatus.getProcessedSize());
|
||||
indexShardStatus.getIncrementalFileCount(), indexShardStatus.getTotalFileCount(), indexShardStatus.getProcessedFileCount(),
|
||||
indexShardStatus.getIncrementalSize(), indexShardStatus.getTotalSize(), indexShardStatus.getProcessedSize());
|
||||
this.failure = indexShardStatus.getFailure();
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.action.admin.cluster.snapshots.status;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
@ -34,19 +35,25 @@ public class SnapshotStats implements Streamable, ToXContentFragment {
|
||||
|
||||
private long startTime;
|
||||
private long time;
|
||||
private int numberOfFiles;
|
||||
private int processedFiles;
|
||||
private int incrementalFileCount;
|
||||
private int totalFileCount;
|
||||
private int processedFileCount;
|
||||
private long incrementalSize;
|
||||
private long totalSize;
|
||||
private long processedSize;
|
||||
|
||||
SnapshotStats() {
|
||||
}
|
||||
|
||||
SnapshotStats(long startTime, long time, int numberOfFiles, int processedFiles, long totalSize, long processedSize) {
|
||||
SnapshotStats(long startTime, long time,
|
||||
int incrementalFileCount, int totalFileCount, int processedFileCount,
|
||||
long incrementalSize, long totalSize, long processedSize) {
|
||||
this.startTime = startTime;
|
||||
this.time = time;
|
||||
this.numberOfFiles = numberOfFiles;
|
||||
this.processedFiles = processedFiles;
|
||||
this.incrementalFileCount = incrementalFileCount;
|
||||
this.totalFileCount = totalFileCount;
|
||||
this.processedFileCount = processedFileCount;
|
||||
this.incrementalSize = incrementalSize;
|
||||
this.totalSize = totalSize;
|
||||
this.processedSize = processedSize;
|
||||
}
|
||||
@ -66,17 +73,31 @@ public class SnapshotStats implements Streamable, ToXContentFragment {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns number of files in the snapshot
|
||||
* Returns incremental file count of the snapshot
|
||||
*/
|
||||
public int getNumberOfFiles() {
|
||||
return numberOfFiles;
|
||||
public int getIncrementalFileCount() {
|
||||
return incrementalFileCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total number of files in the snapshot
|
||||
*/
|
||||
public int getTotalFileCount() {
|
||||
return totalFileCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns number of files in the snapshot that were processed so far
|
||||
*/
|
||||
public int getProcessedFiles() {
|
||||
return processedFiles;
|
||||
public int getProcessedFileCount() {
|
||||
return processedFileCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return incremental files size of the snapshot
|
||||
*/
|
||||
public long getIncrementalSize() {
|
||||
return incrementalSize;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -105,11 +126,16 @@ public class SnapshotStats implements Streamable, ToXContentFragment {
|
||||
out.writeVLong(startTime);
|
||||
out.writeVLong(time);
|
||||
|
||||
out.writeVInt(numberOfFiles);
|
||||
out.writeVInt(processedFiles);
|
||||
out.writeVInt(incrementalFileCount);
|
||||
out.writeVInt(processedFileCount);
|
||||
|
||||
out.writeVLong(totalSize);
|
||||
out.writeVLong(incrementalSize);
|
||||
out.writeVLong(processedSize);
|
||||
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
out.writeVInt(totalFileCount);
|
||||
out.writeVLong(totalSize);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -117,47 +143,92 @@ public class SnapshotStats implements Streamable, ToXContentFragment {
|
||||
startTime = in.readVLong();
|
||||
time = in.readVLong();
|
||||
|
||||
numberOfFiles = in.readVInt();
|
||||
processedFiles = in.readVInt();
|
||||
incrementalFileCount = in.readVInt();
|
||||
processedFileCount = in.readVInt();
|
||||
|
||||
totalSize = in.readVLong();
|
||||
incrementalSize = in.readVLong();
|
||||
processedSize = in.readVLong();
|
||||
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
totalFileCount = in.readVInt();
|
||||
totalSize = in.readVLong();
|
||||
} else {
|
||||
totalFileCount = incrementalFileCount;
|
||||
totalSize = incrementalSize;
|
||||
}
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final String STATS = "stats";
|
||||
static final String NUMBER_OF_FILES = "number_of_files";
|
||||
static final String PROCESSED_FILES = "processed_files";
|
||||
static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
|
||||
static final String TOTAL_SIZE = "total_size";
|
||||
static final String PROCESSED_SIZE_IN_BYTES = "processed_size_in_bytes";
|
||||
static final String PROCESSED_SIZE = "processed_size";
|
||||
|
||||
static final String INCREMENTAL = "incremental";
|
||||
static final String PROCESSED = "processed";
|
||||
static final String TOTAL = "total";
|
||||
|
||||
static final String FILE_COUNT = "file_count";
|
||||
static final String SIZE = "size";
|
||||
static final String SIZE_IN_BYTES = "size_in_bytes";
|
||||
|
||||
static final String START_TIME_IN_MILLIS = "start_time_in_millis";
|
||||
static final String TIME_IN_MILLIS = "time_in_millis";
|
||||
static final String TIME = "time";
|
||||
|
||||
// BWC
|
||||
static final String NUMBER_OF_FILES = "number_of_files";
|
||||
static final String PROCESSED_FILES = "processed_files";
|
||||
static final String TOTAL_SIZE = "total_size";
|
||||
static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
|
||||
static final String PROCESSED_SIZE_IN_BYTES = "processed_size_in_bytes";
|
||||
static final String PROCESSED_SIZE = "processed_size";
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject(Fields.STATS);
|
||||
builder.field(Fields.NUMBER_OF_FILES, getNumberOfFiles());
|
||||
builder.field(Fields.PROCESSED_FILES, getProcessedFiles());
|
||||
builder.humanReadableField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL_SIZE, new ByteSizeValue(getTotalSize()));
|
||||
builder.humanReadableField(Fields.PROCESSED_SIZE_IN_BYTES, Fields.PROCESSED_SIZE, new ByteSizeValue(getProcessedSize()));
|
||||
builder.field(Fields.START_TIME_IN_MILLIS, getStartTime());
|
||||
builder.humanReadableField(Fields.TIME_IN_MILLIS, Fields.TIME, new TimeValue(getTime()));
|
||||
builder.endObject();
|
||||
return builder;
|
||||
builder.startObject(Fields.STATS)
|
||||
// incremental starts
|
||||
.startObject(Fields.INCREMENTAL)
|
||||
.field(Fields.FILE_COUNT, getIncrementalFileCount())
|
||||
.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, new ByteSizeValue(getIncrementalSize()))
|
||||
// incremental ends
|
||||
.endObject();
|
||||
|
||||
if (getProcessedFileCount() != getIncrementalFileCount()) {
|
||||
// processed starts
|
||||
builder.startObject(Fields.PROCESSED)
|
||||
.field(Fields.FILE_COUNT, getProcessedFileCount())
|
||||
.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, new ByteSizeValue(getProcessedSize()))
|
||||
// processed ends
|
||||
.endObject();
|
||||
}
|
||||
// total starts
|
||||
builder.startObject(Fields.TOTAL)
|
||||
.field(Fields.FILE_COUNT, getTotalFileCount())
|
||||
.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, new ByteSizeValue(getTotalSize()))
|
||||
// total ends
|
||||
.endObject();
|
||||
// timings stats
|
||||
builder.field(Fields.START_TIME_IN_MILLIS, getStartTime())
|
||||
.humanReadableField(Fields.TIME_IN_MILLIS, Fields.TIME, new TimeValue(getTime()));
|
||||
|
||||
// BWC part
|
||||
return builder.field(Fields.NUMBER_OF_FILES, getIncrementalFileCount())
|
||||
.field(Fields.PROCESSED_FILES, getProcessedFileCount())
|
||||
.humanReadableField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL_SIZE, new ByteSizeValue(getIncrementalSize()))
|
||||
.humanReadableField(Fields.PROCESSED_SIZE_IN_BYTES, Fields.PROCESSED_SIZE, new ByteSizeValue(getProcessedSize()))
|
||||
// BWC part ends
|
||||
.endObject();
|
||||
}
|
||||
|
||||
void add(SnapshotStats stats) {
|
||||
numberOfFiles += stats.numberOfFiles;
|
||||
processedFiles += stats.processedFiles;
|
||||
incrementalFileCount += stats.incrementalFileCount;
|
||||
totalFileCount += stats.totalFileCount;
|
||||
processedFileCount += stats.processedFileCount;
|
||||
|
||||
incrementalSize += stats.incrementalSize;
|
||||
totalSize += stats.totalSize;
|
||||
processedSize += stats.processedSize;
|
||||
|
||||
|
||||
if (startTime == 0) {
|
||||
// First time here
|
||||
startTime = stats.startTime;
|
||||
|
@ -60,31 +60,39 @@ public class IndexShardSnapshotStatus {
|
||||
private final AtomicReference<Stage> stage;
|
||||
private long startTime;
|
||||
private long totalTime;
|
||||
private int numberOfFiles;
|
||||
private int processedFiles;
|
||||
private int incrementalFileCount;
|
||||
private int totalFileCount;
|
||||
private int processedFileCount;
|
||||
private long totalSize;
|
||||
private long incrementalSize;
|
||||
private long processedSize;
|
||||
private long indexVersion;
|
||||
private String failure;
|
||||
|
||||
private IndexShardSnapshotStatus(final Stage stage, final long startTime, final long totalTime,
|
||||
final int numberOfFiles, final int processedFiles, final long totalSize, final long processedSize,
|
||||
final int incrementalFileCount, final int totalFileCount, final int processedFileCount,
|
||||
final long incrementalSize, final long totalSize, final long processedSize,
|
||||
final long indexVersion, final String failure) {
|
||||
this.stage = new AtomicReference<>(Objects.requireNonNull(stage));
|
||||
this.startTime = startTime;
|
||||
this.totalTime = totalTime;
|
||||
this.numberOfFiles = numberOfFiles;
|
||||
this.processedFiles = processedFiles;
|
||||
this.incrementalFileCount = incrementalFileCount;
|
||||
this.totalFileCount = totalFileCount;
|
||||
this.processedFileCount = processedFileCount;
|
||||
this.totalSize = totalSize;
|
||||
this.processedSize = processedSize;
|
||||
this.incrementalSize = incrementalSize;
|
||||
this.indexVersion = indexVersion;
|
||||
this.failure = failure;
|
||||
}
|
||||
|
||||
public synchronized Copy moveToStarted(final long startTime, final int numberOfFiles, final long totalSize) {
|
||||
public synchronized Copy moveToStarted(final long startTime, final int incrementalFileCount, final int totalFileCount,
|
||||
final long incrementalSize, final long totalSize) {
|
||||
if (stage.compareAndSet(Stage.INIT, Stage.STARTED)) {
|
||||
this.startTime = startTime;
|
||||
this.numberOfFiles = numberOfFiles;
|
||||
this.incrementalFileCount = incrementalFileCount;
|
||||
this.totalFileCount = totalFileCount;
|
||||
this.incrementalSize = incrementalSize;
|
||||
this.totalSize = totalSize;
|
||||
} else {
|
||||
throw new IllegalStateException("Unable to move the shard snapshot status to [STARTED]: " +
|
||||
@ -135,7 +143,7 @@ public class IndexShardSnapshotStatus {
|
||||
* Increments number of processed files
|
||||
*/
|
||||
public synchronized void addProcessedFile(long size) {
|
||||
processedFiles++;
|
||||
processedFileCount++;
|
||||
processedSize += size;
|
||||
}
|
||||
|
||||
@ -146,12 +154,14 @@ public class IndexShardSnapshotStatus {
|
||||
* @return a {@link IndexShardSnapshotStatus.Copy}
|
||||
*/
|
||||
public synchronized IndexShardSnapshotStatus.Copy asCopy() {
|
||||
return new IndexShardSnapshotStatus.Copy(stage.get(), startTime, totalTime, numberOfFiles, processedFiles, totalSize, processedSize,
|
||||
indexVersion, failure);
|
||||
return new IndexShardSnapshotStatus.Copy(stage.get(), startTime, totalTime,
|
||||
incrementalFileCount, totalFileCount, processedFileCount,
|
||||
incrementalSize, totalSize, processedSize,
|
||||
indexVersion, failure);
|
||||
}
|
||||
|
||||
public static IndexShardSnapshotStatus newInitializing() {
|
||||
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, null);
|
||||
return new IndexShardSnapshotStatus(Stage.INIT, 0L, 0L, 0, 0, 0, 0, 0, 0, 0, null);
|
||||
}
|
||||
|
||||
public static IndexShardSnapshotStatus newFailed(final String failure) {
|
||||
@ -159,12 +169,15 @@ public class IndexShardSnapshotStatus {
|
||||
if (failure == null) {
|
||||
throw new IllegalArgumentException("A failure description is required for a failed IndexShardSnapshotStatus");
|
||||
}
|
||||
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, failure);
|
||||
return new IndexShardSnapshotStatus(Stage.FAILURE, 0L, 0L, 0, 0, 0, 0, 0, 0, 0, failure);
|
||||
}
|
||||
|
||||
public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime, final int files, final long size) {
|
||||
public static IndexShardSnapshotStatus newDone(final long startTime, final long totalTime,
|
||||
final int incrementalFileCount, final int fileCount,
|
||||
final long incrementalSize, final long size) {
|
||||
// The snapshot is done which means the number of processed files is the same as total
|
||||
return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, files, files, size, size, 0, null);
|
||||
return new IndexShardSnapshotStatus(Stage.DONE, startTime, totalTime, incrementalFileCount, fileCount, incrementalFileCount,
|
||||
incrementalSize, size, incrementalSize, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -175,23 +188,28 @@ public class IndexShardSnapshotStatus {
|
||||
private final Stage stage;
|
||||
private final long startTime;
|
||||
private final long totalTime;
|
||||
private final int numberOfFiles;
|
||||
private final int processedFiles;
|
||||
private final int incrementalFileCount;
|
||||
private final int totalFileCount;
|
||||
private final int processedFileCount;
|
||||
private final long totalSize;
|
||||
private final long processedSize;
|
||||
private final long incrementalSize;
|
||||
private final long indexVersion;
|
||||
private final String failure;
|
||||
|
||||
public Copy(final Stage stage, final long startTime, final long totalTime,
|
||||
final int numberOfFiles, final int processedFiles, final long totalSize, final long processedSize,
|
||||
final int incrementalFileCount, final int totalFileCount, final int processedFileCount,
|
||||
final long incrementalSize, final long totalSize, final long processedSize,
|
||||
final long indexVersion, final String failure) {
|
||||
this.stage = stage;
|
||||
this.startTime = startTime;
|
||||
this.totalTime = totalTime;
|
||||
this.numberOfFiles = numberOfFiles;
|
||||
this.processedFiles = processedFiles;
|
||||
this.incrementalFileCount = incrementalFileCount;
|
||||
this.totalFileCount = totalFileCount;
|
||||
this.processedFileCount = processedFileCount;
|
||||
this.totalSize = totalSize;
|
||||
this.processedSize = processedSize;
|
||||
this.incrementalSize = incrementalSize;
|
||||
this.indexVersion = indexVersion;
|
||||
this.failure = failure;
|
||||
}
|
||||
@ -208,12 +226,20 @@ public class IndexShardSnapshotStatus {
|
||||
return totalTime;
|
||||
}
|
||||
|
||||
public int getNumberOfFiles() {
|
||||
return numberOfFiles;
|
||||
public int getIncrementalFileCount() {
|
||||
return incrementalFileCount;
|
||||
}
|
||||
|
||||
public int getProcessedFiles() {
|
||||
return processedFiles;
|
||||
public int getTotalFileCount() {
|
||||
return totalFileCount;
|
||||
}
|
||||
|
||||
public int getProcessedFileCount() {
|
||||
return processedFileCount;
|
||||
}
|
||||
|
||||
public long getIncrementalSize() {
|
||||
return incrementalSize;
|
||||
}
|
||||
|
||||
public long getTotalSize() {
|
||||
@ -238,8 +264,10 @@ public class IndexShardSnapshotStatus {
|
||||
"stage=" + stage +
|
||||
", startTime=" + startTime +
|
||||
", totalTime=" + totalTime +
|
||||
", numberOfFiles=" + numberOfFiles +
|
||||
", processedFiles=" + processedFiles +
|
||||
", incrementalFileCount=" + incrementalFileCount +
|
||||
", totalFileCount=" + totalFileCount +
|
||||
", processedFileCount=" + processedFileCount +
|
||||
", incrementalSize=" + incrementalSize +
|
||||
", totalSize=" + totalSize +
|
||||
", processedSize=" + processedSize +
|
||||
", indexVersion=" + indexVersion +
|
||||
|
@ -356,25 +356,28 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
||||
|
||||
private final long time;
|
||||
|
||||
private final int numberOfFiles;
|
||||
private final int incrementalFileCount;
|
||||
|
||||
private final long totalSize;
|
||||
private final long incrementalSize;
|
||||
|
||||
private final List<FileInfo> indexFiles;
|
||||
|
||||
/**
|
||||
* Constructs new shard snapshot metadata from snapshot metadata
|
||||
*
|
||||
* @param snapshot snapshot id
|
||||
* @param indexVersion index version
|
||||
* @param indexFiles list of files in the shard
|
||||
* @param startTime snapshot start time
|
||||
* @param time snapshot running time
|
||||
* @param numberOfFiles number of files that where snapshotted
|
||||
* @param totalSize total size of all files snapshotted
|
||||
* @param snapshot snapshot id
|
||||
* @param indexVersion index version
|
||||
* @param indexFiles list of files in the shard
|
||||
* @param startTime snapshot start time
|
||||
* @param time snapshot running time
|
||||
* @param incrementalFileCount incremental of files that were snapshotted
|
||||
* @param incrementalSize incremental size of snapshot
|
||||
*/
|
||||
public BlobStoreIndexShardSnapshot(String snapshot, long indexVersion, List<FileInfo> indexFiles, long startTime, long time,
|
||||
int numberOfFiles, long totalSize) {
|
||||
public BlobStoreIndexShardSnapshot(String snapshot, long indexVersion, List<FileInfo> indexFiles,
|
||||
long startTime, long time,
|
||||
int incrementalFileCount,
|
||||
long incrementalSize
|
||||
) {
|
||||
assert snapshot != null;
|
||||
assert indexVersion >= 0;
|
||||
this.snapshot = snapshot;
|
||||
@ -382,8 +385,8 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
||||
this.indexFiles = Collections.unmodifiableList(new ArrayList<>(indexFiles));
|
||||
this.startTime = startTime;
|
||||
this.time = time;
|
||||
this.numberOfFiles = numberOfFiles;
|
||||
this.totalSize = totalSize;
|
||||
this.incrementalFileCount = incrementalFileCount;
|
||||
this.incrementalSize = incrementalSize;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -395,8 +398,8 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
||||
this.indexFiles = Collections.emptyList();
|
||||
this.startTime = 0;
|
||||
this.time = 0;
|
||||
this.numberOfFiles = 0;
|
||||
this.totalSize = 0;
|
||||
this.incrementalFileCount = 0;
|
||||
this.incrementalSize = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -441,34 +444,51 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns number of files that where snapshotted
|
||||
* Returns incremental of files that were snapshotted
|
||||
*/
|
||||
public int numberOfFiles() {
|
||||
return numberOfFiles;
|
||||
public int incrementalFileCount() {
|
||||
return incrementalFileCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total number of files that are referenced by this snapshot
|
||||
*/
|
||||
public int totalFileCount() {
|
||||
return indexFiles.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns incremental of files size that were snapshotted
|
||||
*/
|
||||
public long incrementalSize() {
|
||||
return incrementalSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total size of all files that where snapshotted
|
||||
*/
|
||||
public long totalSize() {
|
||||
return totalSize;
|
||||
return indexFiles.stream().mapToLong(fi -> fi.metadata().length()).sum();
|
||||
}
|
||||
|
||||
private static final String NAME = "name";
|
||||
private static final String INDEX_VERSION = "index_version";
|
||||
private static final String START_TIME = "start_time";
|
||||
private static final String TIME = "time";
|
||||
private static final String NUMBER_OF_FILES = "number_of_files";
|
||||
private static final String TOTAL_SIZE = "total_size";
|
||||
private static final String FILES = "files";
|
||||
// for the sake of BWC keep the actual property names as in 6.x
|
||||
// + there is a constraint in #fromXContent() that leads to ElasticsearchParseException("unknown parameter [incremental_file_count]");
|
||||
private static final String INCREMENTAL_FILE_COUNT = "number_of_files";
|
||||
private static final String INCREMENTAL_SIZE = "total_size";
|
||||
|
||||
private static final ParseField PARSE_NAME = new ParseField("name");
|
||||
private static final ParseField PARSE_INDEX_VERSION = new ParseField("index_version", "index-version");
|
||||
private static final ParseField PARSE_START_TIME = new ParseField("start_time");
|
||||
private static final ParseField PARSE_TIME = new ParseField("time");
|
||||
private static final ParseField PARSE_NUMBER_OF_FILES = new ParseField("number_of_files");
|
||||
private static final ParseField PARSE_TOTAL_SIZE = new ParseField("total_size");
|
||||
private static final ParseField PARSE_FILES = new ParseField("files");
|
||||
|
||||
private static final ParseField PARSE_NAME = new ParseField(NAME);
|
||||
private static final ParseField PARSE_INDEX_VERSION = new ParseField(INDEX_VERSION, "index-version");
|
||||
private static final ParseField PARSE_START_TIME = new ParseField(START_TIME);
|
||||
private static final ParseField PARSE_TIME = new ParseField(TIME);
|
||||
private static final ParseField PARSE_INCREMENTAL_FILE_COUNT = new ParseField(INCREMENTAL_FILE_COUNT);
|
||||
private static final ParseField PARSE_INCREMENTAL_SIZE = new ParseField(INCREMENTAL_SIZE);
|
||||
private static final ParseField PARSE_FILES = new ParseField(FILES);
|
||||
|
||||
/**
|
||||
* Serializes shard snapshot metadata info into JSON
|
||||
@ -482,8 +502,8 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
||||
builder.field(INDEX_VERSION, indexVersion);
|
||||
builder.field(START_TIME, startTime);
|
||||
builder.field(TIME, time);
|
||||
builder.field(NUMBER_OF_FILES, numberOfFiles);
|
||||
builder.field(TOTAL_SIZE, totalSize);
|
||||
builder.field(INCREMENTAL_FILE_COUNT, incrementalFileCount);
|
||||
builder.field(INCREMENTAL_SIZE, incrementalSize);
|
||||
builder.startArray(FILES);
|
||||
for (FileInfo fileInfo : indexFiles) {
|
||||
FileInfo.toXContent(fileInfo, builder, params);
|
||||
@ -503,8 +523,8 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
||||
long indexVersion = -1;
|
||||
long startTime = 0;
|
||||
long time = 0;
|
||||
int numberOfFiles = 0;
|
||||
long totalSize = 0;
|
||||
int incrementalFileCount = 0;
|
||||
long incrementalSize = 0;
|
||||
|
||||
List<FileInfo> indexFiles = new ArrayList<>();
|
||||
if (parser.currentToken() == null) { // fresh parser? move to the first token
|
||||
@ -526,10 +546,10 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
||||
startTime = parser.longValue();
|
||||
} else if (PARSE_TIME.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
time = parser.longValue();
|
||||
} else if (PARSE_NUMBER_OF_FILES.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
numberOfFiles = parser.intValue();
|
||||
} else if (PARSE_TOTAL_SIZE.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
totalSize = parser.longValue();
|
||||
} else if (PARSE_INCREMENTAL_FILE_COUNT.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
incrementalFileCount = parser.intValue();
|
||||
} else if (PARSE_INCREMENTAL_SIZE.match(currentFieldName, parser.getDeprecationHandler())) {
|
||||
incrementalSize = parser.longValue();
|
||||
} else {
|
||||
throw new ElasticsearchParseException("unknown parameter [{}]", currentFieldName);
|
||||
}
|
||||
@ -549,7 +569,8 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new BlobStoreIndexShardSnapshot(snapshot, indexVersion, Collections.unmodifiableList(indexFiles),
|
||||
startTime, time, numberOfFiles, totalSize);
|
||||
startTime, time, incrementalFileCount, incrementalSize);
|
||||
}
|
||||
}
|
||||
|
@ -818,7 +818,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
|
||||
Context context = new Context(snapshotId, version, indexId, shardId);
|
||||
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
|
||||
return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(), snapshot.numberOfFiles(), snapshot.totalSize());
|
||||
return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(),
|
||||
snapshot.incrementalFileCount(), snapshot.totalFileCount(),
|
||||
snapshot.incrementalSize(), snapshot.totalSize());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1139,9 +1141,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
|
||||
|
||||
store.incRef();
|
||||
int indexIncrementalFileCount = 0;
|
||||
int indexTotalNumberOfFiles = 0;
|
||||
long indexIncrementalSize = 0;
|
||||
long indexTotalFileCount = 0;
|
||||
try {
|
||||
int indexNumberOfFiles = 0;
|
||||
long indexTotalFilesSize = 0;
|
||||
ArrayList<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new ArrayList<>();
|
||||
final Store.MetadataSnapshot metadata;
|
||||
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
|
||||
@ -1182,9 +1186,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
indexTotalFileCount += md.length();
|
||||
indexTotalNumberOfFiles++;
|
||||
|
||||
if (existingFileInfo == null) {
|
||||
indexNumberOfFiles++;
|
||||
indexTotalFilesSize += md.length();
|
||||
indexIncrementalFileCount++;
|
||||
indexIncrementalSize += md.length();
|
||||
// create a new FileInfo
|
||||
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = new BlobStoreIndexShardSnapshot.FileInfo(fileNameFromGeneration(++generation), md, chunkSize());
|
||||
indexCommitPointFiles.add(snapshotFileInfo);
|
||||
@ -1194,7 +1202,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
}
|
||||
}
|
||||
|
||||
snapshotStatus.moveToStarted(startTime, indexNumberOfFiles, indexTotalFilesSize);
|
||||
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
|
||||
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
|
||||
|
||||
for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
|
||||
try {
|
||||
@ -1217,8 +1226,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
// snapshotStatus.startTime() is assigned on the same machine,
|
||||
// so it's safe to use with VLong
|
||||
System.currentTimeMillis() - lastSnapshotStatus.getStartTime(),
|
||||
lastSnapshotStatus.getNumberOfFiles(),
|
||||
lastSnapshotStatus.getTotalSize());
|
||||
lastSnapshotStatus.getIncrementalFileCount(),
|
||||
lastSnapshotStatus.getIncrementalSize()
|
||||
);
|
||||
|
||||
//TODO: The time stored in snapshot doesn't include cleanup time.
|
||||
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
|
||||
|
@ -65,11 +65,11 @@ public class SnapshotBlocksIT extends ESIntegTestCase {
|
||||
client().prepareIndex(OTHER_INDEX_NAME, "type").setSource("test", "init").execute().actionGet();
|
||||
}
|
||||
|
||||
|
||||
logger.info("--> register a repository");
|
||||
|
||||
assertAcked(client().admin().cluster().preparePutRepository(REPOSITORY_NAME)
|
||||
.setType("fs")
|
||||
.setSettings(Settings.builder().put("location", randomRepoPath())));
|
||||
.setSettings(Settings.builder().put("location", randomRepoPath())));
|
||||
|
||||
logger.info("--> verify the repository");
|
||||
VerifyRepositoryResponse verifyResponse = client().admin().cluster().prepareVerifyRepository(REPOSITORY_NAME).get();
|
||||
|
@ -91,12 +91,20 @@ public class SnapshotStatusTests extends ESTestCase {
|
||||
" \"total\" : " + totalShards + "\n" +
|
||||
" },\n" +
|
||||
" \"stats\" : {\n" +
|
||||
" \"incremental\" : {\n" +
|
||||
" \"file_count\" : 0,\n" +
|
||||
" \"size_in_bytes\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"total\" : {\n" +
|
||||
" \"file_count\" : 0,\n" +
|
||||
" \"size_in_bytes\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"start_time_in_millis\" : 0,\n" +
|
||||
" \"time_in_millis\" : 0,\n" +
|
||||
" \"number_of_files\" : 0,\n" +
|
||||
" \"processed_files\" : 0,\n" +
|
||||
" \"total_size_in_bytes\" : 0,\n" +
|
||||
" \"processed_size_in_bytes\" : 0,\n" +
|
||||
" \"start_time_in_millis\" : 0,\n" +
|
||||
" \"time_in_millis\" : 0\n" +
|
||||
" \"processed_size_in_bytes\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"indices\" : {\n" +
|
||||
" \"" + indexName + "\" : {\n" +
|
||||
@ -109,23 +117,39 @@ public class SnapshotStatusTests extends ESTestCase {
|
||||
" \"total\" : " + totalShards + "\n" +
|
||||
" },\n" +
|
||||
" \"stats\" : {\n" +
|
||||
" \"incremental\" : {\n" +
|
||||
" \"file_count\" : 0,\n" +
|
||||
" \"size_in_bytes\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"total\" : {\n" +
|
||||
" \"file_count\" : 0,\n" +
|
||||
" \"size_in_bytes\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"start_time_in_millis\" : 0,\n" +
|
||||
" \"time_in_millis\" : 0,\n" +
|
||||
" \"number_of_files\" : 0,\n" +
|
||||
" \"processed_files\" : 0,\n" +
|
||||
" \"total_size_in_bytes\" : 0,\n" +
|
||||
" \"processed_size_in_bytes\" : 0,\n" +
|
||||
" \"start_time_in_millis\" : 0,\n" +
|
||||
" \"time_in_millis\" : 0\n" +
|
||||
" \"processed_size_in_bytes\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"shards\" : {\n" +
|
||||
" \"" + shardId + "\" : {\n" +
|
||||
" \"stage\" : \"" + shardStage.toString() + "\",\n" +
|
||||
" \"stats\" : {\n" +
|
||||
" \"incremental\" : {\n" +
|
||||
" \"file_count\" : 0,\n" +
|
||||
" \"size_in_bytes\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"total\" : {\n" +
|
||||
" \"file_count\" : 0,\n" +
|
||||
" \"size_in_bytes\" : 0\n" +
|
||||
" },\n" +
|
||||
" \"start_time_in_millis\" : 0,\n" +
|
||||
" \"time_in_millis\" : 0,\n" +
|
||||
" \"number_of_files\" : 0,\n" +
|
||||
" \"processed_files\" : 0,\n" +
|
||||
" \"total_size_in_bytes\" : 0,\n" +
|
||||
" \"processed_size_in_bytes\" : 0,\n" +
|
||||
" \"start_time_in_millis\" : 0,\n" +
|
||||
" \"time_in_millis\" : 0\n" +
|
||||
" \"processed_size_in_bytes\" : 0\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
|
@ -23,10 +23,12 @@ import com.carrotsearch.hppc.IntHashSet;
|
||||
import com.carrotsearch.hppc.IntSet;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
|
||||
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
@ -83,7 +85,12 @@ import org.elasticsearch.test.TestCustomMetaData;
|
||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.file.FileVisitResult;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.SimpleFileVisitor;
|
||||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
@ -102,6 +109,7 @@ import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
@ -1019,6 +1027,129 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||
assertThat(snapshots.get(0).getState().completed(), equalTo(true));
|
||||
}
|
||||
|
||||
public void testSnapshotTotalAndIncrementalSizes() throws IOException {
|
||||
Client client = client();
|
||||
final String indexName = "test-blocks-1";
|
||||
final String repositoryName = "repo-" + indexName;
|
||||
final String snapshot0 = "snapshot-0";
|
||||
final String snapshot1 = "snapshot-1";
|
||||
|
||||
createIndex(indexName);
|
||||
|
||||
int docs = between(10, 100);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
client.prepareIndex(indexName, "type").setSource("test", "init").execute().actionGet();
|
||||
}
|
||||
|
||||
logger.info("--> register a repository");
|
||||
|
||||
final Path repoPath = randomRepoPath();
|
||||
assertAcked(client.admin().cluster().preparePutRepository(repositoryName)
|
||||
.setType("fs")
|
||||
.setSettings(Settings.builder().put("location", repoPath)));
|
||||
|
||||
logger.info("--> create a snapshot");
|
||||
client.admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0)
|
||||
.setIncludeGlobalState(true)
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
|
||||
SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus(repositoryName)
|
||||
.setSnapshots(snapshot0)
|
||||
.get();
|
||||
|
||||
List<SnapshotStatus> snapshots = response.getSnapshots();
|
||||
|
||||
List<Path> snapshot0Files = scanSnapshotFolder(repoPath);
|
||||
assertThat(snapshots, hasSize(1));
|
||||
|
||||
final int snapshot0FileCount = snapshot0Files.size();
|
||||
final long snapshot0FileSize = calculateTotalFilesSize(snapshot0Files);
|
||||
|
||||
SnapshotStats stats = snapshots.get(0).getStats();
|
||||
|
||||
assertThat(stats.getTotalFileCount(), is(snapshot0FileCount));
|
||||
assertThat(stats.getTotalSize(), is(snapshot0FileSize));
|
||||
|
||||
assertThat(stats.getIncrementalFileCount(), equalTo(snapshot0FileCount));
|
||||
assertThat(stats.getIncrementalSize(), equalTo(snapshot0FileSize));
|
||||
|
||||
assertThat(stats.getIncrementalFileCount(), equalTo(stats.getProcessedFileCount()));
|
||||
assertThat(stats.getIncrementalSize(), equalTo(stats.getProcessedSize()));
|
||||
|
||||
// add few docs - less than initially
|
||||
docs = between(1, 5);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
client.prepareIndex(indexName, "type").setSource("test", "test" + i).execute().actionGet();
|
||||
}
|
||||
|
||||
// create another snapshot
|
||||
// total size has to grow and has to be equal to files on fs
|
||||
assertThat(client.admin().cluster()
|
||||
.prepareCreateSnapshot(repositoryName, snapshot1)
|
||||
.setWaitForCompletion(true).get().status(),
|
||||
equalTo(RestStatus.OK));
|
||||
|
||||
// drop 1st one to avoid miscalculation as snapshot reuses some files of prev snapshot
|
||||
assertTrue(client.admin().cluster()
|
||||
.prepareDeleteSnapshot(repositoryName, snapshot0)
|
||||
.get().isAcknowledged());
|
||||
|
||||
response = client.admin().cluster().prepareSnapshotStatus(repositoryName)
|
||||
.setSnapshots(snapshot1)
|
||||
.get();
|
||||
|
||||
final List<Path> snapshot1Files = scanSnapshotFolder(repoPath);
|
||||
|
||||
final int snapshot1FileCount = snapshot1Files.size();
|
||||
final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files);
|
||||
|
||||
snapshots = response.getSnapshots();
|
||||
|
||||
SnapshotStats anotherStats = snapshots.get(0).getStats();
|
||||
|
||||
ArrayList<Path> snapshotFilesDiff = new ArrayList<>(snapshot1Files);
|
||||
snapshotFilesDiff.removeAll(snapshot0Files);
|
||||
|
||||
assertThat(anotherStats.getIncrementalFileCount(), equalTo(snapshotFilesDiff.size()));
|
||||
assertThat(anotherStats.getIncrementalSize(), equalTo(calculateTotalFilesSize(snapshotFilesDiff)));
|
||||
|
||||
assertThat(anotherStats.getIncrementalFileCount(), equalTo(anotherStats.getProcessedFileCount()));
|
||||
assertThat(anotherStats.getIncrementalSize(), equalTo(anotherStats.getProcessedSize()));
|
||||
|
||||
assertThat(stats.getTotalSize(), lessThan(anotherStats.getTotalSize()));
|
||||
assertThat(stats.getTotalFileCount(), lessThan(anotherStats.getTotalFileCount()));
|
||||
|
||||
assertThat(anotherStats.getTotalFileCount(), is(snapshot1FileCount));
|
||||
assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize));
|
||||
}
|
||||
|
||||
private long calculateTotalFilesSize(List<Path> files) {
|
||||
return files.stream().mapToLong(f -> {
|
||||
try {
|
||||
return Files.size(f);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
}).sum();
|
||||
}
|
||||
|
||||
|
||||
private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
|
||||
List<Path> files = new ArrayList<>();
|
||||
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){
|
||||
@Override
|
||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||
if (file.getFileName().toString().startsWith("__")){
|
||||
files.add(file);
|
||||
}
|
||||
return super.visitFile(file, attrs);
|
||||
}
|
||||
}
|
||||
);
|
||||
return files;
|
||||
}
|
||||
|
||||
public static class SnapshottableMetadata extends TestCustomMetaData {
|
||||
public static final String TYPE = "test_snapshottable";
|
||||
|
||||
|
@ -2066,7 +2066,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), greaterThan(1));
|
||||
assertThat(status.getStats().getProcessedFileCount(), greaterThan(1));
|
||||
}
|
||||
}
|
||||
|
||||
@ -2078,7 +2078,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), equalTo(0));
|
||||
assertThat(status.getStats().getProcessedFileCount(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
@ -2091,7 +2091,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file
|
||||
assertThat(status.getStats().getProcessedFileCount(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -632,7 +632,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
|
||||
|
||||
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
|
||||
assertEquals(IndexShardSnapshotStatus.Stage.DONE, lastSnapshotStatus.getStage());
|
||||
assertEquals(shard.snapshotStoreMetadata().size(), lastSnapshotStatus.getNumberOfFiles());
|
||||
assertEquals(shard.snapshotStoreMetadata().size(), lastSnapshotStatus.getTotalFileCount());
|
||||
assertNull(lastSnapshotStatus.getFailure());
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user