Same as #59905 but for shard level metadata. Since we wnat to retain the ability to do safe+atomic writes for non-uuid shard generations this PR has to create two separate write paths for both kinds of shard generations.
This commit is contained in:
parent
204efe9387
commit
3270cb3088
|
@ -775,14 +775,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final BlobContainer shardContainer = shardContainer(indexId, finalShardId);
|
||||
final Set<String> blobs = shardContainer.listBlobs().keySet();
|
||||
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
|
||||
final String newGen;
|
||||
final long newGen;
|
||||
if (useUUIDs) {
|
||||
newGen = UUIDs.randomBase64UUID();
|
||||
newGen = -1L;
|
||||
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(blobs, shardContainer,
|
||||
oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId)).v1();
|
||||
} else {
|
||||
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
|
||||
newGen = Long.toString(tuple.v2() + 1);
|
||||
newGen = tuple.v2() + 1;
|
||||
blobStoreIndexShardSnapshots = tuple.v1();
|
||||
}
|
||||
allShardsListener.onResponse(deleteFromShardSnapshotMeta(survivingSnapshots, indexId, finalShardId,
|
||||
|
@ -1922,7 +1922,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// reference a generation that has not had all its files fully upload.
|
||||
indexGeneration = UUIDs.randomBase64UUID();
|
||||
try {
|
||||
writeShardIndexBlob(shardContainer, indexGeneration, updatedBlobStoreIndexShardSnapshots);
|
||||
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedBlobStoreIndexShardSnapshots, shardContainer, indexGeneration, compress);
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotFailedException(shardId,
|
||||
"Failed to write shard level snapshot metadata for [" + snapshotId + "] to ["
|
||||
|
@ -1933,7 +1933,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// When not using shard generations we can only write the index-${N} blob after all other work for this shard has
|
||||
// completed.
|
||||
// Also, in case of numeric shard generations the data node has to take care of deleting old shard generations.
|
||||
indexGeneration = Long.toString(Long.parseLong(fileListGeneration) + 1);
|
||||
final long newGen = Long.parseLong(fileListGeneration) + 1;
|
||||
indexGeneration = Long.toString(newGen);
|
||||
// Delete all previous index-N blobs
|
||||
final List<String> blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX))
|
||||
.collect(Collectors.toList());
|
||||
|
@ -1943,7 +1944,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
+ "] when deleting index-N blobs " + blobsToDelete;
|
||||
afterWriteSnapBlob = () -> {
|
||||
try {
|
||||
writeShardIndexBlob(shardContainer, indexGeneration, updatedBlobStoreIndexShardSnapshots);
|
||||
writeShardIndexBlobAtomic(shardContainer, newGen, updatedBlobStoreIndexShardSnapshots);
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotFailedException(shardId,
|
||||
"Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
|
||||
|
@ -2211,12 +2212,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
/**
|
||||
* Delete snapshot from shard level metadata.
|
||||
*
|
||||
* @param indexGeneration generation to write the new shard level level metadata to. If negative a uuid id shard generation should be
|
||||
* used
|
||||
*/
|
||||
private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(Set<SnapshotId> survivingSnapshots, IndexId indexId,
|
||||
int snapshotShardId, Collection<SnapshotId> snapshotIds,
|
||||
BlobContainer shardContainer, Set<String> blobs,
|
||||
BlobStoreIndexShardSnapshots snapshots,
|
||||
String indexGeneration) {
|
||||
long indexGeneration) {
|
||||
// Build a list of snapshots that should be preserved
|
||||
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
|
||||
final Set<String> survivingSnapshotNames = survivingSnapshots.stream().map(SnapshotId::getName).collect(Collectors.toSet());
|
||||
|
@ -2225,29 +2229,39 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
newSnapshotsList.add(point);
|
||||
}
|
||||
}
|
||||
String writtenGeneration = null;
|
||||
try {
|
||||
if (newSnapshotsList.isEmpty()) {
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, ShardGenerations.DELETED_SHARD_GEN, blobs);
|
||||
} else {
|
||||
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
||||
writeShardIndexBlob(shardContainer, indexGeneration, updatedSnapshots);
|
||||
if (indexGeneration < 0L) {
|
||||
writtenGeneration = UUIDs.randomBase64UUID();
|
||||
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration, compress);
|
||||
} else {
|
||||
writtenGeneration = String.valueOf(indexGeneration);
|
||||
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots);
|
||||
}
|
||||
final Set<String> survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, indexGeneration,
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, writtenGeneration,
|
||||
unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RepositoryException(metadata.name(), "Failed to finalize snapshot deletion " + snapshotIds +
|
||||
" with shard index [" + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration) + "]", e);
|
||||
" with shard index [" + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(writtenGeneration) + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeShardIndexBlob(BlobContainer shardContainer, String indexGeneration,
|
||||
BlobStoreIndexShardSnapshots updatedSnapshots) throws IOException {
|
||||
assert ShardGenerations.NEW_SHARD_GEN.equals(indexGeneration) == false;
|
||||
assert ShardGenerations.DELETED_SHARD_GEN.equals(indexGeneration) == false;
|
||||
/**
|
||||
* Utility for atomically writing shard level metadata to a numeric shard generation. This is only required for writing
|
||||
* numeric shard generations where atomic writes with fail-if-already-exists checks are useful in preventing repository corruption.
|
||||
*/
|
||||
private void writeShardIndexBlobAtomic(BlobContainer shardContainer, long indexGeneration,
|
||||
BlobStoreIndexShardSnapshots updatedSnapshots) throws IOException {
|
||||
assert indexGeneration >= 0 : "Shard generation must not be negative but saw [" + indexGeneration + "]";
|
||||
logger.trace(() -> new ParameterizedMessage("[{}] Writing shard index [{}] to [{}]", metadata.name(),
|
||||
indexGeneration, shardContainer.path()));
|
||||
final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(indexGeneration);
|
||||
final String blobName = INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(String.valueOf(indexGeneration));
|
||||
writeAtomic(shardContainer, blobName, INDEX_SHARD_SNAPSHOTS_FORMAT.serialize(updatedSnapshots, blobName, compress), true);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue