The code here was needlessly complicated when it enqueued all file uploads up-front. Instead, we can go with a cleaner worker + queue pattern here by taking the max-parallelism from the threadpool info. Also, I slightly simplified the rethrow and listener (step listener is pointless when you add the callback in the next line) handling it since I noticed that we were needlessly rethrowing in the same code and that wasn't worth a separate PR.
This commit is contained in:
parent
b4ae207e1e
commit
8a02a5fc7d
|
@ -108,8 +108,10 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -999,11 +1001,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {
|
||||
final ShardId shardId = store.shardId();
|
||||
final long startTime = threadPool.absoluteTimeInMillis();
|
||||
final StepListener<Void> snapshotDoneListener = new StepListener<>();
|
||||
snapshotDoneListener.whenComplete(listener::onResponse, e -> {
|
||||
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e));
|
||||
listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e
|
||||
: new IndexShardSnapshotFailedException(store.shardId(), e));
|
||||
|
||||
final ActionListener<Void> snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> {
|
||||
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.stackTrace(e));
|
||||
listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e));
|
||||
});
|
||||
try {
|
||||
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
|
||||
|
@ -1026,7 +1027,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
|
||||
ArrayList<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new ArrayList<>();
|
||||
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
|
||||
store.incRef();
|
||||
final Collection<String> fileNames;
|
||||
final Store.MetadataSnapshot metadataFromStore;
|
||||
|
@ -1147,42 +1148,29 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
allFilesUploadedListener.onResponse(Collections.emptyList());
|
||||
return;
|
||||
}
|
||||
final GroupedActionListener<Void> filesListener =
|
||||
new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount);
|
||||
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||
// Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting
|
||||
final AtomicBoolean alreadyFailed = new AtomicBoolean();
|
||||
for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
|
||||
executor.execute(new ActionRunnable<Void>(filesListener) {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
// Start as many workers as fit into the snapshot pool at once at the most
|
||||
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indexIncrementalFileCount);
|
||||
final ActionListener<Void> filesListener = ActionListener.delegateResponse(
|
||||
new GroupedActionListener<>(allFilesUploadedListener, workers), (l, e) -> {
|
||||
filesToSnapshot.clear(); // Stop uploading the remaining files if we run into any exception
|
||||
l.onFailure(e);
|
||||
});
|
||||
for (int i = 0; i < workers; ++i) {
|
||||
executor.execute(ActionRunnable.run(filesListener, () -> {
|
||||
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
||||
if (snapshotFileInfo != null) {
|
||||
store.incRef();
|
||||
try {
|
||||
if (alreadyFailed.get() == false) {
|
||||
if (store.tryIncRef()) {
|
||||
try {
|
||||
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
} else if (snapshotStatus.isAborted()) {
|
||||
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
|
||||
} else {
|
||||
assert false : "Store was closed before aborting the snapshot";
|
||||
throw new IllegalStateException("Store is closed already");
|
||||
}
|
||||
}
|
||||
filesListener.onResponse(null);
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
|
||||
do {
|
||||
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
|
||||
snapshotFileInfo = filesToSnapshot.poll(0L, TimeUnit.MILLISECONDS);
|
||||
} while (snapshotFileInfo != null);
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
alreadyFailed.set(true);
|
||||
super.onFailure(e);
|
||||
}
|
||||
});
|
||||
}));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
snapshotDoneListener.onFailure(e);
|
||||
|
|
|
@ -138,6 +138,8 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
|||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
|
||||
when(threadPool.info(ThreadPool.Names.SNAPSHOT)).thenReturn(
|
||||
new ThreadPool.Info(ThreadPool.Names.SNAPSHOT, ThreadPool.ThreadPoolType.FIXED, randomIntBetween(1, 10)));
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
|
||||
xContentRegistry(), threadPool, blobStoreContext)) {
|
||||
|
|
|
@ -30,7 +30,9 @@ import org.elasticsearch.threadpool.ThreadPoolStats;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Delayed;
|
||||
|
@ -288,6 +290,8 @@ public class DeterministicTaskQueue {
|
|||
public ThreadPool getThreadPool(Function<Runnable, Runnable> runnableWrapper) {
|
||||
return new ThreadPool(settings) {
|
||||
|
||||
private final Map<String, ThreadPool.Info> infos = new HashMap<>();
|
||||
|
||||
{
|
||||
stopCachedTimeThread();
|
||||
}
|
||||
|
@ -309,7 +313,7 @@ public class DeterministicTaskQueue {
|
|||
|
||||
@Override
|
||||
public Info info(String name) {
|
||||
throw new UnsupportedOperationException();
|
||||
return infos.computeIfAbsent(name, n -> new Info(n, ThreadPoolType.FIXED, random.nextInt(10) + 1));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue