Revert translog changes introduced for CCR (#31947)
We introduced these changes in #26708 because for CCR. However, CCR now uses Lucene instead of translog. This commit reverts these changes so that we can minimize differences between the ccr and the master branch. Relates ##26708
This commit is contained in:
parent
815faf34fc
commit
4d180175c6
|
@ -588,7 +588,7 @@ public abstract class Engine implements Closeable {
|
|||
* Creates a new translog snapshot from this engine for reading translog operations whose seq# in the provided range.
|
||||
* The caller has to close the returned snapshot after finishing the reading.
|
||||
*/
|
||||
public abstract Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException;
|
||||
public abstract Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException;
|
||||
|
||||
public abstract TranslogStats getTranslogStats();
|
||||
|
||||
|
|
|
@ -344,7 +344,7 @@ public class InternalEngine extends Engine {
|
|||
try (ReleasableLock ignored = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
|
||||
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFrom(localCheckpoint + 1)) {
|
||||
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (operation.seqNo() > localCheckpoint) {
|
||||
|
@ -480,8 +480,8 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Translog.Snapshot newTranslogSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
|
||||
return getTranslog().getSnapshotBetween(minSeqNo, maxSeqNo);
|
||||
public Translog.Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
|
||||
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -493,7 +493,7 @@ public class InternalEngine extends Engine {
|
|||
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
|
||||
return newLuceneChangesSnapshot(source, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
|
||||
} else {
|
||||
return getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE);
|
||||
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2483,7 +2483,7 @@ public class InternalEngine extends Engine {
|
|||
} else {
|
||||
final long currentLocalCheckpoint = getLocalCheckpointTracker().getCheckpoint();
|
||||
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
|
||||
try (Translog.Snapshot snapshot = getTranslog().getSnapshotBetween(startingSeqNo, Long.MAX_VALUE)) {
|
||||
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
|
|
|
@ -1605,7 +1605,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
*/
|
||||
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
|
||||
// TODO: Remove this method after primary-replica resync use soft-deletes
|
||||
return getEngine().newTranslogSnapshotBetween(minSeqNo, Long.MAX_VALUE);
|
||||
return getEngine().newSnapshotFromMinSeqNo(minSeqNo);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -411,9 +411,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
public int estimateTotalOperationsFromMinSeq(long minSeqNo) {
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
return readersBetweenMinAndMaxSeqNo(minSeqNo, Long.MAX_VALUE)
|
||||
.mapToInt(BaseTranslogReader::totalOperations)
|
||||
.sum();
|
||||
return readersAboveMinSeqNo(minSeqNo).mapToInt(BaseTranslogReader::totalOperations).sum();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -602,23 +600,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a snapshot with operations having a sequence number equal to or greater than <code>minSeqNo</code>.
|
||||
*/
|
||||
public Snapshot newSnapshotFrom(long minSeqNo) throws IOException {
|
||||
return getSnapshotBetween(minSeqNo, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a snapshot with operations having a sequence number equal to or greater than <code>minSeqNo</code> and
|
||||
* equal to or lesser than <code>maxSeqNo</code>.
|
||||
*/
|
||||
public Snapshot getSnapshotBetween(long minSeqNo, long maxSeqNo) throws IOException {
|
||||
public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
|
||||
try (ReleasableLock ignored = readLock.acquire()) {
|
||||
ensureOpen();
|
||||
TranslogSnapshot[] snapshots = readersBetweenMinAndMaxSeqNo(minSeqNo, maxSeqNo)
|
||||
.map(BaseTranslogReader::newSnapshot)
|
||||
.toArray(TranslogSnapshot[]::new);
|
||||
TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot)
|
||||
.toArray(TranslogSnapshot[]::new);
|
||||
return newMultiSnapshot(snapshots);
|
||||
}
|
||||
}
|
||||
|
@ -644,14 +630,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
|
||||
private Stream<? extends BaseTranslogReader> readersBetweenMinAndMaxSeqNo(long minSeqNo, long maxSeqNo) {
|
||||
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() ;
|
||||
|
||||
private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo) {
|
||||
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
|
||||
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
|
||||
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
|
||||
return Stream.concat(readers.stream(), Stream.of(current))
|
||||
.filter(reader -> {
|
||||
final Checkpoint checkpoint = reader.getCheckpoint();
|
||||
return checkpoint.maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO ||
|
||||
checkpoint.minSeqNo <= maxSeqNo && checkpoint.maxSeqNo >= minSeqNo;
|
||||
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
|
||||
return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -998,7 +998,7 @@ public class TranslogTests extends ESTestCase {
|
|||
// these are what we expect the snapshot to return (and potentially some more).
|
||||
Set<Translog.Operation> expectedOps = new HashSet<>(writtenOps.keySet());
|
||||
expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView);
|
||||
try (Translog.Snapshot snapshot = translog.newSnapshotFrom(committedLocalCheckpointAtView + 1L)) {
|
||||
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) {
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null) {
|
||||
expectedOps.remove(op);
|
||||
|
@ -2814,7 +2814,7 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps));
|
||||
int readFromSnapshot = 0;
|
||||
try (Translog.Snapshot snapshot = translog.newSnapshotFrom(seqNo)) {
|
||||
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) {
|
||||
assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps));
|
||||
Translog.Operation op;
|
||||
while ((op = snapshot.next()) != null) {
|
||||
|
@ -2831,38 +2831,6 @@ public class TranslogTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testGetSnapshotBetween() throws IOException {
|
||||
final int numOperations = randomIntBetween(2, 8196);
|
||||
final List<Integer> sequenceNumbers = IntStream.range(0, numOperations).boxed().collect(Collectors.toList());
|
||||
Collections.shuffle(sequenceNumbers, random());
|
||||
for (Integer sequenceNumber : sequenceNumbers) {
|
||||
translog.add(new Translog.NoOp(sequenceNumber, 0, "test"));
|
||||
if (rarely()) {
|
||||
translog.rollGeneration();
|
||||
}
|
||||
}
|
||||
translog.rollGeneration();
|
||||
|
||||
final int iters = randomIntBetween(8, 32);
|
||||
for (int iter = 0; iter < iters; iter++) {
|
||||
int min = randomIntBetween(0, numOperations - 1);
|
||||
int max = randomIntBetween(min, numOperations);
|
||||
try (Translog.Snapshot snapshot = translog.getSnapshotBetween(min, max)) {
|
||||
final List<Translog.Operation> operations = new ArrayList<>();
|
||||
for (Translog.Operation operation = snapshot.next(); operation != null; operation = snapshot.next()) {
|
||||
if (operation.seqNo() >= min && operation.seqNo() <= max) {
|
||||
operations.add(operation);
|
||||
}
|
||||
}
|
||||
operations.sort(Comparator.comparingLong(Translog.Operation::seqNo));
|
||||
Iterator<Translog.Operation> iterator = operations.iterator();
|
||||
for (long expectedSeqNo = min; expectedSeqNo < max; expectedSeqNo++) {
|
||||
assertThat(iterator.next().seqNo(), equalTo(expectedSeqNo));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testSimpleCommit() throws IOException {
|
||||
final int operations = randomIntBetween(1, 4096);
|
||||
long seqNo = 0;
|
||||
|
|
Loading…
Reference in New Issue