diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 62707eb9964..9cbf0f09a7a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -444,6 +444,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } final Map survivingIndices = updatedRepositoryData.getIndices(); deleteIndices( + updatedRepositoryData, Optional.ofNullable(finalSnapshotInfo) .map(info -> info.indices().stream().filter(survivingIndices::containsKey) .map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList())) @@ -528,7 +529,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - private void deleteIndices(List indices, SnapshotId snapshotId, ActionListener listener) { + private void deleteIndices(RepositoryData repositoryData, List indices, SnapshotId snapshotId, + ActionListener listener) { if (indices.isEmpty()) { listener.onResponse(null); return; @@ -550,7 +552,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { try { - deleteShardSnapshot(indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId); + deleteShardSnapshot(repositoryData, indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId); } catch (SnapshotException ex) { final int finalShardId = shardId; logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", @@ -977,9 +979,29 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp for (SnapshotFiles point : snapshots) { newSnapshotsList.add(point); } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot creation [" + snapshotId + "]", shardContainer, - shardId, snapshotId); + final String indexGeneration = Long.toString(fileListGeneration + 1); + final List blobsToDelete; + try { + final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); + indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); + // Delete all previous index-N blobs + blobsToDelete = + blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList()); + assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, ""))) + .max().orElse(-1L) < Long.parseLong(indexGeneration) + : "Tried to delete an index-N blob newer than the current generation [" + indexGeneration + "] when deleting index-N" + + " blobs " + blobsToDelete; + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, + "Failed to finalize snapshot creation [" + snapshotId + "] with shard index [" + + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e); + } + try { + shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization", + snapshotId, shardId), e); + } snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis()); } catch (Exception e) { snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e)); @@ -1067,7 +1089,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp /** * Delete shard snapshot */ - private void deleteShardSnapshot(IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId) { + private void deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId) { final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId); final Map blobs; try { @@ -1082,19 +1104,43 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // Build a list of snapshots that should be preserved List newSnapshotsList = new ArrayList<>(); + final Set survivingSnapshotNames = + repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getName).collect(Collectors.toSet()); for (SnapshotFiles point : snapshots) { - if (!point.snapshot().equals(snapshotId.getName())) { + if (survivingSnapshotNames.contains(point.snapshot())) { newSnapshotsList.add(point); } } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot deletion [" + snapshotId + "]", shardContainer, - snapshotShardId, snapshotId); - + final String indexGeneration = Long.toString(fileListGeneration + 1); try { - shardContainer.deleteBlobIgnoringIfNotExists(indexShardSnapshotFormat.blobName(snapshotId.getUUID())); + final List blobsToDelete; + if (newSnapshotsList.isEmpty()) { + // If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found + blobsToDelete = new ArrayList<>(blobs.keySet()); + } else { + final Set survivingSnapshotUUIDs = repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getUUID) + .collect(Collectors.toSet()); + final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); + indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); + // Delete all previous index-N, data- and meta-blobs and that are not referenced by the new index-N and temporary blobs + blobsToDelete = blobs.keySet().stream().filter(blob -> + blob.startsWith(SNAPSHOT_INDEX_PREFIX) + || (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat") + && survivingSnapshotUUIDs.contains( + blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false) + || (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null) + || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList()); + } + try { + shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization", + snapshotId, snapshotShardId), e); + } } catch (IOException e) { - logger.warn(new ParameterizedMessage("[{}] [{}] failed to delete shard snapshot file", snapshotShardId, snapshotId), e); + throw new IndexShardSnapshotFailedException(snapshotShardId, + "Failed to finalize snapshot deletion [" + snapshotId + "] with shard index [" + + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e); } } @@ -1110,47 +1156,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * Writes a new index file for the shard and removes all unreferenced files from the repository. - * - * We need to be really careful in handling index files in case of failures to make sure we don't - * have index file that points to files that were deleted. - * - * @param snapshots list of active snapshots in the container - * @param fileListGeneration the generation number of the current snapshot index file - * @param blobs list of blobs in the container - * @param reason a reason explaining why the shard index file is written - */ - private void finalizeShard(List snapshots, long fileListGeneration, Map blobs, - String reason, BlobContainer shardContainer, ShardId shardId, SnapshotId snapshotId) { - final String indexGeneration = Long.toString(fileListGeneration + 1); - try { - final List blobsToDelete; - if (snapshots.isEmpty()) { - // If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found - blobsToDelete = new ArrayList<>(blobs.keySet()); - } else { - final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots); - indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); - // Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs - blobsToDelete = blobs.keySet().stream().filter(blob -> - blob.startsWith(SNAPSHOT_INDEX_PREFIX) - || blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null - || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList()); - } - try { - shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization", - snapshotId, shardId), e); - } - } catch (IOException e) { - String message = - "Failed to finalize " + reason + " with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]"; - throw new IndexShardSnapshotFailedException(shardId, message, e); - } - } - /** * Loads all available snapshots in the repository *