Simplify MultiSnapshot#SeqNoset (#27547)
Today, we maintain two sets in a SeqNoSet: ongoing sets and completed sets. We can remove the completed sets and use only the ongoing sets by releasing the internal bitset of a CountedBitSet when all its bits are set. This behaves like two sets but simpler. This commit also makes CountedBitSet as a drop-in replacement for BitSet. Relates #27268
This commit is contained in:
parent
a880bbd57d
commit
49df50f662
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.translog;
|
||||
|
||||
import org.apache.lucene.util.BitSet;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
|
||||
/**
|
||||
* A {@link CountedBitSet} wraps a {@link FixedBitSet} but automatically releases the internal bitset
|
||||
* when all bits are set to reduce memory usage. This structure can work well for sequence numbers
|
||||
* from translog as these numbers are likely to form contiguous ranges (eg. filling all bits).
|
||||
*/
|
||||
final class CountedBitSet extends BitSet {
|
||||
private short onBits; // Number of bits are set.
|
||||
private FixedBitSet bitset;
|
||||
|
||||
CountedBitSet(short numBits) {
|
||||
assert numBits > 0;
|
||||
this.onBits = 0;
|
||||
this.bitset = new FixedBitSet(numBits);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean get(int index) {
|
||||
assert 0 <= index && index < this.length();
|
||||
assert bitset == null || onBits < bitset.length() : "Bitset should be released when all bits are set";
|
||||
|
||||
return bitset == null ? true : bitset.get(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(int index) {
|
||||
assert 0 <= index && index < this.length();
|
||||
assert bitset == null || onBits < bitset.length() : "Bitset should be released when all bits are set";
|
||||
|
||||
// Ignore set when bitset is full.
|
||||
if (bitset != null) {
|
||||
boolean wasOn = bitset.getAndSet(index);
|
||||
if (wasOn == false) {
|
||||
onBits++;
|
||||
// Once all bits are set, we can simply just return YES for all indexes.
|
||||
// This allows us to clear the internal bitset and use null check as the guard.
|
||||
if (onBits == bitset.length()) {
|
||||
bitset = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(int startIndex, int endIndex) {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear(int index) {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int cardinality() {
|
||||
return onBits;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int length() {
|
||||
return bitset == null ? onBits : bitset.length();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int prevSetBit(int index) {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public int nextSetBit(int index) {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
throw new UnsupportedOperationException("Not implemented yet");
|
||||
}
|
||||
|
||||
// Exposed for testing
|
||||
boolean isInternalBitsetReleased() {
|
||||
return bitset == null;
|
||||
}
|
||||
}
|
|
@ -19,10 +19,8 @@
|
|||
|
||||
package org.elasticsearch.index.translog;
|
||||
|
||||
import com.carrotsearch.hppc.LongHashSet;
|
||||
import com.carrotsearch.hppc.LongObjectHashMap;
|
||||
import com.carrotsearch.hppc.LongSet;
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.apache.lucene.util.BitSet;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
@ -84,41 +82,9 @@ final class MultiSnapshot implements Translog.Snapshot {
|
|||
onClose.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* A wrapper of {@link FixedBitSet} but allows to check if all bits are set in O(1).
|
||||
*/
|
||||
private static final class CountedBitSet {
|
||||
private short onBits;
|
||||
private final FixedBitSet bitset;
|
||||
|
||||
CountedBitSet(short numBits) {
|
||||
assert numBits > 0;
|
||||
this.onBits = 0;
|
||||
this.bitset = new FixedBitSet(numBits);
|
||||
}
|
||||
|
||||
boolean getAndSet(int index) {
|
||||
assert index >= 0;
|
||||
boolean wasOn = bitset.getAndSet(index);
|
||||
if (wasOn == false) {
|
||||
onBits++;
|
||||
}
|
||||
return wasOn;
|
||||
}
|
||||
|
||||
boolean hasAllBitsOn() {
|
||||
return onBits == bitset.length();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sequence numbers from translog are likely to form contiguous ranges,
|
||||
* thus collapsing a completed bitset into a single entry will reduce memory usage.
|
||||
*/
|
||||
static final class SeqNoSet {
|
||||
static final short BIT_SET_SIZE = 1024;
|
||||
private final LongSet completedSets = new LongHashSet();
|
||||
private final LongObjectHashMap<CountedBitSet> ongoingSets = new LongObjectHashMap<>();
|
||||
private final LongObjectHashMap<BitSet> bitSets = new LongObjectHashMap<>();
|
||||
|
||||
/**
|
||||
* Marks this sequence number and returns <tt>true</tt> if it is seen before.
|
||||
|
@ -126,33 +92,15 @@ final class MultiSnapshot implements Translog.Snapshot {
|
|||
boolean getAndSet(long value) {
|
||||
assert value >= 0;
|
||||
final long key = value / BIT_SET_SIZE;
|
||||
|
||||
if (completedSets.contains(key)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
CountedBitSet bitset = ongoingSets.get(key);
|
||||
BitSet bitset = bitSets.get(key);
|
||||
if (bitset == null) {
|
||||
bitset = new CountedBitSet(BIT_SET_SIZE);
|
||||
ongoingSets.put(key, bitset);
|
||||
}
|
||||
|
||||
final boolean wasOn = bitset.getAndSet(Math.toIntExact(value % BIT_SET_SIZE));
|
||||
if (bitset.hasAllBitsOn()) {
|
||||
ongoingSets.remove(key);
|
||||
completedSets.add(key);
|
||||
bitSets.put(key, bitset);
|
||||
}
|
||||
final int index = Math.toIntExact(value % BIT_SET_SIZE);
|
||||
final boolean wasOn = bitset.get(index);
|
||||
bitset.set(index);
|
||||
return wasOn;
|
||||
}
|
||||
|
||||
// For testing
|
||||
long completeSetsSize() {
|
||||
return completedSets.size();
|
||||
}
|
||||
|
||||
// For testing
|
||||
long ongoingSetsSize() {
|
||||
return ongoingSets.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.translog;
|
||||
|
||||
import org.apache.lucene.util.FixedBitSet;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class CountedBitSetTests extends ESTestCase {
|
||||
|
||||
public void testCompareToFixedBitset() {
|
||||
int numBits = (short) randomIntBetween(8, 4096);
|
||||
final FixedBitSet fixedBitSet = new FixedBitSet(numBits);
|
||||
final CountedBitSet countedBitSet = new CountedBitSet((short) numBits);
|
||||
|
||||
for (int i = 0; i < numBits; i++) {
|
||||
if (randomBoolean()) {
|
||||
fixedBitSet.set(i);
|
||||
countedBitSet.set(i);
|
||||
}
|
||||
assertThat(countedBitSet.cardinality(), equalTo(fixedBitSet.cardinality()));
|
||||
assertThat(countedBitSet.length(), equalTo(fixedBitSet.length()));
|
||||
}
|
||||
|
||||
for (int i = 0; i < numBits; i++) {
|
||||
assertThat(countedBitSet.get(i), equalTo(fixedBitSet.get(i)));
|
||||
}
|
||||
}
|
||||
|
||||
public void testReleaseInternalBitSet() {
|
||||
int numBits = (short) randomIntBetween(8, 4096);
|
||||
final CountedBitSet countedBitSet = new CountedBitSet((short) numBits);
|
||||
final List<Integer> values = IntStream.range(0, numBits).boxed().collect(Collectors.toList());
|
||||
|
||||
for (int i = 1; i < numBits; i++) {
|
||||
final int value = values.get(i);
|
||||
assertThat(countedBitSet.get(value), equalTo(false));
|
||||
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(false));
|
||||
|
||||
countedBitSet.set(value);
|
||||
|
||||
assertThat(countedBitSet.get(value), equalTo(true));
|
||||
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(false));
|
||||
assertThat(countedBitSet.length(), equalTo(numBits));
|
||||
assertThat(countedBitSet.cardinality(), equalTo(i));
|
||||
}
|
||||
|
||||
// The missing piece to fill all bits.
|
||||
{
|
||||
final int value = values.get(0);
|
||||
assertThat(countedBitSet.get(value), equalTo(false));
|
||||
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(false));
|
||||
|
||||
countedBitSet.set(value);
|
||||
|
||||
assertThat(countedBitSet.get(value), equalTo(true));
|
||||
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(true));
|
||||
assertThat(countedBitSet.length(), equalTo(numBits));
|
||||
assertThat(countedBitSet.cardinality(), equalTo(numBits));
|
||||
}
|
||||
|
||||
// Tests with released internal bitset.
|
||||
final int iterations = iterations(1000, 10000);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
final int value = randomInt(numBits - 1);
|
||||
assertThat(countedBitSet.get(value), equalTo(true));
|
||||
assertThat(countedBitSet.isInternalBitsetReleased(), equalTo(true));
|
||||
assertThat(countedBitSet.length(), equalTo(numBits));
|
||||
assertThat(countedBitSet.cardinality(), equalTo(numBits));
|
||||
if (frequently()) {
|
||||
assertThat(countedBitSet.get(value), equalTo(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -30,7 +30,6 @@ import java.util.stream.IntStream;
|
|||
import java.util.stream.LongStream;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class MultiSnapshotTests extends ESTestCase {
|
||||
|
||||
|
@ -40,14 +39,8 @@ public class MultiSnapshotTests extends ESTestCase {
|
|||
Randomness.shuffle(values);
|
||||
for (int i = 0; i < 1023; i++) {
|
||||
assertThat(bitSet.getAndSet(values.get(i)), equalTo(false));
|
||||
assertThat(bitSet.ongoingSetsSize(), equalTo(1L));
|
||||
assertThat(bitSet.completeSetsSize(), equalTo(0L));
|
||||
}
|
||||
|
||||
assertThat(bitSet.getAndSet(values.get(1023)), equalTo(false));
|
||||
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
|
||||
assertThat(bitSet.completeSetsSize(), equalTo(1L));
|
||||
|
||||
assertThat(bitSet.getAndSet(between(0, 1023)), equalTo(true));
|
||||
assertThat(bitSet.getAndSet(between(1024, Integer.MAX_VALUE)), equalTo(false));
|
||||
}
|
||||
|
@ -59,7 +52,6 @@ public class MultiSnapshotTests extends ESTestCase {
|
|||
long seq = between(0, 5000);
|
||||
boolean existed = normalSet.add(seq) == false;
|
||||
assertThat("SeqNoSet != Set" + seq, bitSet.getAndSet(seq), equalTo(existed));
|
||||
assertThat(bitSet.ongoingSetsSize() + bitSet.completeSetsSize(), lessThanOrEqualTo(5L));
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -78,12 +70,8 @@ public class MultiSnapshotTests extends ESTestCase {
|
|||
final LongSet normalSet = new LongHashSet();
|
||||
long currentSeq = between(10_000_000, 1_000_000_000);
|
||||
final int iterations = scaledRandomIntBetween(100, 2000);
|
||||
assertThat(bitSet.completeSetsSize(), equalTo(0L));
|
||||
assertThat(bitSet.ongoingSetsSize(), equalTo(0L));
|
||||
long totalDocs = 0;
|
||||
for (long i = 0; i < iterations; i++) {
|
||||
int batchSize = between(1, 1500);
|
||||
totalDocs += batchSize;
|
||||
currentSeq -= batchSize;
|
||||
List<Long> batch = LongStream.range(currentSeq, currentSeq + batchSize)
|
||||
.boxed()
|
||||
|
@ -92,11 +80,7 @@ public class MultiSnapshotTests extends ESTestCase {
|
|||
batch.forEach(seq -> {
|
||||
boolean existed = normalSet.add(seq) == false;
|
||||
assertThat("SeqNoSet != Set", bitSet.getAndSet(seq), equalTo(existed));
|
||||
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(4L));
|
||||
});
|
||||
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
|
||||
}
|
||||
assertThat(bitSet.completeSetsSize(), lessThanOrEqualTo(totalDocs / 1024));
|
||||
assertThat(bitSet.ongoingSetsSize(), lessThanOrEqualTo(2L));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue