Remove gap skipping when opening engine

Today when opening the engine we skip gaps in the history, advancing the
local checkpoint until it is equal to the maximum sequence number
contained in the commit. This allows history to advance, but it leaves
gaps. A previous change filled these gaps when recovering from store,
but since we were skipping the gaps while opening the engine, this
change had no effect. This commit removes the gap skipping when opening
the engine allowing the gap filling to do its job.

Relates #24535
This commit is contained in:
Jason Tedor 2017-05-08 06:38:28 -04:00 committed by GitHub
parent 59dd4d288a
commit bbdaf113d4
4 changed files with 49 additions and 42 deletions

View File

@ -1421,7 +1421,7 @@ public abstract class Engine implements Closeable {
* @param primaryTerm the shards primary term this engine was created for * @param primaryTerm the shards primary term this engine was created for
* @return the number of no-ops added * @return the number of no-ops added
*/ */
public abstract int fillSequenceNumberHistory(long primaryTerm) throws IOException; public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;
/** /**
* Performs recovery from the transaction log. * Performs recovery from the transaction log.

View File

@ -177,15 +177,6 @@ public class InternalEngine extends Engine {
logger.trace("recovered [{}]", seqNoStats); logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer); updateMaxUnsafeAutoIdTimestampFromWriter(writer);
// norelease
/*
* We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means
* that we there might be operations greater than the local checkpoint that will not be replayed. Here we force the local
* checkpoint to the maximum sequence number in the commit (at the potential expense of correctness).
*/
while (seqNoService().getLocalCheckpoint() < seqNoService().getMaxSeqNo()) {
seqNoService().markSeqNoAsCompleted(seqNoService().getLocalCheckpoint() + 1);
}
indexWriter = writer; indexWriter = writer;
translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint()); translog = openTranslog(engineConfig, writer, () -> seqNoService().getGlobalCheckpoint());
assert translog.getGeneration() != null; assert translog.getGeneration() != null;
@ -226,21 +217,20 @@ public class InternalEngine extends Engine {
} }
@Override @Override
public int fillSequenceNumberHistory(long primaryTerm) throws IOException { public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen(); ensureOpen();
final long localCheckpoint = seqNoService.getLocalCheckpoint(); final long localCheckpoint = seqNoService().getLocalCheckpoint();
final long maxSeqId = seqNoService.getMaxSeqNo(); final long maxSeqNo = seqNoService().getMaxSeqNo();
int numNoOpsAdded = 0; int numNoOpsAdded = 0;
for (long seqNo = localCheckpoint + 1; seqNo <= maxSeqId; for (
// the local checkpoint might have been advanced so we are leap-frogging long seqNo = localCheckpoint + 1;
// to the next seq ID we need to process and create a noop for seqNo <= maxSeqNo;
seqNo = seqNoService.getLocalCheckpoint()+1) { seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
final NoOp noOp = new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling up seqNo history"); innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
innerNoOp(noOp);
numNoOpsAdded++; numNoOpsAdded++;
assert seqNo <= seqNoService.getLocalCheckpoint() : "localCheckpoint didn't advanced used to be " + seqNo + " now it's on:" assert seqNo <= seqNoService().getLocalCheckpoint()
+ seqNoService.getLocalCheckpoint(); : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]";
} }
return numNoOpsAdded; return numNoOpsAdded;

View File

@ -365,7 +365,7 @@ final class StoreRecovery {
} }
indexShard.performTranslogRecovery(indexShouldExists); indexShard.performTranslogRecovery(indexShouldExists);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.getEngine().fillSequenceNumberHistory(indexShard.getPrimaryTerm()); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
} }
indexShard.finalizeRecovery(); indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store"); indexShard.postRecovery("post recovery from shard_store");

View File

@ -114,7 +114,6 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
@ -3580,6 +3579,8 @@ public class InternalEngineTests extends ESTestCase {
try (Engine recoveringEngine = try (Engine recoveringEngine =
new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
} }
} }
@ -3618,6 +3619,8 @@ public class InternalEngineTests extends ESTestCase {
try (Engine recoveringEngine = try (Engine recoveringEngine =
new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.fillSeqNoGaps(1);
assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1))); assertThat(recoveringEngine.seqNoService().getLocalCheckpoint(), greaterThanOrEqualTo((long) (3 * (docs - 1) + 2 - 1)));
} }
} }
@ -3719,21 +3722,35 @@ public class InternalEngineTests extends ESTestCase {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
}; };
noOpEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService); noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
@Override
public SequenceNumbersService seqNoService() {
return seqNoService;
}
};
noOpEngine.recoverFromTranslog();
final long primaryTerm = randomNonNegativeLong(); final long primaryTerm = randomNonNegativeLong();
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm);
final String reason = randomAlphaOfLength(16); final String reason = randomAlphaOfLength(16);
noOpEngine.noOp( noOpEngine.noOp(
new Engine.NoOp( new Engine.NoOp(
maxSeqNo + 1, maxSeqNo + 1,
primaryTerm, primaryTerm,
randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY),
System.nanoTime(), System.nanoTime(),
reason)); reason));
assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1)));
assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1)); assertThat(noOpEngine.getTranslog().totalOperations(), equalTo(1 + gapsFilled));
final Translog.Operation op = noOpEngine.getTranslog().newSnapshot().next(); // skip to the op that we added to the translog
assertThat(op, instanceOf(Translog.NoOp.class)); Translog.Operation op;
final Translog.NoOp noOp = (Translog.NoOp) op; Translog.Operation last = null;
final Translog.Snapshot snapshot = noOpEngine.getTranslog().newSnapshot();
while ((op = snapshot.next()) != null) {
last = op;
}
assertNotNull(last);
assertThat(last, instanceOf(Translog.NoOp.class));
final Translog.NoOp noOp = (Translog.NoOp) last;
assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 1))); assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 1)));
assertThat(noOp.primaryTerm(), equalTo(primaryTerm)); assertThat(noOp.primaryTerm(), equalTo(primaryTerm));
assertThat(noOp.reason(), equalTo(reason)); assertThat(noOp.reason(), equalTo(reason));
@ -3846,7 +3863,7 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < docs; i++) { for (int i = 0; i < docs; i++) {
final String docId = Integer.toString(i); final String docId = Integer.toString(i);
final ParsedDocument doc = final ParsedDocument doc =
testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null); testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
Engine.Index primaryResponse = indexForDoc(doc); Engine.Index primaryResponse = indexForDoc(doc);
Engine.IndexResult indexResult = engine.index(primaryResponse); Engine.IndexResult indexResult = engine.index(primaryResponse);
if (randomBoolean()) { if (randomBoolean()) {
@ -3864,8 +3881,8 @@ public class InternalEngineTests extends ESTestCase {
boolean flushed = false; boolean flushed = false;
Engine recoveringEngine = null; Engine recoveringEngine = null;
try { try {
assertEquals(docs-1, engine.seqNoService().getMaxSeqNo()); assertEquals(docs - 1, engine.seqNoService().getMaxSeqNo());
assertEquals(docs-1, engine.seqNoService().getLocalCheckpoint()); assertEquals(docs - 1, engine.seqNoService().getLocalCheckpoint());
assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, replicaEngine.seqNoService().getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint()); assertEquals(checkpointOnReplica, replicaEngine.seqNoService().getLocalCheckpoint());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
@ -3873,13 +3890,13 @@ public class InternalEngineTests extends ESTestCase {
recoveringEngine.recoverFromTranslog(); recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); assertEquals(checkpointOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
assertEquals((maxSeqIDOnReplica+1) - numDocsOnReplica, recoveringEngine.fillSequenceNumberHistory(2)); assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));
// now snapshot the tlog and ensure the primary term is updated // now snapshot the tlog and ensure the primary term is updated
Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot(); Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot();
assertTrue((maxSeqIDOnReplica+1) - numDocsOnReplica <= snapshot.totalOperations()); assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations());
Translog.Operation operation; Translog.Operation operation;
while((operation = snapshot.next()) != null) { while ((operation = snapshot.next()) != null) {
if (operation.opType() == Translog.Operation.Type.NO_OP) { if (operation.opType() == Translog.Operation.Type.NO_OP) {
assertEquals(2, operation.primaryTerm()); assertEquals(2, operation.primaryTerm());
} else { } else {
@ -3905,7 +3922,7 @@ public class InternalEngineTests extends ESTestCase {
recoveringEngine.recoverFromTranslog(); recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
assertEquals(0, recoveringEngine.fillSequenceNumberHistory(3)); assertEquals(0, recoveringEngine.fillSeqNoGaps(3));
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint()); assertEquals(maxSeqIDOnReplica, recoveringEngine.seqNoService().getLocalCheckpoint());
} finally { } finally {