* We don't have to calculate the start and end times form the shards for the status API, we have the start time available from the CS or the `SnapshotInfo` in the repo and can either take the end time form the `SnapshotInfo` or take the most recent time from the shard stats for in progress snapshots * Closes #43074
This commit is contained in:
parent
e490ecb7d3
commit
9e920f9612
|
@ -58,7 +58,7 @@ public class SnapshotIndexStatus implements Iterable<SnapshotIndexShardStatus>,
|
|||
stats = new SnapshotStats();
|
||||
for (SnapshotIndexShardStatus shard : shards) {
|
||||
indexShards.put(shard.getShardId().getId(), shard);
|
||||
stats.add(shard.getStats());
|
||||
stats.add(shard.getStats(), true);
|
||||
}
|
||||
shardsStats = new SnapshotShardsStats(shards);
|
||||
this.indexShards = unmodifiableMap(indexShards);
|
||||
|
|
|
@ -304,7 +304,12 @@ public class SnapshotStats implements Streamable, ToXContentObject {
|
|||
processedSize);
|
||||
}
|
||||
|
||||
void add(SnapshotStats stats) {
|
||||
/**
|
||||
* Add stats instance to the total
|
||||
* @param stats Stats instance to add
|
||||
* @param updateTimestamps Whether or not start time and duration should be updated
|
||||
*/
|
||||
void add(SnapshotStats stats, boolean updateTimestamps) {
|
||||
incrementalFileCount += stats.incrementalFileCount;
|
||||
totalFileCount += stats.totalFileCount;
|
||||
processedFileCount += stats.processedFileCount;
|
||||
|
@ -317,7 +322,7 @@ public class SnapshotStats implements Streamable, ToXContentObject {
|
|||
// First time here
|
||||
startTime = stats.startTime;
|
||||
time = stats.time;
|
||||
} else {
|
||||
} else if (updateTimestamps) {
|
||||
// The time the last snapshot ends
|
||||
long endTime = Math.max(startTime + time, stats.startTime + stats.time);
|
||||
|
||||
|
|
|
@ -22,9 +22,9 @@ package org.elasticsearch.action.admin.cluster.snapshots.status;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress.State;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
@ -72,14 +72,14 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
|
|||
@Nullable
|
||||
private Boolean includeGlobalState;
|
||||
|
||||
SnapshotStatus(final Snapshot snapshot, final State state, final List<SnapshotIndexShardStatus> shards,
|
||||
final Boolean includeGlobalState) {
|
||||
SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards, Boolean includeGlobalState,
|
||||
long startTime, long time) {
|
||||
this.snapshot = Objects.requireNonNull(snapshot);
|
||||
this.state = Objects.requireNonNull(state);
|
||||
this.shards = Objects.requireNonNull(shards);
|
||||
this.includeGlobalState = includeGlobalState;
|
||||
shardsStats = new SnapshotShardsStats(shards);
|
||||
updateShardStats();
|
||||
updateShardStats(startTime, time);
|
||||
}
|
||||
|
||||
private SnapshotStatus(Snapshot snapshot, State state, List<SnapshotIndexShardStatus> shards,
|
||||
|
@ -172,7 +172,16 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
|
|||
if (in.getVersion().onOrAfter(Version.V_6_2_0)) {
|
||||
includeGlobalState = in.readOptionalBoolean();
|
||||
}
|
||||
updateShardStats();
|
||||
final long startTime;
|
||||
final long time;
|
||||
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
|
||||
startTime = in.readLong();
|
||||
time = in.readLong();
|
||||
} else {
|
||||
startTime = 0L;
|
||||
time = 0L;
|
||||
}
|
||||
updateShardStats(startTime, time);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -186,6 +195,10 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
|
|||
if (out.getVersion().onOrAfter(Version.V_6_2_0)) {
|
||||
out.writeOptionalBoolean(includeGlobalState);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
|
||||
out.writeLong(stats.getStartTime());
|
||||
out.writeLong(stats.getTime());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -286,11 +299,12 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
|
|||
return PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
private void updateShardStats() {
|
||||
stats = new SnapshotStats();
|
||||
private void updateShardStats(long startTime, long time) {
|
||||
stats = new SnapshotStats(startTime, time, 0, 0, 0, 0, 0, 0);
|
||||
shardsStats = new SnapshotShardsStats(shards);
|
||||
for (SnapshotIndexShardStatus shard : shards) {
|
||||
stats.add(shard.getStats());
|
||||
// BWC: only update timestamps when we did not get a start time from an old node
|
||||
stats.add(shard.getStats(), startTime == 0L);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -185,7 +185,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
shardStatusBuilder.add(shardStatus);
|
||||
}
|
||||
builder.add(new SnapshotStatus(entry.snapshot(), entry.state(),
|
||||
Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState()));
|
||||
Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState(), entry.startTime(),
|
||||
Math.max(threadPool.absoluteTimeInMillis() - entry.startTime(), 0L)));
|
||||
}
|
||||
}
|
||||
// Now add snapshots on disk that are not currently running
|
||||
|
@ -236,8 +237,10 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
|||
default:
|
||||
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
|
||||
}
|
||||
final long startTime = snapshotInfo.startTime();
|
||||
builder.add(new SnapshotStatus(new Snapshot(repositoryName, snapshotId), state,
|
||||
Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState()));
|
||||
Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState(),
|
||||
startTime, snapshotInfo.endTime() - startTime));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ public class SnapshotStatusTests extends AbstractXContentTestCase<SnapshotStatus
|
|||
List<SnapshotIndexShardStatus> snapshotIndexShardStatuses = new ArrayList<>();
|
||||
snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
|
||||
boolean includeGlobalState = randomBoolean();
|
||||
SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
|
||||
SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L);
|
||||
|
||||
int initializingShards = 0;
|
||||
int startedShards = 0;
|
||||
|
@ -166,7 +166,7 @@ public class SnapshotStatusTests extends AbstractXContentTestCase<SnapshotStatus
|
|||
snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
|
||||
}
|
||||
boolean includeGlobalState = randomBoolean();
|
||||
return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
|
||||
return new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState, 0L, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
|
||||
|
||||
public void testStatusApiConsistency() {
|
||||
Client client = client();
|
||||
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(
|
||||
Settings.builder().put("location", randomRepoPath()).build()));
|
||||
|
||||
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
index("test-idx-1", "_doc", Integer.toString(i), "foo", "bar" + i);
|
||||
index("test-idx-2", "_doc", Integer.toString(i), "foo", "baz" + i);
|
||||
index("test-idx-3", "_doc", Integer.toString(i), "foo", "baz" + i);
|
||||
}
|
||||
refresh();
|
||||
|
||||
logger.info("--> snapshot");
|
||||
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true).get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
|
||||
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
List<SnapshotInfo> snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots();
|
||||
assertThat(snapshotInfos.size(), equalTo(1));
|
||||
SnapshotInfo snapshotInfo = snapshotInfos.get(0);
|
||||
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
|
||||
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));
|
||||
|
||||
final List<SnapshotStatus> snapshotStatus = client.admin().cluster().snapshotsStatus(
|
||||
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
|
||||
assertThat(snapshotStatus.size(), equalTo(1));
|
||||
final SnapshotStatus snStatus = snapshotStatus.get(0);
|
||||
assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
|
||||
assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue