diff --git a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 8a82c2656b7..337aac8cb74 100644 --- a/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -74,7 +74,7 @@ import static com.google.common.collect.Maps.newHashMapWithExpectedSize; *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link #updateIndexShardSnapshotStatus(UpdateIndexShardSnapshotStatusRequest)} method
  • *
  • When last shard is completed master node in {@link #innerUpdateSnapshotState} method marks the snapshot as completed
  • *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotMetaData.Entry)} finalizes snapshot in the repository, - * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(SnapshotId)} to remove snapshot from cluster state
  • + * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(SnapshotId, SnapshotInfo, Throwable)} to remove snapshot from cluster state * */ public class SnapshotsService extends AbstractComponent implements ClusterStateListener { @@ -241,8 +241,8 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL *

    * Creates snapshot in repository and updates snapshot metadata record with list of shards that needs to be processed. * - * @param clusterState cluster state - * @param snapshot snapshot meta data + * @param clusterState cluster state + * @param snapshot snapshot meta data * @param userCreateSnapshotListener listener */ private void beginSnapshot(ClusterState clusterState, final SnapshotMetaData.Entry snapshot, final CreateSnapshotListener userCreateSnapshotListener) { @@ -681,24 +681,11 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL } } Snapshot snapshot = repository.finalizeSnapshot(snapshotId, null, entry.shards().size(), ImmutableList.copyOf(shardFailures)); - for (SnapshotCompletionListener listener : snapshotCompletionListeners) { - try { - listener.onSnapshotCompletion(snapshotId, new SnapshotInfo(snapshot)); - } catch (Throwable t) { - logger.warn("failed to refresh settings for [{}]", t, listener); - } - } + removeSnapshotFromClusterState(snapshotId, new SnapshotInfo(snapshot), null); } catch (Throwable t) { logger.warn("[{}] failed to finalize snapshot", t, snapshotId); - for (SnapshotCompletionListener listener : snapshotCompletionListeners) { - try { - listener.onSnapshotFailure(snapshotId, t); - } catch (Throwable t2) { - logger.warn("failed to update snapshot status for [{}]", t2, listener); - } - } + removeSnapshotFromClusterState(snapshotId, null, t); } - removeSnapshotFromClusterState(snapshotId); } }); } @@ -707,9 +694,11 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL * Removes record of running snapshot from cluster state * * @param snapshotId snapshot id + * @param snapshot snapshot info if snapshot was successful + * @param t exception if snapshot failed */ - private void removeSnapshotFromClusterState(final SnapshotId snapshotId) { - clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { + private void removeSnapshotFromClusterState(final SnapshotId snapshotId, final SnapshotInfo snapshot, final Throwable t) { + clusterService.submitStateUpdateTask("remove snapshot metadata", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { MetaData metaData = currentState.metaData(); @@ -738,6 +727,22 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL public void onFailure(String source, Throwable t) { logger.warn("[{}][{}] failed to remove snapshot metadata", t, snapshotId); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + for (SnapshotCompletionListener listener : snapshotCompletionListeners) { + try { + if (snapshot != null) { + listener.onSnapshotCompletion(snapshotId, snapshot); + } else { + listener.onSnapshotFailure(snapshotId, t); + } + } catch (Throwable t) { + logger.warn("failed to refresh settings for [{}]", t, listener); + } + } + + } }); } @@ -787,10 +792,14 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL } } shards = shardsBuilder.build(); - } else { - // snapshot hasn't started yet or already finished - just end it + } else if (snapshot.state() == State.INIT) { + // snapshot hasn't started yet - end it shards = snapshot.shards(); endSnapshot(snapshot); + } else { + // snapshot is being finalized - wait for it + logger.trace("trying to delete completed snapshot - save to delete"); + return currentState; } SnapshotMetaData.Entry newSnapshot = new SnapshotMetaData.Entry(snapshotId, snapshot.includeGlobalState(), State.ABORTED, snapshot.indices(), shards); snapshots = new SnapshotMetaData(newSnapshot); @@ -807,20 +816,24 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (waitForSnapshot) { + logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); addListener(new SnapshotCompletionListener() { @Override public void onSnapshotCompletion(SnapshotId snapshotId, SnapshotInfo snapshot) { + logger.trace("deleted snapshot completed - deleting files"); removeListener(this); deleteSnapshotFromRepository(snapshotId, listener); } @Override public void onSnapshotFailure(SnapshotId snapshotId, Throwable t) { + logger.trace("deleted snapshot failed - deleting files", t); removeListener(this); deleteSnapshotFromRepository(snapshotId, listener); } }); } else { + logger.trace("deleted snapshot is not running - deleting files"); deleteSnapshotFromRepository(snapshotId, listener); } }