No need to do any switch to the `SNAPSHOT` pool here, the blob store repo handles all its writes async on the `SNAPSHOT` pool so we're just needlessly context-switching to enqueue those tasks there. Also cleaned up the source only repository (the only override to `finalizeSnapshot`) to make it clear that no IO is happening there and we don't need to run it on the `SNAPSHOT` pool either.
This commit is contained in:
parent
31be3a3645
commit
bde92fc5fc
|
@ -1074,27 +1074,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
removeSnapshotFromClusterState(entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"), null);
|
||||
return;
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
final Repository repository = repositoriesService.repository(snapshot.getRepository());
|
||||
final String failure = entry.failure();
|
||||
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
|
||||
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardStatus : entry.shards()) {
|
||||
ShardId shardId = shardStatus.key;
|
||||
ShardSnapshotStatus status = shardStatus.value;
|
||||
final ShardState state = status.state();
|
||||
if (state.failed()) {
|
||||
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
|
||||
} else if (state.completed() == false) {
|
||||
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, "skipped"));
|
||||
} else {
|
||||
assert state == ShardState.SUCCESS;
|
||||
}
|
||||
try {
|
||||
final String failure = entry.failure();
|
||||
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
|
||||
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardStatus : entry.shards()) {
|
||||
ShardId shardId = shardStatus.key;
|
||||
ShardSnapshotStatus status = shardStatus.value;
|
||||
final ShardState state = status.state();
|
||||
if (state.failed()) {
|
||||
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
|
||||
} else if (state.completed() == false) {
|
||||
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, "skipped"));
|
||||
} else {
|
||||
assert state == ShardState.SUCCESS;
|
||||
}
|
||||
final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
|
||||
repository.finalizeSnapshot(
|
||||
}
|
||||
final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
|
||||
repositoriesService.repository(snapshot.getRepository()).finalizeSnapshot(
|
||||
snapshot.getSnapshotId(),
|
||||
shardGenerations,
|
||||
entry.startTime(),
|
||||
|
@ -1106,38 +1103,38 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
metadataForSnapshot(entry, metadata),
|
||||
entry.userMetadata(),
|
||||
entry.version(),
|
||||
state -> stateWithoutSnapshot(state, snapshot),
|
||||
ActionListener.wrap(result -> {
|
||||
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
|
||||
snapshotCompletionListeners.remove(snapshot);
|
||||
if (completionListeners != null) {
|
||||
try {
|
||||
ActionListener.onResponse(completionListeners, result);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed to notify listeners", e);
|
||||
}
|
||||
state -> stateWithoutSnapshot(state, snapshot),
|
||||
ActionListener.wrap(result -> {
|
||||
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
|
||||
snapshotCompletionListeners.remove(snapshot);
|
||||
if (completionListeners != null) {
|
||||
try {
|
||||
ActionListener.onResponse(completionListeners, result);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed to notify listeners", e);
|
||||
}
|
||||
endingSnapshots.remove(snapshot);
|
||||
logger.info("snapshot [{}] completed with state [{}]", snapshot, result.v2().state());
|
||||
}, this::onFailure));
|
||||
}
|
||||
}
|
||||
endingSnapshots.remove(snapshot);
|
||||
logger.info("snapshot [{}] completed with state [{}]", snapshot, result.v2().state());
|
||||
}, e -> handleFinalizationFailure(e, entry)));
|
||||
} catch (Exception e) {
|
||||
handleFinalizationFailure(e, entry);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
Snapshot snapshot = entry.snapshot();
|
||||
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
|
||||
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
|
||||
// will try ending this snapshot again
|
||||
logger.debug(() -> new ParameterizedMessage(
|
||||
"[{}] failed to update cluster state during snapshot finalization", snapshot), e);
|
||||
failSnapshotCompletionListeners(snapshot,
|
||||
new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e));
|
||||
} else {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
|
||||
removeSnapshotFromClusterState(snapshot, e, null);
|
||||
}
|
||||
}
|
||||
});
|
||||
private void handleFinalizationFailure(Exception e, SnapshotsInProgress.Entry entry) {
|
||||
Snapshot snapshot = entry.snapshot();
|
||||
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
|
||||
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
|
||||
// will try ending this snapshot again
|
||||
logger.debug(() -> new ParameterizedMessage(
|
||||
"[{}] failed to update cluster state during snapshot finalization", snapshot), e);
|
||||
failSnapshotCompletionListeners(snapshot,
|
||||
new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e));
|
||||
} else {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
|
||||
removeSnapshotFromClusterState(snapshot, e, null);
|
||||
}
|
||||
}
|
||||
|
||||
private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
|
||||
|
|
Loading…
Reference in New Issue