diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index dcc67662ec7..d89aee833e6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -69,6 +69,7 @@ import org.elasticsearch.common.compress.NotXContentException; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; import org.elasticsearch.common.metrics.CounterMetric; @@ -1701,10 +1702,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // in the commit with files already in the repository if (filesFromSegmentInfos == null) { indexCommitPointFiles = new ArrayList<>(); - store.incRef(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; - try { + try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) { // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should try { logger.trace( @@ -1714,8 +1714,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); } - } finally { - store.decRef(); } for (String fileName : fileNames) { if (snapshotStatus.isAborted()) { @@ -1840,14 +1838,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp executor.execute(ActionRunnable.run(filesListener, () -> { BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); if (snapshotFileInfo != null) { - store.incRef(); - try { + try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) { do { snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS); } while (snapshotFileInfo != null); - } finally { - store.decRef(); } } })); @@ -1857,6 +1852,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } + private static Releasable incrementStoreRef(Store store, IndexShardSnapshotStatus snapshotStatus, ShardId shardId) { + if (store.tryIncRef() == false) { + if (snapshotStatus.isAborted()) { + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } else { + assert false : "Store should not be closed concurrently unless snapshot is aborted"; + throw new IndexShardSnapshotFailedException(shardId, "Store got closed concurrently"); + } + } + return store::decRef; + } + private static boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) { try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) { final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())];