From 5bc2f390a5d0e3eeef29a4f242a474bfc34af696 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 13 Dec 2017 11:10:57 -0500 Subject: [PATCH] Use CountedBitSet in LocalCheckpointTracker (#27793) The CountedBitSet can automatically release its internal bitsets when all bits are set to reduce memory usage. This structure can work well for sequence numbers as these numbers are likely to form contiguous ranges. This commit replaces FixedBitSet by CountedBitSet in LocalCheckpointTracker. --- .../{translog => seqno}/CountedBitSet.java | 10 +++++----- .../index/seqno/LocalCheckpointTracker.java | 18 +++++++++--------- .../index/translog/MultiSnapshot.java | 1 + .../CountedBitSetTests.java | 2 +- .../seqno/LocalCheckpointTrackerTests.java | 4 ++-- 5 files changed, 18 insertions(+), 17 deletions(-) rename core/src/main/java/org/elasticsearch/index/{translog => seqno}/CountedBitSet.java (93%) rename core/src/test/java/org/elasticsearch/index/{translog => seqno}/CountedBitSetTests.java (99%) diff --git a/core/src/main/java/org/elasticsearch/index/translog/CountedBitSet.java b/core/src/main/java/org/elasticsearch/index/seqno/CountedBitSet.java similarity index 93% rename from core/src/main/java/org/elasticsearch/index/translog/CountedBitSet.java rename to core/src/main/java/org/elasticsearch/index/seqno/CountedBitSet.java index ca1ae279a99..54270de1b01 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/CountedBitSet.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/CountedBitSet.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.translog; +package org.elasticsearch.index.seqno; import org.apache.lucene.util.BitSet; import org.apache.lucene.util.FixedBitSet; @@ -25,15 +25,15 @@ import org.apache.lucene.util.RamUsageEstimator; /** * A {@link CountedBitSet} wraps a {@link FixedBitSet} but automatically releases the internal bitset - * when all bits are set to reduce memory usage. This structure can work well for sequence numbers - * from translog as these numbers are likely to form contiguous ranges (eg. filling all bits). + * when all bits are set to reduce memory usage. This structure can work well for sequence numbers as + * these numbers are likely to form contiguous ranges (eg. filling all bits). */ -final class CountedBitSet extends BitSet { +public final class CountedBitSet extends BitSet { static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CountedBitSet.class); private short onBits; // Number of bits are set. private FixedBitSet bitset; - CountedBitSet(short numBits) { + public CountedBitSet(short numBits) { if (numBits <= 0) { throw new IllegalArgumentException("Number of bits must be positive. Given [" + numBits + "]"); } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 54751e8958a..2644dbfc32d 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.seqno; import com.carrotsearch.hppc.LongObjectHashMap; -import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.BitSet; import org.elasticsearch.common.SuppressForbidden; /** @@ -33,13 +33,13 @@ public class LocalCheckpointTracker { * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple sets allocating them on * demand and cleaning up while completed. This constant controls the size of the sets. */ - static final int BIT_SET_SIZE = 1024; + static final short BIT_SET_SIZE = 1024; /** * A collection of bit sets representing pending sequence numbers. Each sequence number is mapped to a bit set by dividing by the * bit set size. */ - final LongObjectHashMap processedSeqNo = new LongObjectHashMap<>(); + final LongObjectHashMap processedSeqNo = new LongObjectHashMap<>(); /** * The current local checkpoint, i.e., all sequence numbers no more than this number have been completed. @@ -96,7 +96,7 @@ public class LocalCheckpointTracker { // this is possible during recovery where we might replay an operation that was also replicated return; } - final FixedBitSet bitSet = getBitSetForSeqNo(seqNo); + final BitSet bitSet = getBitSetForSeqNo(seqNo); final int offset = seqNoToBitSetOffset(seqNo); bitSet.set(offset); if (seqNo == checkpoint + 1) { @@ -170,7 +170,7 @@ public class LocalCheckpointTracker { try { // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words long bitSetKey = getBitSetKey(checkpoint); - FixedBitSet current = processedSeqNo.get(bitSetKey); + BitSet current = processedSeqNo.get(bitSetKey); if (current == null) { // the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set assert checkpoint % BIT_SET_SIZE == BIT_SET_SIZE - 1; @@ -184,7 +184,7 @@ public class LocalCheckpointTracker { */ if (checkpoint == lastSeqNoInBitSet(bitSetKey)) { assert current != null; - final FixedBitSet removed = processedSeqNo.remove(bitSetKey); + final BitSet removed = processedSeqNo.remove(bitSetKey); assert removed == current; current = processedSeqNo.get(++bitSetKey); } @@ -210,15 +210,15 @@ public class LocalCheckpointTracker { return seqNo / BIT_SET_SIZE; } - private FixedBitSet getBitSetForSeqNo(final long seqNo) { + private BitSet getBitSetForSeqNo(final long seqNo) { assert Thread.holdsLock(this); final long bitSetKey = getBitSetKey(seqNo); final int index = processedSeqNo.indexOf(bitSetKey); - final FixedBitSet bitSet; + final BitSet bitSet; if (processedSeqNo.indexExists(index)) { bitSet = processedSeqNo.indexGet(index); } else { - bitSet = new FixedBitSet(BIT_SET_SIZE); + bitSet = new CountedBitSet(BIT_SET_SIZE); processedSeqNo.indexInsert(index, bitSetKey, bitSet); } return bitSet; diff --git a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java index e4bfbdcd42e..910d71a51a0 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/translog/MultiSnapshot.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.translog; import com.carrotsearch.hppc.LongObjectHashMap; import org.apache.lucene.util.BitSet; +import org.elasticsearch.index.seqno.CountedBitSet; import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.Closeable; diff --git a/core/src/test/java/org/elasticsearch/index/translog/CountedBitSetTests.java b/core/src/test/java/org/elasticsearch/index/seqno/CountedBitSetTests.java similarity index 99% rename from core/src/test/java/org/elasticsearch/index/translog/CountedBitSetTests.java rename to core/src/test/java/org/elasticsearch/index/seqno/CountedBitSetTests.java index b68607f02d6..b014f827406 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/CountedBitSetTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/CountedBitSetTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.index.translog; +package org.elasticsearch.index.seqno; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.test.ESTestCase; diff --git a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index eb62391e0b0..31b8c23bf1c 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.index.seqno; import com.carrotsearch.hppc.LongObjectHashMap; -import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.BitSet; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -260,7 +260,7 @@ public class LocalCheckpointTrackerTests extends ESTestCase { tracker.resetCheckpoint(localCheckpoint); assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); - assertThat(tracker.processedSeqNo, new BaseMatcher>() { + assertThat(tracker.processedSeqNo, new BaseMatcher>() { @Override public boolean matches(Object item) { return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());