Fix possible race condition during snapshot deletion
This commit is contained in:
parent
0a7c6c9288
commit
3c0cc22d36
|
@ -74,7 +74,7 @@ import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
|
|||
* <li>Once shard snapshot is created data node updates state of the shard in the cluster state using the {@link #updateIndexShardSnapshotStatus(UpdateIndexShardSnapshotStatusRequest)} method</li>
|
||||
* <li>When last shard is completed master node in {@link #innerUpdateSnapshotState} method marks the snapshot as completed</li>
|
||||
* <li>After cluster state is updated, the {@link #endSnapshot(SnapshotMetaData.Entry)} finalizes snapshot in the repository,
|
||||
* notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(SnapshotId)} to remove snapshot from cluster state</li>
|
||||
* notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls {@link #removeSnapshotFromClusterState(SnapshotId, SnapshotInfo, Throwable)} to remove snapshot from cluster state</li>
|
||||
* </ul>
|
||||
*/
|
||||
public class SnapshotsService extends AbstractComponent implements ClusterStateListener {
|
||||
|
@ -681,25 +681,12 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
|
|||
}
|
||||
}
|
||||
Snapshot snapshot = repository.finalizeSnapshot(snapshotId, null, entry.shards().size(), ImmutableList.copyOf(shardFailures));
|
||||
for (SnapshotCompletionListener listener : snapshotCompletionListeners) {
|
||||
try {
|
||||
listener.onSnapshotCompletion(snapshotId, new SnapshotInfo(snapshot));
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to refresh settings for [{}]", t, listener);
|
||||
}
|
||||
}
|
||||
removeSnapshotFromClusterState(snapshotId, new SnapshotInfo(snapshot), null);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("[{}] failed to finalize snapshot", t, snapshotId);
|
||||
for (SnapshotCompletionListener listener : snapshotCompletionListeners) {
|
||||
try {
|
||||
listener.onSnapshotFailure(snapshotId, t);
|
||||
} catch (Throwable t2) {
|
||||
logger.warn("failed to update snapshot status for [{}]", t2, listener);
|
||||
removeSnapshotFromClusterState(snapshotId, null, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
removeSnapshotFromClusterState(snapshotId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -707,9 +694,11 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
|
|||
* Removes record of running snapshot from cluster state
|
||||
*
|
||||
* @param snapshotId snapshot id
|
||||
* @param snapshot snapshot info if snapshot was successful
|
||||
* @param t exception if snapshot failed
|
||||
*/
|
||||
private void removeSnapshotFromClusterState(final SnapshotId snapshotId) {
|
||||
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
|
||||
private void removeSnapshotFromClusterState(final SnapshotId snapshotId, final SnapshotInfo snapshot, final Throwable t) {
|
||||
clusterService.submitStateUpdateTask("remove snapshot metadata", new ProcessedClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
MetaData metaData = currentState.metaData();
|
||||
|
@ -738,6 +727,22 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
|
|||
public void onFailure(String source, Throwable t) {
|
||||
logger.warn("[{}][{}] failed to remove snapshot metadata", t, snapshotId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
for (SnapshotCompletionListener listener : snapshotCompletionListeners) {
|
||||
try {
|
||||
if (snapshot != null) {
|
||||
listener.onSnapshotCompletion(snapshotId, snapshot);
|
||||
} else {
|
||||
listener.onSnapshotFailure(snapshotId, t);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to refresh settings for [{}]", t, listener);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -787,10 +792,14 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
|
|||
}
|
||||
}
|
||||
shards = shardsBuilder.build();
|
||||
} else {
|
||||
// snapshot hasn't started yet or already finished - just end it
|
||||
} else if (snapshot.state() == State.INIT) {
|
||||
// snapshot hasn't started yet - end it
|
||||
shards = snapshot.shards();
|
||||
endSnapshot(snapshot);
|
||||
} else {
|
||||
// snapshot is being finalized - wait for it
|
||||
logger.trace("trying to delete completed snapshot - save to delete");
|
||||
return currentState;
|
||||
}
|
||||
SnapshotMetaData.Entry newSnapshot = new SnapshotMetaData.Entry(snapshotId, snapshot.includeGlobalState(), State.ABORTED, snapshot.indices(), shards);
|
||||
snapshots = new SnapshotMetaData(newSnapshot);
|
||||
|
@ -807,20 +816,24 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
|
|||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (waitForSnapshot) {
|
||||
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
|
||||
addListener(new SnapshotCompletionListener() {
|
||||
@Override
|
||||
public void onSnapshotCompletion(SnapshotId snapshotId, SnapshotInfo snapshot) {
|
||||
logger.trace("deleted snapshot completed - deleting files");
|
||||
removeListener(this);
|
||||
deleteSnapshotFromRepository(snapshotId, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSnapshotFailure(SnapshotId snapshotId, Throwable t) {
|
||||
logger.trace("deleted snapshot failed - deleting files", t);
|
||||
removeListener(this);
|
||||
deleteSnapshotFromRepository(snapshotId, listener);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
logger.trace("deleted snapshot is not running - deleting files");
|
||||
deleteSnapshotFromRepository(snapshotId, listener);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue