Remove checkpoint tracker bit sets setting

We added an index-level setting for controlling the size of the bit sets
used to back the local checkpoint tracker. This setting is really only
needed to control the memory footprint of the bit sets but we do not
think this setting is going to be needed. This commit removes this
setting before it is released to the wild after which we would have to
worry about BWC implications.

Relates #27191
This commit is contained in:
Jason Tedor 2017-11-01 21:13:01 -04:00 committed by GitHub
parent ac9addd454
commit 90d6317437
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 18 additions and 39 deletions

View File

@ -120,7 +120,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.QUERY_STRING_LENIENT_SETTING, IndexSettings.QUERY_STRING_LENIENT_SETTING,
IndexSettings.ALLOW_UNMAPPED, IndexSettings.ALLOW_UNMAPPED,
IndexSettings.INDEX_CHECK_ON_STARTUP, IndexSettings.INDEX_CHECK_ON_STARTUP,
LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE,
IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD, IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
IndexSettings.MAX_SLICES_PER_SCROLL, IndexSettings.MAX_SLICES_PER_SCROLL,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,

View File

@ -34,10 +34,9 @@ public class LocalCheckpointTracker {
/** /**
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
* demand and cleaning up while completed. This setting controls the size of the arrays. * demand and cleaning up while completed. This constant controls the size of the arrays.
*/ */
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE = static final int BIT_ARRAYS_SIZE = 1024;
Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope);
/** /**
* An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which * An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which
@ -45,11 +44,6 @@ public class LocalCheckpointTracker {
*/ */
final LinkedList<FixedBitSet> processedSeqNo = new LinkedList<>(); final LinkedList<FixedBitSet> processedSeqNo = new LinkedList<>();
/**
* The size of each bit set representing processed sequence numbers.
*/
private final int bitArraysSize;
/** /**
* The sequence number that the first bit in the first array corresponds to. * The sequence number that the first bit in the first array corresponds to.
*/ */
@ -70,11 +64,10 @@ public class LocalCheckpointTracker {
* {@link SequenceNumbers#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint, * {@link SequenceNumbers#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint,
* or {@link SequenceNumbers#NO_OPS_PERFORMED}. * or {@link SequenceNumbers#NO_OPS_PERFORMED}.
* *
* @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbers#NO_OPS_PERFORMED} * @param maxSeqNo the last sequence number assigned, or {@link SequenceNumbers#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbers#NO_OPS_PERFORMED} * @param localCheckpoint the last known local checkpoint, or {@link SequenceNumbers#NO_OPS_PERFORMED}
*/ */
public LocalCheckpointTracker(final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) { public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) {
if (localCheckpoint < 0 && localCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) { if (localCheckpoint < 0 && localCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"local checkpoint must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] " "local checkpoint must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] "
@ -84,7 +77,6 @@ public class LocalCheckpointTracker {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); "max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
} }
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1; firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
checkpoint = localCheckpoint; checkpoint = localCheckpoint;
@ -183,7 +175,7 @@ public class LocalCheckpointTracker {
@SuppressForbidden(reason = "Object#notifyAll") @SuppressForbidden(reason = "Object#notifyAll")
private void updateCheckpoint() { private void updateCheckpoint() {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
assert checkpoint < firstProcessedSeqNo + bitArraysSize - 1 : assert checkpoint < firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1 :
"checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)"; "checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() : assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() :
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
@ -196,10 +188,10 @@ public class LocalCheckpointTracker {
checkpoint++; checkpoint++;
// the checkpoint always falls in the first bit set or just before. If it falls // the checkpoint always falls in the first bit set or just before. If it falls
// on the last bit of the current bit set, we can clean it. // on the last bit of the current bit set, we can clean it.
if (checkpoint == firstProcessedSeqNo + bitArraysSize - 1) { if (checkpoint == firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1) {
processedSeqNo.removeFirst(); processedSeqNo.removeFirst();
firstProcessedSeqNo += bitArraysSize; firstProcessedSeqNo += BIT_ARRAYS_SIZE;
assert checkpoint - firstProcessedSeqNo < bitArraysSize; assert checkpoint - firstProcessedSeqNo < BIT_ARRAYS_SIZE;
current = processedSeqNo.peekFirst(); current = processedSeqNo.peekFirst();
} }
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1))); } while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
@ -218,13 +210,13 @@ public class LocalCheckpointTracker {
private FixedBitSet getBitSetForSeqNo(final long seqNo) { private FixedBitSet getBitSetForSeqNo(final long seqNo) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize; final long bitSetOffset = (seqNo - firstProcessedSeqNo) / BIT_ARRAYS_SIZE;
if (bitSetOffset > Integer.MAX_VALUE) { if (bitSetOffset > Integer.MAX_VALUE) {
throw new IndexOutOfBoundsException( throw new IndexOutOfBoundsException(
"sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]"); "sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]");
} }
while (bitSetOffset >= processedSeqNo.size()) { while (bitSetOffset >= processedSeqNo.size()) {
processedSeqNo.add(new FixedBitSet(bitArraysSize)); processedSeqNo.add(new FixedBitSet(BIT_ARRAYS_SIZE));
} }
return processedSeqNo.get((int) bitSetOffset); return processedSeqNo.get((int) bitSetOffset);
} }
@ -239,7 +231,7 @@ public class LocalCheckpointTracker {
private int seqNoToBitSetOffset(final long seqNo) { private int seqNoToBitSetOffset(final long seqNo) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo; assert seqNo >= firstProcessedSeqNo;
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; return ((int) (seqNo - firstProcessedSeqNo)) % BIT_ARRAYS_SIZE;
} }
} }

View File

@ -56,7 +56,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
final long localCheckpoint, final long localCheckpoint,
final long globalCheckpoint) { final long globalCheckpoint) {
super(shardId, indexSettings); super(shardId, indexSettings);
localCheckpointTracker = new LocalCheckpointTracker(indexSettings, maxSeqNo, localCheckpoint); localCheckpointTracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint);
globalCheckpointTracker = new GlobalCheckpointTracker(shardId, allocationId, indexSettings, globalCheckpoint); globalCheckpointTracker = new GlobalCheckpointTracker(shardId, allocationId, indexSettings, globalCheckpoint);
} }

View File

@ -243,7 +243,7 @@ public class RecoverySourceHandler {
logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo); logger.trace("all operations up to [{}] completed, checking translog content", endingSeqNo);
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(shard.indexSettings(), startingSeqNo, startingSeqNo - 1); final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { try (Translog.Snapshot snapshot = shard.getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
Translog.Operation operation; Translog.Operation operation;
while ((operation = snapshot.next()) != null) { while ((operation = snapshot.next()) != null) {

View File

@ -21,10 +21,8 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.junit.Before; import org.junit.Before;
import java.util.ArrayList; import java.util.ArrayList;
@ -38,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_ARRAYS_SIZE;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.isOneOf;
@ -46,19 +45,8 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
private LocalCheckpointTracker tracker; private LocalCheckpointTracker tracker;
private static final int SMALL_CHUNK_SIZE = 4;
public static LocalCheckpointTracker createEmptyTracker() { public static LocalCheckpointTracker createEmptyTracker() {
return new LocalCheckpointTracker( return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED);
IndexSettingsModule.newIndexSettings(
"test",
Settings
.builder()
.put(LocalCheckpointTracker.SETTINGS_BIT_ARRAYS_SIZE.getKey(), SMALL_CHUNK_SIZE)
.build()),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED
);
} }
@Override @Override
@ -98,7 +86,7 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
public void testSimpleOverFlow() { public void testSimpleOverFlow() {
List<Integer> seqNoList = new ArrayList<>(); List<Integer> seqNoList = new ArrayList<>();
final boolean aligned = randomBoolean(); final boolean aligned = randomBoolean();
final int maxOps = SMALL_CHUNK_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, SMALL_CHUNK_SIZE - 1)); final int maxOps = BIT_ARRAYS_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_ARRAYS_SIZE - 1));
for (int i = 0; i < maxOps; i++) { for (int i = 0; i < maxOps; i++) {
seqNoList.add(i); seqNoList.add(i);
@ -109,7 +97,7 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
} }
assertThat(tracker.checkpoint, equalTo(maxOps - 1L)); assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1)); assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
} }
public void testConcurrentPrimary() throws InterruptedException { public void testConcurrentPrimary() throws InterruptedException {
@ -150,7 +138,7 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
tracker.markSeqNoAsCompleted(unFinishedSeq); tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1)); assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
} }
public void testConcurrentReplica() throws InterruptedException { public void testConcurrentReplica() throws InterruptedException {
@ -198,7 +186,7 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L)); assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
tracker.markSeqNoAsCompleted(unFinishedSeq); tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L)); assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / SMALL_CHUNK_SIZE) * SMALL_CHUNK_SIZE)); assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
} }
public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException { public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {