* fix local processed checkpoint update (#2576) Signed-off-by: Poojita Raj <poojiraj@amazon.com> * separated tests + wrapper function Signed-off-by: Poojita Raj <poojiraj@amazon.com> * moved tests + compareAndSet change Signed-off-by: Poojita Raj <poojiraj@amazon.com>
This commit is contained in:
parent
3c5d997a76
commit
c4b684d3ae
|
@ -33,6 +33,7 @@
|
||||||
package org.opensearch.index.seqno;
|
package org.opensearch.index.seqno;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.LongObjectHashMap;
|
import com.carrotsearch.hppc.LongObjectHashMap;
|
||||||
|
import org.opensearch.common.Nullable;
|
||||||
import org.opensearch.common.SuppressForbidden;
|
import org.opensearch.common.SuppressForbidden;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -116,6 +117,13 @@ public class LocalCheckpointTracker {
|
||||||
nextSeqNo.accumulateAndGet(seqNo + 1, Math::max);
|
nextSeqNo.accumulateAndGet(seqNo + 1, Math::max);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks that the sequence number is in an acceptable range for an update to take place.
|
||||||
|
*/
|
||||||
|
private boolean shouldUpdateSeqNo(final long seqNo, final long lowerBound, @Nullable final AtomicLong upperBound) {
|
||||||
|
return !((seqNo <= lowerBound) || (upperBound != null && seqNo > upperBound.get()));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks the provided sequence number as processed and updates the processed checkpoint if possible.
|
* Marks the provided sequence number as processed and updates the processed checkpoint if possible.
|
||||||
*
|
*
|
||||||
|
@ -134,11 +142,29 @@ public class LocalCheckpointTracker {
|
||||||
markSeqNo(seqNo, persistedCheckpoint, persistedSeqNo);
|
markSeqNo(seqNo, persistedCheckpoint, persistedSeqNo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the processed sequence checkpoint to the given value.
|
||||||
|
*
|
||||||
|
* This method is only used for segment replication since indexing doesn't
|
||||||
|
* take place on the replica allowing us to avoid the check that all sequence numbers
|
||||||
|
* are consecutively processed.
|
||||||
|
*
|
||||||
|
* @param seqNo the sequence number to mark as processed
|
||||||
|
*/
|
||||||
|
public synchronized void fastForwardProcessedSeqNo(final long seqNo) {
|
||||||
|
advanceMaxSeqNo(seqNo);
|
||||||
|
final long currentProcessedCheckpoint = processedCheckpoint.get();
|
||||||
|
if (shouldUpdateSeqNo(seqNo, currentProcessedCheckpoint, persistedCheckpoint) == false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
processedCheckpoint.compareAndSet(currentProcessedCheckpoint, seqNo);
|
||||||
|
}
|
||||||
|
|
||||||
private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap<CountedBitSet> bitSetMap) {
|
private void markSeqNo(final long seqNo, final AtomicLong checkPoint, final LongObjectHashMap<CountedBitSet> bitSetMap) {
|
||||||
assert Thread.holdsLock(this);
|
assert Thread.holdsLock(this);
|
||||||
// make sure we track highest seen sequence number
|
// make sure we track highest seen sequence number
|
||||||
advanceMaxSeqNo(seqNo);
|
advanceMaxSeqNo(seqNo);
|
||||||
if (seqNo <= checkPoint.get()) {
|
if (shouldUpdateSeqNo(seqNo, checkPoint.get(), null) == false) {
|
||||||
// this is possible during recovery where we might replay an operation that was also replicated
|
// this is possible during recovery where we might replay an operation that was also replicated
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -331,4 +331,60 @@ public class LocalCheckpointTrackerTests extends OpenSearchTestCase {
|
||||||
final long seqNo = randomNonNegativeLong();
|
final long seqNo = randomNonNegativeLong();
|
||||||
assertThat(tracker.hasProcessed(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo)));
|
assertThat(tracker.hasProcessed(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testFastForwardProcessedNoPersistentUpdate() {
|
||||||
|
// base case with no persistent checkpoint update
|
||||||
|
long seqNo1;
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
|
||||||
|
seqNo1 = tracker.generateSeqNo();
|
||||||
|
assertThat(seqNo1, equalTo(0L));
|
||||||
|
tracker.fastForwardProcessedSeqNo(seqNo1);
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFastForwardProcessedPersistentUpdate() {
|
||||||
|
// base case with persistent checkpoint update
|
||||||
|
long seqNo1;
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
|
||||||
|
seqNo1 = tracker.generateSeqNo();
|
||||||
|
assertThat(seqNo1, equalTo(0L));
|
||||||
|
|
||||||
|
tracker.markSeqNoAsPersisted(seqNo1);
|
||||||
|
assertThat(tracker.getPersistedCheckpoint(), equalTo(0L));
|
||||||
|
tracker.fastForwardProcessedSeqNo(seqNo1);
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
|
||||||
|
assertThat(tracker.hasProcessed(0L), equalTo(true));
|
||||||
|
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));
|
||||||
|
|
||||||
|
// idempotent case
|
||||||
|
tracker.fastForwardProcessedSeqNo(seqNo1);
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
|
||||||
|
assertThat(tracker.hasProcessed(0L), equalTo(true));
|
||||||
|
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testFastForwardProcessedPersistentUpdate2() {
|
||||||
|
long seqNo1, seqNo2;
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
|
||||||
|
seqNo1 = tracker.generateSeqNo();
|
||||||
|
seqNo2 = tracker.generateSeqNo();
|
||||||
|
assertThat(seqNo1, equalTo(0L));
|
||||||
|
assertThat(seqNo2, equalTo(1L));
|
||||||
|
tracker.markSeqNoAsPersisted(seqNo1);
|
||||||
|
tracker.markSeqNoAsPersisted(seqNo2);
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
|
||||||
|
assertThat(tracker.getPersistedCheckpoint(), equalTo(1L));
|
||||||
|
|
||||||
|
tracker.fastForwardProcessedSeqNo(seqNo2);
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
|
||||||
|
assertThat(tracker.hasProcessed(seqNo1), equalTo(true));
|
||||||
|
assertThat(tracker.hasProcessed(seqNo2), equalTo(true));
|
||||||
|
|
||||||
|
tracker.fastForwardProcessedSeqNo(seqNo1);
|
||||||
|
assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
|
||||||
|
assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true));
|
||||||
|
assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false));
|
||||||
|
assertThat(tracker.getMaxSeqNo(), equalTo(1L));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue