Fix Snapshot Finalization not Waiting for Index Metadata (#47445) (#47459)

* Fix Snapshot Finalization not Waiting for Index Metadata

We were mixing up the listeners here which led to the final listener
that should be called after all the metadata has been written
to be called before that.
I fixed this by removing the one redundant listener and flattening
the logic out.

* Closes #47425
This commit is contained in:
Armin Braun 2019-10-02 23:26:18 +02:00 committed by GitHub
parent 6b96f53ea0
commit 0beb5263b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 13 deletions

View File

@ -719,18 +719,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final Map<String, Object> userMetadata, final Map<String, Object> userMetadata,
final ActionListener<SnapshotInfo> listener) { final ActionListener<SnapshotInfo> listener) {
// We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob
// Once we're done writing all metadata, we update the index-N blob to finalize the snapshot // Once we're done writing all metadata, we update the index-N blob to finalize the snapshot
final ActionListener<SnapshotInfo> afterMetaWrites = ActionListener.wrap(snapshotInfo -> { final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
ActionListener.wrap(snapshotInfos -> {
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId); writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId);
listener.onResponse(snapshotInfo); listener.onResponse(snapshotInfo);
}, ex -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", ex))); },
e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e))),
// We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob 2 + indices.size());
final GroupedActionListener<SnapshotInfo> allMetaListener =
new GroupedActionListener<>(ActionListener.map(afterMetaWrites, snapshotInfos -> {
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
return snapshotInfos.iterator().next();
}), 2 + indices.size());
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
@ -753,7 +752,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
})); }));
} }
executor.execute(ActionRunnable.wrap(afterMetaWrites, afterMetaListener -> { executor.execute(ActionRunnable.wrap(allMetaListener, afterMetaListener -> {
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()), indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,

View File

@ -1317,7 +1317,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get(); client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get();
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made, " + logger.info("--> make sure that number of files is back to what it was when the first snapshot was made, " +
"plus one because one backup index-N file should remain"); "plus one because one backup index-N file should remain");
assertThat(numberOfFiles(repo), equalTo(numberOfFiles[0] + 1)); assertFileCount(repo, numberOfFiles[0] + 1);
} }
public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exception { public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exception {