Fix NPE in SnapshotService CS Application (#58680) (#58735)

In the unlikely corner case of deleting a relocation (hence `WAITING`) primary shard's
index during a partial snapshot, we would throw an NPE when checking if there's any external
changes to process.
This commit is contained in:
Armin Braun 2020-06-30 15:20:49 +02:00 committed by GitHub
parent d8731853a3
commit b52a764143
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 0 deletions

View File

@ -2127,6 +2127,46 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> done"); logger.info("--> done");
} }
public void testSnapshotDeleteRelocatingPrimaryIndex() throws Exception {
final String repoName = "test-repo";
createRepository(repoName, "fs", randomRepoPath());
// Create index on two nodes and make sure each node has a primary by setting no replicas
final String indexName = "test-idx";
assertAcked(prepareCreate(indexName, 2, Settings.builder()
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_NUMBER_OF_SHARDS, between(2, 10))));
ensureGreen(indexName);
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client().prepareSearch(indexName).setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
logger.info("--> start relocations");
allowNodes(indexName, 1);
logger.info("--> wait for relocations to start");
assertBusy(() -> assertThat(
client().admin().cluster().prepareHealth(indexName).execute().actionGet().getRelocatingShards(), greaterThan(0)),
1L, TimeUnit.MINUTES);
logger.info("--> snapshot");
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
.setWaitForCompletion(false).setPartial(true).setIndices(indexName).get();
assertAcked(client().admin().indices().prepareDelete(indexName));
logger.info("--> wait for snapshot to complete");
SnapshotInfo snapshotInfo = waitForCompletion(repoName, "test-snap", TimeValue.timeValueSeconds(600));
assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL));
assertThat(snapshotInfo.shardFailures().size(), greaterThan(0));
logger.info("--> done");
}
public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException { public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException {
Client client = client(); Client client = client();

View File

@ -625,6 +625,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
} }
} }
} catch (Exception e) { } catch (Exception e) {
assert false : new AssertionError(e);
logger.warn("Failed to update snapshot state ", e); logger.warn("Failed to update snapshot state ", e);
} }
assert assertConsistentWithClusterState(event.state()); assert assertConsistentWithClusterState(event.state());
@ -826,6 +827,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
for (ObjectCursor<String> index : entry.waitingIndices().keys()) { for (ObjectCursor<String> index : entry.waitingIndices().keys()) {
if (event.indexRoutingTableChanged(index.value)) { if (event.indexRoutingTableChanged(index.value)) {
IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value); IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value);
if (indexShardRoutingTable == null) {
// index got removed concurrently and we have to fail WAITING state shards
return true;
}
for (ShardId shardId : entry.waitingIndices().get(index.value)) { for (ShardId shardId : entry.waitingIndices().get(index.value)) {
ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard(); ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard();
if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) { if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) {