If a partial snapshot has some of its shards aborted because an index got deleted, this can lead to confusing `IllegalStateExceptions` when trying to increment the ref count of the already closed `Store`. Refactored this a little to throw the same exception for aborted shards no matter the timing of the store close and assert that the concurrent store close can in fact only happen when the shard snapshot has already been aborted.
This commit is contained in:
parent
4511611802
commit
dde75b0f64
|
@ -69,6 +69,7 @@ import org.elasticsearch.common.compress.NotXContentException;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||||
import org.elasticsearch.common.metrics.CounterMetric;
|
import org.elasticsearch.common.metrics.CounterMetric;
|
||||||
|
@ -1701,10 +1702,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
// in the commit with files already in the repository
|
// in the commit with files already in the repository
|
||||||
if (filesFromSegmentInfos == null) {
|
if (filesFromSegmentInfos == null) {
|
||||||
indexCommitPointFiles = new ArrayList<>();
|
indexCommitPointFiles = new ArrayList<>();
|
||||||
store.incRef();
|
|
||||||
final Collection<String> fileNames;
|
final Collection<String> fileNames;
|
||||||
final Store.MetadataSnapshot metadataFromStore;
|
final Store.MetadataSnapshot metadataFromStore;
|
||||||
try {
|
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
|
||||||
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
|
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
|
||||||
try {
|
try {
|
||||||
logger.trace(
|
logger.trace(
|
||||||
|
@ -1714,8 +1714,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
|
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
store.decRef();
|
|
||||||
}
|
}
|
||||||
for (String fileName : fileNames) {
|
for (String fileName : fileNames) {
|
||||||
if (snapshotStatus.isAborted()) {
|
if (snapshotStatus.isAborted()) {
|
||||||
|
@ -1840,14 +1838,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
executor.execute(ActionRunnable.run(filesListener, () -> {
|
executor.execute(ActionRunnable.run(filesListener, () -> {
|
||||||
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
||||||
if (snapshotFileInfo != null) {
|
if (snapshotFileInfo != null) {
|
||||||
store.incRef();
|
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
|
||||||
try {
|
|
||||||
do {
|
do {
|
||||||
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
|
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
|
||||||
snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
||||||
} while (snapshotFileInfo != null);
|
} while (snapshotFileInfo != null);
|
||||||
} finally {
|
|
||||||
store.decRef();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
|
@ -1857,6 +1852,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Releasable incrementStoreRef(Store store, IndexShardSnapshotStatus snapshotStatus, ShardId shardId) {
|
||||||
|
if (store.tryIncRef() == false) {
|
||||||
|
if (snapshotStatus.isAborted()) {
|
||||||
|
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
|
||||||
|
} else {
|
||||||
|
assert false : "Store should not be closed concurrently unless snapshot is aborted";
|
||||||
|
throw new IndexShardSnapshotFailedException(shardId, "Store got closed concurrently");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return store::decRef;
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) {
|
private static boolean assertFileContentsMatchHash(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) {
|
||||||
try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) {
|
try (IndexInput indexInput = store.openVerifyingInput(fileInfo.physicalName(), IOContext.READONCE, fileInfo.metadata())) {
|
||||||
final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())];
|
final byte[] tmp = new byte[Math.toIntExact(fileInfo.metadata().length())];
|
||||||
|
|
Loading…
Reference in New Issue