diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 356dcd1439b..d613200531c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -402,7 +402,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit break; } final FileInfo fileToRecover; - final FileSession fileSession; + final FileSession prevFileSession; synchronized (mutex) { if (inFlightRequests.isEmpty() && remainingFiles.isEmpty()) { break; @@ -418,15 +418,17 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit } final Map.Entry minEntry = inFlightRequests.entrySet().stream().min(Comparator.comparingLong(e -> e.getValue().lastTrackedSeqNo)).get(); - fileSession = minEntry.getValue(); + prevFileSession = minEntry.getValue(); fileToRecover = minEntry.getKey(); } try { - requestSeqIdTracker.waitForOpsToComplete(fileSession.lastTrackedSeqNo); + requestSeqIdTracker.waitForOpsToComplete(prevFileSession.lastTrackedSeqNo); + final FileSession fileSession; synchronized (mutex) { + fileSession = inFlightRequests.get(fileToRecover); // if file has been removed in the mean-while, it means that restore of this file completed, so start working // on the next one - if (inFlightRequests.containsKey(fileToRecover) == false) { + if (fileSession == null) { continue; } } @@ -439,7 +441,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit fileToRecover.length() - fileSession.lastOffset)); final GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileToRecover.name(), bytesRequested); - logger.trace("[{}] [{}] fetching chunk for file [{}]", shardId, snapshotId, fileToRecover.name()); + logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId, + fileToRecover.name(), fileSession.lastOffset, bytesRequested); remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, ActionListener.wrap( @@ -453,23 +456,32 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override protected void doRun() throws Exception { final int actualChunkSize = r.getChunk().length(); + logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId, + snapshotId, fileToRecover.name(), r.getOffset(), actualChunkSize); final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize); throttleListener.accept(nanosPaused); final long newOffset = r.getOffset() + actualChunkSize; + + assert r.getOffset() == fileSession.lastOffset; + assert actualChunkSize == bytesRequested; assert newOffset <= fileToRecover.length(); final boolean lastChunk = newOffset >= fileToRecover.length(); multiFileWriter.writeFileChunk(fileToRecover.metadata(), r.getOffset(), r.getChunk(), lastChunk); if (lastChunk) { synchronized (mutex) { - final FileSession session = inFlightRequests.remove(fileToRecover); - assert session != null : "session disappeared for " + fileToRecover.name(); + final FileSession removed = inFlightRequests.remove(fileToRecover); + assert removed != null : "session disappeared for " + fileToRecover.name(); + assert removed.lastTrackedSeqNo == requestSeqId; + assert removed.lastOffset == fileSession.lastOffset; } } else { synchronized (mutex) { final FileSession replaced = inFlightRequests.replace(fileToRecover, new FileSession(requestSeqId, newOffset)); assert replaced != null : "session disappeared for " + fileToRecover.name(); + assert replaced.lastTrackedSeqNo == requestSeqId; + assert replaced.lastOffset == fileSession.lastOffset; } } requestSeqIdTracker.markSeqNoAsCompleted(requestSeqId); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index d5c3304e966..ade96b46141 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -106,6 +106,12 @@ public class IndexFollowingIT extends CcrIntegTestCase { public void testFollowIndex() throws Exception { final int numberOfPrimaryShards = randomIntBetween(1, 3); int numberOfReplicas = between(0, 1); + + followerClient().admin().cluster().prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), + new ByteSizeValue(randomIntBetween(1, 1000), ByteSizeUnit.KB))) + .get(); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, numberOfReplicas, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));