Fix CCR concurrent file chunk fetching bug (#38736)

Fixes a bug with concurrent file chunk fetching during recovery from remote where the wrong offset
was used.
This commit is contained in:
Yannick Welsch 2019-02-11 19:15:04 +01:00
parent 078da6d9bd
commit bafc709326
2 changed files with 25 additions and 7 deletions

View File

@ -402,7 +402,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
break;
}
final FileInfo fileToRecover;
final FileSession fileSession;
final FileSession prevFileSession;
synchronized (mutex) {
if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) {
break;
@ -418,15 +418,17 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
}
final Map.Entry<FileInfo, FileSession> minEntry =
inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get();
fileSession = minEntry.getValue();
prevFileSession = minEntry.getValue();
fileToRecover = minEntry.getKey();
}
try {
requestSeqIdTracker.waitForOpsToComplete(fileSession.lastTrackedSeqNo);
requestSeqIdTracker.waitForOpsToComplete(prevFileSession.lastTrackedSeqNo);
final FileSession fileSession;
synchronized (mutex) {
fileSession = inFlightRequests.get(fileToRecover);
// if file has been removed in the mean-while, it means that restore of this file completed, so start working
// on the next one
if (inFlightRequests.containsKey(fileToRecover) == false) {
if (fileSession == null) {
continue;
}
}
@ -439,7 +441,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
fileToRecover.length() - fileSession.lastOffset));
final GetCcrRestoreFileChunkRequest request =
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested);
logger.trace("[{}] [{}] fetching chunk for file [{}]", shardId, snapshotId, fileToRecover.name());
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
fileToRecover.name(), fileSession.lastOffset, bytesRequested);
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request,
ActionListener.wrap(
@ -453,23 +456,32 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
protected void doRun() throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId,
snapshotId, fileToRecover.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
final long newOffset = r.getOffset() + actualChunkSize;
assert r.getOffset() == fileSession.lastOffset;
assert actualChunkSize == bytesRequested;
assert newOffset <= fileToRecover.length();
final boolean lastChunk = newOffset >= fileToRecover.length();
multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(),
lastChunk);
if (lastChunk) {
synchronized (mutex) {
final FileSession session = inFlightRequests.remove(fileToRecover);
assert session != null : "session disappeared for " + fileToRecover.name();
final FileSession removed = inFlightRequests.remove(fileToRecover);
assert removed != null : "session disappeared for " + fileToRecover.name();
assert removed.lastTrackedSeqNo == requestSeqId;
assert removed.lastOffset == fileSession.lastOffset;
}
} else {
synchronized (mutex) {
final FileSession replaced = inFlightRequests.replace(fileToRecover,
new FileSession(requestSeqId, newOffset));
assert replaced != null : "session disappeared for " + fileToRecover.name();
assert replaced.lastTrackedSeqNo == requestSeqId;
assert replaced.lastOffset == fileSession.lastOffset;
}
}
requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId);

View File

@ -106,6 +106,12 @@ public class IndexFollowingIT extends CcrIntegTestCase {
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
int numberOfReplicas = between(0, 1);
followerClient().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(),
new ByteSizeValue(randomIntBetween(1, 1000), ByteSizeUnit.KB)))
.get();
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));