Lazy initialize checkpoint tracker bit sets

This local checkpoint tracker uses collections of bit sets to track
which sequence numbers are complete, eventually removing these bit sets
when the local checkpoint advances. However, these bit sets were eagerly
allocated so that if a sequence number far ahead of the checkpoint was
marked as completed, all bit sets between the "last" bit set and the bit
set needed to track the marked sequence number were allocated. If this
sequence number was too far ahead, the memory requirements could be
excessive. This commit opts for a different strategy for holding on to
these bit sets and enables them to be lazily allocated.

Relates #27179
This commit is contained in:
Jason Tedor 2017-11-01 21:26:52 -04:00 committed by GitHub
parent 90d6317437
commit 59657ad1cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 90 additions and 59 deletions

View File

@ -19,12 +19,9 @@
package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.index.IndexSettings;
import java.util.LinkedList;
/**
* This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
@ -33,21 +30,16 @@ import java.util.LinkedList;
public class LocalCheckpointTracker {
/**
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
* demand and cleaning up while completed. This constant controls the size of the arrays.
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple sets allocating them on
* demand and cleaning up while completed. This constant controls the size of the sets.
*/
static final int BIT_ARRAYS_SIZE = 1024;
static final int BIT_SET_SIZE = 1024;
/**
* An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which
* marks the sequence number the fist bit in the first array corresponds to.
* A collection of bit sets representing pending sequence numbers. Each sequence number is mapped to a bit set by dividing by the
* bit set size.
*/
final LinkedList<FixedBitSet> processedSeqNo = new LinkedList<>();
/**
* The sequence number that the first bit in the first array corresponds to.
*/
long firstProcessedSeqNo;
final LongObjectHashMap<FixedBitSet> processedSeqNo = new LongObjectHashMap<>();
/**
* The current local checkpoint, i.e., all sequence numbers no more than this number have been completed.
@ -77,7 +69,6 @@ public class LocalCheckpointTracker {
throw new IllegalArgumentException(
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
}
firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
checkpoint = localCheckpoint;
}
@ -122,7 +113,6 @@ public class LocalCheckpointTracker {
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
this.checkpoint = checkpoint;
}
@ -175,24 +165,28 @@ public class LocalCheckpointTracker {
@SuppressForbidden(reason = "Object#notifyAll")
private void updateCheckpoint() {
assert Thread.holdsLock(this);
assert checkpoint < firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1 :
"checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() :
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
"updateCheckpoint is called but the bit following the checkpoint is not set";
try {
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
FixedBitSet current = processedSeqNo.getFirst();
long bitSetKey = getBitSetKey(checkpoint);
FixedBitSet current = processedSeqNo.get(bitSetKey);
if (current == null) {
// the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set
assert checkpoint % BIT_SET_SIZE == BIT_SET_SIZE - 1;
current = processedSeqNo.get(++bitSetKey);
}
do {
checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1) {
processedSeqNo.removeFirst();
firstProcessedSeqNo += BIT_ARRAYS_SIZE;
assert checkpoint - firstProcessedSeqNo < BIT_ARRAYS_SIZE;
current = processedSeqNo.peekFirst();
/*
* The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the
* current bit set, we can clean it.
*/
if (checkpoint == lastSeqNoInBitSet(bitSetKey)) {
assert current != null;
final FixedBitSet removed = processedSeqNo.remove(bitSetKey);
assert removed == current;
current = processedSeqNo.get(++bitSetKey);
}
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
} finally {
@ -201,37 +195,45 @@ public class LocalCheckpointTracker {
}
}
/**
* Return the bit array for the provided sequence number, possibly allocating a new array if needed.
*
* @param seqNo the sequence number to obtain the bit array for
* @return the bit array corresponding to the provided sequence number
*/
private FixedBitSet getBitSetForSeqNo(final long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
final long bitSetOffset = (seqNo - firstProcessedSeqNo) / BIT_ARRAYS_SIZE;
if (bitSetOffset > Integer.MAX_VALUE) {
throw new IndexOutOfBoundsException(
"sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]");
}
while (bitSetOffset >= processedSeqNo.size()) {
processedSeqNo.add(new FixedBitSet(BIT_ARRAYS_SIZE));
}
return processedSeqNo.get((int) bitSetOffset);
private long lastSeqNoInBitSet(final long bitSetKey) {
return (1 + bitSetKey) * BIT_SET_SIZE - 1;
}
/**
* Obtain the position in the bit array corresponding to the provided sequence number. The bit array corresponding to the sequence
* number can be obtained via {@link #getBitSetForSeqNo(long)}.
* Return the bit set for the provided sequence number, possibly allocating a new set if needed.
*
* @param seqNo the sequence number to obtain the bit set for
* @return the bit set corresponding to the provided sequence number
*/
private long getBitSetKey(final long seqNo) {
assert Thread.holdsLock(this);
return seqNo / BIT_SET_SIZE;
}
private FixedBitSet getBitSetForSeqNo(final long seqNo) {
assert Thread.holdsLock(this);
final long bitSetKey = getBitSetKey(seqNo);
final int index = processedSeqNo.indexOf(bitSetKey);
final FixedBitSet bitSet;
if (processedSeqNo.indexExists(index)) {
bitSet = processedSeqNo.indexGet(index);
} else {
bitSet = new FixedBitSet(BIT_SET_SIZE);
processedSeqNo.indexInsert(index, bitSetKey, bitSet);
}
return bitSet;
}
/**
* Obtain the position in the bit set corresponding to the provided sequence number. The bit set corresponding to the sequence number
* can be obtained via {@link #getBitSetForSeqNo(long)}.
*
* @param seqNo the sequence number to obtain the position for
* @return the position in the bit array corresponding to the provided sequence number
* @return the position in the bit set corresponding to the provided sequence number
*/
private int seqNoToBitSetOffset(final long seqNo) {
assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo;
return ((int) (seqNo - firstProcessedSeqNo)) % BIT_ARRAYS_SIZE;
return Math.toIntExact(seqNo % BIT_SET_SIZE);
}
}

View File

@ -19,10 +19,14 @@
package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Before;
import java.util.ArrayList;
@ -36,8 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_ARRAYS_SIZE;
import static org.hamcrest.Matchers.empty;
import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_SET_SIZE;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
@ -83,10 +86,19 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
assertThat(tracker.getCheckpoint(), equalTo(2L));
}
public void testLazyInitialization() {
/*
* Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
*/
tracker.markSeqNoAsCompleted(randomNonNegativeLong());
assertThat(tracker.processedSeqNo.size(), equalTo(1));
}
public void testSimpleOverFlow() {
List<Integer> seqNoList = new ArrayList<>();
final boolean aligned = randomBoolean();
final int maxOps = BIT_ARRAYS_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_ARRAYS_SIZE - 1));
final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1));
for (int i = 0; i < maxOps; i++) {
seqNoList.add(i);
@ -97,7 +109,9 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
}
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
if (aligned == false) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
}
public void testConcurrentPrimary() throws InterruptedException {
@ -138,7 +152,9 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
}
public void testConcurrentReplica() throws InterruptedException {
@ -186,7 +202,10 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
}
public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {
@ -241,7 +260,17 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, empty());
assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<FixedBitSet>>() {
@Override
public boolean matches(Object item) {
return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());
}
@Override
public void describeTo(Description description) {
description.appendText("empty");
}
});
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}
}