diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 5380a3b2b7f..54751e8958a 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -19,12 +19,9 @@ package org.elasticsearch.index.seqno; +import com.carrotsearch.hppc.LongObjectHashMap; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.index.IndexSettings; - -import java.util.LinkedList; /** * This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all @@ -33,21 +30,16 @@ import java.util.LinkedList; public class LocalCheckpointTracker { /** - * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on - * demand and cleaning up while completed. This constant controls the size of the arrays. + * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple sets allocating them on + * demand and cleaning up while completed. This constant controls the size of the sets. */ - static final int BIT_ARRAYS_SIZE = 1024; + static final int BIT_SET_SIZE = 1024; /** - * An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which - * marks the sequence number the fist bit in the first array corresponds to. + * A collection of bit sets representing pending sequence numbers. Each sequence number is mapped to a bit set by dividing by the + * bit set size. */ - final LinkedList processedSeqNo = new LinkedList<>(); - - /** - * The sequence number that the first bit in the first array corresponds to. - */ - long firstProcessedSeqNo; + final LongObjectHashMap processedSeqNo = new LongObjectHashMap<>(); /** * The current local checkpoint, i.e., all sequence numbers no more than this number have been completed. @@ -77,7 +69,6 @@ public class LocalCheckpointTracker { throw new IllegalArgumentException( "max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); } - firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1; nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; checkpoint = localCheckpoint; } @@ -122,7 +113,6 @@ public class LocalCheckpointTracker { assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; assert checkpoint <= this.checkpoint; processedSeqNo.clear(); - firstProcessedSeqNo = checkpoint + 1; this.checkpoint = checkpoint; } @@ -175,24 +165,28 @@ public class LocalCheckpointTracker { @SuppressForbidden(reason = "Object#notifyAll") private void updateCheckpoint() { assert Thread.holdsLock(this); - assert checkpoint < firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1 : - "checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; - assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : - "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : "updateCheckpoint is called but the bit following the checkpoint is not set"; try { // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words - FixedBitSet current = processedSeqNo.getFirst(); + long bitSetKey = getBitSetKey(checkpoint); + FixedBitSet current = processedSeqNo.get(bitSetKey); + if (current == null) { + // the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set + assert checkpoint % BIT_SET_SIZE == BIT_SET_SIZE - 1; + current = processedSeqNo.get(++bitSetKey); + } do { checkpoint++; - // the checkpoint always falls in the first bit set or just before. If it falls - // on the last bit of the current bit set, we can clean it. - if (checkpoint == firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1) { - processedSeqNo.removeFirst(); - firstProcessedSeqNo += BIT_ARRAYS_SIZE; - assert checkpoint - firstProcessedSeqNo < BIT_ARRAYS_SIZE; - current = processedSeqNo.peekFirst(); + /* + * The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the + * current bit set, we can clean it. + */ + if (checkpoint == lastSeqNoInBitSet(bitSetKey)) { + assert current != null; + final FixedBitSet removed = processedSeqNo.remove(bitSetKey); + assert removed == current; + current = processedSeqNo.get(++bitSetKey); } } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); } finally { @@ -201,37 +195,45 @@ public class LocalCheckpointTracker { } } - /** - * Return the bit array for the provided sequence number, possibly allocating a new array if needed. - * - * @param seqNo the sequence number to obtain the bit array for - * @return the bit array corresponding to the provided sequence number - */ - private FixedBitSet getBitSetForSeqNo(final long seqNo) { - assert Thread.holdsLock(this); - assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; - final long bitSetOffset = (seqNo - firstProcessedSeqNo) / BIT_ARRAYS_SIZE; - if (bitSetOffset > Integer.MAX_VALUE) { - throw new IndexOutOfBoundsException( - "sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]"); - } - while (bitSetOffset >= processedSeqNo.size()) { - processedSeqNo.add(new FixedBitSet(BIT_ARRAYS_SIZE)); - } - return processedSeqNo.get((int) bitSetOffset); + private long lastSeqNoInBitSet(final long bitSetKey) { + return (1 + bitSetKey) * BIT_SET_SIZE - 1; } /** - * Obtain the position in the bit array corresponding to the provided sequence number. The bit array corresponding to the sequence - * number can be obtained via {@link #getBitSetForSeqNo(long)}. + * Return the bit set for the provided sequence number, possibly allocating a new set if needed. + * + * @param seqNo the sequence number to obtain the bit set for + * @return the bit set corresponding to the provided sequence number + */ + private long getBitSetKey(final long seqNo) { + assert Thread.holdsLock(this); + return seqNo / BIT_SET_SIZE; + } + + private FixedBitSet getBitSetForSeqNo(final long seqNo) { + assert Thread.holdsLock(this); + final long bitSetKey = getBitSetKey(seqNo); + final int index = processedSeqNo.indexOf(bitSetKey); + final FixedBitSet bitSet; + if (processedSeqNo.indexExists(index)) { + bitSet = processedSeqNo.indexGet(index); + } else { + bitSet = new FixedBitSet(BIT_SET_SIZE); + processedSeqNo.indexInsert(index, bitSetKey, bitSet); + } + return bitSet; + } + + /** + * Obtain the position in the bit set corresponding to the provided sequence number. The bit set corresponding to the sequence number + * can be obtained via {@link #getBitSetForSeqNo(long)}. * * @param seqNo the sequence number to obtain the position for - * @return the position in the bit array corresponding to the provided sequence number + * @return the position in the bit set corresponding to the provided sequence number */ private int seqNoToBitSetOffset(final long seqNo) { assert Thread.holdsLock(this); - assert seqNo >= firstProcessedSeqNo; - return ((int) (seqNo - firstProcessedSeqNo)) % BIT_ARRAYS_SIZE; + return Math.toIntExact(seqNo % BIT_SET_SIZE); } } diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index ae167bb59f0..eb62391e0b0 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -19,10 +19,14 @@ package org.elasticsearch.index.seqno; +import com.carrotsearch.hppc.LongObjectHashMap; +import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ESTestCase; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.junit.Before; import java.util.ArrayList; @@ -36,8 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_ARRAYS_SIZE; -import static org.hamcrest.Matchers.empty; +import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_SET_SIZE; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.isOneOf; @@ -83,10 +86,19 @@ public class LocalCheckpointTrackerTests extends ESTestCase { assertThat(tracker.getCheckpoint(), equalTo(2L)); } + public void testLazyInitialization() { + /* + * Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large + * sequence numbers this could lead to excessive memory usage resulting in out of memory errors. + */ + tracker.markSeqNoAsCompleted(randomNonNegativeLong()); + assertThat(tracker.processedSeqNo.size(), equalTo(1)); + } + public void testSimpleOverFlow() { List seqNoList = new ArrayList<>(); final boolean aligned = randomBoolean(); - final int maxOps = BIT_ARRAYS_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_ARRAYS_SIZE - 1)); + final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1)); for (int i = 0; i < maxOps; i++) { seqNoList.add(i); @@ -97,7 +109,9 @@ public class LocalCheckpointTrackerTests extends ESTestCase { } assertThat(tracker.checkpoint, equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); + if (aligned == false) { + assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE)); + } } public void testConcurrentPrimary() throws InterruptedException { @@ -138,7 +152,9 @@ public class LocalCheckpointTrackerTests extends ESTestCase { tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); + if (tracker.processedSeqNo.size() == 1) { + assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE)); + } } public void testConcurrentReplica() throws InterruptedException { @@ -186,7 +202,10 @@ public class LocalCheckpointTrackerTests extends ESTestCase { assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); tracker.markSeqNoAsCompleted(unFinishedSeq); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); - assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE)); + assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); + if (tracker.processedSeqNo.size() == 1) { + assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE)); + } } public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException { @@ -241,7 +260,17 @@ public class LocalCheckpointTrackerTests extends ESTestCase { tracker.resetCheckpoint(localCheckpoint); assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); - assertThat(tracker.processedSeqNo, empty()); + assertThat(tracker.processedSeqNo, new BaseMatcher>() { + @Override + public boolean matches(Object item) { + return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty()); + } + + @Override + public void describeTo(Description description) { + description.appendText("empty"); + } + }); assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1))); } }