Snapshot/Restore: Ensure that shard failure reasons are correctly stored in CS (#25941)
The failure reason for snapshot shard failures might not be propagated properly if the master node changes after the errors were reported by other data nodes. This commits ensures that the snapshot shard failure reason is preserved properly and adds workaround for reading old snapshot files where this information might not have been preserved. Closes #25878
This commit is contained in:
parent
e508c277e1
commit
fe46ef393b
|
@ -253,6 +253,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
this.nodeId = nodeId;
|
||||
this.state = state;
|
||||
this.reason = reason;
|
||||
// If the state is failed we have to have a reason for this failure
|
||||
assert state.failed() == false || reason != null;
|
||||
}
|
||||
|
||||
public ShardSnapshotStatus(StreamInput in) throws IOException {
|
||||
|
@ -413,9 +415,17 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
int shards = in.readVInt();
|
||||
for (int j = 0; j < shards; j++) {
|
||||
ShardId shardId = ShardId.readShardId(in);
|
||||
String nodeId = in.readOptionalString();
|
||||
State shardState = State.fromValue(in.readByte());
|
||||
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
|
||||
// TODO: Change this to an appropriate version when it's backported
|
||||
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
builder.put(shardId, new ShardSnapshotStatus(in));
|
||||
} else {
|
||||
String nodeId = in.readOptionalString();
|
||||
State shardState = State.fromValue(in.readByte());
|
||||
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
|
||||
// Some old snapshot might still have null in shard failure reasons
|
||||
String reason = shardState.failed() ? "" : null;
|
||||
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState, reason));
|
||||
}
|
||||
}
|
||||
long repositoryStateId = UNDEFINED_REPOSITORY_STATE_ID;
|
||||
if (in.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) {
|
||||
|
@ -449,8 +459,13 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
out.writeVInt(entry.shards().size());
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards()) {
|
||||
shardEntry.key.writeTo(out);
|
||||
out.writeOptionalString(shardEntry.value.nodeId());
|
||||
out.writeByte(shardEntry.value.state().value());
|
||||
// TODO: Change this to an appropriate version when it's backported
|
||||
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
|
||||
shardEntry.value.writeTo(out);
|
||||
} else {
|
||||
out.writeOptionalString(shardEntry.value.nodeId());
|
||||
out.writeByte(shardEntry.value.state().value());
|
||||
}
|
||||
}
|
||||
if (out.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) {
|
||||
out.writeLong(entry.repositoryStateId);
|
||||
|
|
|
@ -62,6 +62,7 @@ public class SnapshotShardFailure implements ShardOperationFailedException {
|
|||
this.nodeId = nodeId;
|
||||
this.shardId = shardId;
|
||||
this.reason = reason;
|
||||
assert reason != null;
|
||||
status = RestStatus.INTERNAL_SERVER_ERROR;
|
||||
}
|
||||
|
||||
|
@ -192,7 +193,9 @@ public class SnapshotShardFailure implements ShardOperationFailedException {
|
|||
} else if ("node_id".equals(currentFieldName)) {
|
||||
snapshotShardFailure.nodeId = parser.text();
|
||||
} else if ("reason".equals(currentFieldName)) {
|
||||
snapshotShardFailure.reason = parser.text();
|
||||
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
|
||||
// Some old snapshot might still have null in shard failure reasons
|
||||
snapshotShardFailure.reason = parser.textOrNull();
|
||||
} else if ("shard_id".equals(currentFieldName)) {
|
||||
shardId = parser.intValue();
|
||||
} else if ("status".equals(currentFieldName)) {
|
||||
|
@ -215,6 +218,11 @@ public class SnapshotShardFailure implements ShardOperationFailedException {
|
|||
throw new ElasticsearchParseException("index shard was not set");
|
||||
}
|
||||
snapshotShardFailure.shardId = new ShardId(index, index_uuid, shardId);
|
||||
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
|
||||
// Some old snapshot might still have null in shard failure reasons
|
||||
if (snapshotShardFailure.reason == null) {
|
||||
snapshotShardFailure.reason = "";
|
||||
}
|
||||
return snapshotShardFailure;
|
||||
}
|
||||
|
||||
|
|
|
@ -1128,7 +1128,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
|
||||
ShardSnapshotStatus status = shardEntry.value;
|
||||
if (!status.state().completed()) {
|
||||
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED));
|
||||
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED,
|
||||
"aborted by snapshot deletion"));
|
||||
} else {
|
||||
shardsBuilder.put(shardEntry.key, status);
|
||||
}
|
||||
|
|
|
@ -57,12 +57,12 @@ public class SnapshotsInProgressTests extends ESTestCase {
|
|||
// test more than one waiting shard in an index
|
||||
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
|
||||
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
|
||||
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
|
||||
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
|
||||
// test exactly one waiting shard in an index
|
||||
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
|
||||
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
|
||||
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
|
||||
// test no waiting shards in an index
|
||||
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
|
||||
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
|
||||
Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT,
|
||||
indices, System.currentTimeMillis(), randomLong(), shards.build());
|
||||
|
||||
|
|
|
@ -128,13 +128,20 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
return null;
|
||||
}
|
||||
|
||||
public static String blockMasterFromFinalizingSnapshot(final String repositoryName) {
|
||||
public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) {
|
||||
final String masterName = internalCluster().getMasterName();
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
|
||||
.repository(repositoryName)).setBlockOnWriteIndexFile(true);
|
||||
return masterName;
|
||||
}
|
||||
|
||||
public static String blockMasterFromFinalizingSnapshotOnSnapFile(final String repositoryName) {
|
||||
final String masterName = internalCluster().getMasterName();
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
|
||||
.repository(repositoryName)).setBlockAndFailOnWriteSnapFiles(true);
|
||||
return masterName;
|
||||
}
|
||||
|
||||
public static String blockNodeWithIndex(final String repositoryName, final String indexName) {
|
||||
for(String node : internalCluster().nodesInclude(indexName)) {
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName))
|
||||
|
|
|
@ -767,6 +767,67 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
assertEquals(0, snapshotInfo.failedShards());
|
||||
}
|
||||
|
||||
|
||||
public void testMasterAndDataShutdownDuringSnapshot() throws Exception {
|
||||
logger.info("--> starting three master nodes and two data nodes");
|
||||
internalCluster().startMasterOnlyNodes(3);
|
||||
internalCluster().startDataOnlyNodes(2);
|
||||
|
||||
final Client client = client();
|
||||
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("mock").setSettings(Settings.builder()
|
||||
.put("location", randomRepoPath())
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||
|
||||
assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
|
||||
.put("number_of_replicas", 0)));
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
final int numdocs = randomIntBetween(10, 100);
|
||||
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
|
||||
for (int i = 0; i < builders.length; i++) {
|
||||
builders[i] = client().prepareIndex("test-idx", "type1", Integer.toString(i)).setSource("field1", "bar " + i);
|
||||
}
|
||||
indexRandom(true, builders);
|
||||
flushAndRefresh();
|
||||
|
||||
final int numberOfShards = getNumShards("test-idx").numPrimaries;
|
||||
logger.info("number of shards: {}", numberOfShards);
|
||||
|
||||
final String masterNode = blockMasterFromFinalizingSnapshotOnSnapFile("test-repo");
|
||||
final String dataNode = blockNodeWithIndex("test-repo", "test-idx");
|
||||
|
||||
dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
|
||||
|
||||
logger.info("--> stopping data node {}", dataNode);
|
||||
stopNode(dataNode);
|
||||
logger.info("--> stopping master node {} ", masterNode);
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
|
||||
logger.info("--> wait until the snapshot is done");
|
||||
|
||||
assertBusy(() -> {
|
||||
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get();
|
||||
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertTrue(snapshotInfo.state().completed());
|
||||
}, 1, TimeUnit.MINUTES);
|
||||
|
||||
logger.info("--> verify that snapshot was partial");
|
||||
|
||||
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get();
|
||||
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertEquals(SnapshotState.PARTIAL, snapshotInfo.state());
|
||||
assertNotEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
|
||||
assertThat(snapshotInfo.failedShards(), greaterThan(0));
|
||||
for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
|
||||
assertNotNull(failure.reason());
|
||||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25281")
|
||||
public void testMasterShutdownDuringFailedSnapshot() throws Exception {
|
||||
logger.info("--> starting two master nodes and two data nodes");
|
||||
|
@ -800,7 +861,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()),
|
||||
30, TimeUnit.SECONDS);
|
||||
|
||||
final String masterNode = blockMasterFromFinalizingSnapshot("test-repo");
|
||||
final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile("test-repo");
|
||||
|
||||
logger.info("--> snapshot");
|
||||
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
|
|
|
@ -2252,9 +2252,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
public ClusterState execute(ClusterState currentState) {
|
||||
// Simulate orphan snapshot
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
|
||||
shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED));
|
||||
shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED));
|
||||
shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED));
|
||||
shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
|
||||
shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
|
||||
shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
|
||||
List<Entry> entries = new ArrayList<>();
|
||||
entries.add(new Entry(new Snapshot(repositoryName,
|
||||
createSnapshotResponse.getSnapshotInfo().snapshotId()),
|
||||
|
|
|
@ -66,7 +66,8 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
|
|||
ShardId shardId = new ShardId(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10)), randomIntBetween(0, 10));
|
||||
String nodeId = randomAlphaOfLength(10);
|
||||
State shardState = randomFrom(State.values());
|
||||
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState));
|
||||
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState,
|
||||
shardState.failed() ? randomAlphaOfLength(10) : null));
|
||||
}
|
||||
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
|
||||
return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards);
|
||||
|
|
|
@ -104,6 +104,9 @@ public class MockRepository extends FsRepository {
|
|||
* finalization of a snapshot, while permitting other IO operations to proceed unblocked. */
|
||||
private volatile boolean blockOnWriteIndexFile;
|
||||
|
||||
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
|
||||
private volatile boolean blockAndFailOnWriteSnapFile;
|
||||
|
||||
private volatile boolean atomicMove;
|
||||
|
||||
private volatile boolean blocked = false;
|
||||
|
@ -118,6 +121,7 @@ public class MockRepository extends FsRepository {
|
|||
blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false);
|
||||
blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
|
||||
blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false);
|
||||
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
|
||||
randomPrefix = metadata.settings().get("random", "default");
|
||||
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
|
||||
atomicMove = metadata.settings().getAsBoolean("atomic_move", true);
|
||||
|
@ -168,6 +172,7 @@ public class MockRepository extends FsRepository {
|
|||
blockOnControlFiles = false;
|
||||
blockOnInitialization = false;
|
||||
blockOnWriteIndexFile = false;
|
||||
blockAndFailOnWriteSnapFile = false;
|
||||
this.notifyAll();
|
||||
}
|
||||
|
||||
|
@ -175,6 +180,10 @@ public class MockRepository extends FsRepository {
|
|||
blockOnDataFiles = blocked;
|
||||
}
|
||||
|
||||
public void setBlockAndFailOnWriteSnapFiles(boolean blocked) {
|
||||
blockAndFailOnWriteSnapFile = blocked;
|
||||
}
|
||||
|
||||
public void setBlockOnWriteIndexFile(boolean blocked) {
|
||||
blockOnWriteIndexFile = blocked;
|
||||
}
|
||||
|
@ -187,7 +196,8 @@ public class MockRepository extends FsRepository {
|
|||
logger.debug("Blocking execution");
|
||||
boolean wasBlocked = false;
|
||||
try {
|
||||
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile) {
|
||||
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile ||
|
||||
blockAndFailOnWriteSnapFile) {
|
||||
blocked = true;
|
||||
this.wait();
|
||||
wasBlocked = true;
|
||||
|
@ -266,6 +276,8 @@ public class MockRepository extends FsRepository {
|
|||
throw new IOException("Random IOException");
|
||||
} else if (blockOnControlFiles) {
|
||||
blockExecutionAndMaybeWait(blobName);
|
||||
} else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) {
|
||||
blockExecutionAndFail(blobName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -283,6 +295,15 @@ public class MockRepository extends FsRepository {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks an I/O operation on the blob fails and throws an exception when unblocked
|
||||
*/
|
||||
private void blockExecutionAndFail(final String blobName) throws IOException {
|
||||
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
|
||||
blockExecution();
|
||||
throw new IOException("exception after block");
|
||||
}
|
||||
|
||||
MockBlobContainer(BlobContainer delegate) {
|
||||
super(delegate);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue