Move trimming unsafe commits from engine ctor to store (#29260)

As follow up to #28245 , this PR removes the logic for selecting the 
right start commit from the Engine constructor in favor of explicitly
trimming them in the Store, before the engine is opened. This makes the
constructor in engine follow standard Lucene semantics and use the last
commit.

Relates #28245
Relates #29156
This commit is contained in:
Boaz Leskes 2018-03-29 19:35:57 +02:00 committed by Nhat Nguyen
parent 04d0edc8ee
commit eb8b31746a
9 changed files with 207 additions and 150 deletions

View File

@ -93,12 +93,12 @@ which returns something similar to:
{
"commit" : {
"id" : "3M3zkw2GHMo2Y4h4/KFKCg==",
"generation" : 4,
"generation" : 3,
"user_data" : {
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",
"local_checkpoint" : "-1",
"translog_generation" : "3",
"translog_generation" : "2",
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1"

View File

@ -47,60 +47,27 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final Logger logger;
private final TranslogDeletionPolicy translogDeletionPolicy;
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private volatile IndexCommit lastCommit; // the most recent commit point
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
CombinedDeletionPolicy(Logger logger, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) {
this.logger = logger;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.startingCommit = startingCommit;
this.snapshottedCommits = new ObjectIntHashMap<>();
}
@Override
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
assert commits.isEmpty() == false : "index is opened, but we have no commits";
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
keepOnlyStartingCommitOnInit(commits);
updateTranslogDeletionPolicy();
onCommit(commits);
if (safeCommit != commits.get(commits.size() - 1)) {
throw new IllegalStateException("Engine is opened, but the last commit isn't safe. Global checkpoint ["
+ globalCheckpointSupplier.getAsLong() + "], seqNo is last commit ["
+ SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommit.getUserData().entrySet()) + "], "
+ "seqNos in safe commit [" + SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet()) + "]");
}
/**
* Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe
* at the recovering time but they can suddenly become safe in the future.
* The following issues can happen if unsafe commits are kept oninit.
* <p>
* 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
* and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
* is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
* the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the
* commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
* <p>
* 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
* c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).
* The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
* while the local checkpoint of c2 is 2.
* <p>
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
*/
private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) throws IOException {
for (IndexCommit commit : commits) {
if (startingCommit.equals(commit) == false) {
this.deleteCommit(commit);
}
}
assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
lastCommit = startingCommit;
safeCommit = startingCommit;
}
@Override

View File

@ -41,10 +41,8 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
@ -59,6 +57,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqN
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
@ -70,7 +69,6 @@ import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
@ -78,8 +76,6 @@ import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@ -183,12 +179,10 @@ public class InternalEngine extends Engine {
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
final IndexCommit startingCommit = getStartingCommitPoint();
assert startingCommit != null : "Starting commit should be non-null";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(startingCommit);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
this.combinedDeletionPolicy =
new CombinedDeletionPolicy(logger, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer);
Objects.requireNonNull(historyUUID, "history uuid should not be null");
@ -232,10 +226,11 @@ public class InternalEngine extends Engine {
}
private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException {
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(startingCommit);
final SequenceNumbers.CommitInfo seqNoStats =
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(store.readLastCommittedSegmentsInfo().userData.entrySet());
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
@ -395,31 +390,6 @@ public class InternalEngine extends Engine {
pendingTranslogRecovery.set(false); // we are good - now we can commit
}
private IndexCommit getStartingCommitPoint() throws IOException {
final IndexCommit startingIndexCommit;
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
return startingIndexCommit;
}
private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
@ -1907,9 +1877,9 @@ public class InternalEngine extends Engine {
}
}
private IndexWriter createWriter(IndexCommit startingCommit) throws IOException {
private IndexWriter createWriter() throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(startingCommit);
final IndexWriterConfig iwc = getIndexWriterConfig();
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
@ -1922,11 +1892,10 @@ public class InternalEngine extends Engine {
return new IndexWriter(directory, iwc);
}
private IndexWriterConfig getIndexWriterConfig(IndexCommit startingCommit) {
private IndexWriterConfig getIndexWriterConfig() {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;

View File

@ -122,5 +122,13 @@ public class SequenceNumbers {
this.maxSeqNo = maxSeqNo;
this.localCheckpoint = localCheckpoint;
}
@Override
public String toString() {
return "CommitInfo{" +
"maxSeqNo=" + maxSeqNo +
", localCheckpoint=" + localCheckpoint +
'}';
}
}
}

View File

@ -1317,6 +1317,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assertMaxUnsafeAutoIdInCommit();
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
createNewEngine(config);
verifyNotClosed();
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexFormatTooNewException;
@ -75,6 +76,7 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -1463,7 +1465,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public void createEmpty() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory)) {
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory, null)) {
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
@ -1482,7 +1484,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public void bootstrapNewHistory() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
final Map<String, String> userData = getUserData(writer);
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
final Map<String, String> map = new HashMap<>();
@ -1501,7 +1503,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public void associateIndexWithNewTranslog(final String translogUUID) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
@ -1520,7 +1522,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public void ensureIndexHasHistoryUUID() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
final Map<String, String> userData = getUserData(writer);
if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) {
updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()));
@ -1530,6 +1532,82 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
}
/**
* Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe
* at the recovering time but they can suddenly become safe in the future.
* The following issues can happen if unsafe commits are kept oninit.
* <p>
* 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
* and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
* is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
* the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the
* commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
* <p>
* 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
* c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).
* The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
* while the local checkpoint of c2 is 2.
* <p>
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
*/
public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long minRetainedTranslogGen,
final org.elasticsearch.Version indexVersionCreated) throws IOException {
metadataLock.writeLock().lock();
try {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(directory);
if (existingCommits.isEmpty()) {
throw new IllegalArgumentException("No index found to trim");
}
final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit startingIndexCommit;
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog are fully retained.
if (indexVersionCreated.before(org.elasticsearch.Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
if (translogUUID.equals(startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY)) == false) {
throw new IllegalStateException("starting commit translog uuid ["
+ startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid ["
+ translogUUID + "]");
}
if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) {
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, startingIndexCommit)) {
// this achieves two things:
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
// - deletes any other commit (by lucene standard deletion policy)
//
// note that we can't just use IndexCommit.delete() as we really want to make sure that those files won't be used
// even if a virus scanner causes the files not to be used.
// The new commit will use segment files from the starting commit but userData from the last commit by default.
// Thus, we need to manually set the userData from the starting commit to the new commit.
writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet());
writer.commit();
}
}
} finally {
metadataLock.writeLock().unlock();
}
}
private void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
final Map<String, String> userData = getUserData(writer);
userData.putAll(keysToUpdate);
@ -1543,9 +1621,12 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return userData;
}
private IndexWriter newIndexWriter(IndexWriterConfig.OpenMode openMode, final Directory dir) throws IOException {
private static IndexWriter newIndexWriter(final IndexWriterConfig.OpenMode openMode, final Directory dir, final IndexCommit commit)
throws IOException {
assert openMode == IndexWriterConfig.OpenMode.APPEND || commit == null : "can't specify create flag with a commit";
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
.setIndexCommit(commit)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.translog;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.UUIDs;
@ -38,6 +37,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
@ -1705,6 +1705,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
* @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid
*/
public static long readGlobalCheckpoint(final Path location, final String expectedTranslogUUID) throws IOException {
final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID);
return checkpoint.globalCheckpoint;
}
private static Checkpoint readCheckpoint(Path location, String expectedTranslogUUID) throws IOException {
final Checkpoint checkpoint = readCheckpoint(location);
// We need to open at least translog reader to validate the translogUUID.
final Path translogFile = location.resolve(getFilename(checkpoint.generation));
@ -1715,7 +1720,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} catch (Exception ex) {
throw new TranslogCorruptedException("Translog at [" + location + "] is corrupted", ex);
}
return checkpoint.globalCheckpoint;
return checkpoint;
}
/**
* Returns the minimum translog generation retained by the translog at the given location.
* This ensures that the translogUUID from this translog matches with the provided translogUUID.
*
* @param location the location of the translog
* @return the minimum translog generation
* @throws IOException if an I/O exception occurred reading the checkpoint
* @throws TranslogCorruptedException if the translog is corrupted or mismatched with the given uuid
*/
public static long readMinTranslogGeneration(final Path location, final String expectedTranslogUUID) throws IOException {
final Checkpoint checkpoint = readCheckpoint(location, expectedTranslogUUID);
return checkpoint.minTranslogGeneration;
}
/**

View File

@ -52,7 +52,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
final LongArrayList maxSeqNoList = new LongArrayList();
final LongArrayList translogGenList = new LongArrayList();
@ -91,7 +91,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final AtomicLong globalCheckpoint = new AtomicLong();
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
long lastMaxSeqNo = between(1, 1000);
long lastTranslogGen = between(1, 20);
int safeIndex = 0;
@ -161,7 +161,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
long legacyTranslogGen = randomNonNegativeLong();
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
@ -194,7 +194,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
final int invalidCommits = between(1, 10);
final List<IndexCommit> commitList = new ArrayList<>();
@ -217,39 +217,11 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
}
}
/**
* Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time
* but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)}
*/
public void testKeepOnlyStartingCommitOnInit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
final UUID translogUUID = UUID.randomUUID();
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
for (int i = 0; i < totalCommits; i++) {
commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong()));
}
final IndexCommit startingCommit = randomFrom(commitList);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, startingCommit);
indexPolicy.onInit(commitList);
for (IndexCommit commit : commitList) {
if (commit.equals(startingCommit) == false) {
verify(commit, times(1)).delete();
}
}
verify(startingCommit, never()).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(),
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(),
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
}
public void testCheckUnreferencedCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get, null);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, globalCheckpoint::get);
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
long lastMaxSeqNo = between(1, 1000);

View File

@ -763,6 +763,7 @@ public class InternalEngineTests extends EngineTestCase {
}
}
initialEngine.close();
trimUnsafeCommits(initialEngine.config());
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.recoverFromTranslog();
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
@ -1168,6 +1169,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
EngineConfig config = engine.config();
engine.close();
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.recoverFromTranslog();
assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
@ -3581,7 +3583,7 @@ public class InternalEngineTests extends EngineTestCase {
} finally {
IOUtils.close(initialEngine);
}
trimUnsafeCommits(initialEngine.config());
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.fillSeqNoGaps(2);
@ -3933,6 +3935,7 @@ public class InternalEngineTests extends EngineTestCase {
// now do it again to make sure we preserve values etc.
try {
trimUnsafeCommits(replicaEngine.config());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
if (flushed) {
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
@ -4256,31 +4259,6 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception {
IOUtils.close(engine);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final EngineConfig config = copy(engine.config(), globalCheckpoint::get);
final IndexCommit safeCommit;
try (InternalEngine engine = createEngine(config)) {
final int numDocs = between(5, 50);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
if (randomBoolean()) {
engine.flush();
}
}
// Selects a starting commit and advances and persists the global checkpoint to that commit.
final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
safeCommit = randomFrom(commits);
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
engine.getTranslog().sync();
}
try (InternalEngine engine = new InternalEngine(config)) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(engine.store.directory());
assertThat("safe commit should be kept", existingCommits, contains(safeCommit));
}
}
public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
IOUtils.close(engine, store);
store = createStore();
@ -4615,4 +4593,64 @@ public class InternalEngineTests extends EngineTestCase {
false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo()));
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
}
public void testTrimUnsafeCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final int maxSeqNo = 40;
final List<Long> seqNos = LongStream.rangeClosed(0, maxSeqNo).boxed().collect(Collectors.toList());
Collections.shuffle(seqNos, random());
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
final List<Long> commitMaxSeqNo = new ArrayList<>();
final long minTranslogGen;
try (InternalEngine engine = createEngine(config)) {
for (int i = 0; i < seqNos.size(); i++) {
ParsedDocument doc = testParsedDocument(Long.toString(seqNos.get(i)), null, testDocument(), new BytesArray("{}"), null);
Engine.Index index = new Engine.Index(newUid(doc), doc, seqNos.get(i), 0,
1, VersionType.EXTERNAL, REPLICA, System.nanoTime(), -1, false);
engine.index(index);
if (randomBoolean()) {
engine.flush();
final Long maxSeqNoInCommit = seqNos.subList(0, i + 1).stream().max(Long::compareTo).orElse(-1L);
commitMaxSeqNo.add(maxSeqNoInCommit);
}
}
globalCheckpoint.set(randomInt(maxSeqNo));
engine.syncTranslog();
minTranslogGen = engine.getTranslog().getMinFileGeneration();
}
store.trimUnsafeCommits(globalCheckpoint.get(), minTranslogGen,config.getIndexSettings().getIndexVersionCreated());
long safeMaxSeqNo =
commitMaxSeqNo.stream().filter(s -> s <= globalCheckpoint.get())
.reduce((s1, s2) -> s2) // get the last one.
.orElse(SequenceNumbers.NO_OPS_PERFORMED);
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
assertThat(commits, hasSize(1));
assertThat(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(safeMaxSeqNo)));
try (IndexReader reader = DirectoryReader.open(commits.get(0))) {
for (LeafReaderContext context: reader.leaves()) {
final NumericDocValues values = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
if (values != null) {
for (int docID = 0; docID < context.reader().maxDoc(); docID++) {
if (values.advanceExact(docID) == false) {
throw new AssertionError("Document does not have a seq number: " + docID);
}
assertThat(values.longValue(), lessThanOrEqualTo(globalCheckpoint.get()));
}
}
}
}
}
}
private static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, config.getIndexSettings().getIndexVersionCreated());
}
}