diff --git a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index 523e24d8c57..7170a5fe586 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -120,6 +120,17 @@ final class Checkpoint { out.writeLong(trimmedAboveSeqNo); } + /** + * Returns the maximum sequence number of operations in this checkpoint after applying {@link #trimmedAboveSeqNo}. + */ + long maxEffectiveSeqNo() { + if (trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + return maxSeqNo; + } else { + return Math.min(trimmedAboveSeqNo, maxSeqNo); + } + } + static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint, long minTranslogGeneration) { final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 8b925edce79..dabd6829f54 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -702,11 +702,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : "callers of readersAboveMinSeqNo must hold a lock: readLock [" + readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]"; - return Stream.concat(readers.stream(), Stream.of(current)) - .filter(reader -> { - final long maxSeqNo = reader.getCheckpoint().maxSeqNo; - return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo; - }); + return Stream.concat(readers.stream(), Stream.of(current)).filter(reader -> minSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo()); } /** @@ -1638,7 +1634,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC */ long minTranslogFileGeneration = this.currentFileGeneration(); for (final TranslogReader reader : readers) { - if (seqNo <= reader.getCheckpoint().maxSeqNo) { + if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index a44c64c3bc1..8df9046a4c6 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -803,6 +803,11 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback); done.set(true); thread.join(); + + for (IndexShard shard : shards) { + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0)); + } } }