Allow for a fairer distribution of snapshot and restore operations to enable parallel snapshots and improve behaviour for parallel snapshot + restore. Closes #55803
This commit is contained in:
parent
b4a2cd810a
commit
9bc9d01b84
|
@ -1835,23 +1835,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
|
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
|
||||||
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
|
final ActionListener<Void> filesListener = fileQueueListener(filesToSnapshot, workers, allFilesUploadedListener);
|
||||||
for (int i = 0; i < workers; ++i) {
|
for (int i = 0; i < workers; ++i) {
|
||||||
executor.execute(ActionRunnable.run(filesListener, () -> {
|
executeOneFileSnapshot(store, snapshotId, indexId, snapshotStatus, filesToSnapshot, executor, filesListener);
|
||||||
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
|
||||||
if (snapshotFileInfo != null) {
|
|
||||||
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
|
|
||||||
do {
|
|
||||||
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
|
|
||||||
snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
|
||||||
} while (snapshotFileInfo != null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
listener.onFailure(e);
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void executeOneFileSnapshot(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus,
|
||||||
|
BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot, Executor executor,
|
||||||
|
ActionListener<Void> listener) throws InterruptedException {
|
||||||
|
final ShardId shardId = store.shardId();
|
||||||
|
final BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
||||||
|
if (snapshotFileInfo == null) {
|
||||||
|
listener.onResponse(null);
|
||||||
|
} else {
|
||||||
|
executor.execute(ActionRunnable.wrap(listener, l -> {
|
||||||
|
try (Releasable ignored = incrementStoreRef(store, snapshotStatus, shardId)) {
|
||||||
|
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
|
||||||
|
executeOneFileSnapshot(store, snapshotId, indexId, snapshotStatus, filesToSnapshot, executor, l);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static Releasable incrementStoreRef(Store store, IndexShardSnapshotStatus snapshotStatus, ShardId shardId) {
|
private static Releasable incrementStoreRef(Store store, IndexShardSnapshotStatus snapshotStatus, ShardId shardId) {
|
||||||
if (store.tryIncRef() == false) {
|
if (store.tryIncRef() == false) {
|
||||||
if (snapshotStatus.isAborted()) {
|
if (snapshotStatus.isAborted()) {
|
||||||
|
@ -1901,21 +1908,33 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
fileQueueListener(files, workers, ActionListener.map(listener, v -> null));
|
fileQueueListener(files, workers, ActionListener.map(listener, v -> null));
|
||||||
// restore the files from the snapshot to the Lucene store
|
// restore the files from the snapshot to the Lucene store
|
||||||
for (int i = 0; i < workers; ++i) {
|
for (int i = 0; i < workers; ++i) {
|
||||||
executor.execute(ActionRunnable.run(allFilesListener, () -> {
|
try {
|
||||||
store.incRef();
|
executeOneFileRestore(files, allFilesListener);
|
||||||
try {
|
} catch (Exception e) {
|
||||||
BlobStoreIndexShardSnapshot.FileInfo fileToRecover;
|
allFilesListener.onFailure(e);
|
||||||
while ((fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS)) != null) {
|
}
|
||||||
restoreFile(fileToRecover, store);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
store.decRef();
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void executeOneFileRestore(BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> files,
|
||||||
|
ActionListener<Void> allFilesListener) throws InterruptedException {
|
||||||
|
final BlobStoreIndexShardSnapshot.FileInfo fileToRecover = files.poll(0L, TimeUnit.MILLISECONDS);
|
||||||
|
if (fileToRecover == null) {
|
||||||
|
allFilesListener.onResponse(null);
|
||||||
|
} else {
|
||||||
|
executor.execute(ActionRunnable.wrap(allFilesListener, filesListener -> {
|
||||||
|
store.incRef();
|
||||||
|
try {
|
||||||
|
restoreFile(fileToRecover, store);
|
||||||
|
} finally {
|
||||||
|
store.decRef();
|
||||||
|
}
|
||||||
|
executeOneFileRestore(files, filesListener);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
|
private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store store) throws IOException {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try (IndexOutput indexOutput =
|
try (IndexOutput indexOutput =
|
||||||
|
|
Loading…
Reference in New Issue