Account trimAboveSeqNo in committed translog generation (#50205)

Today we do not consider trimAboveSeqNo when calculating the translog 
generation of an index commit. If there is no new indexing after the
primary promotion, then we won't be able to clean up the translog.
This commit is contained in:
Nhat Nguyen 2019-12-16 11:08:00 -05:00
parent be78d5cc74
commit 731bfa6614
3 changed files with 18 additions and 6 deletions

View File

@ -120,6 +120,17 @@ final class Checkpoint {
out.writeLong(trimmedAboveSeqNo);
}
/**
* Returns the maximum sequence number of operations in this checkpoint after applying {@link #trimmedAboveSeqNo}.
*/
long maxEffectiveSeqNo() {
if (trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
return maxSeqNo;
} else {
return Math.min(trimmedAboveSeqNo, maxSeqNo);
}
}
static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint,
long minTranslogGeneration) {
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;

View File

@ -702,11 +702,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
return Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> {
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
});
return Stream.concat(readers.stream(), Stream.of(current)).filter(reader -> minSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo());
}
/**
@ -1638,7 +1634,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
long minTranslogFileGeneration = this.currentFileGeneration();
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxSeqNo) {
if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) {
minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
}
}

View File

@ -803,6 +803,11 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback);
done.set(true);
thread.join();
for (IndexShard shard : shards) {
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0));
}
}
}