Preserve multiple translog generations

Today when a flush is performed, the translog is committed and if there
are no outstanding views, only the current translog generation is
preserved. Yet for the purpose of sequence numbers, we need stronger
guarantees than this. This commit migrates the preservation of translog
generations to keep the minimum generation that would be needed to
recover after the local checkpoint.

Relates #24015
This commit is contained in:
Jason Tedor 2017-04-17 08:51:54 -04:00 committed by GitHub
parent 8033c576b7
commit f7ebe9d18f
4 changed files with 435 additions and 100 deletions

View File

@ -299,7 +299,7 @@ public class InternalEngine extends Engine {
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
}
if (generation.translogUUID == null) {
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
@ -1233,12 +1233,12 @@ public class InternalEngine extends Engine {
try {
translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog, null);
final long committedGeneration = commitIndexWriter(indexWriter, translog, null);
logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values
refresh("version_table_flush");
// after refresh documents can be retrieved from the index so we can now commit the translog
translog.commit();
translog.commit(committedGeneration);
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
@ -1734,55 +1734,65 @@ public class InternalEngine extends Engine {
}
}
private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
/**
* Commits the specified index writer.
*
* @param writer the index writer to commit
* @param translog the translog
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @return the minimum translog generation for the local checkpoint committed with the specified index writer
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String localCheckpointValue = Long.toString(localCheckpoint);
writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
* segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
* segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
* of the commit data iterator (which occurs after all documents have been flushed to Lucene).
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
final Map<String, String> commitData = new HashMap<>(5);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
writer.commit();
} catch (Exception ex) {
return translogGeneration.translogFileGeneration;
} catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
} catch (Exception inner) {
} catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
} catch (AssertionError e) {
// IndexWriter throws AssertionError on commit, if asserts are enabled, if any files don't exist, but tests that
// randomly throw FNFE/NSFE can also hit this:
} catch (final AssertionError e) {
/*
* If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
* throw FileNotFoundException or NoSuchFileException can also hit this.
*/
if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
try {
failEngine("lucene commit failed", engineException);
} catch (Exception inner) {
} catch (final Exception inner) {
engineException.addSuppressed(inner);
}
throw engineException;
@ -1866,7 +1876,7 @@ public class InternalEngine extends Engine {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(6);
Map<String, String> commitData = new HashMap<>(5);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}

View File

@ -85,14 +85,14 @@ import java.util.stream.Stream;
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
* The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
* generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
* generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case
* the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than
* one translog file present. Such an uncommitted translog file always has a <tt>translog-${gen}.ckp</tt> associated with it which is an fsynced copy of the it's last <tt>translog.ckp</tt> such that in
* disaster recovery last fsynced offsets, number of operation etc. are still preserved.
* </p>
*/
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable, TwoPhaseCommit {
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
/*
* TODO
@ -804,6 +804,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
long seqNo();
long primaryTerm();
/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
@ -953,6 +955,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return seqNo;
}
@Override
public long primaryTerm() {
return primaryTerm;
}
@ -1104,6 +1107,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return seqNo;
}
@Override
public long primaryTerm() {
return primaryTerm;
}
@ -1180,6 +1184,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return seqNo;
}
@Override
public long primaryTerm() {
return primaryTerm;
}
@ -1347,6 +1352,31 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
out.writeInt((int) checksum);
}
/**
* Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if
* there is no generation that could any such sequence number.
*
* @param seqNo the sequence number
* @return the minimum generation for the sequence number
*/
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
try (ReleasableLock ignored = writeLock.acquire()) {
/*
* When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the
* local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will
* be the current translog generation as we do not need any prior generations to have a complete history up to the current local
* checkpoint.
*/
long minTranslogFileGeneration = this.currentFileGeneration();
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxSeqNo) {
minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
}
}
return new TranslogGeneration(translogUUID, minTranslogFileGeneration);
}
}
/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
@ -1375,27 +1405,38 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
@Override
public long prepareCommit() throws IOException {
/**
* Prepares a translog commit by setting the current committing generation and rolling the translog generation.
*
* @throws IOException if an I/O exception occurred while rolling the translog generation
*/
public void prepareCommit() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
final String message = String.format(
Locale.ROOT,
"already committing a translog with generation [%d]",
currentCommittingGeneration);
final String message =
String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration);
throw new IllegalStateException(message);
}
currentCommittingGeneration = current.getGeneration();
rollGeneration();
}
return 0;
}
@Override
public long commit() throws IOException {
/**
* Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation
* will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved.
*
* If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog
* generation to be rolled.
*
* @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations
* @throws IOException if an I/O exception occurred preparing the translog commit
*/
public void commit(final long committedGeneration) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
assert assertCommittedGenerationIsInValidRange(committedGeneration);
if (currentCommittingGeneration == NOT_SET_GENERATION) {
prepareCommit();
}
@ -1403,26 +1444,39 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration)
: "readers missing committing generation [" + currentCommittingGeneration + "]";
// set the last committed generation otherwise old files will not be cleaned up
lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1;
lastCommittedTranslogFileGeneration = committedGeneration;
currentCommittingGeneration = NOT_SET_GENERATION;
trimUnreferencedReaders();
}
return 0;
}
private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) {
assert committedGeneration <= current.generation
: "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]";
final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
assert committedGeneration >= min
: "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]";
return true;
}
/**
* Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views
* and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}.
*/
void trimUnreferencedReaders() {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
// we're shutdown potentially on some tragic event - don't delete anything
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
final long finalMinReferencedGen = minReferencedGen;
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
long minReferencedGen = Math.min(
lastCommittedTranslogFileGeneration,
outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE));
final List<TranslogReader> unreferenced =
readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
Path translogPath = unreferencedReader.path();
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
final Path translogPath = unreferencedReader.path();
logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);
IOUtils.closeWhileHandlingException(unreferencedReader);
IOUtils.deleteFilesIgnoringExceptions(translogPath,
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration())));
@ -1442,13 +1496,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
@Override
public void rollback() throws IOException {
ensureOpen();
close();
}
/**
* References a transaction log generation
*/

View File

@ -151,6 +151,7 @@ import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -166,6 +167,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.shuffle;
@ -834,6 +837,58 @@ public class InternalEngineTests extends ESTestCase {
}
}
public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
final int docs = randomIntBetween(1, 4096);
final List<Long> seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toList());
Randomness.shuffle(seqNos);
engine.close();
Engine initialEngine = null;
try {
final AtomicInteger counter = new AtomicInteger();
initialEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG)) {
@Override
public SequenceNumbersService seqNoService() {
return new SequenceNumbersService(
engine.shardId,
engine.config().getIndexSettings(),
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.UNASSIGNED_SEQ_NO) {
@Override
public long generateSeqNo() {
return seqNos.get(counter.getAndIncrement());
}
};
}
};
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null);
initialEngine.index(indexForDoc(doc));
if (rarely()) {
initialEngine.getTranslog().rollGeneration();
} else if (rarely()) {
initialEngine.flush();
}
}
} finally {
IOUtils.close(initialEngine);
}
Engine recoveringEngine = null;
try {
recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine.recoverFromTranslog();
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
assertEquals(docs, topDocs.totalHits);
}
} finally {
IOUtils.close(recoveringEngine);
}
}
public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
@ -3349,48 +3404,68 @@ public class InternalEngineTests extends ESTestCase {
searchResult.close();
}
/**
* A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier
* and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the
* value of {@code expectedLocalCheckpoint} is set accordingly.
*
* @param latchReference to latch the thread for the purpose of stalling
* @param barrier to signal the thread has generated a new sequence number
* @param stall whether or not the thread should stall
* @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence
* number
* @return a sequence number service
*/
private SequenceNumbersService getStallingSeqNoService(
final AtomicReference<CountDownLatch> latchReference,
final CyclicBarrier barrier,
final AtomicBoolean stall,
final AtomicLong expectedLocalCheckpoint) {
return new SequenceNumbersService(
shardId,
defaultSettings,
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.UNASSIGNED_SEQ_NO) {
@Override
public long generateSeqNo() {
final long seqNo = super.generateSeqNo();
final CountDownLatch latch = latchReference.get();
if (stall.get()) {
try {
barrier.await();
latch.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
} else {
if (expectedLocalCheckpoint.get() + 1 == seqNo) {
expectedLocalCheckpoint.set(seqNo);
}
}
return seqNo;
}
};
}
public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws BrokenBarrierException, InterruptedException, IOException {
engine.close();
final int docs = randomIntBetween(1, 32);
InternalEngine initialEngine = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<CountDownLatch> latchReference = new AtomicReference<>(new CountDownLatch(1));
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean skip = new AtomicBoolean();
final AtomicBoolean stall = new AtomicBoolean();
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
final List<Thread> threads = new ArrayList<>();
final SequenceNumbersService seqNoService =
new SequenceNumbersService(
shardId,
defaultSettings,
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.NO_OPS_PERFORMED,
SequenceNumbersService.UNASSIGNED_SEQ_NO) {
@Override
public long generateSeqNo() {
final long seqNo = super.generateSeqNo();
if (skip.get()) {
try {
barrier.await();
latch.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
} else {
if (expectedLocalCheckpoint.get() + 1 == seqNo) {
expectedLocalCheckpoint.set(seqNo);
}
}
return seqNo;
}
};
final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
final InternalEngine finalInitialEngine = initialEngine;
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null);
skip.set(randomBoolean());
stall.set(randomBoolean());
final Thread thread = new Thread(() -> {
try {
finalInitialEngine.index(indexForDoc(doc));
@ -3399,7 +3474,7 @@ public class InternalEngineTests extends ESTestCase {
}
});
thread.start();
if (skip.get()) {
if (stall.get()) {
threads.add(thread);
barrier.await();
} else {
@ -3411,7 +3486,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(initialEngine.seqNoService().getMaxSeqNo(), equalTo((long) (docs - 1)));
initialEngine.flush(true, true);
latch.countDown();
latchReference.get().countDown();
for (final Thread thread : threads) {
thread.join();
}
@ -3586,6 +3661,78 @@ public class InternalEngineTests extends ESTestCase {
}
}
public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierException, InterruptedException {
engine.close();
final int numberOfTriplets = randomIntBetween(1, 32);
InternalEngine actualEngine = null;
try {
final AtomicReference<CountDownLatch> latchReference = new AtomicReference<>();
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean stall = new AtomicBoolean();
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
final Map<Thread, CountDownLatch> threads = new LinkedHashMap<>();
final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
final InternalEngine finalActualEngine = actualEngine;
final Translog translog = finalActualEngine.getTranslog();
final long generation = finalActualEngine.getTranslog().currentFileGeneration();
for (int i = 0; i < numberOfTriplets; i++) {
/*
* Index three documents with the first and last landing in the same generation and the middle document being stalled until
* a later generation.
*/
stall.set(false);
index(finalActualEngine, 3 * i);
final CountDownLatch latch = new CountDownLatch(1);
latchReference.set(latch);
final int skipId = 3 * i + 1;
stall.set(true);
final Thread thread = new Thread(() -> {
try {
index(finalActualEngine, skipId);
} catch (IOException e) {
throw new AssertionError(e);
}
});
thread.start();
threads.put(thread, latch);
barrier.await();
stall.set(false);
index(finalActualEngine, 3 * i + 2);
finalActualEngine.flush();
/*
* This sequence number landed in the last generation, but the lower and upper bounds for an earlier generation straddle
* this sequence number.
*/
assertThat(translog.getMinGenerationForSeqNo(3 * i + 1).translogFileGeneration, equalTo(i + generation));
}
int i = 0;
for (final Map.Entry<Thread, CountDownLatch> entry : threads.entrySet()) {
final Map<String, String> userData = finalActualEngine.commitStats().getUserData();
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i)));
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation)));
entry.getValue().countDown();
entry.getKey().join();
finalActualEngine.flush();
i++;
}
} finally {
IOUtils.close(actualEngine);
}
}
private void index(final InternalEngine engine, final int id) throws IOException {
final String docId = Integer.toString(id);
final ParsedDocument doc =
testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
}
/**
* Return a tuple representing the sequence ID for the given {@code Get}
* operation. The first value in the tuple is the sequence number, the

View File

@ -36,8 +36,10 @@ import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@ -84,6 +86,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -101,6 +104,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.hamcrest.Matchers.containsString;
@ -124,7 +128,7 @@ public class TranslogTests extends ESTestCase {
if (translog.isOpen()) {
if (translog.currentFileGeneration() > 1) {
translog.commit();
translog.commit(translog.currentFileGeneration());
assertFileDeleted(translog, translog.currentFileGeneration() - 1);
}
translog.close();
@ -287,7 +291,7 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
translog.commit();
translog.commit(translog.currentFileGeneration());
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
assertThat(snapshot.totalOperations(), equalTo(0));
@ -373,7 +377,7 @@ public class TranslogTests extends ESTestCase {
}
}
translog.commit();
translog.commit(translog.currentFileGeneration());
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(0L));
@ -446,7 +450,7 @@ public class TranslogTests extends ESTestCase {
try (Translog.View view = translog.newView()) {
Translog.Snapshot snapshot2 = translog.newSnapshot();
translog.commit();
translog.commit(translog.currentFileGeneration());
assertThat(snapshot2, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
}
@ -821,7 +825,7 @@ public class TranslogTests extends ESTestCase {
break;
}
}
translog.commit();
translog.commit(translog.currentFileGeneration());
}
} finally {
run.set(false);
@ -858,7 +862,7 @@ public class TranslogTests extends ESTestCase {
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
}
if (rarely()) {
translog.commit();
translog.commit(translog.currentFileGeneration());
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
}
@ -878,7 +882,7 @@ public class TranslogTests extends ESTestCase {
ArrayList<Location> locations = new ArrayList<>();
for (int op = 0; op < translogOperations; op++) {
if (rarely()) {
translog.commit(); // do this first so that there is at least one pending tlog entry
translog.commit(translog.currentFileGeneration()); // do this first so that there is at least one pending tlog entry
}
final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
locations.add(location);
@ -889,7 +893,7 @@ public class TranslogTests extends ESTestCase {
assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream()));
assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
} else if (rarely()) {
translog.commit();
translog.commit(translog.currentFileGeneration());
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
} else {
@ -909,7 +913,7 @@ public class TranslogTests extends ESTestCase {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
if (rarely() && translogOperations > op + 1) {
translog.commit();
translog.commit(translog.currentFileGeneration());
}
}
Collections.shuffle(locations, random());
@ -1074,7 +1078,7 @@ public class TranslogTests extends ESTestCase {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations - 1) {
translog.commit();
translog.commit(translog.currentFileGeneration());
minUncommittedOp = op + 1;
translogGeneration = translog.getGeneration();
}
@ -1300,7 +1304,7 @@ public class TranslogTests extends ESTestCase {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
translog.commit();
translog.commit(translog.currentFileGeneration());
firstUncommitted = op + 1;
}
}
@ -1483,7 +1487,7 @@ public class TranslogTests extends ESTestCase {
}
try {
translog.commit();
translog.commit(translog.currentFileGeneration());
fail("already closed");
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
@ -1930,7 +1934,7 @@ public class TranslogTests extends ESTestCase {
if (randomBoolean()) {
failableTLog.prepareCommit();
}
failableTLog.commit();
failableTLog.commit(translog.currentFileGeneration());
syncedDocs.clear();
}
}
@ -2110,12 +2114,13 @@ public class TranslogTests extends ESTestCase {
for (int i = 0; i <= rolls; i++) {
assertFileIsPresent(translog, generation + i);
}
translog.commit();
translog.commit(generation + rolls);
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1));
assertThat(translog.totalOperations(), equalTo(0));
for (int i = 0; i <= rolls; i++) {
for (int i = 0; i < rolls; i++) {
assertFileDeleted(translog, generation + i);
}
assertFileIsPresent(translog, generation + rolls);
assertFileIsPresent(translog, generation + rolls + 1);
}
@ -2167,7 +2172,7 @@ public class TranslogTests extends ESTestCase {
}
}
translog.commit();
translog.commit(generation + rollsBefore + 1);
for (int i = 0; i <= rollsBefore; i++) {
assertFileDeleted(translog, generation + i);
@ -2178,4 +2183,130 @@ public class TranslogTests extends ESTestCase {
}
public void testMinGenerationForSeqNo() throws IOException {
final long initialGeneration = translog.getGeneration().translogFileGeneration;
final int operations = randomIntBetween(1, 4096);
final List<Long> shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
Randomness.shuffle(shuffledSeqNos);
final List<Tuple<Long, Long>> seqNos = new ArrayList<>();
final Map<Long, Long> terms = new HashMap<>();
for (final Long seqNo : shuffledSeqNos) {
seqNos.add(Tuple.tuple(seqNo, terms.computeIfAbsent(seqNo, k -> 0L)));
Long repeatingTermSeqNo = randomFrom(seqNos.stream().map(Tuple::v1).collect(Collectors.toList()));
seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.computeIfPresent(repeatingTermSeqNo, (s, t) -> t + 1)));
}
for (final Tuple<Long, Long> tuple : seqNos) {
translog.add(new Translog.NoOp(tuple.v1(), tuple.v2(), "test"));
if (rarely()) {
translog.rollGeneration();
}
}
Map<Long, Set<Tuple<Long, Long>>> generations = new HashMap<>();
translog.commit(initialGeneration);
for (long seqNo = 0; seqNo < operations; seqNo++) {
final Set<Tuple<Long, Long>> seenSeqNos = new HashSet<>();
final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration;
for (long g = generation; g < translog.currentFileGeneration(); g++) {
if (!generations.containsKey(g)) {
final Set<Tuple<Long, Long>> generationSeenSeqNos = new HashSet<>();
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g)));
try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) {
Translog.Snapshot snapshot = reader.newSnapshot();
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
generationSeenSeqNos.add(Tuple.tuple(operation.seqNo(), operation.primaryTerm()));
}
}
generations.put(g, generationSeenSeqNos);
}
seenSeqNos.addAll(generations.get(g));
}
final long seqNoLowerBound = seqNo;
final Set<Tuple<Long, Long>> expected = seqNos.stream().filter(t -> t.v1() >= seqNoLowerBound).collect(Collectors.toSet());
seenSeqNos.retainAll(expected);
assertThat(seenSeqNos, equalTo(expected));
}
}
public void testSimpleCommit() throws IOException {
final int operations = randomIntBetween(1, 4096);
long seqNo = 0;
for (int i = 0; i < operations; i++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test'"));
if (rarely()) {
translog.rollGeneration();
}
}
final long generation =
randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration()));
translog.commit(generation);
for (long i = 0; i < generation; i++) {
assertFileDeleted(translog, i);
}
for (long i = generation; i <= translog.currentFileGeneration(); i++) {
assertFileIsPresent(translog, i);
}
}
public void testPrepareCommitAndCommit() throws IOException {
final int operations = randomIntBetween(1, 4096);
long seqNo = 0;
long last = -1;
for (int i = 0; i < operations; i++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
if (rarely()) {
final long generation = translog.currentFileGeneration();
translog.prepareCommit();
if (rarely()) {
// simulate generation filling up and rolling between preparing the commit and committing
translog.rollGeneration();
}
final int committedGeneration = randomIntBetween(Math.max(1, Math.toIntExact(last)), Math.toIntExact(generation));
translog.commit(committedGeneration);
last = committedGeneration;
for (long g = 0; i < generation; g++) {
assertFileDeleted(translog, g);
}
for (long g = generation; g < translog.currentFileGeneration(); g++) {
assertFileIsPresent(translog, g);
}
}
}
}
public void testCommitWithOpenView() throws IOException {
final int operations = randomIntBetween(1, 4096);
long seqNo = 0;
long lastCommittedGeneration = -1;
for (int i = 0; i < operations; i++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
if (rarely()) {
try (Translog.View ignored = translog.newView()) {
final long viewGeneration = lastCommittedGeneration;
translog.prepareCommit();
final long committedGeneration = randomIntBetween(
Math.max(1, Math.toIntExact(lastCommittedGeneration)),
Math.toIntExact(translog.currentFileGeneration()));
translog.commit(committedGeneration);
lastCommittedGeneration = committedGeneration;
// with an open view, committing should preserve generations back to the last committed generation
for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) {
assertFileDeleted(translog, g);
}
// the view generation could be -1 if no commit has been performed
final long max = Math.max(1, Math.min(lastCommittedGeneration, viewGeneration));
for (long g = max; g < translog.currentFileGeneration(); g++) {
assertFileIsPresent(translog, g);
}
}
}
}
}
}