Allow engine to recover from translog upto a seqno (#33032)

This change allows an engine to recover from its local translog up to
the given seqno. The extended API can be used in these use cases:

When a replica starts following a new primary, it resets its index to
the safe commit, then replays its local translog up to the current
global checkpoint (see #32867).

When a replica starts a peer-recovery, it can initialize the
start_sequence_number to the persisted global checkpoint instead of the
local checkpoint of the safe commit. A replica will then replay its
local translog up to that global checkpoint before accepting remote
translog from the primary. This change will increase the chance of
operation-based recovery. I will make this in a follow-up.

Relates #32867
This commit is contained in:
Nhat Nguyen 2018-08-22 07:57:44 -04:00 committed by GitHub
parent b02150a5ed
commit 262d3c0783
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 217 additions and 41 deletions

View File

@ -1623,10 +1623,12 @@ public abstract class Engine implements Closeable {
public abstract int fillSeqNoGaps(long primaryTerm) throws IOException;
/**
* Performs recovery from the transaction log.
* Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
* This operation will close the engine if the recovery fails.
*
* @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
*/
public abstract Engine recoverFromTranslog() throws IOException;
public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;
/**
* Do not replay translog operations, but make the engine be ready.

View File

@ -364,7 +364,7 @@ public class InternalEngine extends Engine {
}
@Override
public InternalEngine recoverFromTranslog() throws IOException {
public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
@ -372,7 +372,7 @@ public class InternalEngine extends Engine {
throw new IllegalStateException("Engine has already been recovered");
}
try {
recoverFromTranslogInternal();
recoverFromTranslogInternal(recoverUpToSeqNo);
} catch (Exception e) {
try {
pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
@ -394,11 +394,12 @@ public class InternalEngine extends Engine {
pendingTranslogRecovery.set(false); // we are good - now we can commit
}
private void recoverFromTranslogInternal() throws IOException {
private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGen)) {
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);

View File

@ -1305,7 +1305,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog();
getEngine().recoverFromTranslog(Long.MAX_VALUE);
}
/**

View File

@ -577,21 +577,27 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
public Snapshot newSnapshot() throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
return newSnapshotFromGen(getMinFileGeneration());
return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE);
}
}
public Snapshot newSnapshotFromGen(long minGeneration) throws IOException {
public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (minGeneration < getMinFileGeneration()) {
throw new IllegalArgumentException("requested snapshot generation [" + minGeneration + "] is not available. " +
final long fromFileGen = fromGeneration.translogFileGeneration;
if (fromFileGen < getMinFileGeneration()) {
throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " +
"Min referenced generation is [" + getMinFileGeneration() + "]");
}
TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> reader.getGeneration() >= minGeneration)
.filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo)
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
return newMultiSnapshot(snapshots);
final Snapshot snapshot = newMultiSnapshot(snapshots);
if (upToSeqNo == Long.MAX_VALUE) {
return snapshot;
} else {
return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo);
}
}
}
@ -926,7 +932,59 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* Returns the next operation in the snapshot or <code>null</code> if we reached the end.
*/
Translog.Operation next() throws IOException;
}
/**
* A filtered snapshot consisting of only operations whose sequence numbers are in the given range
* between {@code fromSeqNo} (inclusive) and {@code toSeqNo} (inclusive). This filtered snapshot
* shares the same underlying resources with the {@code delegate} snapshot, therefore we should not
* use the {@code delegate} after passing it to this filtered snapshot.
*/
static final class SeqNoFilterSnapshot implements Snapshot {
private final Snapshot delegate;
private int filteredOpsCount;
private final long fromSeqNo; // inclusive
private final long toSeqNo; // inclusive
SeqNoFilterSnapshot(Snapshot delegate, long fromSeqNo, long toSeqNo) {
assert fromSeqNo <= toSeqNo : "from_seq_no[" + fromSeqNo + "] > to_seq_no[" + toSeqNo + "]";
this.delegate = delegate;
this.fromSeqNo = fromSeqNo;
this.toSeqNo = toSeqNo;
}
@Override
public int totalOperations() {
return delegate.totalOperations();
}
@Override
public int skippedOperations() {
return filteredOpsCount + delegate.skippedOperations();
}
@Override
public int overriddenOperations() {
return delegate.overriddenOperations();
}
@Override
public Operation next() throws IOException {
Translog.Operation op;
while ((op = delegate.next()) != null) {
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo) {
return op;
} else {
filteredOpsCount++;
}
}
return null;
}
@Override
public void close() throws IOException {
delegate.close();
}
}
/**

View File

@ -649,7 +649,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
searcher.close();
@ -666,7 +666,7 @@ public class InternalEngineTests extends EngineTestCase {
engine = new InternalEngine(engine.config());
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
@ -696,7 +696,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(engine.config());
try (Engine recoveringEngine = new InternalEngine(engine.config())){
recoveringEngine.recoverFromTranslog();
recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new MatchAllDocsQuery(), collector);
@ -732,7 +732,7 @@ public class InternalEngineTests extends EngineTestCase {
}
};
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.recoverFromTranslog();
recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
assertTrue(committed.get());
} finally {
IOUtils.close(recoveringEngine);
@ -766,7 +766,7 @@ public class InternalEngineTests extends EngineTestCase {
initialEngine.close();
trimUnsafeCommits(initialEngine.config());
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.recoverFromTranslog();
recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
assertEquals(docs, topDocs.totalHits);
@ -776,6 +776,43 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testRecoveryFromTranslogUpToSeqNo() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
final long maxSeqNo;
try (InternalEngine engine = createEngine(config)) {
final int docs = randomIntBetween(1, 100);
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
if (rarely()) {
engine.rollTranslogGeneration();
} else if (rarely()) {
engine.flush(randomBoolean(), true);
}
}
maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.recoverFromTranslog(Long.MAX_VALUE);
assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
engine.recoverFromTranslog(upToSeqNo);
assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo));
}
}
}
public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
@ -1153,7 +1190,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
}
@ -1172,7 +1209,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
}
@ -2126,7 +2163,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(initialEngine.engineConfig);
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){
recoveringEngine.recoverFromTranslog();
recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertThat(
@ -2447,7 +2484,7 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine engine = createEngine(config)) {
engine.index(firstIndexRequest);
globalCheckpoint.set(engine.getLocalCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(Long.MAX_VALUE));
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
@ -2469,7 +2506,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
@ -2486,7 +2523,7 @@ public class InternalEngineTests extends EngineTestCase {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
}
@ -2500,7 +2537,7 @@ public class InternalEngineTests extends EngineTestCase {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
@ -2606,7 +2643,7 @@ public class InternalEngineTests extends EngineTestCase {
}
}
}) {
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc1));
globalCheckpoint.set(engine.getLocalCheckpoint());
@ -2617,7 +2654,7 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier))) {
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
assertVisibleCount(engine, 1);
final long committedGen = Long.valueOf(
engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
@ -2683,7 +2720,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
assertVisibleCount(engine, numDocs, false);
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
@ -3384,7 +3421,7 @@ public class InternalEngineTests extends EngineTestCase {
}
try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
@ -3667,7 +3704,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(initialEngine.config());
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
}
@ -3774,7 +3811,7 @@ public class InternalEngineTests extends EngineTestCase {
throw new UnsupportedOperationException();
}
};
noOpEngine.recoverFromTranslog();
noOpEngine.recoverFromTranslog(Long.MAX_VALUE);
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
final String reason = randomAlphaOfLength(16);
noOpEngine.noOp(new Engine.NoOp(maxSeqNo + 1, primaryTerm.get(), LOCAL_TRANSLOG_RECOVERY, System.nanoTime(), reason));
@ -3986,7 +4023,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.recoverFromTranslog();
recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));
@ -4022,7 +4059,7 @@ public class InternalEngineTests extends EngineTestCase {
if (flushed) {
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.recoverFromTranslog();
recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
assertEquals(0, recoveringEngine.fillSeqNoGaps(3));
@ -4215,7 +4252,7 @@ public class InternalEngineTests extends EngineTestCase {
super.commitIndexWriter(writer, translog, syncId);
}
}) {
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
ParseContext.Document document = testDocumentWithTextField();

View File

@ -132,7 +132,7 @@ public class RefreshListenersTests extends ESTestCase {
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null,
(e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm);
engine = new InternalEngine(config);
engine.recoverFromTranslog();
engine.recoverFromTranslog(Long.MAX_VALUE);
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
}

View File

@ -360,7 +360,8 @@ public class TranslogTests extends ESTestCase {
}
markCurrentGenAsCommitted(translog);
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(firstId + 1)) {
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), firstId + 1), randomNonNegativeLong())) {
assertThat(snapshot, SnapshotMatchers.size(0));
assertThat(snapshot.totalOperations(), equalTo(0));
}
@ -645,6 +646,82 @@ public class TranslogTests extends ESTestCase {
}
}
public void testSnapshotFromMinGen() throws Exception {
Map<Long, List<Translog.Operation>> operationsByGen = new HashMap<>();
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), 1), randomNonNegativeLong())) {
assertThat(snapshot, SnapshotMatchers.size(0));
}
int iters = between(1, 10);
for (int i = 0; i < iters; i++) {
long currentGeneration = translog.currentFileGeneration();
operationsByGen.putIfAbsent(currentGeneration, new ArrayList<>());
int numOps = between(0, 20);
for (int op = 0; op < numOps; op++) {
long seqNo = randomLongBetween(0, 1000);
addToTranslogAndList(translog, operationsByGen.get(currentGeneration), new Translog.Index("test",
Long.toString(seqNo), seqNo, primaryTerm.get(), new byte[]{1}));
}
long minGen = randomLongBetween(translog.getMinFileGeneration(), translog.currentFileGeneration());
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), Long.MAX_VALUE)) {
List<Translog.Operation> expectedOps = operationsByGen.entrySet().stream()
.filter(e -> e.getKey() >= minGen)
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toList());
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps));
}
long upToSeqNo = randomLongBetween(0, 2000);
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), upToSeqNo)) {
List<Translog.Operation> expectedOps = operationsByGen.entrySet().stream()
.filter(e -> e.getKey() >= minGen)
.flatMap(e -> e.getValue().stream().filter(op -> op.seqNo() <= upToSeqNo))
.collect(Collectors.toList());
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps));
}
translog.rollGeneration();
}
}
public void testSeqNoFilterSnapshot() throws Exception {
final int generations = between(2, 20);
for (int gen = 0; gen < generations; gen++) {
List<Long> batch = LongStream.rangeClosed(0, between(0, 100)).boxed().collect(Collectors.toList());
Randomness.shuffle(batch);
for (long seqNo : batch) {
Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1});
translog.add(op);
}
translog.rollGeneration();
}
List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operations.add(op);
}
}
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, between(200, 300), between(300, 400)); // out range
assertThat(filter, SnapshotMatchers.size(0));
assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations()));
assertThat(filter.overriddenOperations(), equalTo(snapshot.overriddenOperations()));
assertThat(filter.skippedOperations(), equalTo(snapshot.totalOperations()));
}
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int fromSeqNo = between(-2, 500);
int toSeqNo = between(fromSeqNo, 500);
List<Translog.Operation> selectedOps = operations.stream()
.filter(op -> fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo).collect(Collectors.toList());
Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo);
assertThat(filter, SnapshotMatchers.containsOperationsInAnyOrder(selectedOps));
assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations()));
assertThat(filter.overriddenOperations(), equalTo(snapshot.overriddenOperations()));
assertThat(filter.skippedOperations(), equalTo(snapshot.skippedOperations() + operations.size() - selectedOps.size()));
}
}
public void assertFileIsPresent(Translog translog, long id) {
if (Files.exists(translog.location().resolve(Translog.getFilename(id)))) {
return;
@ -1304,7 +1381,7 @@ public class TranslogTests extends ESTestCase {
translog = new Translog(config, translogGeneration.translogUUID, translog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration.translogFileGeneration)) {
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) {
for (int i = minUncommittedOp; i < translogOperations; i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't",
translog.currentFileGeneration() - 1, locations.get(i).generation);
@ -1735,7 +1812,7 @@ public class TranslogTests extends ESTestCase {
}
this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration.translogFileGeneration)) {
try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) {
for (int i = firstUncommitted; i < translogOperations; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("" + i, next);
@ -2557,7 +2634,8 @@ public class TranslogTests extends ESTestCase {
generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
}
try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get);
Translog.Snapshot snapshot = translog.newSnapshotFromGen(minGenForRecovery)) {
Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(generationUUID, minGenForRecovery), Long.MAX_VALUE)) {
assertEquals(syncedDocs.size(), snapshot.totalOperations());
for (int i = 0; i < syncedDocs.size(); i++) {
Translog.Operation next = snapshot.next();

View File

@ -375,7 +375,7 @@ public abstract class EngineTestCase extends ESTestCase {
}
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
internalEngine.recoverFromTranslog();
internalEngine.recoverFromTranslog(Long.MAX_VALUE);
return internalEngine;
}