Smarter CCR concurrent file chunk fetching (#38841)

The previous logic for concurrent file chunk fetching did not allow for multiple chunks from the same
file to be fetched in parallel. The parallelism only allowed to fetch chunks from different files in
parallel. This required complex logic on the follower to be aware from which file it was already
fetching information, in order to ensure that chunks for the same file would be fetched in sequential
order. During benchmarking, this exhibited throughput issues when recovery came towards the end,
where it would only be sequentially fetching chunks for the same largest segment file, with
throughput considerably going down in a high-latency network as there was no parallelism anymore.

The new logic here follows the peer recovery model more closely, and sends multiple requests for
the same file in parallel, and then reorders the results as necessary. Benchmarks show that this
leads to better overall throughput and the implementation is also simpler.
This commit is contained in:
Yannick Welsch 2019-02-15 07:51:29 +01:00
parent 1f74ba2d33
commit d55e52223f
2 changed files with 24 additions and 95 deletions

View File

@ -70,10 +70,8 @@ import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionReque
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -375,16 +373,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
restore(snapshotFiles);
}
private static class FileSession {
FileSession(long lastTrackedSeqNo, long lastOffset) {
this.lastTrackedSeqNo = lastTrackedSeqNo;
this.lastOffset = lastOffset;
}
final long lastTrackedSeqNo;
final long lastOffset;
}
@Override
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
@ -393,63 +381,34 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
final ArrayDeque<FileInfo> remainingFiles = new ArrayDeque<>(filesToRecover);
final Map<FileInfo, FileSession> inFlightRequests = new HashMap<>();
final Object mutex = new Object();
while (true) {
if (error.get() != null) {
break;
}
final FileInfo fileToRecover;
final FileSession prevFileSession;
synchronized (mutex) {
if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) {
break;
}
final long maxConcurrentFileChunks = ccrSettings.getMaxConcurrentFileChunks();
if (remainingFiles.isEmpty() == false && inFlightRequests.size() < maxConcurrentFileChunks) {
for (int i = 0; i < maxConcurrentFileChunks; i++) {
if (remainingFiles.isEmpty()) {
break;
}
inFlightRequests.put(remainingFiles.pop(), new FileSession(NO_OPS_PERFORMED, 0));
}
}
final Map.Entry<FileInfo, FileSession> minEntry =
inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get();
prevFileSession = minEntry.getValue();
fileToRecover = minEntry.getKey();
}
try {
requestSeqIdTracker.waitForOpsToComplete(prevFileSession.lastTrackedSeqNo);
final FileSession fileSession;
synchronized (mutex) {
fileSession = inFlightRequests.get(fileToRecover);
// if file has been removed in the mean-while, it means that restore of this file completed, so start working
// on the next one
if (fileSession == null) {
continue;
}
}
for (FileInfo fileInfo : filesToRecover) {
final long fileLength = fileInfo.length();
long offset = 0;
while (offset < fileLength && error.get() == null) {
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
try {
synchronized (mutex) {
inFlightRequests.put(fileToRecover, new FileSession(requestSeqId, fileSession.lastOffset));
requestSeqIdTracker.waitForOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks());
if (error.get() != null) {
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
break;
}
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(),
fileToRecover.length() - fileSession.lastOffset));
final int bytesRequested = Math.toIntExact(
Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset));
offset += bytesRequested;
final GetCcrRestoreFileChunkRequest request =
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested);
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested);
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
fileToRecover.name(), fileSession.lastOffset, bytesRequested);
fileInfo.name(), offset, bytesRequested);
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
ActionListener.wrap(
r -> threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
@ -457,52 +416,26 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
protected void doRun() throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId,
snapshotId, fileToRecover.name(), r.getOffset(), actualChunkSize);
snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
final long newOffset = r.getOffset() + actualChunkSize;
assert r.getOffset() == fileSession.lastOffset;
assert actualChunkSize == bytesRequested;
assert newOffset <= fileToRecover.length();
final boolean lastChunk = newOffset >= fileToRecover.length();
multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(),
lastChunk);
if (lastChunk) {
synchronized (mutex) {
final FileSession removed = inFlightRequests.remove(fileToRecover);
assert removed != null : "session disappeared for " + fileToRecover.name();
assert removed.lastTrackedSeqNo == requestSeqId;
assert removed.lastOffset == fileSession.lastOffset;
}
} else {
synchronized (mutex) {
final FileSession replaced = inFlightRequests.replace(fileToRecover,
new FileSession(requestSeqId, newOffset));
assert replaced != null : "session disappeared for " + fileToRecover.name();
assert replaced.lastTrackedSeqNo == requestSeqId;
assert replaced.lastOffset == fileSession.lastOffset;
}
}
final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength;
multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk);
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
}),
e -> {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
}
));
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);
throw e;
}
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileToRecover.metadata(), e));
break;
}
}
try {
requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo());
} catch (InterruptedException e) {

View File

@ -208,11 +208,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
}
private long readFileBytes(String fileName, BytesReference reference) throws IOException {
Releasable lock = keyedLock.tryAcquire(fileName);
if (lock == null) {
throw new IllegalStateException("can't read from the same file on the same session concurrently");
}
try (Releasable releasable = lock) {
try (Releasable ignored = keyedLock.acquire(fileName)) {
final IndexInput indexInput = cachedInputs.computeIfAbsent(fileName, f -> {
try {
return commitRef.getIndexCommit().getDirectory().openInput(fileName, IOContext.READONCE);