Open engine should keep only starting commit (#28228)

Keeping unsafe commits when opening an engine can be problematic because
these commits are not safe at the recovering time but they can suddenly
become safe in the future. The following issues can happen if unsafe
commits are kept oninit.

1. Replica can use unsafe commit in peer-recovery. This happens when a
replica with a safe commit c1 (max_seqno=1) and an unsafe commit c2
(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new
document (seqno=2) is added without flushing, the global checkpoint is
advanced to 2; and the replica recovers again, it will use the unsafe
commit c2 (max_seqno=2 <= gcp=2) as the starting commit for sequenced
based recovery even the commit c2 contains a stale operation and the
document (with seqno=2) will not be replicated to the replica.

2. Min translog gen for recovery can go backwards in peer-recovery. This
happens when a replica with a safe commit c1 (local_checkpoint=1,
recovery_translog_gen=1) and an unsafe commit c2 (local_checkpoint=2,
recovery_translog_gen=2). The replica recovers from a primary, and keeps
c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
commit on the replica will cause exception as the new last commit c3
will have recovery_translog_gen=1. The recovery translog generation of a
commit is calculated based on the current local checkpoint. The local
checkpoint of c3 is 1 while the local checkpoint of c2 is 2.

3. Commit without translog can be used for recovery. An old index, which
was created before multiple-commits is introduced (v6.2), may not have a
safe commit. If that index has a snapshotted commit without translog and
an unsafe commit, the policy can consider the snapshotted commit as a
safe commit for recovery even the commit does not have translog.

These issues can be avoided if the combined deletion policy keeps only
the starting commit onInit.

Relates #27804
Relates #28181
This commit is contained in:
Nhat Nguyen 2018-01-16 08:37:42 -05:00 committed by GitHub
parent 67c1f1c856
commit 65e90079ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 234 additions and 52 deletions

View File

@ -45,37 +45,72 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private IndexCommit lastCommit; // the most recent commit point
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
this.openMode = openMode;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.startingCommit = startingCommit;
this.snapshottedCommits = new ObjectIntHashMap<>();
}
@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
break;
case OPEN_INDEX_CREATE_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
// When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
// We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
break;
case OPEN_INDEX_AND_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
onCommit(commits);
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
keepOnlyStartingCommitOnInit(commits);
// OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
// We therefore should not use that index commit to update the translog deletion policy.
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
updateTranslogDeletionPolicy();
}
break;
default:
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
}
}
/**
* Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe
* at the recovering time but they can suddenly become safe in the future.
* The following issues can happen if unsafe commits are kept oninit.
* <p>
* 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
* and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
* is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
* the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the
* commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
* <p>
* 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
* c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).
* The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
* while the local checkpoint of c2 is 2.
* <p>
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
*/
private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) {
commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete);
assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
lastCommit = startingCommit;
safeCommit = startingCommit;
}
@Override
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());

View File

@ -185,7 +185,7 @@ public class InternalEngine extends Engine {
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint);
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
@ -411,28 +411,44 @@ public class InternalEngine extends Engine {
}
private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog
// files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog files are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
final IndexCommit startingIndexCommit;
final List<IndexCommit> existingCommits;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
startingIndexCommit = null;
break;
case OPEN_INDEX_CREATE_TRANSLOG:
// Use the last commit
existingCommits = DirectoryReader.listCommits(store.directory());
startingIndexCommit = existingCommits.get(existingCommits.size() - 1);
break;
case OPEN_INDEX_AND_TRANSLOG:
// Use the safe commit
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
break;
default:
throw new IllegalArgumentException("unknown mode: " + openMode);
}
return null;
return startingIndexCommit;
}
private void recoverFromTranslogInternal() throws IOException {
@ -557,9 +573,7 @@ public class InternalEngine extends Engine {
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
internalSearcherManager = new SearcherManager(directoryReader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
// The index commit from IndexWriterConfig is null if the engine is open with other modes
// rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit.
lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit());
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
externalSearcherFactory);
success = true;

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectLongMap;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
@ -1290,12 +1291,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
/** opens the engine on top of the existing lucene engine but creates an empty translog **/
public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException {
assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
assert commitInfo.localCheckpoint >= globalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
if (Assertions.ENABLED) {
assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
assert commitInfo.localCheckpoint >= globalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+ globalCheckpoint + "]";
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]";
}
globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog");
innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID);
}

View File

@ -182,17 +182,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* @throws IOException if the index is corrupted or the segments file is not present
*/
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
return readCommittedSegmentsInfo(null);
}
/**
* Returns the committed segments info for the given commit point.
* If the commit point is not provided, this method will return the segments info of the last commit in the store.
*/
public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException {
failIfCorrupted();
try {
return readSegmentsInfo(commit, directory());
return readSegmentsInfo(null, directory());
} catch (CorruptIndexException ex) {
markStoreCorrupted(ex);
throw ex;

View File

@ -54,7 +54,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
final LongArrayList maxSeqNoList = new LongArrayList();
final LongArrayList translogGenList = new LongArrayList();
@ -93,7 +94,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final AtomicLong globalCheckpoint = new AtomicLong();
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
long lastMaxSeqNo = between(1, 1000);
long lastTranslogGen = between(1, 20);
int safeIndex = 0;
@ -156,11 +158,12 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
long legacyTranslogGen = randomNonNegativeLong();
IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen);
indexPolicy.onInit(singletonList(legacyCommit));
indexPolicy.onCommit(singletonList(legacyCommit));
verify(legacyCommit, never()).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen));
@ -188,7 +191,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get, null);
final int invalidCommits = between(1, 10);
final List<IndexCommit> commitList = new ArrayList<>();
@ -211,6 +215,35 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
}
}
/**
* Keeping existing unsafe commits can be problematic because these commits are not safe at the recovering time
* but they can suddenly become safe in the future. See {@link CombinedDeletionPolicy#keepOnlyStartingCommitOnInit(List)}
*/
public void testKeepOnlyStartingCommitOnInit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
final UUID translogUUID = UUID.randomUUID();
final List<IndexCommit> commitList = new ArrayList<>();
int totalCommits = between(2, 20);
for (int i = 0; i < totalCommits; i++) {
commitList.add(mockIndexCommit(randomNonNegativeLong(), translogUUID, randomNonNegativeLong()));
}
final IndexCommit startingCommit = randomFrom(commitList);
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(
OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get, startingCommit);
indexPolicy.onInit(commitList);
for (IndexCommit commit : commitList) {
if (commit.equals(startingCommit) == false) {
verify(commit, times(1)).delete();
}
}
verify(startingCommit, never()).delete();
assertThat(translogPolicy.getMinTranslogGenerationForRecovery(),
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
assertThat(translogPolicy.getTranslogGenerationOfLastCommit(),
equalTo(Long.parseLong(startingCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))));
}
IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException {
final Map<String, String> userData = new HashMap<>();
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));

View File

@ -163,6 +163,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
@ -4010,13 +4011,15 @@ public class InternalEngineTests extends EngineTestCase {
boolean flushed = false;
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
Engine recoveringEngine = null;
try {
assertEquals(docs - 1, engine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine = new InternalEngine(copy(
replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().uncommittedOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
@ -4038,6 +4041,8 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
if ((flushed = randomBoolean())) {
globalCheckpoint.set(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
recoveringEngine.getTranslog().sync();
recoveringEngine.flush(true, true);
}
}
@ -4047,7 +4052,8 @@ public class InternalEngineTests extends EngineTestCase {
// now do it again to make sure we preserve values etc.
try {
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
recoveringEngine = new InternalEngine(
copy(replicaEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get));
if (flushed) {
assertEquals(0, recoveringEngine.getTranslog().uncommittedOperations());
}
@ -4355,4 +4361,57 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1));
}
}
public void testOpenIndexAndTranslogKeepOnlySafeCommit() throws Exception {
IOUtils.close(engine);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, globalCheckpoint::get);
final IndexCommit safeCommit;
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
final int numDocs = between(5, 50);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
if (randomBoolean()) {
engine.flush();
}
}
// Selects a starting commit and advances and persists the global checkpoint to that commit.
final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
safeCommit = randomFrom(commits);
globalCheckpoint.set(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)));
engine.getTranslog().sync();
}
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(engine.store.directory());
assertThat("OPEN_INDEX_AND_TRANSLOG should keep only safe commit", existingCommits, contains(safeCommit));
}
}
public void testOpenIndexCreateTranslogKeepOnlyLastCommit() throws Exception {
IOUtils.close(engine);
final EngineConfig config = copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
final Map<String, String> lastCommit;
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
engine.skipTranslogRecovery();
final int numDocs = between(5, 50);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
if (randomBoolean()) {
engine.flush();
}
}
final List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
lastCommit = commits.get(commits.size() - 1).getUserData();
}
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(engine.store.directory());
assertThat("OPEN_INDEX_CREATE_TRANSLOG should keep only last commit", existingCommits, hasSize(1));
final Map<String, String> userData = existingCommits.get(0).getUserData();
assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(lastCommit.get(SequenceNumbers.MAX_SEQ_NO)));
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(lastCommit.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
// Translog tags should be fresh.
assertThat(userData.get(Translog.TRANSLOG_UUID_KEY), not(equalTo(lastCommit.get(Translog.TRANSLOG_UUID_KEY))));
assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo("1"));
}
}
}

View File

@ -304,8 +304,52 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
replica.store().close();
newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
shards.recoverReplica(newReplica);
shards.assertAllEqual(totalDocs);
// Make sure that flushing on a recovering shard is ok.
shards.flush();
shards.assertAllEqual(totalDocs);
}
}
public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(2)) {
shards.startAll();
IndexShard oldPrimary = shards.getPrimary();
IndexShard newPrimary = shards.getReplicas().get(0);
IndexShard replica = shards.getReplicas().get(1);
int goodDocs = shards.indexDocs(scaledRandomIntBetween(1, 20));
shards.flush();
// simulate docs that were inflight when primary failed, these will be rolled back
int staleDocs = scaledRandomIntBetween(1, 10);
logger.info("--> indexing {} stale docs", staleDocs);
for (int i = 0; i < staleDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "stale_" + i)
.source("{}", XContentType.JSON);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary);
indexOnReplica(bulkShardRequest, replica);
}
shards.flush();
shards.promoteReplicaToPrimary(newPrimary).get();
// Recover a replica should rollback the stale documents
shards.removeReplica(replica);
replica.close("recover replica - first time", false);
replica.store().close();
replica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
shards.recoverReplica(replica);
shards.assertAllEqual(goodDocs);
// Index more docs - move the global checkpoint >= seqno of the stale operations.
goodDocs += shards.indexDocs(scaledRandomIntBetween(staleDocs, staleDocs * 5));
shards.syncGlobalCheckpoint();
assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo()));
// Recover a replica again should also rollback the stale documents.
shards.removeReplica(replica);
replica.close("recover replica - second time", false);
replica.store().close();
IndexShard anotherReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId());
shards.recoverReplica(anotherReplica);
shards.assertAllEqual(goodDocs);
shards.flush();
shards.assertAllEqual(goodDocs);
}
}