Rollback primary before recovering from translog (#27804)

Today we always recover a primary from the last commit point. However 
with a new deletion policy, we keep multiple commit points in the
existing store, thus we have chance to find a good starting commit
point. With a good starting commit point, we may be able to throw away
stale operations. This PR rollbacks a primary to a starting commit then
recovering from translog.

Relates #10708
This commit is contained in:
Nhat Nguyen 2017-12-22 18:25:36 -05:00 committed by GitHub
parent adb49efe17
commit 6629f4ab0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 197 additions and 72 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
@ -37,7 +38,7 @@ import java.util.function.LongSupplier;
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
final class CombinedDeletionPolicy extends IndexDeletionPolicy {
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
@ -70,7 +71,7 @@ final class CombinedDeletionPolicy extends IndexDeletionPolicy {
@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits);
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
}
@ -89,12 +90,28 @@ final class CombinedDeletionPolicy extends IndexDeletionPolicy {
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}
/**
* Find a safe commit point from a list of existing commits based on the supplied global checkpoint.
* The max sequence number of a safe commit point should be at most the global checkpoint.
* If an index was created before v6.2, and we haven't retained a safe commit yet, this method will return the oldest commit.
*
* @param commits a list of existing commit points
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path)}
* @return a safe commit or the oldest commit if a safe commit is not found
*/
public static IndexCommit findSafeCommitPoint(List<IndexCommit> commits, long globalCheckpoint) throws IOException {
if (commits.isEmpty()) {
throw new IllegalArgumentException("Commit list must not empty");
}
final int keptPosition = indexOfKeptCommits(commits, globalCheckpoint);
return commits.get(keptPosition);
}
/**
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
*/
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long globalCheckpoint) throws IOException {
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
// Commits are sorted by age (the 0th one is the oldest commit).
@ -109,7 +126,7 @@ final class CombinedDeletionPolicy extends IndexDeletionPolicy {
return i;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}

View File

@ -569,27 +569,6 @@ public abstract class Engine implements Closeable {
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();
/**
* Read the last segments info from the commit pointed to by the searcher manager
*/
protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager<IndexSearcher> sm, final Store store) throws IOException {
IndexSearcher searcher = sm.acquire();
try {
IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
return Lucene.readSegmentInfos(latestCommit);
} catch (IOException e) {
// Fall back to reading from the store if reading from the commit fails
try {
return store.readLastCommittedSegmentsInfo();
} catch (IOException e2) {
e2.addSuppressed(e);
throw e2;
}
} finally {
sm.release(searcher);
}
}
/**
* Global stats on segments.
*/

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
@ -77,6 +78,7 @@ import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@ -175,14 +177,17 @@ public class InternalEngine extends Engine {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
final IndexCommit startingCommit = getStartingCommitPoint();
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null :
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
@ -232,7 +237,7 @@ public class InternalEngine extends Engine {
}
private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
switch (openMode) {
@ -242,7 +247,7 @@ public class InternalEngine extends Engine {
break;
case OPEN_INDEX_AND_TRANSLOG:
case OPEN_INDEX_CREATE_TRANSLOG:
final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo();
final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo(startingCommit);
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
@ -397,6 +402,31 @@ public class InternalEngine extends Engine {
return this;
}
private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog
// files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog files are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
}
return null;
}
private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
@ -519,7 +549,9 @@ public class InternalEngine extends Engine {
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
internalSearcherManager = new SearcherManager(directoryReader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
// The index commit from IndexWriterConfig is null if the engine is open with other modes
// rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit.
lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit());
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
externalSearcherFactory);
success = true;
@ -1776,9 +1808,9 @@ public class InternalEngine extends Engine {
}
}
private IndexWriter createWriter(boolean create) throws IOException {
private IndexWriter createWriter(boolean create, IndexCommit startingCommit) throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(create);
final IndexWriterConfig iwc = getIndexWriterConfig(create, startingCommit);
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
@ -1791,10 +1823,11 @@ public class InternalEngine extends Engine {
return new IndexWriter(directory, iwc);
}
private IndexWriterConfig getIndexWriterConfig(boolean create) {
private IndexWriterConfig getIndexWriterConfig(boolean create, IndexCommit startingCommit) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;

View File

@ -1291,7 +1291,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException {
assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo();
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
assert commitInfo.localCheckpoint >= globalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+ globalCheckpoint + "]";

View File

@ -381,7 +381,7 @@ final class StoreRecovery {
recoveryState.getIndex().updateVersion(version);
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
indexShard.openIndexAndCreateTranslog(true, store.loadSeqNoInfo().localCheckpoint);
indexShard.openIndexAndCreateTranslog(true, store.loadSeqNoInfo(null).localCheckpoint);
} else {
// since we recover from local, just fill the files and size
try {
@ -442,7 +442,7 @@ final class StoreRecovery {
final long localCheckpoint;
store.incRef();
try {
localCheckpoint = store.loadSeqNoInfo().localCheckpoint;
localCheckpoint = store.loadSeqNoInfo(null).localCheckpoint;
} finally {
store.decRef();
}

View File

@ -182,9 +182,17 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* @throws IOException if the index is corrupted or the segments file is not present
*/
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
return readCommittedSegmentsInfo(null);
}
/**
* Returns the committed segments info for the given commit point.
* If the commit point is not provided, this method will return the segments info of the last commit in the store.
*/
public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException {
failIfCorrupted();
try {
return readSegmentsInfo(null, directory());
return readSegmentsInfo(commit, directory());
} catch (CorruptIndexException ex) {
markStoreCorrupted(ex);
throw ex;
@ -212,13 +220,14 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
/**
* Loads the maximum sequence number and local checkpoint from the latest Lucene commit point.
* Loads the maximum sequence number and local checkpoint from the given Lucene commit point or the latest if not provided.
*
* @param commit the commit point to load seqno stats, or the last commit in the store if the parameter is null
* @return {@link SequenceNumbers.CommitInfo} containing information about the last commit
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public SequenceNumbers.CommitInfo loadSeqNoInfo() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData();
public SequenceNumbers.CommitInfo loadSeqNoInfo(final IndexCommit commit) throws IOException {
final Map<String, String> userData = commit != null ? commit.getUserData() : SegmentInfos.readLatestCommit(directory).getUserData();
return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
}

View File

@ -356,7 +356,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
/**
* Returns the minimum file generation referenced by the translog
*/
long getMinFileGeneration() {
public long getMinFileGeneration() {
try (ReleasableLock ignored = readLock.acquire()) {
if (readers.isEmpty()) {
return current.getGeneration();

View File

@ -353,7 +353,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo();
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*

View File

@ -106,7 +106,6 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -1121,9 +1120,14 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testSyncedFlushSurvivesEngineRestart() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
IOUtils.close(store, engine);
store = createStore();
engine = createEngine(store, primaryTranslogDir, globalCheckpoint::get);
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}"), null);
engine.index(indexForDoc(doc));
globalCheckpoint.set(0L);
final Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Engine.SyncedFlushResult.SUCCESS);
@ -1145,7 +1149,7 @@ public class InternalEngineTests extends EngineTestCase {
public void testSyncedFlushVanishesOnReplay() throws IOException {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}"), null);
engine.index(indexForDoc(doc));
final Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
@ -2514,6 +2518,7 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testTranslogReplay() throws IOException {
final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpointTracker().getCheckpoint();
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
@ -2527,7 +2532,7 @@ public class InternalEngineTests extends EngineTestCase {
parser.mappingUpdate = dynamicUpdate();
engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
@ -2541,7 +2546,7 @@ public class InternalEngineTests extends EngineTestCase {
}
engine.close();
engine = createEngine(store, primaryTranslogDir);
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
assertVisibleCount(engine, numDocs, false);
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
assertEquals(0, parser.appliedOperations());
@ -2568,7 +2573,7 @@ public class InternalEngineTests extends EngineTestCase {
}
engine.close();
engine = createEngine(store, primaryTranslogDir);
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs + 1);
assertThat(topDocs.totalHits, equalTo(numDocs + 1L));
@ -2580,7 +2585,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.refresh("test");
} else {
engine.close();
engine = createEngine(store, primaryTranslogDir);
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
}
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs);
@ -2809,8 +2814,9 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testCurrentTranslogIDisCommitted() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null);
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
// create
{
@ -2820,7 +2826,7 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
assertFalse(engine.isRecovering());
engine.index(firstIndexRequest);
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));

View File

@ -145,6 +145,7 @@ import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ -1230,6 +1231,7 @@ public class IndexShardTests extends IndexShardTestCase {
public void testIndexingOperationsListeners() throws IOException {
IndexShard shard = newStartedShard(true);
indexDoc(shard, "test", "0", "{\"foo\" : \"bar\"}");
shard.updateLocalCheckpointForShard(shard.shardRouting.allocationId().getId(), 0);
AtomicInteger preIndex = new AtomicInteger();
AtomicInteger postIndexCreate = new AtomicInteger();
AtomicInteger postIndexUpdate = new AtomicInteger();
@ -1538,28 +1540,44 @@ public class IndexShardTests extends IndexShardTestCase {
}
public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
/*
* The flow of this test:
* - delete #1
* - roll generation (to create gen 2)
* - index #0
* - index #3
* - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained)
* - index #2
* - index #5
* - If flush and then recover from the existing store, delete #1 will be removed while index #0 is still retained and replayed.
*/
final IndexShard shard = newStartedShard(false);
final Consumer<Mapping> mappingConsumer = getMappingUpdater(shard, "test");
shard.applyDeleteOperationOnReplica(1, 2, "test", "id", VersionType.EXTERNAL, mappingConsumer);
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
// index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation stick
// around
shard.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id2", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
shard.applyIndexOperationOnReplica(3, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id-3", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
shard.applyIndexOperationOnReplica(2, 3, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id-2", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
shard.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(shard.shardId().getIndexName(), "test", "id-5", new BytesArray("{}"), XContentType.JSON), mappingConsumer);
final int translogOps;
if (randomBoolean()) {
// Advance the global checkpoint to remove the 1st commit; this shard will recover the 2nd commit.
shard.updateGlobalCheckpointOnReplica(3, "test");
logger.info("--> flushing shard");
flushShard(shard);
translogOps = 2;
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
translogOps = 4; // delete #1 won't be replayed.
} else if (randomBoolean()) {
shard.getEngine().rollTranslogGeneration();
translogOps = 3;
translogOps = 5;
} else {
translogOps = 3;
translogOps = 5;
}
final ShardRouting replicaRouting = shard.routingEntry();
@ -1574,7 +1592,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry()));
assertDocCount(newShard, 1);
assertDocCount(newShard, 3);
closeShards(newShard);
}
@ -1586,6 +1604,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexDoc(shard, "test", Integer.toString(i));
}
if (randomBoolean()) {
shard.updateLocalCheckpointForShard(shard.shardRouting.allocationId().getId(), totalOps - 1);
flushShard(shard);
translogOps = 0;
}
@ -1738,6 +1757,39 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(newShard);
}
public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
final IndexShard shard = newStartedShard(false);
final String indexName = shard.shardId().getIndexName();
final Consumer<Mapping> mapping = getMappingUpdater(shard, "doc");
// Index #0, index #1
shard.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "doc", "doc-0", new BytesArray("{}"), XContentType.JSON), mapping);
flushShard(shard);
shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here.
shard.applyIndexOperationOnReplica(1, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "doc", "doc-1", new BytesArray("{}"), XContentType.JSON), mapping);
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1"));
// Simulate resync (without rollback): Noop #1, index #2
shard.primaryTerm++;
shard.markSeqNoAsNoop(1, "test");
shard.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false,
SourceToParse.source(indexName, "doc", "doc-2", new BytesArray("{}"), XContentType.JSON), mapping);
flushShard(shard);
assertThat(getShardDocUIDs(shard), containsInAnyOrder("doc-0", "doc-1", "doc-2"));
// Recovering from store should discard doc #1
final ShardRouting replicaRouting = shard.routingEntry();
IndexShard newShard = reinitShard(shard,
newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.StoreRecoverySource.EXISTING_STORE_INSTANCE));
newShard.primaryTerm++;
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
assertThat(getShardDocUIDs(newShard), containsInAnyOrder("doc-0", "doc-2"));
closeShards(newShard);
}
public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException, IOException {
final IndexShard shard = newStartedShard(true);
ShardRouting origRouting = shard.routingEntry();

View File

@ -19,16 +19,22 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.resync.ResyncReplicationResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.tasks.TaskManager;
import java.io.IOException;
@ -58,7 +64,10 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
int numDocs = randomInt(10);
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "test", Integer.toString(i));
// Index doc but not advance local checkpoint.
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
SourceToParse.source(shard.shardId().getIndexName(), "test", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(shard, "test"));
}
long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
@ -108,7 +117,10 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
indexDoc(shard, "test", Integer.toString(i));
// Index doc but not advance local checkpoint.
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
SourceToParse.source(shard.shardId().getIndexName(), "test", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(shard, "test"));
}
String allocationId = shard.routingEntry().allocationId().getId();

View File

@ -25,6 +25,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -61,7 +62,7 @@ public class TestTranslog {
Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
for (Path translogDir : translogDirs) {
if (Files.isDirectory(translogDir)) {
final long minUsedTranslogGen = minTranslogGenUsedInRecovery(translogDir.getParent().resolve("index"));
final long minUsedTranslogGen = minTranslogGenUsedInRecovery(translogDir);
logger.info("--> Translog dir [{}], minUsedTranslogGen [{}]", translogDir, minUsedTranslogGen);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
for (Path item : stream) {
@ -112,11 +113,11 @@ public class TestTranslog {
/**
* Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog.
*/
private static long minTranslogGenUsedInRecovery(Path indexPath) throws IOException {
try (NIOFSDirectory directory = new NIOFSDirectory(indexPath)) {
final List<IndexCommit> commits = DirectoryReader.listCommits(directory);
// TODO: We should call CombinedDeletionPolicy to get a correct recovering commit.
final IndexCommit recoveringCommit = commits.get(commits.size() - 1);
private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException {
try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) {
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath);
IndexCommit recoveringCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint);
return Long.parseLong(recoveringCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
}
}

View File

@ -159,6 +159,15 @@ public abstract class EngineTestCase extends ESTestCase {
return copy(config, openMode, config.getAnalyzer());
}
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, LongSupplier globalCheckpointSupplier) {
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
config.getCircuitBreakerService(), globalCheckpointSupplier);
}
public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
@ -251,6 +260,10 @@ public abstract class EngineTestCase extends ESTestCase {
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
}
protected InternalEngine createEngine(Store store, Path translogPath, LongSupplier globalCheckpointSupplier) throws IOException {
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier);
}
protected InternalEngine createEngine(
Store store,
Path translogPath,

View File

@ -555,8 +555,11 @@ public abstract class IndexShardTestCase extends ESTestCase {
throws IOException {
SourceToParse sourceToParse = SourceToParse.source(shard.shardId().getIndexName(), type, id, new BytesArray(source), xContentType);
if (shard.routingEntry().primary()) {
return shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse,
final Engine.IndexResult result = shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, sourceToParse,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(shard, type));
shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(),
shard.getEngine().getLocalCheckpointTracker().getCheckpoint());
return result;
} else {
return shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0,
VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse, getMappingUpdater(shard, type));