Include include_global_state in Snapshot status API (#26853)

This commit adds a field include_global_state to snapshot status api response. For legacy snapshot, the field is not present.

Closes #22423
This commit is contained in:
kel 2017-11-30 17:38:07 +08:00 committed by Tanguy Leroux
parent 192d1f03f8
commit efac982e35
9 changed files with 99 additions and 22 deletions

View File

@ -19,8 +19,10 @@
package org.elasticsearch.action.admin.cluster.snapshots.status;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -57,10 +59,15 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
private SnapshotStats stats;
SnapshotStatus(final Snapshot snapshot, final State state, final List<SnapshotIndexShardStatus> shards) {
@Nullable
private Boolean includeGlobalState;
SnapshotStatus(final Snapshot snapshot, final State state, final List<SnapshotIndexShardStatus> shards,
final Boolean includeGlobalState) {
this.snapshot = Objects.requireNonNull(snapshot);
this.state = Objects.requireNonNull(state);
this.shards = Objects.requireNonNull(shards);
this.includeGlobalState = includeGlobalState;
shardsStats = new SnapshotShardsStats(shards);
updateShardStats();
}
@ -82,6 +89,14 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
return state;
}
/**
* Returns true if global state is included in the snapshot, false otherwise.
* Can be null if this information is unknown.
*/
public Boolean includeGlobalState() {
return includeGlobalState;
}
/**
* Returns list of snapshot shards
*/
@ -132,6 +147,9 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
builder.add(SnapshotIndexShardStatus.readShardSnapshotStatus(in));
}
shards = Collections.unmodifiableList(builder);
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
includeGlobalState = in.readOptionalBoolean();
}
updateShardStats();
}
@ -143,6 +161,9 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
for (SnapshotIndexShardStatus shard : shards) {
shard.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeOptionalBoolean(includeGlobalState);
}
}
/**
@ -174,6 +195,7 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
private static final String UUID = "uuid";
private static final String STATE = "state";
private static final String INDICES = "indices";
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@ -182,6 +204,9 @@ public class SnapshotStatus implements ToXContentObject, Streamable {
builder.field(REPOSITORY, snapshot.getRepository());
builder.field(UUID, snapshot.getSnapshotId().getUUID());
builder.field(STATE, state.name());
if (includeGlobalState != null) {
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
}
shardsStats.toXContent(builder, params);
stats.toXContent(builder, params);
builder.startObject(INDICES);

View File

@ -196,7 +196,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
SnapshotIndexShardStatus shardStatus = new SnapshotIndexShardStatus(shardEntry.key, stage);
shardStatusBuilder.add(shardStatus);
}
builder.add(new SnapshotStatus(entry.snapshot(), entry.state(), Collections.unmodifiableList(shardStatusBuilder)));
builder.add(new SnapshotStatus(entry.snapshot(), entry.state(),
Collections.unmodifiableList(shardStatusBuilder), entry.includeGlobalState()));
}
}
// Now add snapshots on disk that are not currently running
@ -248,7 +249,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
default:
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
}
builder.add(new SnapshotStatus(new Snapshot(repositoryName, snapshotId), state, Collections.unmodifiableList(shardStatusBuilder)));
builder.add(new SnapshotStatus(new Snapshot(repositoryName, snapshotId), state,
Collections.unmodifiableList(shardStatusBuilder), snapshotInfo.includeGlobalState()));
}
}
}

View File

@ -116,10 +116,11 @@ public interface Repository extends LifecycleComponent {
* @param totalShards total number of shards
* @param shardFailures list of shard failures
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param includeGlobalState include cluster global state
* @return snapshot description
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId);
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState);
/**
* Deletes snapshot

View File

@ -468,11 +468,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final String failure,
final int totalShards,
final List<SnapshotShardFailure> shardFailures,
final long repositoryStateId) {
final long repositoryStateId,
final boolean includeGlobalState) {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures);
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
includeGlobalState);
try {
snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getUUID());
final RepositoryData repositoryData = getRepositoryData();

View File

@ -69,8 +69,10 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
private static final String NAME = "name";
private static final String TOTAL_SHARDS = "total_shards";
private static final String SUCCESSFUL_SHARDS = "successful_shards";
private static final String INCLUDE_GLOBAL_STATE = "include_global_state";
private static final Version VERSION_INCOMPATIBLE_INTRODUCED = Version.V_5_2_0;
private static final Version INCLUDE_GLOBAL_STATE_INTRODUCED = Version.V_7_0_0_alpha1;
public static final Version VERBOSE_INTRODUCED = Version.V_5_5_0;
private static final Comparator<SnapshotInfo> COMPARATOR =
@ -94,27 +96,32 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
private final int successfulShards;
@Nullable
private Boolean includeGlobalState;
@Nullable
private final Version version;
private final List<SnapshotShardFailure> shardFailures;
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, SnapshotState state) {
this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0, Collections.emptyList());
this(snapshotId, indices, state, null, null, 0L, 0L, 0, 0, Collections.emptyList(), null);
}
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, long startTime) {
this(snapshotId, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L, 0, 0, Collections.emptyList());
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, long startTime, Boolean includeGlobalState) {
this(snapshotId, indices, SnapshotState.IN_PROGRESS, null, Version.CURRENT, startTime, 0L, 0, 0,
Collections.emptyList(), includeGlobalState);
}
public SnapshotInfo(SnapshotId snapshotId, List<String> indices, long startTime, String reason, long endTime,
int totalShards, List<SnapshotShardFailure> shardFailures) {
int totalShards, List<SnapshotShardFailure> shardFailures, Boolean includeGlobalState) {
this(snapshotId, indices, snapshotState(reason, shardFailures), reason, Version.CURRENT,
startTime, endTime, totalShards, totalShards - shardFailures.size(), shardFailures);
startTime, endTime, totalShards, totalShards - shardFailures.size(), shardFailures, includeGlobalState);
}
private SnapshotInfo(SnapshotId snapshotId, List<String> indices, SnapshotState state, String reason, Version version,
long startTime, long endTime, int totalShards, int successfulShards, List<SnapshotShardFailure> shardFailures) {
long startTime, long endTime, int totalShards, int successfulShards, List<SnapshotShardFailure> shardFailures,
Boolean includeGlobalState) {
this.snapshotId = Objects.requireNonNull(snapshotId);
this.indices = Collections.unmodifiableList(Objects.requireNonNull(indices));
this.state = state;
@ -125,6 +132,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.shardFailures = Objects.requireNonNull(shardFailures);
this.includeGlobalState = includeGlobalState;
}
/**
@ -163,6 +171,9 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
} else {
version = in.readBoolean() ? Version.readVersion(in) : null;
}
if (in.getVersion().onOrAfter(INCLUDE_GLOBAL_STATE_INTRODUCED)) {
includeGlobalState = in.readOptionalBoolean();
}
}
/**
@ -172,7 +183,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
public static SnapshotInfo incompatible(SnapshotId snapshotId) {
return new SnapshotInfo(snapshotId, Collections.emptyList(), SnapshotState.INCOMPATIBLE,
"the snapshot is incompatible with the current version of Elasticsearch and its exact version is unknown",
null, 0L, 0L, 0, 0, Collections.emptyList());
null, 0L, 0L, 0, 0, Collections.emptyList(), null);
}
/**
@ -271,6 +282,10 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
return successfulShards;
}
public Boolean includeGlobalState() {
return includeGlobalState;
}
/**
* Returns shard failures; an empty list will be returned if there were no shard
* failures, or if {@link #state()} returns {@code null}.
@ -361,6 +376,9 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
builder.value(index);
}
builder.endArray();
if (includeGlobalState != null) {
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
}
if (verbose || state != null) {
builder.field(STATE, state);
}
@ -411,6 +429,9 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
if (reason != null) {
builder.field(REASON, reason);
}
if (includeGlobalState != null) {
builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState);
}
builder.field(START_TIME, startTime);
builder.field(END_TIME, endTime);
builder.field(TOTAL_SHARDS, totalShards);
@ -442,6 +463,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
long endTime = 0;
int totalShards = 0;
int successfulShards = 0;
Boolean includeGlobalState = null;
List<SnapshotShardFailure> shardFailures = Collections.emptyList();
if (parser.currentToken() == null) { // fresh parser? move to the first token
parser.nextToken();
@ -476,6 +498,8 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
successfulShards = parser.intValue();
} else if (VERSION_ID.equals(currentFieldName)) {
version = Version.fromId(parser.intValue());
} else if (INCLUDE_GLOBAL_STATE.equals(currentFieldName)) {
includeGlobalState = parser.booleanValue();
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (INDICES.equals(currentFieldName)) {
@ -517,7 +541,8 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
endTime,
totalShards,
successfulShards,
shardFailures);
shardFailures,
includeGlobalState);
}
@Override
@ -564,6 +589,9 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContent,
out.writeBoolean(false);
}
}
if (out.getVersion().onOrAfter(INCLUDE_GLOBAL_STATE_INTRODUCED)) {
out.writeOptionalBoolean(includeGlobalState);
}
}
private static SnapshotState snapshotState(final String reason, final List<SnapshotShardFailure> shardFailures) {

View File

@ -496,7 +496,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
ExceptionsHelper.detailedMessage(exception),
0,
Collections.emptyList(),
snapshot.getRepositoryStateId());
snapshot.getRepositoryStateId(),
snapshot.includeGlobalState());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] failed to close snapshot in repository", snapshot.snapshot()), inner);
@ -510,7 +511,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
return new SnapshotInfo(entry.snapshot().getSnapshotId(),
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
entry.startTime());
entry.startTime(), entry.includeGlobalState());
}
/**
@ -968,7 +969,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
failure,
entry.shards().size(),
Collections.unmodifiableList(shardFailures),
entry.getRepositoryStateId());
entry.getRepositoryStateId(),
entry.includeGlobalState());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);

View File

@ -46,7 +46,8 @@ public class SnapshotStatusTests extends ESTestCase {
SnapshotIndexShardStatus snapshotIndexShardStatus = new SnapshotIndexShardStatus(testShardId, shardStage);
List<SnapshotIndexShardStatus> snapshotIndexShardStatuses = new ArrayList<>();
snapshotIndexShardStatuses.add(snapshotIndexShardStatus);
SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses);
boolean includeGlobalState = randomBoolean();
SnapshotStatus status = new SnapshotStatus(snapshot, state, snapshotIndexShardStatuses, includeGlobalState);
int initializingShards = 0;
int startedShards = 0;
@ -80,6 +81,7 @@ public class SnapshotStatusTests extends ESTestCase {
" \"repository\" : \"test-repo\",\n" +
" \"uuid\" : \"" + uuid + "\",\n" +
" \"state\" : \"" + state.toString() + "\",\n" +
" \"include_global_state\" : " + includeGlobalState + ",\n" +
" \"shards_stats\" : {\n" +
" \"initializing\" : " + initializingShards + ",\n" +
" \"started\" : " + startedShards + ",\n" +

View File

@ -2542,8 +2542,9 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId) {
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState) {
return null;
}

View File

@ -590,12 +590,20 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-no-global-state").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
SnapshotsStatusResponse snapshotsStatusResponse = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap-no-global-state").get();
assertThat(snapshotsStatusResponse.getSnapshots().size(), equalTo(1));
SnapshotStatus snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0);
assertThat(snapshotStatus.includeGlobalState(), equalTo(false));
logger.info("--> snapshot with global state");
createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-with-global-state").setIndices().setIncludeGlobalState(true).setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-with-global-state").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
snapshotsStatusResponse = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap-with-global-state").get();
assertThat(snapshotsStatusResponse.getSnapshots().size(), equalTo(1));
snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0);
assertThat(snapshotStatus.includeGlobalState(), equalTo(true));
if (testTemplate) {
logger.info("--> delete test template");
@ -1635,7 +1643,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
String blockedNode = blockNodeWithIndex("test-repo", "test-idx");
logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(false).setIncludeGlobalState(false).setIndices("test-idx").get();
logger.info("--> waiting for block to kick in");
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
@ -1645,6 +1654,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(response.getSnapshots().size(), equalTo(1));
SnapshotStatus snapshotStatus = response.getSnapshots().get(0);
assertThat(snapshotStatus.getState(), equalTo(SnapshotsInProgress.State.STARTED));
assertThat(snapshotStatus.includeGlobalState(), equalTo(false));
// We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
assertThat(snapshotStatus.getShardsStats().getStartedShards(), greaterThan(0));
for (SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
@ -1658,6 +1669,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(response.getSnapshots().size(), equalTo(1));
snapshotStatus = response.getSnapshots().get(0);
assertThat(snapshotStatus.getState(), equalTo(SnapshotsInProgress.State.STARTED));
assertThat(snapshotStatus.includeGlobalState(), equalTo(false));
// We blocked the node during data write operation, so at least one shard snapshot should be in STARTED stage
assertThat(snapshotStatus.getShardsStats().getStartedShards(), greaterThan(0));
for (SnapshotIndexShardStatus shardStatus : snapshotStatus.getIndices().get("test-idx")) {
@ -1684,6 +1697,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
response = client.admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap").execute().actionGet();
snapshotStatus = response.getSnapshots().get(0);
assertThat(snapshotStatus.getIndices().size(), equalTo(1));
assertThat(snapshotStatus.includeGlobalState(), equalTo(false));
SnapshotIndexStatus indexStatus = snapshotStatus.getIndices().get("test-idx");
assertThat(indexStatus, notNullValue());
assertThat(indexStatus.getShardsStats().getInitializingShards(), equalTo(0));