diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index c518b885a2e..cedfcfabc73 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -903,7 +903,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final Map userMetadata, Version repositoryMetaVersion, final ActionListener listener) { - + assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : + "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]"; final Collection indices = shardGenerations.indices(); // Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f350c7ecf90..f7c0f41d878 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -106,9 +106,8 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed; *
    *
  • On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that * no snapshot is currently running and registers the new snapshot in cluster state
  • - *
  • When cluster state is updated - * the {@link #beginSnapshot} method kicks in and initializes - * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • + *
  • When the cluster state is updated the {@link #beginSnapshot} method kicks in and populates the list of shards that need to be + * snapshotted in cluster state
  • *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#startNewSnapshots} method
  • *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using @@ -1057,10 +1056,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (endingSnapshots.add(entry.snapshot()) == false) { return; } + final Snapshot snapshot = entry.snapshot(); + if (entry.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) { + logger.debug("[{}] was aborted before starting", snapshot); + removeSnapshotFromClusterState(entry.snapshot(), null, + new SnapshotException(snapshot, "Aborted on initialization")); + return; + } threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @Override protected void doRun() { - final Snapshot snapshot = entry.snapshot(); final Repository repository = repositoriesService.repository(snapshot.getRepository()); final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); @@ -1256,12 +1261,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus */ private void deleteSnapshot(final Snapshot snapshot, final ActionListener listener, final long repositoryStateId, final boolean immediatePriority) { - logger.info("deleting snapshot [{}]", snapshot); Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL; + logger.info("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]", + snapshot, repositoryStateId, priority); clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { boolean waitForSnapshot = false; + boolean abortedDuringInit = false; + @Override public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); @@ -1321,6 +1329,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus shards = snapshotEntry.shards(); assert shards.isEmpty(); failure = "Snapshot was aborted during initialization"; + abortedDuringInit = true; } else if (state == State.STARTED) { // snapshot is started - mark every non completed shard as aborted final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); @@ -1385,19 +1394,21 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus ); }, e -> { - logger.warn("deleted snapshot failed - deleting files", e); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true); - } catch (SnapshotMissingException smex) { - logger.info(() -> new ParameterizedMessage( - "Tried deleting in-progress snapshot [{}], but it could not be found after failing to abort.", - smex.getSnapshotName()), e); - listener.onFailure(new SnapshotException(snapshot, - "Tried deleting in-progress snapshot [" + smex.getSnapshotName() + "], but it " + - "could not be found after failing to abort.", smex)); + if (abortedDuringInit) { + logger.debug(() -> new ParameterizedMessage("Snapshot [{}] was aborted during INIT", snapshot), e); + listener.onResponse(null); + } else { + if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) + != null) { + logger.warn("master failover before deleted snapshot could complete", e); + // Just pass the exception to the transport handler as is so it is retried on the new master + listener.onFailure(e); + } else { + logger.warn("deleted snapshot failed", e); + listener.onFailure( + new SnapshotMissingException(snapshot.getRepository(), snapshot.getSnapshotId(), e)); } - }); + } } )); } else {