Use local checkpoint to calculate min translog gen for recovery (#51905)

Today we use the translog_generation of the safe commit as the minimum
required translog generation for recovery. This approach has a
limitation, where we won't be able to clean up translog unless we flush.
Reopening an already recovered engine will create a new empty translog,
and we leave it there until we force flush.

This commit removes the translog_generation commit tag and uses the
local checkpoint of the safe commit to calculate the minimum required
translog generation for recovery instead.

Closes #49970
This commit is contained in:
Nhat Nguyen 2020-02-10 08:26:01 -05:00
parent b4179a8814
commit db6b9c21c7
24 changed files with 343 additions and 552 deletions

View File

@ -88,6 +88,13 @@
cluster.health:
wait_for_no_initializing_shards: true
wait_for_events: languid
# Before 8.0, an empty shard has two empty translog files as we used the translog_generation commit tag as the minimum required
# translog generation for recovery. Here we force-flush to have a consistent translog stats for both old and new indices.
- do:
indices.flush:
index: test
force: true
wait_if_ongoing: true
- do:
indices.stats:
metric: [ translog ]
@ -115,10 +122,9 @@
- do:
indices.stats:
metric: [ translog ]
# after flushing we have one empty translog file while an empty index before flushing has two empty translog files.
- lt: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.operations: 0 }
- lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }
---

View File

@ -121,16 +121,10 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy {
assert Thread.holdsLock(this);
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
softDeletesPolicy.setLocalCheckpointOfSafeCommit(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
}
protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {

View File

@ -400,7 +400,7 @@ public class InternalEngine extends Engine {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) {
return translogRecoveryRunner.run(this, snapshot);
}
}
@ -473,23 +473,24 @@ public class InternalEngine extends Engine {
}
private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
final long localCheckpoint = getProcessedLocalCheckpoint();
if (localCheckpoint < recoverUpToSeqNo) {
try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
} else {
opsRecovered = 0;
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null :
translogGeneration.translogFileGeneration, translog.currentFileGeneration());
logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered, translog.currentFileGeneration());
commitIndexWriter(indexWriter, translog, null);
refreshLastCommittedSegmentInfos();
refresh("translog_recovery");
@ -501,7 +502,8 @@ public class InternalEngine extends Engine {
LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(), persistedSequenceNumberConsumer);
@ -549,7 +551,7 @@ public class InternalEngine extends Engine {
ensureSoftDeletesEnabled();
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo);
return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE);
}
}
@ -598,18 +600,6 @@ public class InternalEngine extends Engine {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}
/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() throws IOException {
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}
/**
* Reads the current stored history ID from the IW commit data.
*/
@ -1688,8 +1678,9 @@ public class InternalEngine extends Engine {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) {
long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
if (syncId != null && indexWriter.hasUncommittedChanges() &&
translog.estimateTotalOperationsFromMinSeq(localCheckpointOfLastCommit + 1) == 0) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
@ -1714,8 +1705,10 @@ public class InternalEngine extends Engine {
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long localCheckpointOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long translogGenerationOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration;
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
@ -2423,11 +2416,6 @@ public class InternalEngine extends Engine {
ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpointValue = Long.toString(localCheckpoint);
writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
@ -2438,10 +2426,9 @@ public class InternalEngine extends Engine {
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(8);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
@ -2657,7 +2644,7 @@ public class InternalEngine extends Engine {
return true;
}
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1);
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {

View File

@ -28,6 +28,7 @@ import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
@ -137,31 +138,23 @@ public final class NoOpEngine extends ReadOnlyEngine {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1) {
if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);
if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
}
final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen " + translog.currentFileGeneration() + " != min gen " + translog.getMinFileGeneration();
}
}
} catch (final Exception e) {

View File

@ -223,15 +223,14 @@ public class ReadOnlyEngine extends Engine {
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
config.getIndexSettings().getTranslogRetentionTotalFiles()
);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);
final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {

View File

@ -1453,10 +1453,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
updateCommitData(writer, map);
updateCommitData(writer, Collections.singletonMap(Translog.TRANSLOG_UUID_KEY, translogUUID));
} finally {
metadataLock.writeLock().unlock();
}
@ -1517,7 +1514,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
if (indexVersionCreated.before(org.elasticsearch.Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
final String translogGeneration = commit.getUserData().get("translog_generation");
if (translogGeneration == null || minRetainedTranslogGen <= Long.parseLong(translogGeneration)) {
recoverableCommits.add(commit);
}
}

View File

@ -73,9 +73,7 @@ import java.util.stream.Stream;
/**
* A Translog is a per index shard component that records all non-committed index operations in a durable manner.
* In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. The engine
* records the current translog generation {@link Translog#getGeneration()} in it's commit metadata using {@link #TRANSLOG_GENERATION_KEY}
* to reference the generation that contains all operations that have not yet successfully been committed to the engines lucene index.
* In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}.
* Additionally, since Elasticsearch 2.0 the engine also records a {@link #TRANSLOG_UUID_KEY} with each commit to ensure a strong
* association between the lucene index an the transaction log file. This UUID is used to prevent accidental recovery from a transaction
* log that belongs to a
@ -106,7 +104,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* - we need to page align the last write before we sync, we can take advantage of ensureSynced for this since we might have already
* fsynced far enough
*/
public static final String TRANSLOG_GENERATION_KEY = "translog_generation";
public static final String TRANSLOG_UUID_KEY = "translog_uuid";
public static final String TRANSLOG_FILE_PREFIX = "translog-";
public static final String TRANSLOG_FILE_SUFFIX = ".tlog";
@ -222,16 +219,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
ArrayList<TranslogReader> foundTranslogs = new ArrayList<>();
try (ReleasableLock ignored = writeLock.acquire()) {
logger.debug("open uncommitted translog checkpoint {}", checkpoint);
final long minGenerationToRecoverFrom;
if (checkpoint.minTranslogGeneration < 0) {
final Version indexVersionCreated = indexSettings().getIndexVersionCreated();
assert indexVersionCreated.before(Version.V_6_0_0_beta1) :
"no minTranslogGeneration in checkpoint, but index was created with version [" + indexVersionCreated + "]";
minGenerationToRecoverFrom = deletionPolicy.getMinTranslogGenerationForRecovery();
} else {
minGenerationToRecoverFrom = checkpoint.minTranslogGeneration;
}
final long minGenerationToRecoverFrom = checkpoint.minTranslogGeneration;
// we open files in reverse order in order to validate the translog uuid before we start traversing the translog based on
// the generation id we found in the lucene commit. This gives for better error messages if the wrong
@ -608,33 +596,28 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
}
}
/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
* Snapshots are fixed in time and will not be updated with future operations.
*/
// for testing
public Snapshot newSnapshot() throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE);
}
return newSnapshot(0, Long.MAX_VALUE);
}
public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException {
/**
* Creates a new translog snapshot containing operations from the given range.
*
* @param fromSeqNo the lower bound of the range (inclusive)
* @param toSeqNo the upper bound of the range (inclusive)
* @return the new snapshot
*/
public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException {
assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo;
assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long fromFileGen = fromGeneration.translogFileGeneration;
if (fromFileGen < getMinFileGeneration()) {
throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " +
"Min referenced generation is [" + getMinFileGeneration() + "]");
}
TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo)
.filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo())
.map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new);
final Snapshot snapshot = newMultiSnapshot(snapshots);
if (upToSeqNo == Long.MAX_VALUE) {
return snapshot;
} else {
return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo);
}
return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo);
}
}
@ -668,15 +651,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return null;
}
public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot)
.toArray(TranslogSnapshot[]::new);
return newMultiSnapshot(snapshots);
}
}
private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOException {
final Closeable onClose;
if (snapshots.length == 0) {
@ -866,7 +840,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
public TranslogStats stats() {
// acquire lock to make the two numbers roughly consistent (no file change half way)
try (ReleasableLock lock = readLock.acquire()) {
final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit();
long uncommittedGen = getMinGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1).translogFileGeneration;
return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen),
sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge());
}
@ -966,7 +940,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* shares the same underlying resources with the {@code delegate} snapshot, therefore we should not
* use the {@code delegate} after passing it to this filtered snapshot.
*/
static final class SeqNoFilterSnapshot implements Snapshot {
private static final class SeqNoFilterSnapshot implements Snapshot {
private final Snapshot delegate;
private int filteredOpsCount;
private final long fromSeqNo; // inclusive
@ -1626,22 +1600,20 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
*/
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
/*
* When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the
* local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will
* be the current translog generation as we do not need any prior generations to have a complete history up to the current local
* checkpoint.
*/
long minTranslogFileGeneration = this.currentFileGeneration();
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) {
minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
}
}
return new TranslogGeneration(translogUUID, minTranslogFileGeneration);
return new TranslogGeneration(translogUUID, minGenerationForSeqNo(seqNo, current, readers));
}
}
private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List<TranslogReader> readers) {
long minGen = writer.generation;
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) {
minGen = Math.min(minGen, reader.getGeneration());
}
}
return minGen;
}
/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
@ -1681,7 +1653,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current);
long minReferencedGen = Math.min(deletionPolicy.minTranslogGenRequired(readers, current),
minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers));
assert minReferencedGen >= getMinFileGeneration() :
"deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is ["
+ getMinFileGeneration() + "]";

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.translog;
import org.apache.lucene.util.Counter;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
import java.util.HashMap;
@ -47,17 +48,7 @@ public class TranslogDeletionPolicy {
* translog generation
*/
private final Map<Long, Counter> translogRefCounts = new HashMap<>();
/**
* the translog generation that is requires to properly recover from the oldest non deleted
* {@link org.apache.lucene.index.IndexCommit}.
*/
private long minTranslogGenerationForRecovery = 1;
/**
* This translog generation is used to calculate the number of uncommitted operations since the last index commit.
*/
private long translogGenerationOfLastCommit = 1;
private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
private long retentionSizeInBytes;
@ -76,23 +67,12 @@ public class TranslogDeletionPolicy {
}
}
public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) {
throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," +
"current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]");
public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
if (newCheckpoint < this.localCheckpointOfSafeCommit) {
throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " +
"current [" + this.localCheckpointOfSafeCommit + "] new [" + newCheckpoint + "]");
}
minTranslogGenerationForRecovery = newGen;
}
/**
* Sets the translog generation of the last index commit.
*/
public synchronized void setTranslogGenerationOfLastCommit(long lastGen) {
if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," +
"current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]");
}
translogGenerationOfLastCommit = lastGen;
this.localCheckpointOfSafeCommit = newCheckpoint;
}
public synchronized void setRetentionSizeInBytes(long bytes) {
@ -172,7 +152,7 @@ public class TranslogDeletionPolicy {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery));
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
}
static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
@ -222,16 +202,11 @@ public class TranslogDeletionPolicy {
return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
}
/** returns the translog generation that will be used as a basis of a future store/peer recovery */
public synchronized long getMinTranslogGenerationForRecovery() {
return minTranslogGenerationForRecovery;
}
/**
* Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit.
* Returns the local checkpoint of the safe commit. This value is used to calculate the min required generation for recovery.
*/
public synchronized long getTranslogGenerationOfLastCommit() {
return translogGenerationOfLastCommit;
public synchronized long getLocalCheckpointOfSafeCommit() {
return localCheckpointOfSafeCommit;
}
synchronized long getTranslogRefCount(long gen) {

View File

@ -129,30 +129,25 @@ public class TruncateTranslogAction {
// Retrieve the generation and UUID from the existing data
commitData = commits.get(commits.size() - 1).getUserData();
final String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY);
final String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY);
if (translogGeneration == null || translogUUID == null) {
throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]",
translogGeneration, translogUUID);
if (translogUUID == null) {
throw new ElasticsearchException("shard must have a valid translog UUID");
}
final long globalCheckpoint = commitData.containsKey(SequenceNumbers.MAX_SEQ_NO)
? Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO))
: SequenceNumbers.UNASSIGNED_SEQ_NO;
terminal.println("Translog Generation: " + translogGeneration);
terminal.println("Translog UUID : " + translogUUID);
terminal.println("History UUID : " + historyUUID);
Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME);
Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME);
Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX +
translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX +
translogGeneration + Translog.TRANSLOG_FILE_SUFFIX);
final long gen = 1;
Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + gen + Translog.TRANSLOG_FILE_SUFFIX);
Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + gen + Translog.TRANSLOG_FILE_SUFFIX);
// Write empty checkpoint and translog to empty files
long gen = Long.parseLong(translogGeneration);
int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID);
writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint);
@ -192,7 +187,7 @@ public class TruncateTranslogAction {
};
try (Translog translog = new Translog(translogConfig, translogUUID,
retainAllTranslogPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm, seqNo -> {});
Translog.Snapshot snapshot = translog.newSnapshot()) {
Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)) {
//noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot
while (snapshot.next() != null) {
}

View File

@ -123,7 +123,6 @@ public class IndicesStatsTests extends ESSingleNodeTestCase {
assertNotNull(commitStats);
assertThat(commitStats.getGeneration(), greaterThan(0L));
assertThat(commitStats.getId(), notNullValue());
assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY));
assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY));
}
}

View File

@ -43,10 +43,8 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -393,8 +391,6 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
.build());
Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
final Path translogPath = translog.getConfig().getTranslogPath();
final String translogUuid = translog.getTranslogUUID();
int translogOps = 0;
final int numDocs = scaledRandomIntBetween(10, 100);
@ -415,15 +411,9 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
assertTrue(indexService.getTrimTranslogTask().mustReschedule());
final long lastCommitedTranslogGeneration;
try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) {
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertBusy(() -> {
long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid);
assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration));
});
final Engine readOnlyEngine = getEngine(indexService.getShard(0));
assertBusy(() ->
assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES)));
assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT));

View File

@ -58,20 +58,16 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint);
final LongArrayList maxSeqNoList = new LongArrayList();
final LongArrayList translogGenList = new LongArrayList();
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = 0;
long lastCheckpoint = lastMaxSeqNo;
long lastTranslogGen = 0;
final UUID translogUUID = UUID.randomUUID();
for (int i = 0; i < totalCommits; i++) {
lastMaxSeqNo += between(1, 10000);
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
lastTranslogGen += between(1, 100);
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen));
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID));
maxSeqNoList.add(lastMaxSeqNo);
translogGenList.add(lastTranslogGen);
}
int keptIndex = randomInt(commitList.size() - 1);
@ -88,8 +84,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
verify(commitList.get(i), never()).delete();
}
}
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex)));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(keptIndex))));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(),
equalTo(Math.max(NO_OPS_PERFORMED,
Math.min(getLocalCheckpoint(commitList.get(keptIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
@ -105,7 +100,6 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint);
long lastMaxSeqNo = between(1, 1000);
long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo);
long lastTranslogGen = between(1, 20);
int safeIndex = 0;
List<IndexCommit> commitList = new ArrayList<>();
List<IndexCommit> snapshottingCommits = new ArrayList<>();
@ -115,8 +109,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
for (int n = 0; n < newCommits; n++) {
lastMaxSeqNo += between(1, 1000);
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
lastTranslogGen += between(1, 20);
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen));
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID));
}
// Advance the global checkpoint to between [safeIndex, safeIndex + 1)
safeIndex = randomIntBetween(safeIndex, commitList.size() - 1);
@ -155,10 +148,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
// Snapshotting commits must not be deleted.
snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false)));
// We don't need to retain translog for snapshotting commits.
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(),
equalTo(Long.parseLong(commitList.get(safeIndex).getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(),
equalTo(Long.parseLong(commitList.get(commitList.size() - 1).getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(safeIndex))));
assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(
Math.max(NO_OPS_PERFORMED,
Math.min(getLocalCheckpoint(commitList.get(safeIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
@ -171,8 +161,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
assertThat(commitList.get(i).isDeleted(), equalTo(true));
}
assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false));
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get());
assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(
Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps))));
@ -188,19 +177,17 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final List<IndexCommit> commitList = new ArrayList<>();
for (int i = 0; i < invalidCommits; i++) {
long maxSeqNo = randomNonNegativeLong();
commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID(), randomNonNegativeLong()));
commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID()));
}
final UUID expectedTranslogUUID = UUID.randomUUID();
long lastTranslogGen = 0;
final int validCommits = between(1, 10);
long lastMaxSeqNo = between(1, 1000);
long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo);
for (int i = 0; i < validCommits; i++) {
lastTranslogGen += between(1, 1000);
lastMaxSeqNo += between(1, 1000);
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID, lastTranslogGen));
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID));
}
// We should never keep invalid commits regardless of the value of the global checkpoint.
@ -222,12 +209,10 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);
long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo);
long lastTranslogGen = between(1, 50);
for (int i = 0; i < totalCommits; i++) {
lastMaxSeqNo += between(1, 10000);
lastTranslogGen += between(1, 100);
lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo);
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen));
commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID));
}
int safeCommitIndex = randomIntBetween(0, commitList.size() - 1);
globalCheckpoint.set(Long.parseLong(commitList.get(safeCommitIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
@ -236,8 +221,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
if (safeCommitIndex == commitList.size() - 1) {
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
} else {
// Advanced but not enough for any commit after the safe commit becomes safe
@ -254,8 +238,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
commitList.forEach(this::resetDeletion);
indexPolicy.onCommit(commitList);
// Safe commit is the last commit - no need to clean up
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen));
assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1))));
assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false));
}
}
@ -271,12 +254,11 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
};
}
IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString());
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen));
final IndexCommit commit = mock(IndexCommit.class);
final Directory directory = mock(Directory.class);
when(commit.getUserData()).thenReturn(userData);

View File

@ -98,6 +98,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -176,6 +177,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToLongBiFunction;
@ -697,7 +699,6 @@ public class InternalEngineTests extends EngineTestCase {
CommitStats stats1 = engine.commitStats();
assertThat(stats1.getGeneration(), greaterThan(0L));
assertThat(stats1.getId(), notNullValue());
assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY));
assertThat(stats1.getUserData(), hasKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
assertThat(
Long.parseLong(stats1.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
@ -722,11 +723,7 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration()));
assertThat(stats2.getId(), notNullValue());
assertThat(stats2.getId(), not(equalTo(stats1.getId())));
assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY));
assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY));
assertThat(
stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY),
not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY),
equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY)));
assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get()));
@ -1157,6 +1154,7 @@ public class InternalEngineTests extends EngineTestCase {
final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get();
engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier));
engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, randomNonNegativeLong());
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
boolean inSync = randomBoolean();
@ -1167,24 +1165,20 @@ public class InternalEngineTests extends EngineTestCase {
engine.flush();
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L));
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
engine.flush();
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L));
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L));
engine.flush(true, true);
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L));
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 4L : 2L));
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
engine.flush(true, true);
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(5L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(5L));
assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L));
}
public void testSyncedFlush() throws IOException {
@ -2817,7 +2811,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
}
public void testCurrentTranslogIDisCommitted() throws IOException {
public void testCurrentTranslogUUIIDIsCommitted() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null,
@ -2842,7 +2836,6 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE));
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
@ -2852,18 +2845,9 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine engine = new InternalEngine(config)) {
expectThrows(IllegalStateException.class, engine::ensureCanFlush);
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
} else {
// creating an empty index will create the first translog gen and commit it
// opening the empty index will make the second translog file but not commit it
// opening the engine again (i=0) will make the third translog file, which then be committed
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
@ -2876,7 +2860,6 @@ public class InternalEngineTests extends EngineTestCase {
store.associateIndexWithNewTranslog(translogUUID);
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(2, engine.getTranslog().currentFileGeneration());
@ -2889,12 +2872,9 @@ public class InternalEngineTests extends EngineTestCase {
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1",
userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
@ -3010,8 +2990,9 @@ public class InternalEngineTests extends EngineTestCase {
globalCheckpointSupplier))) {
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, 1);
final long committedGen = Long.valueOf(
engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final long localCheckpoint = Long.parseLong(
engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long committedGen = engine.getTranslog().getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration;
for (int gen = 1; gen < committedGen; gen++) {
final Path genFile = translogPath.resolve(Translog.getFilename(gen));
assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile));
@ -4518,7 +4499,6 @@ public class InternalEngineTests extends EngineTestCase {
for (final Map.Entry<Thread, CountDownLatch> entry : threads.entrySet()) {
final Map<String, String> userData = finalActualEngine.commitStats().getUserData();
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i)));
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation)));
entry.getValue().countDown();
entry.getKey().join();
finalActualEngine.flush();
@ -4579,6 +4559,7 @@ public class InternalEngineTests extends EngineTestCase {
final EngineConfig engineConfig;
final SeqNoStats prevSeqNoStats;
final List<DocIdSeqNoAndSource> prevDocs;
final List<Translog.Operation> existingTranslog;
try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) {
engineConfig = engine.config();
for (final long seqNo : seqNos) {
@ -4597,6 +4578,9 @@ public class InternalEngineTests extends EngineTestCase {
engine.syncTranslog();
prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
prevDocs = getDocIds(engine, true);
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
existingTranslog = TestTranslog.drainSnapshot(snapshot, false);
}
}
try (InternalEngine engine = new InternalEngine(engineConfig)) {
final Translog.TranslogGeneration currrentTranslogGeneration = new Translog.TranslogGeneration(
@ -4607,8 +4591,10 @@ public class InternalEngineTests extends EngineTestCase {
SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint()));
assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo()));
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshotFromGen(currrentTranslogGeneration, Long.MAX_VALUE)) {
assertThat("restore from local translog must not add operations to translog", snapshot, SnapshotMatchers.size(0));
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
assertThat("restore from local translog must not add operations to translog",
snapshot.totalOperations(), equalTo(existingTranslog.size()));
assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(existingTranslog));
}
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
@ -5077,6 +5063,10 @@ public class InternalEngineTests extends EngineTestCase {
assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false));
// A new engine may have more than one empty translog files - the test should account this extra.
final Translog translog = engine.getTranslog();
final IntSupplier uncommittedTranslogOperationsSinceLastCommit = () -> {
long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration);
};
final long extraTranslogSizeInNewEngine =
engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES;
int numDocs = between(10, 100);
@ -5098,7 +5088,7 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush();
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0));
// Stale operations skipped by Lucene but added to translog - still able to flush
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc =
@ -5107,11 +5097,11 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(result.isCreated(), equalTo(false));
}
SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos();
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs));
assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(numDocs));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(true));
engine.flush(false, false);
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0));
// If the new index commit still points to the same translog generation as the current index commit,
// we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes.
generateNewSeqNo(engine); // create a gap here
@ -6039,11 +6029,21 @@ public class InternalEngineTests extends EngineTestCase {
engine.forceMerge(randomBoolean(), 1, false, false, false);
}
}
if (randomBoolean()) {
// engine is flushed properly before shutting down.
engine.syncTranslog();
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
engine.flush();
}
docs = getDocIds(engine, true);
}
try (InternalEngine engine = new InternalEngine(config)) {
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, randomBoolean()), equalTo(docs));
if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) {
assertThat("engine should trim all unreferenced translog after recovery",
engine.getTranslog().getMinFileGeneration(), equalTo(engine.getTranslog().currentFileGeneration()));
}
}
}
}
@ -6098,12 +6098,12 @@ public class InternalEngineTests extends EngineTestCase {
engine.rollTranslogGeneration();
engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(snapshot.totalOperations(), equalTo(0));
assertNull(snapshot.next());
}
applyOperations(engine, operations);
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2));
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos));
}

View File

@ -28,6 +28,8 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.ParsedDocument;
@ -36,14 +38,12 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.test.IndexSettingsModule;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
@ -55,7 +55,6 @@ public class NoOpEngineTests extends EngineTestCase {
public void testNoopEngine() throws IOException {
engine.close();
final NoOpEngine engine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir));
expectThrows(UnsupportedOperationException.class, () -> engine.syncFlush(null, null));
assertThat(engine.refreshNeeded(), equalTo(false));
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
engine.close();
@ -123,7 +122,7 @@ public class NoOpEngineTests extends EngineTestCase {
for (int i = 0; i < numDocs; i++) {
if (randomBoolean()) {
String delId = Integer.toString(i);
Engine.DeleteResult result = engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get()));
Engine.DeleteResult result = engine.delete(new Engine.Delete("_doc", delId, newUid(delId), primaryTerm.get()));
assertTrue(result.isFound());
engine.syncTranslog(); // advance persisted local checkpoint
globalCheckpoint.set(engine.getPersistedLocalCheckpoint());
@ -131,7 +130,7 @@ public class NoOpEngineTests extends EngineTestCase {
}
}
engine.getLocalCheckpointTracker().waitForProcessedOpsToComplete(numDocs + deletions - 1);
flushAndTrimTranslog(engine);
engine.flush(true, true);
}
final DocsStats expectedDocStats;
@ -168,52 +167,33 @@ public class NoOpEngineTests extends EngineTestCase {
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
boolean softDeleteEnabled = engine.config().getIndexSettings().isSoftDeleteEnabled();
engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, randomNonNegativeLong());
final int numDocs = scaledRandomIntBetween(10, 3000);
int totalTranslogOps = 0;
for (int i = 0; i < numDocs; i++) {
totalTranslogOps++;
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
tracker.updateLocalCheckpoint(allocationId.getId(), i);
if (rarely()) {
totalTranslogOps = 0;
engine.flush();
}
if (randomBoolean()) {
engine.rollTranslogGeneration();
}
}
// prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine.
final Translog.Snapshot snapshot = engine.getTranslog().newSnapshot();
engine.flush(true, true);
final String translogUuid = engine.getTranslog().getTranslogUUID();
final long minFileGeneration = engine.getTranslog().getMinFileGeneration();
final long currentFileGeneration = engine.getTranslog().currentFileGeneration();
engine.close();
final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker));
final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath();
final long lastCommitedTranslogGeneration;
try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) {
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration));
}
assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration));
assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeleteEnabled ? 0 : numDocs));
assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps));
noOpEngine.trimUnreferencedTranslogFiles();
assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration));
assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(0));
assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
assertThat(noOpEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long)Translog.DEFAULT_HEADER_SIZE_IN_BYTES));
snapshot.close();
noOpEngine.close();
}
private void flushAndTrimTranslog(final InternalEngine engine) {
engine.flush(true, true);
final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy();
deletionPolicy.setRetentionSizeInBytes(-1);
deletionPolicy.setRetentionAgeInMillis(-1);
deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration);
engine.flush(true, true);
}
}

View File

@ -803,7 +803,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback);
done.set(true);
thread.join();
shards.syncGlobalCheckpoint();
for (IndexShard shard : shards) {
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0));

View File

@ -352,17 +352,17 @@ public class IndexShardIT extends ESSingleNodeTestCase {
assertFalse(shard.shouldPeriodicallyFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
new ByteSizeValue(190 /* size of the operation + two generations header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0")
new ByteSizeValue(135 /* size of the operation + one generation header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "_doc").setId("0")
.setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertFalse(shard.shouldPeriodicallyFlush());
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
new SourceToParse("test", "test", "1", new BytesArray("{}"), XContentType.JSON),
new SourceToParse("test", "_doc", "1", new BytesArray("{}"), XContentType.JSON),
SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
assertTrue(shard.shouldPeriodicallyFlush());
final Translog translog = getTranslog(shard);
assertEquals(2, translog.stats().getUncommittedOperations());
client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON)
client().prepareIndex("test", "_doc", "2").setSource("{}", XContentType.JSON)
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
assertBusy(() -> { // this is async
assertFalse(shard.shouldPeriodicallyFlush());
@ -376,7 +376,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES))
.build()).get();
client().prepareDelete("test", "test", "2").get();
client().prepareDelete("test", "_doc", "2").get();
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]",
translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration());
assertBusy(() -> { // this is async
@ -434,8 +434,8 @@ public class IndexShardIT extends ESSingleNodeTestCase {
final boolean flush = randomBoolean();
final Settings settings;
if (flush) {
// size of the operation plus two generations of overhead.
settings = Settings.builder().put("index.translog.flush_threshold_size", "180b").build();
// size of the operation plus the overhead of one generation.
settings = Settings.builder().put("index.translog.flush_threshold_size", "125b").build();
} else {
// size of the operation plus header and footer
settings = Settings.builder().put("index.translog.generation_threshold_size", "117b").build();

View File

@ -117,7 +117,6 @@ import org.elasticsearch.index.store.StoreUtils;
import org.elasticsearch.index.translog.TestTranslog;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.translog.TranslogTests;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -1405,7 +1404,7 @@ public class IndexShardTests extends IndexShardTestCase {
latch.await();
for (int i = 0; i < 10000; i++) {
semaphore.acquire();
shard.sync(TranslogTests.randomTranslogLocation(), (ex) -> semaphore.release());
shard.sync(new Translog.Location(randomLong(), randomLong(), randomInt()), (ex) -> semaphore.release());
}
} catch (Exception ex) {
throw new RuntimeException(ex);
@ -2033,17 +2032,20 @@ public class IndexShardTests extends IndexShardTestCase {
shard.sync(); // advance local checkpoint
final int translogOps;
final int replayedOps;
if (randomBoolean()) {
// Advance the global checkpoint to remove the 1st commit; this shard will recover the 2nd commit.
shard.updateGlobalCheckpointOnReplica(3, "test");
logger.info("--> flushing shard");
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
translogOps = 4; // delete #1 won't be replayed.
} else if (randomBoolean()) {
shard.getEngine().rollTranslogGeneration();
translogOps = 5;
replayedOps = 3;
} else {
if (randomBoolean()) {
shard.getEngine().rollTranslogGeneration();
}
translogOps = 5;
replayedOps = 5;
}
final ShardRouting replicaRouting = shard.routingEntry();
@ -2053,10 +2055,9 @@ public class IndexShardTests extends IndexShardTestCase {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(recoverFromStore(newShard));
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(replayedOps, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry()));
assertDocCount(newShard, 3);
closeShards(newShard);

View File

@ -126,15 +126,12 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
}
if (syncNeeded && globalCheckPoint < numDocs - 1) {
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
assertThat(resyncTask.getResyncedOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
if (shard.indexSettings.isSoftDeleteEnabled()) {
assertThat(resyncTask.getSkippedOperations(), equalTo(0));
assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations()));
assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint)));
} else {
int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included
assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps));
assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps));
assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs));
assertThat(resyncTask.getTotalOperations(), equalTo(numDocs));
}
} else {
assertThat(resyncTask.getSkippedOperations(), equalTo(0));

View File

@ -67,7 +67,6 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
@ -845,9 +844,7 @@ public class StoreTests extends ESTestCase {
writer.addDocument(doc);
Map<String, String> commitData = new HashMap<>(2);
String syncId = "a sync id";
String translogId = "a translog id";
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogId);
writer.setLiveCommitData(commitData.entrySet());
writer.commit();
writer.close();
@ -856,7 +853,6 @@ public class StoreTests extends ESTestCase {
assertFalse(metadata.asMap().isEmpty());
// do not check for correct files, we have enough tests for that above
assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
assertThat(metadata.getCommitUserData().get(Translog.TRANSLOG_GENERATION_KEY), equalTo(translogId));
TestUtil.checkIndex(store.directory());
assertDeleteContent(store, store.directory());
IOUtils.close(store);

View File

@ -22,12 +22,8 @@ package org.elasticsearch.index.translog;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -69,7 +65,7 @@ public class TestTranslog {
* See {@link TestTranslog#corruptFile(Logger, Random, Path, boolean)} for details of the corruption applied.
*/
public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir) throws IOException {
corruptRandomTranslogFile(logger, random, translogDir, minTranslogGenUsedInRecovery(translogDir));
corruptRandomTranslogFile(logger, random, translogDir, Translog.readCheckpoint(translogDir).minTranslogGeneration);
}
/**
@ -188,19 +184,6 @@ public class TestTranslog {
}
}
/**
* Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog.
*/
private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
IndexCommit recoveringCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint);
return Long.parseLong(recoveringCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
}
}
/**
* Returns the primary term associated with the current translog writer of the given translog.
*/

View File

@ -36,6 +36,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Math.min;
import static org.hamcrest.Matchers.equalTo;
@ -48,12 +49,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
allGens.add(readersAndWriter.v2());
try {
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0, 0);
assertMinGenRequired(deletionPolicy, readersAndWriter, 1L);
final int committedReader = randomIntBetween(0, allGens.size() - 1);
final long committedGen = allGens.get(committedReader).generation;
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen);
assertMinGenRequired(deletionPolicy, readersAndWriter, allGens.get(allGens.size() - 1).generation);
} finally {
IOUtils.close(readersAndWriter.v1());
IOUtils.close(readersAndWriter.v2());
@ -127,8 +123,6 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
allGens.add(readersAndWriter.v2());
try {
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE);
deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE);
deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE);
int selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGenerationByAge = allGens.get(selectedReader).generation;
long maxAge = now - allGens.get(selectedReader).getLastModifiedTime();
@ -145,31 +139,28 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles));
// make a new policy as committed gen can't go backwards (for now)
deletionPolicy = new MockDeletionPolicy(now, size, maxAge, totalFiles);
long committedGen = randomFrom(allGens).generation;
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(committedGen);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen,
max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)));
assertMinGenRequired(deletionPolicy, readersAndWriter,
max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles));
long viewGen = randomFrom(allGens).generation;
try (Releasable ignored = deletionPolicy.acquireTranslogGen(viewGen)) {
assertMinGenRequired(deletionPolicy, readersAndWriter,
min3(committedGen, viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)));
min(viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)));
// disable age
deletionPolicy.setRetentionAgeInMillis(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter,
min3(committedGen, viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles)));
min(viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles)));
// disable size
deletionPolicy.setRetentionAgeInMillis(maxAge);
deletionPolicy.setRetentionSizeInBytes(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter,
min3(committedGen, viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles)));
min(viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles)));
// disable age and zie
deletionPolicy.setRetentionAgeInMillis(-1);
deletionPolicy.setRetentionSizeInBytes(-1);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen));
assertMinGenRequired(deletionPolicy, readersAndWriter, viewGen);
// disable total files
deletionPolicy.setRetentionTotalFiles(0);
assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen));
assertMinGenRequired(deletionPolicy, readersAndWriter, viewGen);
}
} finally {
IOUtils.close(readersAndWriter.v1());
@ -232,8 +223,4 @@ public class TranslogDeletionPolicyTests extends ESTestCase {
private static long max3(long x1, long x2, long x3) {
return Math.max(Math.max(x1, x2), x3);
}
private static long min3(long x1, long x2, long x3) {
return Math.min(Math.min(x1, x2), x3);
}
}

View File

@ -166,7 +166,7 @@ public class TranslogTests extends ESTestCase {
if (translog.isOpen()) {
if (translog.currentFileGeneration() > 1) {
markCurrentGenAsCommitted(translog);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(Long.MAX_VALUE);
translog.trimUnreferencedReaders();
assertFileDeleted(translog, translog.currentFileGeneration() - 1);
}
@ -201,28 +201,6 @@ public class TranslogTests extends ESTestCase {
}
private void markCurrentGenAsCommitted(Translog translog) throws IOException {
long genToCommit = translog.currentFileGeneration();
long genToRetain = randomLongBetween(translog.getDeletionPolicy().getMinTranslogGenerationForRecovery(), genToCommit);
commit(translog, genToRetain, genToCommit);
}
private void rollAndCommit(Translog translog) throws IOException {
translog.rollGeneration();
markCurrentGenAsCommitted(translog);
}
private long commit(Translog translog, long genToRetain, long genToCommit) throws IOException {
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit);
deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain);
long minGenRequired = deletionPolicy.minTranslogGenRequired(translog.getReaders(), translog.getCurrent());
translog.trimUnreferencedReaders();
assertThat(minGenRequired, equalTo(translog.getMinFileGeneration()));
assertFilePresences(translog);
return minGenRequired;
}
@Override
@Before
public void setUp() throws Exception {
@ -356,7 +334,7 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
}
final long seqNo = randomNonNegativeLong();
final long seqNo = randomLongBetween(0, Integer.MAX_VALUE);
final String reason = randomAlphaOfLength(16);
final long noopTerm = randomLongBetween(1, primaryTerm.get());
addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, noopTerm, reason));
@ -389,9 +367,7 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
}
markCurrentGenAsCommitted(translog);
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), firstId + 1), randomNonNegativeLong())) {
try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo + 1, randomLongBetween(seqNo + 1, Long.MAX_VALUE))) {
assertThat(snapshot, SnapshotMatchers.size(0));
assertThat(snapshot.totalOperations(), equalTo(0));
}
@ -450,8 +426,8 @@ public class TranslogTests extends ESTestCase {
assertThat(stats.estimatedNumberOfOperations(), equalTo(1));
assertThat(stats.getTranslogSizeInBytes(), equalTo(162L));
assertThat(stats.getUncommittedOperations(), equalTo(1));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(162L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(107L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
translog.add(new Translog.Delete("test", "2", 1, primaryTerm.get(), newUid("2")));
@ -460,8 +436,8 @@ public class TranslogTests extends ESTestCase {
assertThat(stats.estimatedNumberOfOperations(), equalTo(2));
assertThat(stats.getTranslogSizeInBytes(), equalTo(210L));
assertThat(stats.getUncommittedOperations(), equalTo(2));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(210L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(155L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
translog.add(new Translog.Delete("test", "3", 2, primaryTerm.get(), newUid("3")));
@ -470,8 +446,8 @@ public class TranslogTests extends ESTestCase {
assertThat(stats.estimatedNumberOfOperations(), equalTo(3));
assertThat(stats.getTranslogSizeInBytes(), equalTo(258L));
assertThat(stats.getUncommittedOperations(), equalTo(3));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(258L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(203L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16)));
@ -480,19 +456,18 @@ public class TranslogTests extends ESTestCase {
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
assertThat(stats.getTranslogSizeInBytes(), equalTo(300L));
assertThat(stats.getUncommittedOperations(), equalTo(4));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(245L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
final long expectedSizeInBytes = 355L;
translog.rollGeneration();
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes));
assertThat(stats.getTranslogSizeInBytes(), equalTo(355L));
assertThat(stats.getUncommittedOperations(), equalTo(4));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(expectedSizeInBytes));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
{
@ -501,26 +476,26 @@ public class TranslogTests extends ESTestCase {
stats.writeTo(out);
final TranslogStats copy = new TranslogStats(out.bytes().streamInput());
assertThat(copy.estimatedNumberOfOperations(), equalTo(4));
assertThat(copy.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes));
assertThat(copy.getTranslogSizeInBytes(), equalTo(355L));
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
builder.startObject();
copy.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
assertThat(Strings.toString(builder), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes
+ ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + expectedSizeInBytes
assertThat(Strings.toString(builder), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + 355
+ ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + 300
+ ",\"earliest_last_modified_age\":" + stats.getEarliestLastModifiedAge() + "}}"));
}
}
markCurrentGenAsCommitted(translog);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(3, Long.MAX_VALUE));
translog.trimUnreferencedReaders();
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(4));
assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes));
assertThat(stats.getTranslogSizeInBytes(), equalTo(355L));
assertThat(stats.getUncommittedOperations(), equalTo(0));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L));
}
}
@ -542,7 +517,7 @@ public class TranslogTests extends ESTestCase {
}
assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps));
if (frequently()) {
markCurrentGenAsCommitted(translog);
deletionPolicy.setLocalCheckpointOfSafeCommit(i);
assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen));
uncommittedOps = operationsInLastGen;
}
@ -603,7 +578,7 @@ public class TranslogTests extends ESTestCase {
assertThat(e, hasToString(containsString("earliestLastModifiedAge must be >= 0")));
}
public void testSnapshot() throws IOException {
public void testBasicSnapshot() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.size(0));
@ -611,13 +586,13 @@ public class TranslogTests extends ESTestCase {
addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1}));
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
try (Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)) {
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(1));
}
try (Translog.Snapshot snapshot = translog.newSnapshot();
Translog.Snapshot snapshot1 = translog.newSnapshot()) {
try (Translog.Snapshot snapshot = translog.newSnapshot(0, randomIntBetween(0, 10));
Translog.Snapshot snapshot1 = translog.newSnapshot(0, randomIntBetween(0, 10))) {
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(1));
@ -660,7 +635,7 @@ public class TranslogTests extends ESTestCase {
Translog.Snapshot snapshot2 = translog.newSnapshot();
toClose.add(snapshot2);
markCurrentGenAsCommitted(translog);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(2);
assertThat(snapshot2, containsOperationsInAnyOrder(ops));
assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
} finally {
@ -676,78 +651,62 @@ public class TranslogTests extends ESTestCase {
assertEquals(ex.getMessage(), "translog is already closed");
}
public void testSnapshotFromMinGen() throws Exception {
Map<Long, List<Translog.Operation>> operationsByGen = new HashMap<>();
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), 1), randomNonNegativeLong())) {
assertThat(snapshot, SnapshotMatchers.size(0));
}
int iters = between(1, 10);
for (int i = 0; i < iters; i++) {
long currentGeneration = translog.currentFileGeneration();
operationsByGen.putIfAbsent(currentGeneration, new ArrayList<>());
int numOps = between(0, 20);
for (int op = 0; op < numOps; op++) {
long seqNo = randomLongBetween(0, 1000);
addToTranslogAndList(translog, operationsByGen.get(currentGeneration), new Translog.Index("test",
Long.toString(seqNo), seqNo, primaryTerm.get(), new byte[]{1}));
}
long minGen = randomLongBetween(translog.getMinFileGeneration(), translog.currentFileGeneration());
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), Long.MAX_VALUE)) {
List<Translog.Operation> expectedOps = operationsByGen.entrySet().stream()
.filter(e -> e.getKey() >= minGen)
.flatMap(e -> e.getValue().stream())
.collect(Collectors.toList());
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps));
}
long upToSeqNo = randomLongBetween(0, 2000);
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), upToSeqNo)) {
List<Translog.Operation> expectedOps = operationsByGen.entrySet().stream()
.filter(e -> e.getKey() >= minGen)
.flatMap(e -> e.getValue().stream().filter(op -> op.seqNo() <= upToSeqNo))
.collect(Collectors.toList());
assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps));
}
translog.rollGeneration();
}
}
public void testSeqNoFilterSnapshot() throws Exception {
public void testRangeSnapshot() throws Exception {
long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
final int generations = between(2, 20);
Map<Long, List<Translog.Operation>> operationsByGen = new HashMap<>();
for (int gen = 0; gen < generations; gen++) {
List<Long> batch = LongStream.rangeClosed(0, between(0, 100)).boxed().collect(Collectors.toList());
Randomness.shuffle(batch);
for (long seqNo : batch) {
Translog.Index op =
new Translog.Index("doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1});
Set<Long> seqNos = new HashSet<>();
int numOps = randomIntBetween(1, 100);
for (int i = 0; i < numOps; i++) {
final long seqNo = randomValueOtherThanMany(seqNos::contains, () -> randomLongBetween(0, 1000));
minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
seqNos.add(seqNo);
}
List<Translog.Operation> ops = new ArrayList<>(seqNos.size());
for (long seqNo : seqNos) {
Translog.Index op = new Translog.Index("_doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{randomByte()});
translog.add(op);
ops.add(op);
}
operationsByGen.put(translog.currentFileGeneration(), ops);
translog.rollGeneration();
}
List<Translog.Operation> operations = new ArrayList<>();
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operations.add(op);
if (rarely()) {
translog.rollGeneration(); // empty generation
}
}
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, between(200, 300), between(300, 400)); // out range
assertThat(filter, SnapshotMatchers.size(0));
assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations()));
assertThat(filter.skippedOperations(), equalTo(snapshot.totalOperations()));
if (minSeqNo > 0) {
long fromSeqNo = randomLongBetween(0, minSeqNo - 1);
long toSeqNo = randomLongBetween(fromSeqNo, minSeqNo - 1);
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) {
assertThat(snapshot.totalOperations(), equalTo(0));
assertNull(snapshot.next());
}
}
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int fromSeqNo = between(-2, 500);
int toSeqNo = between(fromSeqNo, 500);
List<Translog.Operation> selectedOps = operations.stream()
.filter(op -> fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo).collect(Collectors.toList());
Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo);
assertThat(filter, SnapshotMatchers.containsOperationsInAnyOrder(selectedOps));
assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations()));
assertThat(filter.skippedOperations(), equalTo(snapshot.skippedOperations() + operations.size() - selectedOps.size()));
long fromSeqNo = randomLongBetween(maxSeqNo + 1, Long.MAX_VALUE);
long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE);
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) {
assertThat(snapshot.totalOperations(), equalTo(0));
assertNull(snapshot.next());
}
fromSeqNo = randomLongBetween(0, 2000);
toSeqNo = randomLongBetween(fromSeqNo, 2000);
try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) {
Set<Long> seenSeqNos = new HashSet<>();
List<Translog.Operation> expectedOps = new ArrayList<>();
for (long gen = translog.currentFileGeneration(); gen > 0; gen--) {
for (Translog.Operation op : operationsByGen.getOrDefault(gen, Collections.emptyList())) {
if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && seenSeqNos.add(op.seqNo())) {
expectedOps.add(op);
}
}
}
assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(expectedOps));
}
}
@ -769,6 +728,7 @@ public class TranslogTests extends ESTestCase {
for (long gen = 1; gen < translog.getMinFileGeneration(); gen++) {
assertFileDeleted(translog, gen);
}
}
static class LocationOperation implements Comparable<LocationOperation> {
@ -950,8 +910,8 @@ public class TranslogTests extends ESTestCase {
assertThat(snapshot.totalOperations(), equalTo(1));
}
translog.close();
assertFileIsPresent(translog, 1);
assertFileDeleted(translog, 1);
assertFileIsPresent(translog, 2);
}
/**
@ -1023,9 +983,7 @@ public class TranslogTests extends ESTestCase {
translog.rollGeneration();
// expose the new checkpoint (simulating a commit), before we trim the translog
lastCommittedLocalCheckpoint.set(localCheckpoint);
deletionPolicy.setTranslogGenerationOfLastCommit(translog.currentFileGeneration());
deletionPolicy.setMinTranslogGenerationForRecovery(
translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
translog.trimUnreferencedReaders();
}
}
@ -1094,7 +1052,7 @@ public class TranslogTests extends ESTestCase {
// these are what we expect the snapshot to return (and potentially some more).
Set<Translog.Operation> expectedOps = new HashSet<>(writtenOps.keySet());
expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView);
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) {
try (Translog.Snapshot snapshot = translog.newSnapshot(committedLocalCheckpointAtView + 1L, Long.MAX_VALUE)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
expectedOps.remove(op);
@ -1172,7 +1130,7 @@ public class TranslogTests extends ESTestCase {
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
}
if (rarely()) {
rollAndCommit(translog);
translog.rollGeneration();
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
}
@ -1192,7 +1150,7 @@ public class TranslogTests extends ESTestCase {
ArrayList<Location> locations = new ArrayList<>();
for (int op = 0; op < translogOperations; op++) {
if (rarely()) {
rollAndCommit(translog); // do this first so that there is at least one pending tlog entry
translog.rollGeneration();
}
final Translog.Location location =
translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
@ -1206,7 +1164,7 @@ public class TranslogTests extends ESTestCase {
// we are the last location so everything should be synced
assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded());
} else if (rarely()) {
rollAndCommit(translog);
translog.rollGeneration();
// not syncing now
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream()));
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
@ -1229,7 +1187,7 @@ public class TranslogTests extends ESTestCase {
translog.add(new Translog.Index("test", "" + op, op,
primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
if (rarely() && translogOperations > op + 1) {
rollAndCommit(translog);
translog.rollGeneration();
}
}
Collections.shuffle(locations, random());
@ -1412,7 +1370,7 @@ public class TranslogTests extends ESTestCase {
Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations - 1) {
rollAndCommit(translog);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op);
minUncommittedOp = op + 1;
translogGeneration = translog.getGeneration();
}
@ -1435,7 +1393,7 @@ public class TranslogTests extends ESTestCase {
assertEquals("lastCommitted must be 1 less than current",
translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) {
try (Translog.Snapshot snapshot = translog.newSnapshot(minUncommittedOp, Long.MAX_VALUE)) {
for (int i = minUncommittedOp; i < translogOperations; i++) {
assertEquals("expected operation" + i + " to be in the previous translog but wasn't",
translog.currentFileGeneration() - 1, locations.get(i).generation);
@ -1866,7 +1824,9 @@ public class TranslogTests extends ESTestCase {
locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
rollAndCommit(translog);
translog.rollGeneration();
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op);
translog.trimUnreferencedReaders();
firstUncommitted = op + 1;
}
}
@ -1887,7 +1847,7 @@ public class TranslogTests extends ESTestCase {
}
this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get,
seqNo -> {});
try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) {
try (Translog.Snapshot snapshot = this.translog.newSnapshot(randomLongBetween(0, firstUncommitted), Long.MAX_VALUE)) {
for (int i = firstUncommitted; i < translogOperations; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("" + i, next);
@ -2068,7 +2028,7 @@ public class TranslogTests extends ESTestCase {
}
try {
rollAndCommit(translog);
translog.rollGeneration();
fail("already closed");
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
@ -2246,28 +2206,29 @@ public class TranslogTests extends ESTestCase {
*/
public void testRecoveryFromAFutureGenerationCleansUp() throws IOException {
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations / 2; op++) {
translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
int op = 0;
for (; op < translogOperations / 2; op++) {
translog.add(new Translog.Index("_doc", Integer.toString(op), op, primaryTerm.get(),
Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
if (rarely()) {
translog.rollGeneration();
}
}
translog.rollGeneration();
long comittedGeneration = randomLongBetween(2, translog.currentFileGeneration());
for (int op = translogOperations / 2; op < translogOperations; op++) {
translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, op);
for (op = translogOperations / 2; op < translogOperations; op++) {
translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(),
Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
if (rarely()) {
translog.rollGeneration();
}
}
long minRetainedGen = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration;
// engine blows up, after committing the above generation
translog.close();
TranslogConfig config = translog.getConfig();
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy,
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {});
assertThat(translog.getMinFileGeneration(), equalTo(1L));
@ -2276,7 +2237,7 @@ public class TranslogTests extends ESTestCase {
assertFileIsPresent(translog, gen);
}
translog.trimUnreferencedReaders();
for (long gen = 1; gen < comittedGeneration; gen++) {
for (long gen = 1; gen < minRetainedGen; gen++) {
assertFileDeleted(translog, gen);
}
}
@ -2290,8 +2251,9 @@ public class TranslogTests extends ESTestCase {
final FailSwitch fail = new FailSwitch();
fail.failNever();
final TranslogConfig config = getTranslogConfig(tempDir);
final long comittedGeneration;
final long localCheckpoint;
final String translogUUID;
long minGenForRecovery = 1L;
try (Translog translog = getFailableTranslog(fail, config)) {
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
// disable retention so we trim things
@ -2299,24 +2261,25 @@ public class TranslogTests extends ESTestCase {
deletionPolicy.setRetentionAgeInMillis(-1);
translogUUID = translog.getTranslogUUID();
int translogOperations = randomIntBetween(10, 100);
for (int op = 0; op < translogOperations / 2; op++) {
translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
int op = 0;
for (; op < translogOperations / 2; op++) {
translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(),
Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
if (rarely()) {
translog.rollGeneration();
}
}
translog.rollGeneration();
comittedGeneration = randomLongBetween(2, translog.currentFileGeneration());
for (int op = translogOperations / 2; op < translogOperations; op++) {
translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(),
localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, op);
for (op = translogOperations / 2; op < translogOperations; op++) {
translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(),
Integer.toString(op).getBytes(Charset.forName("UTF-8"))));
if (rarely()) {
translog.rollGeneration();
}
}
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, translog.currentFileGeneration()));
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration;
fail.failRandomly();
try {
translog.trimUnreferencedReaders();
@ -2325,16 +2288,16 @@ public class TranslogTests extends ESTestCase {
}
}
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE));
deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(config, translogUUID, deletionPolicy,
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {})) {
// we don't know when things broke exactly
assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L));
assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration));
assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(minGenForRecovery));
assertFilePresences(translog);
minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration;
translog.trimUnreferencedReaders();
assertThat(translog.getMinFileGeneration(), equalTo(comittedGeneration));
assertThat(translog.getMinFileGeneration(), equalTo(minGenForRecovery));
assertFilePresences(translog);
}
}
@ -2642,7 +2605,7 @@ public class TranslogTests extends ESTestCase {
fail.failRandomly();
TranslogConfig config = getTranslogConfig(tempDir);
final int numOps = randomIntBetween(100, 200);
long minGenForRecovery = 1;
long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
List<String> syncedDocs = new ArrayList<>();
List<String> unsynced = new ArrayList<>();
if (randomBoolean()) {
@ -2672,8 +2635,8 @@ public class TranslogTests extends ESTestCase {
unsynced.clear();
failableTLog.rollGeneration();
committing = true;
failableTLog.getDeletionPolicy().setTranslogGenerationOfLastCommit(failableTLog.currentFileGeneration());
failableTLog.getDeletionPolicy().setMinTranslogGenerationForRecovery(failableTLog.currentFileGeneration());
failableTLog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(opsAdded);
syncedDocs.clear();
failableTLog.trimUnreferencedReaders();
committing = false;
syncedDocs.clear();
@ -2705,7 +2668,7 @@ public class TranslogTests extends ESTestCase {
assertThat(unsynced, empty());
}
generationUUID = failableTLog.getTranslogUUID();
minGenForRecovery = failableTLog.getDeletionPolicy().getMinTranslogGenerationForRecovery();
localCheckpointOfSafeCommit = failableTLog.getDeletionPolicy().getLocalCheckpointOfSafeCommit();
IOUtils.closeWhileHandlingException(failableTLog);
}
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
@ -2717,8 +2680,7 @@ public class TranslogTests extends ESTestCase {
if (randomBoolean()) {
try {
TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy();
deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery);
deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy));
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// failed - that's ok, we didn't even create it
@ -2729,8 +2691,7 @@ public class TranslogTests extends ESTestCase {
fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file
TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy();
deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery);
deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
if (generationUUID == null) {
// we never managed to successfully create a translog, make it
generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(),
@ -2738,8 +2699,7 @@ public class TranslogTests extends ESTestCase {
}
try (Translog translog = new Translog(config, generationUUID, deletionPolicy,
() -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {});
Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(generationUUID, minGenForRecovery), Long.MAX_VALUE)) {
Translog.Snapshot snapshot = translog.newSnapshot(localCheckpointOfSafeCommit + 1, Long.MAX_VALUE)) {
assertEquals(syncedDocs.size(), snapshot.totalOperations());
for (int i = 0; i < syncedDocs.size(); i++) {
Translog.Operation next = snapshot.next();
@ -2922,10 +2882,20 @@ public class TranslogTests extends ESTestCase {
.map(t -> t.getPrimaryTerm()).collect(Collectors.toList());
assertThat(storedPrimaryTerms, equalTo(primaryTerms));
}
long minGenForRecovery = randomLongBetween(generation, generation + rolls);
commit(translog, minGenForRecovery, generation + rolls);
final BaseTranslogReader minRetainedReader = randomFrom(
Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent()))
.filter(r -> r.getCheckpoint().minSeqNo >= 0)
.collect(Collectors.toList()));
int retainedOps = Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent()))
.filter(r -> r.getCheckpoint().generation >= minRetainedReader.generation)
.mapToInt(r -> r.getCheckpoint().numOps)
.sum();
deletionPolicy.setLocalCheckpointOfSafeCommit(
randomLongBetween(minRetainedReader.getCheckpoint().minSeqNo, minRetainedReader.getCheckpoint().maxSeqNo) - 1);
translog.trimUnreferencedReaders();
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls));
assertThat(translog.stats().getUncommittedOperations(), equalTo(0));
assertThat(translog.stats().getUncommittedOperations(), equalTo(retainedOps));
if (longRetention) {
for (int i = 0; i <= rolls; i++) {
assertFileIsPresent(translog, generation + i);
@ -2933,17 +2903,17 @@ public class TranslogTests extends ESTestCase {
deletionPolicy.setRetentionAgeInMillis(randomBoolean() ? 100 : -1);
assertBusy(() -> {
translog.trimUnreferencedReaders();
for (long i = 0; i < minGenForRecovery; i++) {
for (long i = 0; i < minRetainedReader.generation; i++) {
assertFileDeleted(translog, i);
}
});
} else {
// immediate cleanup
for (long i = 0; i < minGenForRecovery; i++) {
for (long i = 0; i < minRetainedReader.generation; i++) {
assertFileDeleted(translog, i);
}
}
for (long i = minGenForRecovery; i < generation + rolls; i++) {
for (long i = minRetainedReader.generation; i < generation + rolls; i++) {
assertFileIsPresent(translog, i);
}
}
@ -3002,7 +2972,7 @@ public class TranslogTests extends ESTestCase {
}
assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps));
int readFromSnapshot = 0;
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) {
try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo, Long.MAX_VALUE)) {
assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps));
Translog.Operation op;
while ((op = snapshot.next()) != null) {
@ -3031,8 +3001,7 @@ public class TranslogTests extends ESTestCase {
translog.rollGeneration();
}
}
long lastGen = randomLongBetween(1, translog.currentFileGeneration());
commit(translog, randomLongBetween(1, lastGen), lastGen);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(0, operations));
}
public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException {
@ -3044,9 +3013,8 @@ public class TranslogTests extends ESTestCase {
translog.rollGeneration();
}
if (rarely()) {
long lastGen = randomLongBetween(deletionPolicy.getTranslogGenerationOfLastCommit(), translog.currentFileGeneration());
long minGen = randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), lastGen);
commit(translog, minGen, lastGen);
translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(
randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i));
}
if (frequently()) {
long minGen;
@ -3069,7 +3037,7 @@ public class TranslogTests extends ESTestCase {
translog.rollGeneration();
}
}
rollAndCommit(translog);
translog.rollGeneration();
translog.close();
assertThat(Translog.readGlobalCheckpoint(translogDir, translogUUID), equalTo(globalCheckpoint.get()));
expectThrows(TranslogCorruptedException.class, () -> Translog.readGlobalCheckpoint(translogDir, UUIDs.randomBase64UUID()));
@ -3201,9 +3169,8 @@ public class TranslogTests extends ESTestCase {
translog.sync();
assertThat(translog.getMaxSeqNo(),
equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values())));
long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration());
long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream()
.filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue())
.filter(e -> e.getKey() >= translog.getMinFileGeneration()).mapToLong(e -> e.getValue())
.max().orElse(SequenceNumbers.NO_OPS_PERFORMED);
assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo));
}

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SeqNoStats;
@ -177,14 +176,8 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint();
Optional<SequenceNumbers.CommitInfo> safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint);
assertTrue(safeCommit.isPresent());
final Translog.TranslogGeneration recoveringTranslogGeneration;
try (Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit()) {
recoveringTranslogGeneration = new Translog.TranslogGeneration(
commitRef.getIndexCommit().getUserData().get(Translog.TRANSLOG_UUID_KEY),
Long.parseLong(commitRef.getIndexCommit().getUserData().get(Translog.TRANSLOG_GENERATION_KEY)));
}
int expectedTotalLocal = 0;
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshotFromGen(recoveringTranslogGeneration, globalCheckpoint)) {
try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot(safeCommit.get().localCheckpoint + 1, globalCheckpoint)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
if (op.seqNo() <= globalCheckpoint) {

View File

@ -276,21 +276,17 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
Map<String, String> userData = new HashMap<>(replica.store().readLastCommittedSegmentsInfo().getUserData());
final String translogUUIDtoUse;
final long translogGenToUse;
final String historyUUIDtoUse = UUIDs.randomBase64UUID(random());
if (randomBoolean()) {
// create a new translog
translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs,
replica.shardId(), replica.getPendingPrimaryTerm());
translogGenToUse = 1;
} else {
translogUUIDtoUse = translogGeneration.translogUUID;
translogGenToUse = translogGeneration.translogFileGeneration;
}
try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) {
userData.put(Engine.HISTORY_UUID_KEY, historyUUIDtoUse);
userData.put(Translog.TRANSLOG_UUID_KEY, translogUUIDtoUse);
userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGenToUse));
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}