diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index add4a443903..2d46ae1d8a8 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -21,43 +21,48 @@ package org.elasticsearch.index.engine; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; -import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogDeletionPolicy; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.function.LongSupplier; /** * An {@link IndexDeletionPolicy} that coordinates between Lucene's commits and the retention of translog generation files, * making sure that all translog files that are needed to recover from the Lucene commit are not deleted. + *

+ * In particular, this policy will delete index commits whose max sequence number is at most + * the current global checkpoint except the index commit which has the highest max sequence number among those. */ -class CombinedDeletionPolicy extends IndexDeletionPolicy { - +final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final TranslogDeletionPolicy translogDeletionPolicy; private final EngineConfig.OpenMode openMode; + private final LongSupplier globalCheckpointSupplier; - private final SnapshotDeletionPolicy indexDeletionPolicy; - - CombinedDeletionPolicy(SnapshotDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy, - EngineConfig.OpenMode openMode) { - this.indexDeletionPolicy = indexDeletionPolicy; - this.translogDeletionPolicy = translogDeletionPolicy; + CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy, + LongSupplier globalCheckpointSupplier) { this.openMode = openMode; + this.translogDeletionPolicy = translogDeletionPolicy; + this.globalCheckpointSupplier = globalCheckpointSupplier; } @Override public void onInit(List commits) throws IOException { - indexDeletionPolicy.onInit(commits); switch (openMode) { case CREATE_INDEX_AND_TRANSLOG: + assert commits.isEmpty() : "index is created, but we have commits"; break; case OPEN_INDEX_CREATE_TRANSLOG: assert commits.isEmpty() == false : "index is opened, but we have no commits"; + // When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately. + // We therefore can simply skip processing here as `onCommit` will be called right after with a new commit. break; case OPEN_INDEX_AND_TRANSLOG: assert commits.isEmpty() == false : "index is opened, but we have no commits"; - setLastCommittedTranslogGeneration(commits); + onCommit(commits); break; default: throw new IllegalArgumentException("unknown openMode [" + openMode + "]"); @@ -66,24 +71,56 @@ class CombinedDeletionPolicy extends IndexDeletionPolicy { @Override public void onCommit(List commits) throws IOException { - indexDeletionPolicy.onCommit(commits); - setLastCommittedTranslogGeneration(commits); + final int keptPosition = indexOfKeptCommits(commits); + for (int i = 0; i < keptPosition; i++) { + commits.get(i).delete(); + } + updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1)); } - private void setLastCommittedTranslogGeneration(List commits) throws IOException { - // when opening an existing lucene index, we currently always open the last commit. - // we therefore use the translog gen as the one that will be required for recovery - final IndexCommit indexCommit = commits.get(commits.size() - 1); - assert indexCommit.isDeleted() == false : "last commit is deleted"; - long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen); + private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException { + assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted"; + final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + + assert lastCommit.isDeleted() == false : "The last commit must not be deleted"; + final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + + assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen"; + translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen); + translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); } - public SnapshotDeletionPolicy getIndexDeletionPolicy() { - return indexDeletionPolicy; - } + /** + * Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint. + * Index commits with different translog UUID will be filtered out as they don't belong to this engine. + */ + private int indexOfKeptCommits(List commits) throws IOException { + final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong(); + final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); - public TranslogDeletionPolicy getTranslogDeletionPolicy() { - return translogDeletionPolicy; + // Commits are sorted by age (the 0th one is the oldest commit). + for (int i = commits.size() - 1; i >= 0; i--) { + final Map commitUserData = commits.get(i).getUserData(); + // Ignore index commits with different translog uuid. + if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) { + return i + 1; + } + // 5.x commits do not contain MAX_SEQ_NO. + if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) { + return i; + } + final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)); + if (maxSeqNoFromCommit <= currentGlobalCheckpoint) { + return i; + } + } + /* + * We may reach to this point in these cases: + * 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress. + * Thus, after upgrading, we may not find a safe commit until we can reserve one. + * 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary. + * However, that commit may not be a safe commit if writes are in progress in the primary. + */ + return 0; } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 47056c3b010..e4f6a6f9b0a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -25,7 +25,6 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; @@ -128,7 +127,7 @@ public class InternalEngine extends Engine { private final String uidField; - private final CombinedDeletionPolicy deletionPolicy; + private final SnapshotDeletionPolicy snapshotDeletionPolicy; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling @@ -167,8 +166,6 @@ public class InternalEngine extends Engine { engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() ); - this.deletionPolicy = new CombinedDeletionPolicy( - new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); IndexWriter writer = null; Translog translog = null; @@ -182,30 +179,19 @@ public class InternalEngine extends Engine { mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); throttle = new IndexThrottle(); try { - final SeqNoStats seqNoStats; - switch (openMode) { - case OPEN_INDEX_AND_TRANSLOG: - writer = createWriter(false); - final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); - seqNoStats = store.loadSeqNoStats(globalCheckpoint); - break; - case OPEN_INDEX_CREATE_TRANSLOG: - writer = createWriter(false); - seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO); - break; - case CREATE_INDEX_AND_TRANSLOG: - writer = createWriter(true); - seqNoStats = new SeqNoStats( - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO); - break; - default: - throw new IllegalArgumentException(openMode.toString()); - } + final SeqNoStats seqNoStats = loadSeqNoStats(openMode); logger.trace("recovered [{}]", seqNoStats); - seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats); + this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats); + this.snapshotDeletionPolicy = new SnapshotDeletionPolicy( + new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint) + ); + writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); updateMaxUnsafeAutoIdTimestampFromWriter(writer); + assert engineConfig.getForceNewHistoryUUID() == false + || openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG + || openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG + : "OpenMode must be either CREATE_INDEX_AND_TRANSLOG or OPEN_INDEX_CREATE_TRANSLOG if forceNewHistoryUUID; " + + "openMode [" + openMode + "], forceNewHistoryUUID [" + engineConfig.getForceNewHistoryUUID() + "]"; historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID()); Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; @@ -380,6 +366,23 @@ public class InternalEngine extends Engine { seqNoStats.getGlobalCheckpoint()); } + private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException { + switch (openMode) { + case OPEN_INDEX_AND_TRANSLOG: + final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); + return store.loadSeqNoStats(globalCheckpoint); + case OPEN_INDEX_CREATE_TRANSLOG: + return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO); + case CREATE_INDEX_AND_TRANSLOG: + return new SeqNoStats( + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.UNASSIGNED_SEQ_NO); + default: + throw new IllegalArgumentException(openMode.toString()); + } + } + @Override public InternalEngine recoverFromTranslog() throws IOException { flushLock.lock(); @@ -1607,7 +1610,7 @@ public class InternalEngine extends Engine { } try (ReleasableLock lock = readLock.acquire()) { logger.trace("pulling snapshot"); - return new IndexCommitRef(deletionPolicy.getIndexDeletionPolicy()); + return new IndexCommitRef(snapshotDeletionPolicy); } catch (IOException e) { throw new SnapshotFailedEngineException(shardId, e); } @@ -1788,7 +1791,7 @@ public class InternalEngine extends Engine { final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); iwc.setCommitOnClose(false); // we by default don't commit on close iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); - iwc.setIndexDeletionPolicy(deletionPolicy); + iwc.setIndexDeletionPolicy(snapshotDeletionPolicy); // with tests.verbose, lucene sets this up: plumb to align with filesystem stream boolean verbose = false; try { diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 80033833899..9e2e7ddbd06 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -372,14 +372,14 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * Returns the number of operations in the translog files that aren't committed to lucene. */ public int uncommittedOperations() { - return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery()); + return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit()); } /** * Returns the size in bytes of the translog files that aren't committed to lucene. */ public long uncommittedSizeInBytes() { - return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery()); + return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit()); } /** diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index adee4bd9fa9..5eba198378a 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -54,6 +54,11 @@ public class TranslogDeletionPolicy { */ private long minTranslogGenerationForRecovery = 1; + /** + * This translog generation is used to calculate the number of uncommitted operations since the last index commit. + */ + private long translogGenerationOfLastCommit = 1; + private long retentionSizeInBytes; private long retentionAgeInMillis; @@ -69,13 +74,24 @@ public class TranslogDeletionPolicy { } public synchronized void setMinTranslogGenerationForRecovery(long newGen) { - if (newGen < minTranslogGenerationForRecovery) { - throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" + - minTranslogGenerationForRecovery + "]"); + if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) { + throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," + + "current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]"); } minTranslogGenerationForRecovery = newGen; } + /** + * Sets the translog generation of the last index commit. + */ + public synchronized void setTranslogGenerationOfLastCommit(long lastGen) { + if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) { + throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," + + "current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]"); + } + translogGenerationOfLastCommit = lastGen; + } + public synchronized void setRetentionSizeInBytes(long bytes) { retentionSizeInBytes = bytes; } @@ -193,6 +209,14 @@ public class TranslogDeletionPolicy { return minTranslogGenerationForRecovery; } + /** + * Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit. + * See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()} + */ + public synchronized long getTranslogGenerationOfLastCommit() { + return translogGenerationOfLastCommit; + } + synchronized long getTranslogRefCount(long gen) { final Counter counter = translogRefCounts.get(gen); return counter == null ? 0 : counter.get(); diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 5d4385cbd38..0fc6195161a 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -19,66 +19,178 @@ package org.elasticsearch.index.engine; +import com.carrotsearch.hppc.LongArrayList; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import static java.util.Collections.singletonList; +import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; +import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class CombinedDeletionPolicyTests extends ESTestCase { - public void testPassThrough() throws IOException { - SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class); - CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, createTranslogDeletionPolicy(), - EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); - List commitList = new ArrayList<>(); - long count = randomIntBetween(1, 3); - for (int i = 0; i < count; i++) { - commitList.add(mockIndexCommitWithTranslogGen(randomNonNegativeLong())); + public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(); + TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + + final LongArrayList maxSeqNoList = new LongArrayList(); + final LongArrayList translogGenList = new LongArrayList(); + final List commitList = new ArrayList<>(); + int totalCommits = between(2, 20); + long lastMaxSeqNo = 0; + long lastTranslogGen = 0; + final UUID translogUUID = UUID.randomUUID(); + for (int i = 0; i < totalCommits; i++) { + lastMaxSeqNo += between(1, 10000); + lastTranslogGen += between(1, 100); + commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen)); + maxSeqNoList.add(lastMaxSeqNo); + translogGenList.add(lastTranslogGen); } - combinedDeletionPolicy.onInit(commitList); - verify(indexDeletionPolicy, times(1)).onInit(commitList); - combinedDeletionPolicy.onCommit(commitList); - verify(indexDeletionPolicy, times(1)).onCommit(commitList); + + int keptIndex = randomInt(commitList.size() - 1); + final long lower = maxSeqNoList.get(keptIndex); + final long upper = keptIndex == commitList.size() - 1 ? + Long.MAX_VALUE : Math.max(maxSeqNoList.get(keptIndex), maxSeqNoList.get(keptIndex + 1) - 1); + globalCheckpoint.set(randomLongBetween(lower, upper)); + indexPolicy.onCommit(commitList); + + for (int i = 0; i < commitList.size(); i++) { + if (i < keptIndex) { + verify(commitList.get(i), times(1)).delete(); + } else { + verify(commitList.get(i), never()).delete(); + } + } + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex))); + assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); } - public void testSettingMinTranslogGen() throws IOException { - SnapshotDeletionPolicy indexDeletionPolicy = mock(SnapshotDeletionPolicy.class); - final TranslogDeletionPolicy translogDeletionPolicy = mock(TranslogDeletionPolicy.class); - CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(indexDeletionPolicy, translogDeletionPolicy, - EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG); - List commitList = new ArrayList<>(); - long count = randomIntBetween(10, 20); - long lastGen = 0; - for (int i = 0; i < count; i++) { - lastGen += randomIntBetween(10, 20000); - commitList.add(mockIndexCommitWithTranslogGen(lastGen)); - } - combinedDeletionPolicy.onInit(commitList); - verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen); - commitList.clear(); - for (int i = 0; i < count; i++) { - lastGen += randomIntBetween(10, 20000); - commitList.add(mockIndexCommitWithTranslogGen(lastGen)); - } - combinedDeletionPolicy.onCommit(commitList); - verify(translogDeletionPolicy, times(1)).setMinTranslogGenerationForRecovery(lastGen); + public void testIgnoreSnapshottingCommits() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(); + final UUID translogUUID = UUID.randomUUID(); + TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + + long firstMaxSeqNo = randomLongBetween(0, Long.MAX_VALUE - 1); + long secondMaxSeqNo = randomLongBetween(firstMaxSeqNo + 1, Long.MAX_VALUE); + + long lastTranslogGen = randomNonNegativeLong(); + final IndexCommit firstCommit = mockIndexCommit(firstMaxSeqNo, translogUUID, randomLongBetween(0, lastTranslogGen)); + final IndexCommit secondCommit = mockIndexCommit(secondMaxSeqNo, translogUUID, lastTranslogGen); + SnapshotDeletionPolicy snapshotDeletionPolicy = new SnapshotDeletionPolicy(indexPolicy); + + snapshotDeletionPolicy.onInit(Arrays.asList(firstCommit)); + snapshotDeletionPolicy.snapshot(); + assertThat(snapshotDeletionPolicy.getSnapshots(), contains(firstCommit)); + + // SnapshotPolicy prevents the first commit from deleting, but CombinedPolicy does not retain its translog. + globalCheckpoint.set(randomLongBetween(secondMaxSeqNo, Long.MAX_VALUE)); + snapshotDeletionPolicy.onCommit(Arrays.asList(firstCommit, secondCommit)); + verify(firstCommit, never()).delete(); + verify(secondCommit, never()).delete(); + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); } - IndexCommit mockIndexCommitWithTranslogGen(long gen) throws IOException { - IndexCommit commit = mock(IndexCommit.class); - when(commit.getUserData()).thenReturn(Collections.singletonMap(Translog.TRANSLOG_GENERATION_KEY, Long.toString(gen))); + public void testLegacyIndex() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(); + final UUID translogUUID = UUID.randomUUID(); + + TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); + + long legacyTranslogGen = randomNonNegativeLong(); + IndexCommit legacyCommit = mockLegacyIndexCommit(translogUUID, legacyTranslogGen); + indexPolicy.onInit(singletonList(legacyCommit)); + verify(legacyCommit, never()).delete(); + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen)); + assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(legacyTranslogGen)); + + long safeTranslogGen = randomLongBetween(legacyTranslogGen, Long.MAX_VALUE); + long maxSeqNo = randomLongBetween(1, Long.MAX_VALUE); + final IndexCommit freshCommit = mockIndexCommit(maxSeqNo, translogUUID, safeTranslogGen); + + globalCheckpoint.set(randomLongBetween(0, maxSeqNo - 1)); + indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit)); + verify(legacyCommit, times(0)).delete(); + verify(freshCommit, times(0)).delete(); + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(legacyTranslogGen)); + assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen)); + + // Make the fresh commit safe. + globalCheckpoint.set(randomLongBetween(maxSeqNo, Long.MAX_VALUE)); + indexPolicy.onCommit(Arrays.asList(legacyCommit, freshCommit)); + verify(legacyCommit, times(1)).delete(); + verify(freshCommit, times(0)).delete(); + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(safeTranslogGen)); + assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(safeTranslogGen)); + } + + public void testDeleteInvalidCommits() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); + TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); + CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_CREATE_TRANSLOG, translogPolicy, globalCheckpoint::get); + + final int invalidCommits = between(1, 10); + final List commitList = new ArrayList<>(); + for (int i = 0; i < invalidCommits; i++) { + commitList.add(mockIndexCommit(randomNonNegativeLong(), UUID.randomUUID(), randomNonNegativeLong())); + } + + final UUID expectedTranslogUUID = UUID.randomUUID(); + long lastTranslogGen = 0; + final int validCommits = between(1, 10); + for (int i = 0; i < validCommits; i++) { + lastTranslogGen += between(1, 1000); + commitList.add(mockIndexCommit(randomNonNegativeLong(), expectedTranslogUUID, lastTranslogGen)); + } + + // We should never keep invalid commits regardless of the value of the global checkpoint. + indexPolicy.onCommit(commitList); + for (int i = 0; i < invalidCommits - 1; i++) { + verify(commitList.get(i), times(1)).delete(); + } + } + + IndexCommit mockIndexCommit(long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { + final Map userData = new HashMap<>(); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); + userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); + userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); + final IndexCommit commit = mock(IndexCommit.class); + when(commit.getUserData()).thenReturn(userData); + return commit; + } + + IndexCommit mockLegacyIndexCommit(UUID translogUUID, long translogGen) throws IOException { + final Map userData = new HashMap<>(); + userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); + userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); + final IndexCommit commit = mock(IndexCommit.class); + when(commit.getUserData()).thenReturn(userData); return commit; } } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2d772daf7cc..999b4dca563 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -79,6 +79,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; @@ -113,6 +114,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -165,6 +167,7 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -945,17 +948,47 @@ public class InternalEngineTests extends EngineTestCase { } public void testCommitAdvancesMinTranslogForRecovery() throws IOException { + IOUtils.close(engine, store); + final Path translogPath = createTempDir(); + store = createStore(); + final AtomicBoolean inSync = new AtomicBoolean(randomBoolean()); + final BiFunction seqNoServiceSupplier = (config, seqNoStats) -> + new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { + @Override + public long getGlobalCheckpoint() { + return inSync.get() ? getLocalCheckpoint() : SequenceNumbers.UNASSIGNED_SEQ_NO; + } + }; + engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null), seqNoServiceSupplier); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); + engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(2L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync.get() ? 2L : 1L)); + assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(2L)); + engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(2L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(2L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync.get() ? 2L : 1L)); + assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(2L)); + engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(3L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync.get() ? 3L : 1L)); + assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); + + inSync.set(true); + engine.flush(true, true); + assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L)); + assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(4L)); + assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L)); } public void testSyncedFlush() throws IOException { @@ -2359,10 +2392,26 @@ public class InternalEngineTests extends EngineTestCase { ); indexSettings.updateIndexMetaData(builder.build()); + final BiFunction seqNoServiceSupplier = (config, seqNoStats) -> + new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { + @Override + public long getGlobalCheckpoint() { + return getLocalCheckpoint(); + } + }; + try (Store store = createStore()) { AtomicBoolean throwErrorOnCommit = new AtomicBoolean(); final Path translogPath = createTempDir(); - try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null)) { + try (InternalEngine engine = + new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null), seqNoServiceSupplier) { + @Override protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { super.commitIndexWriter(writer, translog, syncId); @@ -2377,7 +2426,8 @@ public class InternalEngineTests extends EngineTestCase { FlushFailedEngineException e = expectThrows(FlushFailedEngineException.class, engine::flush); assertThat(e.getCause().getMessage(), equalTo("power's out")); } - try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null))) { + try (InternalEngine engine = + new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null), seqNoServiceSupplier)) { engine.recoverFromTranslog(); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( @@ -2608,13 +2658,16 @@ public class InternalEngineTests extends EngineTestCase { EngineConfig config = engine.config(); EngineConfig newConfig = new EngineConfig( - randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, + randomBoolean() ? EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, shardId, allocationId.getId(), threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), new NoneCircuitBreakerService()); + if (newConfig.getOpenMode() == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG) { + Lucene.cleanLuceneIndex(store.directory()); + } engine = new InternalEngine(newConfig); if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { engine.recoverFromTranslog(); @@ -4116,4 +4169,62 @@ public class InternalEngineTests extends EngineTestCase { } } + public void testKeepTranslogAfterGlobalCheckpoint() throws Exception { + IOUtils.close(engine, store); + final AtomicLong globalCheckpoint = new AtomicLong(0); + final BiFunction seqNoServiceSupplier = (config, seqNoStats) -> + new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { + @Override + public long getGlobalCheckpoint() { + return globalCheckpoint.get(); + } + }; + + final IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(), + defaultSettings.getScopedSettings()); + IndexMetaData.Builder builder = IndexMetaData.builder(indexSettings.getIndexMetaData()) + .settings(Settings.builder().put(indexSettings.getSettings()) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), randomFrom("-1", "100micros", "30m")) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), randomFrom("-1", "512b", "1gb"))); + indexSettings.updateIndexMetaData(builder.build()); + + store = createStore(); + try (InternalEngine engine + = createEngine(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, seqNoServiceSupplier)) { + int numDocs = scaledRandomIntBetween(10, 100); + for (int docId = 0; docId < numDocs; docId++) { + ParseContext.Document document = testDocumentWithTextField(); + document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); + engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null))); + if (frequently()) { + globalCheckpoint.set(randomIntBetween( + Math.toIntExact(engine.seqNoService().getGlobalCheckpoint()), + Math.toIntExact(engine.seqNoService().getLocalCheckpoint()))); + } + if (frequently()) { + engine.flush(randomBoolean(), true); + final List commits = DirectoryReader.listCommits(store.directory()); + // Keep only one safe commit as the oldest commit. + final IndexCommit safeCommit = commits.get(0); + assertThat(Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), + lessThanOrEqualTo(globalCheckpoint.get())); + for (int i = 1; i < commits.size(); i++) { + assertThat(Long.parseLong(commits.get(i).getUserData().get(SequenceNumbers.MAX_SEQ_NO)), + greaterThan(globalCheckpoint.get())); + } + // Make sure we keep all translog operations after the local checkpoint of the safe commit. + long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(localCheckpointFromSafeCommit + 1, docId)); + } + } + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index e4b1eb083b5..1410d4978b1 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1051,8 +1051,9 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); } - public void testAcquireIndexCommit() throws IOException { - final IndexShard shard = newStartedShard(); + public void testAcquireIndexCommit() throws Exception { + boolean isPrimary = randomBoolean(); + final IndexShard shard = newStartedShard(isPrimary); int numDocs = randomInt(20); for (int i = 0; i < numDocs; i++) { indexDoc(shard, "type", "id_" + i); @@ -1069,6 +1070,14 @@ public class IndexShardTests extends IndexShardTestCase { assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0)); } commit.close(); + // Make the global checkpoint in sync with the local checkpoint. + if (isPrimary) { + final String allocationId = shard.shardRouting.allocationId().getId(); + shard.updateLocalCheckpointForShard(allocationId, numDocs + moreDocs - 1); + shard.updateGlobalCheckpointForShard(allocationId, shard.getLocalCheckpoint()); + } else { + shard.updateGlobalCheckpointOnReplica(numDocs + moreDocs - 1, "test"); + } flushShard(shard, true); // check it's clean up diff --git a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java index 4ca6057bd6b..21f7dd9481c 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java +++ b/core/src/test/java/org/elasticsearch/index/translog/SnapshotMatchers.java @@ -19,6 +19,8 @@ package org.elasticsearch.index.translog; +import com.carrotsearch.hppc.LongHashSet; +import com.carrotsearch.hppc.LongSet; import org.elasticsearch.ElasticsearchException; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -61,6 +63,13 @@ public final class SnapshotMatchers { return new ContainingInAnyOrderMatcher(expectedOperations); } + /** + * Consumes a snapshot and makes sure that its operations have all seqno between minSeqNo(inclusive) and maxSeqNo(inclusive). + */ + public static Matcher containsSeqNoRange(long minSeqNo, long maxSeqNo) { + return new ContainingSeqNoRangeMatcher(minSeqNo, maxSeqNo); + } + public static class SizeMatcher extends TypeSafeMatcher { private final int size; @@ -190,4 +199,45 @@ public final class SnapshotMatchers { .appendText(" in any order."); } } + + static class ContainingSeqNoRangeMatcher extends TypeSafeMatcher { + private final long minSeqNo; + private final long maxSeqNo; + private final List notFoundSeqNo = new ArrayList<>(); + + ContainingSeqNoRangeMatcher(long minSeqNo, long maxSeqNo) { + this.minSeqNo = minSeqNo; + this.maxSeqNo = maxSeqNo; + } + + @Override + protected boolean matchesSafely(Translog.Snapshot snapshot) { + try { + final LongSet seqNoList = new LongHashSet(); + Translog.Operation op; + while ((op = snapshot.next()) != null) { + seqNoList.add(op.seqNo()); + } + for (long i = minSeqNo; i <= maxSeqNo; i++) { + if (seqNoList.contains(i) == false) { + notFoundSeqNo.add(i); + } + } + return notFoundSeqNo.isEmpty(); + } catch (IOException ex) { + throw new ElasticsearchException("failed to read snapshot content", ex); + } + } + + @Override + protected void describeMismatchSafely(Translog.Snapshot snapshot, Description mismatchDescription) { + mismatchDescription + .appendText("not found seqno ").appendValueList("[", ", ", "]", notFoundSeqNo); + } + + @Override + public void describeTo(Description description) { + description.appendText("snapshot contains all seqno from [" + minSeqNo + " to " + maxSeqNo + "]"); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index f62d292730e..39fc182623f 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -53,6 +53,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase { assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); final int committedReader = randomIntBetween(0, allGens.size() - 1); final long committedGen = allGens.get(committedReader).generation; + deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen); } finally { @@ -109,6 +110,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase { allGens.add(readersAndWriter.v2()); try { TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE); + deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE); deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); int selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGenerationByAge = allGens.get(selectedReader).generation; @@ -122,6 +124,7 @@ public class TranslogDeletionPolicyTests extends ESTestCase { // make a new policy as committed gen can't go backwards (for now) deletionPolicy = new MockDeletionPolicy(now, size, maxAge); long committedGen = randomFrom(allGens).generation; + deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, Math.max(selectedGenerationByAge, selectedGenerationBySize))); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 593e1059215..dc050949fe3 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -153,17 +153,20 @@ public class TranslogTests extends ESTestCase { } private void markCurrentGenAsCommitted(Translog translog) throws IOException { - commit(translog, translog.currentFileGeneration()); + long genToCommit = translog.currentFileGeneration(); + long genToRetain = randomLongBetween(translog.getDeletionPolicy().getMinTranslogGenerationForRecovery(), genToCommit); + commit(translog, genToRetain, genToCommit); } private void rollAndCommit(Translog translog) throws IOException { translog.rollGeneration(); - commit(translog, translog.currentFileGeneration()); + markCurrentGenAsCommitted(translog); } - private void commit(Translog translog, long genToCommit) throws IOException { + private void commit(Translog translog, long genToRetain, long genToCommit) throws IOException { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - deletionPolicy.setMinTranslogGenerationForRecovery(genToCommit); + deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit); + deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain); long minGenRequired = deletionPolicy.minTranslogGenRequired(translog.getReaders(), translog.getCurrent()); translog.trimUnreferencedReaders(); assertThat(minGenRequired, equalTo(translog.getMinFileGeneration())); @@ -440,6 +443,31 @@ public class TranslogTests extends ESTestCase { } } + public void testUncommittedOperations() throws Exception { + final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); + deletionPolicy.setRetentionAgeInMillis(randomLong()); + deletionPolicy.setRetentionSizeInBytes(randomLong()); + + final int operations = scaledRandomIntBetween(10, 100); + int uncommittedOps = 0; + int operationsInLastGen = 0; + for (int i = 0; i < operations; i++) { + translog.add(new Translog.Index("test", Integer.toString(i), i, new byte[]{1})); + uncommittedOps++; + operationsInLastGen++; + if (rarely()) { + translog.rollGeneration(); + operationsInLastGen = 0; + } + assertThat(translog.uncommittedOperations(), equalTo(uncommittedOps)); + if (frequently()) { + markCurrentGenAsCommitted(translog); + assertThat(translog.uncommittedOperations(), equalTo(operationsInLastGen)); + uncommittedOps = operationsInLastGen; + } + } + } + public void testTotalTests() { final TranslogStats total = new TranslogStats(); final int n = randomIntBetween(0, 16); @@ -824,6 +852,7 @@ public class TranslogTests extends ESTestCase { translog.rollGeneration(); // expose the new checkpoint (simulating a commit), before we trim the translog lastCommittedLocalCheckpoint.set(localCheckpoint); + deletionPolicy.setTranslogGenerationOfLastCommit(translog.currentFileGeneration()); deletionPolicy.setMinTranslogGenerationForRecovery( translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); translog.trimUnreferencedReaders(); @@ -1822,6 +1851,7 @@ public class TranslogTests extends ESTestCase { translog.close(); TranslogConfig config = translog.getConfig(); final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); + deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -1867,6 +1897,7 @@ public class TranslogTests extends ESTestCase { translog.rollGeneration(); } } + deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, translog.currentFileGeneration())); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); fail.failRandomly(); try { @@ -1876,6 +1907,7 @@ public class TranslogTests extends ESTestCase { } } final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1); + deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.UNASSIGNED_SEQ_NO)) { // we don't know when things broke exactly @@ -2413,8 +2445,9 @@ public class TranslogTests extends ESTestCase { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); } - commit(translog, generation + rolls); - assertThat(translog.currentFileGeneration(), equalTo(generation + rolls )); + long minGenForRecovery = randomLongBetween(generation, generation + rolls); + commit(translog, minGenForRecovery, generation + rolls); + assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); assertThat(translog.uncommittedOperations(), equalTo(0)); if (longRetention) { for (int i = 0; i <= rolls; i++) { @@ -2423,17 +2456,19 @@ public class TranslogTests extends ESTestCase { deletionPolicy.setRetentionAgeInMillis(randomBoolean() ? 100 : -1); assertBusy(() -> { translog.trimUnreferencedReaders(); - for (int i = 0; i < rolls; i++) { - assertFileDeleted(translog, generation + i); + for (long i = 0; i < minGenForRecovery; i++) { + assertFileDeleted(translog, i); } }); } else { // immediate cleanup - for (int i = 0; i < rolls; i++) { - assertFileDeleted(translog, generation + i); + for (long i = 0; i < minGenForRecovery; i++) { + assertFileDeleted(translog, i); } } - assertFileIsPresent(translog, generation + rolls); + for (long i = minGenForRecovery; i < generation + rolls; i++) { + assertFileIsPresent(translog, i); + } } public void testMinSeqNoBasedAPI() throws IOException { @@ -2516,10 +2551,8 @@ public class TranslogTests extends ESTestCase { translog.rollGeneration(); } } - - final long generation = - randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration())); - commit(translog, generation); + long lastGen = randomLongBetween(1, translog.currentFileGeneration()); + commit(translog, randomLongBetween(1, lastGen), lastGen); } public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { @@ -2531,7 +2564,9 @@ public class TranslogTests extends ESTestCase { translog.rollGeneration(); } if (rarely()) { - commit(translog, randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), translog.currentFileGeneration())); + long lastGen = randomLongBetween(deletionPolicy.getTranslogGenerationOfLastCommit(), translog.currentFileGeneration()); + long minGen = randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), lastGen); + commit(translog, minGen, lastGen); } if (frequently()) { long minGen; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 55b7e22eb8a..689627d6642 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.indices.recovery; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.UUIDs; @@ -98,20 +99,41 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { } public void testRecoveryWithOutOfOrderDelete() throws Exception { + /* + * The flow of this test: + * - delete #1 + * - roll generation (to create gen 2) + * - index #0 + * - index #3 + * - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained) + * - index #2 + * - index #5 + * - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed. + */ try (ReplicationGroup shards = createGroup(1)) { shards.startAll(); // create out of order delete and index op on replica final IndexShard orgReplica = shards.getReplicas().get(0); + final String indexName = orgReplica.shardId().getIndexName(); + + // delete #1 orgReplica.applyDeleteOperationOnReplica(1, 2, "type", "id", VersionType.EXTERNAL, u -> {}); orgReplica.getTranslog().rollGeneration(); // isolate the delete in it's own generation + // index #0 orgReplica.applyIndexOperationOnReplica(0, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id", new BytesArray("{}"), XContentType.JSON), - u -> {}); - - // index a second item into the second generation, skipping seq# 2. Local checkpoint is now 1, which will make this generation - // stick around + SourceToParse.source(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON), u -> {}); + // index #3 orgReplica.applyIndexOperationOnReplica(3, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, - SourceToParse.source(orgReplica.shardId().getIndexName(), "type", "id2", new BytesArray("{}"), XContentType.JSON), u -> {}); + SourceToParse.source(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON), u -> {}); + // Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1. + orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true)); + // index #2 + orgReplica.applyIndexOperationOnReplica(2, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON), u -> {}); + orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); + // index #5 -> force NoOp #4. + orgReplica.applyIndexOperationOnReplica(5, 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, + SourceToParse.source(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON), u -> {}); final int translogOps; if (randomBoolean()) { @@ -120,18 +142,17 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { IndexMetaData.Builder builder = IndexMetaData.builder(orgReplica.indexSettings().getIndexMetaData()); builder.settings(Settings.builder().put(orgReplica.indexSettings().getSettings()) .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") - ); + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1")); orgReplica.indexSettings().updateIndexMetaData(builder.build()); orgReplica.onSettingsChanged(); - translogOps = 3; // 2 ops + seqno gaps + translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed). } else { logger.info("--> flushing shard (translog will be retained)"); - translogOps = 4; // 3 ops + seqno gaps + translogOps = 6; // 5 ops + seqno gaps } flushShard(orgReplica); } else { - translogOps = 4; // 3 ops + seqno gaps + translogOps = 6; // 5 ops + seqno gaps } final IndexShard orgPrimary = shards.getPrimary(); @@ -139,7 +160,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); - shards.assertAllEqual(1); + shards.assertAllEqual(3); assertThat(newReplica.getTranslog().totalOperations(), equalTo(translogOps)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 01ad6763a8c..84e750f6e28 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -461,6 +461,13 @@ public abstract class ESTestCase extends LuceneTestCase { return RandomNumbers.randomIntBetween(random(), min, max); } + /** + * A random long number between min (inclusive) and max (inclusive). + */ + public static long randomLongBetween(long min, long max) { + return RandomNumbers.randomLongBetween(random(), min, max); + } + /** * Returns a "scaled" number of iterations for loops which can have a variable * iteration count. This method is effectively