Use CountedBitSet in LocalCheckpointTracker (#27793)

The CountedBitSet can automatically release its internal bitsets when
all bits are set to reduce memory usage. This structure can work well
for sequence numbers as these numbers are likely to form contiguous
ranges. This commit replaces FixedBitSet by CountedBitSet in
LocalCheckpointTracker.
This commit is contained in:
Nhat Nguyen 2017-12-13 11:10:57 -05:00 committed by GitHub
parent 442c3b8bcf
commit 5bc2f390a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 18 additions and 17 deletions

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog; package org.elasticsearch.index.seqno;
import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
@ -25,15 +25,15 @@ import org.apache.lucene.util.RamUsageEstimator;
/** /**
* A {@link CountedBitSet} wraps a {@link FixedBitSet} but automatically releases the internal bitset * A {@link CountedBitSet} wraps a {@link FixedBitSet} but automatically releases the internal bitset
* when all bits are set to reduce memory usage. This structure can work well for sequence numbers * when all bits are set to reduce memory usage. This structure can work well for sequence numbers as
* from translog as these numbers are likely to form contiguous ranges (eg. filling all bits). * these numbers are likely to form contiguous ranges (eg. filling all bits).
*/ */
final class CountedBitSet extends BitSet { public final class CountedBitSet extends BitSet {
static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CountedBitSet.class); static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CountedBitSet.class);
private short onBits; // Number of bits are set. private short onBits; // Number of bits are set.
private FixedBitSet bitset; private FixedBitSet bitset;
CountedBitSet(short numBits) { public CountedBitSet(short numBits) {
if (numBits <= 0) { if (numBits <= 0) {
throw new IllegalArgumentException("Number of bits must be positive. Given [" + numBits + "]"); throw new IllegalArgumentException("Number of bits must be positive. Given [" + numBits + "]");
} }

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.seqno; package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.LongObjectHashMap; import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.BitSet;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
/** /**
@ -33,13 +33,13 @@ public class LocalCheckpointTracker {
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple sets allocating them on * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple sets allocating them on
* demand and cleaning up while completed. This constant controls the size of the sets. * demand and cleaning up while completed. This constant controls the size of the sets.
*/ */
static final int BIT_SET_SIZE = 1024; static final short BIT_SET_SIZE = 1024;
/** /**
* A collection of bit sets representing pending sequence numbers. Each sequence number is mapped to a bit set by dividing by the * A collection of bit sets representing pending sequence numbers. Each sequence number is mapped to a bit set by dividing by the
* bit set size. * bit set size.
*/ */
final LongObjectHashMap<FixedBitSet> processedSeqNo = new LongObjectHashMap<>(); final LongObjectHashMap<BitSet> processedSeqNo = new LongObjectHashMap<>();
/** /**
* The current local checkpoint, i.e., all sequence numbers no more than this number have been completed. * The current local checkpoint, i.e., all sequence numbers no more than this number have been completed.
@ -96,7 +96,7 @@ public class LocalCheckpointTracker {
// this is possible during recovery where we might replay an operation that was also replicated // this is possible during recovery where we might replay an operation that was also replicated
return; return;
} }
final FixedBitSet bitSet = getBitSetForSeqNo(seqNo); final BitSet bitSet = getBitSetForSeqNo(seqNo);
final int offset = seqNoToBitSetOffset(seqNo); final int offset = seqNoToBitSetOffset(seqNo);
bitSet.set(offset); bitSet.set(offset);
if (seqNo == checkpoint + 1) { if (seqNo == checkpoint + 1) {
@ -170,7 +170,7 @@ public class LocalCheckpointTracker {
try { try {
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
long bitSetKey = getBitSetKey(checkpoint); long bitSetKey = getBitSetKey(checkpoint);
FixedBitSet current = processedSeqNo.get(bitSetKey); BitSet current = processedSeqNo.get(bitSetKey);
if (current == null) { if (current == null) {
// the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set // the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set
assert checkpoint % BIT_SET_SIZE == BIT_SET_SIZE - 1; assert checkpoint % BIT_SET_SIZE == BIT_SET_SIZE - 1;
@ -184,7 +184,7 @@ public class LocalCheckpointTracker {
*/ */
if (checkpoint == lastSeqNoInBitSet(bitSetKey)) { if (checkpoint == lastSeqNoInBitSet(bitSetKey)) {
assert current != null; assert current != null;
final FixedBitSet removed = processedSeqNo.remove(bitSetKey); final BitSet removed = processedSeqNo.remove(bitSetKey);
assert removed == current; assert removed == current;
current = processedSeqNo.get(++bitSetKey); current = processedSeqNo.get(++bitSetKey);
} }
@ -210,15 +210,15 @@ public class LocalCheckpointTracker {
return seqNo / BIT_SET_SIZE; return seqNo / BIT_SET_SIZE;
} }
private FixedBitSet getBitSetForSeqNo(final long seqNo) { private BitSet getBitSetForSeqNo(final long seqNo) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
final long bitSetKey = getBitSetKey(seqNo); final long bitSetKey = getBitSetKey(seqNo);
final int index = processedSeqNo.indexOf(bitSetKey); final int index = processedSeqNo.indexOf(bitSetKey);
final FixedBitSet bitSet; final BitSet bitSet;
if (processedSeqNo.indexExists(index)) { if (processedSeqNo.indexExists(index)) {
bitSet = processedSeqNo.indexGet(index); bitSet = processedSeqNo.indexGet(index);
} else { } else {
bitSet = new FixedBitSet(BIT_SET_SIZE); bitSet = new CountedBitSet(BIT_SET_SIZE);
processedSeqNo.indexInsert(index, bitSetKey, bitSet); processedSeqNo.indexInsert(index, bitSetKey, bitSet);
} }
return bitSet; return bitSet;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.translog;
import com.carrotsearch.hppc.LongObjectHashMap; import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BitSet;
import org.elasticsearch.index.seqno.CountedBitSet;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.Closeable; import java.io.Closeable;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.index.translog; package org.elasticsearch.index.seqno;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.seqno; package org.elasticsearch.index.seqno;
import com.carrotsearch.hppc.LongObjectHashMap; import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.BitSet;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -260,7 +260,7 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
tracker.resetCheckpoint(localCheckpoint); tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<FixedBitSet>>() { assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<BitSet>>() {
@Override @Override
public boolean matches(Object item) { public boolean matches(Object item) {
return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty()); return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());