Throw back replica local checkpoint on new primary

This commit causes a replica to throwback its local checkpoint to the
global checkpoint when learning of a new primary through a replica
operation.

Relates #25452
This commit is contained in:
Jason Tedor 2017-07-05 09:17:16 -04:00 committed by GitHub
parent 7c637a0bfe
commit 7dcd81b41b
5 changed files with 166 additions and 20 deletions

View File

@ -121,6 +121,19 @@ public class LocalCheckpointTracker {
}
}
/**
* Resets the checkpoint to the specified value.
*
* @param checkpoint the local checkpoint to reset this tracker to
*/
synchronized void resetCheckpoint(final long checkpoint) {
assert checkpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO;
assert checkpoint <= this.checkpoint;
processedSeqNo.clear();
firstProcessedSeqNo = checkpoint + 1;
this.checkpoint = checkpoint;
}
/**
* The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}.
*

View File

@ -106,6 +106,15 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
localCheckpointTracker.markSeqNoAsCompleted(seqNo);
}
/**
* Resets the local checkpoint to the specified value.
*
* @param localCheckpoint the local checkpoint to reset to
*/
public void resetLocalCheckpoint(final long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}
/**
* The current sequence number stats.
*

View File

@ -2058,6 +2058,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
"shard term already update. op term [" + operationPrimaryTerm + "], shardTerm [" + primaryTerm + "]";
primaryTerm = operationPrimaryTerm;
updateGlobalCheckpointOnReplica(globalCheckpoint);
final long currentGlobalCheckpoint = getGlobalCheckpoint();
final long localCheckpoint;
if (currentGlobalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
} else {
localCheckpoint = currentGlobalCheckpoint;
}
logger.trace(
"detected new primary with primary term [{}], resetting local checkpoint from [{}] to [{}]",
operationPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().seqNoService().resetLocalCheckpoint(localCheckpoint);
getEngine().getTranslog().rollGeneration();
});
globalCheckpointUpdated = true;

View File

@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isOneOf;
@ -236,4 +237,23 @@ public class LocalCheckpointTrackerTests extends ESTestCase {
thread.join();
}
public void testResetCheckpoint() {
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int maxSeqNo = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
for (int i = 0; i < operations; i++) {
if (!rarely()) {
tracker.markSeqNoAsCompleted(i);
maxSeqNo = i;
}
}
final int localCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint()));
tracker.resetCheckpoint(localCheckpoint);
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
assertThat(tracker.processedSeqNo, empty());
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
}
}

View File

@ -80,6 +80,7 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
@ -142,7 +143,6 @@ import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
/**
@ -405,26 +405,10 @@ public class IndexShardTests extends IndexShardTestCase {
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
int max = Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED);
boolean gap = false;
for (int i = 0; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
max = i;
} else {
gap = true;
}
}
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
final int maxSeqNo = max;
if (gap) {
assertThat(indexShard.getLocalCheckpoint(), not(equalTo(maxSeqNo)));
}
final int maxSeqNo = result.maxSeqNo;
final boolean gap = result.gap;
// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
@ -626,6 +610,12 @@ public class IndexShardTests extends IndexShardTestCase {
}
newGlobalCheckPoint = randomIntBetween((int) indexShard.getGlobalCheckpoint(), (int) localCheckPoint);
}
final long expectedLocalCheckpoint;
if (newGlobalCheckPoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
expectedLocalCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
} else {
expectedLocalCheckpoint = newGlobalCheckPoint;
}
// but you can not increment with a new primary term until the operations on the older primary term complete
final Thread thread = new Thread(() -> {
try {
@ -637,6 +627,7 @@ public class IndexShardTests extends IndexShardTestCase {
@Override
public void onResponse(Releasable releasable) {
assertThat(indexShard.getPrimaryTerm(), equalTo(newPrimaryTerm));
assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
onResponse.set(true);
releasable.close();
@ -697,6 +688,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertTrue(onResponse.get());
assertNull(onFailure.get());
assertThat(indexShard.getTranslog().getGeneration().translogFileGeneration, equalTo(translogGen + 1));
assertThat(indexShard.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(indexShard.getGlobalCheckpoint(), equalTo(newGlobalCheckPoint));
}
}
@ -707,6 +699,56 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(indexShard);
}
public void testThrowBackLocalCheckpointOnReplica() throws IOException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
// most of the time this is large enough that most of the time there will be at least one gap
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(SequenceNumbersService.NO_OPS_PERFORMED));
final long globalCheckpointOnReplica =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica);
final int globalCheckpoint =
randomIntBetween(
Math.toIntExact(SequenceNumbersService.UNASSIGNED_SEQ_NO),
Math.toIntExact(indexShard.getLocalCheckpoint()));
final CountDownLatch latch = new CountDownLatch(1);
indexShard.acquireReplicaOperationPermit(
indexShard.primaryTerm + 1,
globalCheckpoint,
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
releasable.close();
latch.countDown();
}
@Override
public void onFailure(final Exception e) {
}
},
ThreadPool.Names.SAME);
latch.await();
if (globalCheckpointOnReplica == SequenceNumbersService.UNASSIGNED_SEQ_NO
&& globalCheckpoint == SequenceNumbersService.UNASSIGNED_SEQ_NO) {
assertThat(indexShard.getLocalCheckpoint(), equalTo(SequenceNumbersService.NO_OPS_PERFORMED));
} else {
assertThat(indexShard.getLocalCheckpoint(), equalTo(Math.max(globalCheckpoint, globalCheckpointOnReplica)));
}
// ensure that after the local checkpoint throw back and indexing again, the local checkpoint advances
final Result result = indexOnReplicaWithGaps(indexShard, operations, Math.toIntExact(indexShard.getLocalCheckpoint()));
assertThat(indexShard.getLocalCheckpoint(), equalTo((long) result.localCheckpoint));
closeShards(indexShard);
}
public void testConcurrentTermIncreaseOnReplicaShard() throws BrokenBarrierException, InterruptedException, IOException {
final IndexShard indexShard = newStartedShard(false);
@ -1966,6 +2008,55 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(newShard);
}
class Result {
private final int localCheckpoint;
private final int maxSeqNo;
private final boolean gap;
Result(final int localCheckpoint, final int maxSeqNo, final boolean gap) {
this.localCheckpoint = localCheckpoint;
this.maxSeqNo = maxSeqNo;
this.gap = gap;
}
}
/**
* Index on the specified shard while introducing sequence number gaps.
*
* @param indexShard the shard
* @param operations the number of operations
* @param offset the starting sequence number
* @return a pair of the maximum sequence number and whether or not a gap was introduced
* @throws IOException if an I/O exception occurs while indexing on the shard
*/
private Result indexOnReplicaWithGaps(
final IndexShard indexShard,
final int operations,
final int offset) throws IOException {
int localCheckpoint = offset;
int max = offset;
boolean gap = false;
for (int i = offset + 1; i < operations; i++) {
if (!rarely()) {
final String id = Integer.toString(i);
SourceToParse sourceToParse = SourceToParse.source(indexShard.shardId().getIndexName(), "test", id,
new BytesArray("{}"), XContentType.JSON);
indexShard.applyIndexOperationOnReplica(i, indexShard.getPrimaryTerm(),
1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse,
getMappingUpdater(indexShard, sourceToParse.type()));
if (!gap && i == localCheckpoint + 1) {
localCheckpoint++;
}
max = i;
} else {
gap = true;
}
}
assert localCheckpoint == indexShard.getLocalCheckpoint();
assert !gap || (localCheckpoint != max);
return new Result(localCheckpoint, max, gap);
}
/** A dummy repository for testing which just needs restore overridden */
private abstract static class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final String indexName;