Cleanup comments in LocalCheckpointService.java

This commit cleans up the comments in LocalCheckpointService, making
them uniform in their formatting and taking advantage of the line-length
limit of 140 characters.
This commit is contained in:
Jason Tedor 2017-01-03 09:27:53 -05:00
parent 71d6a37032
commit f086d1d3db
1 changed files with 73 additions and 44 deletions

View File

@ -16,6 +16,7 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.elasticsearch.index.seqno; package org.elasticsearch.index.seqno;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
@ -27,46 +28,53 @@ import org.elasticsearch.index.shard.ShardId;
import java.util.LinkedList; import java.util.LinkedList;
/** /**
* This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which * This class generates sequences numbers and keeps track of the so called local checkpoint - the highest number for which all previous
* all previous seqNo have been processed (including) * sequence numbers have been processed (inclusive).
*/ */
public class LocalCheckpointService extends AbstractIndexShardComponent { public class LocalCheckpointService extends AbstractIndexShardComponent {
/** /**
* we keep a bit for each seq No that is still pending. to optimize allocation, we do so in multiple arrays * We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
* allocating them on demand and cleaning up while completed. This setting controls the size of the arrays * demand and cleaning up while completed. This setting controls the size of the arrays.
*/ */
public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE = Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, public static Setting<Integer> SETTINGS_BIT_ARRAYS_SIZE =
4, Setting.Property.IndexScope); Setting.intSetting("index.seq_no.checkpoint.bit_arrays_size", 1024, 4, Setting.Property.IndexScope);
/** /**
* an ordered list of bit arrays representing pending seq nos. The list is "anchored" in {@link #firstProcessedSeqNo} * An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which
* which marks the seqNo the fist bit in the first array corresponds to. * marks the sequence number the fist bit in the first array corresponds to.
*/
final LinkedList<FixedBitSet> processedSeqNo = new LinkedList<>();
/**
* The size of each bit set representing processed sequence numbers.
*/ */
final LinkedList<FixedBitSet> processedSeqNo;
private final int bitArraysSize; private final int bitArraysSize;
/**
* The sequence number that the first bit in the first array corresponds to.
*/
long firstProcessedSeqNo; long firstProcessedSeqNo;
/** the current local checkpoint, i.e., all seqNo lower (&lt;=) than this number have been completed */ /**
* The current local checkpoint, i.e., all sequence numbers no more than this number have been completed.
*/
volatile long checkpoint; volatile long checkpoint;
/** the next available seqNo - used for seqNo generation */ /**
* The next available sequence number.
*/
private volatile long nextSeqNo; private volatile long nextSeqNo;
/** /**
* Initialize the local checkpoint service. The {@code maxSeqNo} should be * Initialize the local checkpoint service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
* set to the last sequence number assigned by this shard, or * {@link SequenceNumbersService#NO_OPS_PERFORMED} and {@code localCheckpoint} should be set to the last known local checkpoint for this
* {@link SequenceNumbersService#NO_OPS_PERFORMED} and * shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
* {@code localCheckpoint} should be set to the last known local checkpoint
* for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}.
* *
* @param shardId the shard this service is providing tracking * @param shardId the shard this service is providing tracking local checkpoints for
* local checkpoints for
* @param indexSettings the index settings * @param indexSettings the index settings
* @param maxSeqNo the last sequence number assigned by this shard, or * @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* {@link SequenceNumbersService#NO_OPS_PERFORMED} * @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
* @param localCheckpoint the last known local checkpoint for this shard, or
* {@link SequenceNumbersService#NO_OPS_PERFORMED}
*/ */
LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) { LocalCheckpointService(final ShardId shardId, final IndexSettings indexSettings, final long maxSeqNo, final long localCheckpoint) {
super(shardId, indexSettings); super(shardId, indexSettings);
@ -80,52 +88,63 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
"max seq. no. must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]"); "max seq. no. must be non-negative or [" + SequenceNumbersService.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
} }
bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings()); bitArraysSize = SETTINGS_BIT_ARRAYS_SIZE.get(indexSettings.getSettings());
processedSeqNo = new LinkedList<>();
firstProcessedSeqNo = localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1; firstProcessedSeqNo = localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
this.nextSeqNo = maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1; nextSeqNo = maxSeqNo == SequenceNumbersService.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
this.checkpoint = localCheckpoint; checkpoint = localCheckpoint;
} }
/** /**
* issue the next sequence number * Issue the next sequence number.
**/ *
* @return the next assigned sequence number
*/
synchronized long generateSeqNo() { synchronized long generateSeqNo() {
return nextSeqNo++; return nextSeqNo++;
} }
/** /**
* marks the processing of the given seqNo have been completed * Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
**/ *
synchronized void markSeqNoAsCompleted(long seqNo) { * @param seqNo the sequence number to mark as completed
// make sure we track highest seen seqNo */
synchronized void markSeqNoAsCompleted(final long seqNo) {
// make sure we track highest seen sequence number
if (seqNo >= nextSeqNo) { if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1; nextSeqNo = seqNo + 1;
} }
if (seqNo <= checkpoint) { if (seqNo <= checkpoint) {
// this is possible during recovery where we might replay an op that was also replicated // this is possible during recovery where we might replay an operation that was also replicated
return; return;
} }
FixedBitSet bitSet = getBitSetForSeqNo(seqNo); final FixedBitSet bitSet = getBitSetForSeqNo(seqNo);
int offset = seqNoToBitSetOffset(seqNo); final int offset = seqNoToBitSetOffset(seqNo);
bitSet.set(offset); bitSet.set(offset);
if (seqNo == checkpoint + 1) { if (seqNo == checkpoint + 1) {
updateCheckpoint(); updateCheckpoint();
} }
} }
/** gets the current check point */ /**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*
* @return the current checkpoint
*/
public long getCheckpoint() { public long getCheckpoint() {
return checkpoint; return checkpoint;
} }
/** gets the maximum seqno seen so far */ /**
* The maximum sequence number issued so far.
*
* @return the maximum sequence number
*/
long getMaxSeqNo() { long getMaxSeqNo() {
return nextSeqNo - 1; return nextSeqNo - 1;
} }
/** /**
* moves the checkpoint to the last consecutively processed seqNo * Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
* Note: this method assumes that the seqNo following the current checkpoint is processed. * current checkpoint is processed.
*/ */
private void updateCheckpoint() { private void updateCheckpoint() {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
@ -135,7 +154,7 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)"; "checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) : assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
"updateCheckpoint is called but the bit following the checkpoint is not set"; "updateCheckpoint is called but the bit following the checkpoint is not set";
// keep it simple for now, get the checkpoint one by one. in the future we can optimize and read words // keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
FixedBitSet current = processedSeqNo.getFirst(); FixedBitSet current = processedSeqNo.getFirst();
do { do {
checkpoint++; checkpoint++;
@ -151,14 +170,18 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
} }
/** /**
* gets the bit array for the given seqNo, allocating new ones if needed. * Return the bit array for the provided sequence number, possibly allocating a new array if needed.
*
* @param seqNo the sequence number to obtain the bit array for
* @return the bit array corresponding to the provided sequence number
*/ */
private FixedBitSet getBitSetForSeqNo(long seqNo) { private FixedBitSet getBitSetForSeqNo(final long seqNo) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo; assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize; final long bitSetOffset = (seqNo - firstProcessedSeqNo) / bitArraysSize;
if (bitSetOffset > Integer.MAX_VALUE) { if (bitSetOffset > Integer.MAX_VALUE) {
throw new IndexOutOfBoundsException("seqNo too high. got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]"); throw new IndexOutOfBoundsException(
"sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]");
} }
while (bitSetOffset >= processedSeqNo.size()) { while (bitSetOffset >= processedSeqNo.size()) {
processedSeqNo.add(new FixedBitSet(bitArraysSize)); processedSeqNo.add(new FixedBitSet(bitArraysSize));
@ -166,8 +189,14 @@ public class LocalCheckpointService extends AbstractIndexShardComponent {
return processedSeqNo.get((int)bitSetOffset); return processedSeqNo.get((int)bitSetOffset);
} }
/** maps the given seqNo to a position in the bit set returned by {@link #getBitSetForSeqNo} */ /**
private int seqNoToBitSetOffset(long seqNo) { * Obtain the position in the bit array corresponding to the provided sequence number. The bit array corresponding to the sequence
* number can be obtained via {@link #getBitSetForSeqNo(long)}.
*
* @param seqNo the sequence number to obtain the position for
* @return the position in the bit array corresponding to the provided sequence number
*/
private int seqNoToBitSetOffset(final long seqNo) {
assert Thread.holdsLock(this); assert Thread.holdsLock(this);
assert seqNo >= firstProcessedSeqNo; assert seqNo >= firstProcessedSeqNo;
return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize; return ((int) (seqNo - firstProcessedSeqNo)) % bitArraysSize;