diff --git a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index d603a40b5d3..6a61843c263 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/core/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -19,14 +19,17 @@ package org.elasticsearch.index.engine; +import com.carrotsearch.hppc.ObjectIntHashMap; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.store.Directory; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogDeletionPolicy; import java.io.IOException; import java.nio.file.Path; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.function.LongSupplier; @@ -42,12 +45,16 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { private final TranslogDeletionPolicy translogDeletionPolicy; private final EngineConfig.OpenMode openMode; private final LongSupplier globalCheckpointSupplier; + private final ObjectIntHashMap snapshottedCommits; // Number of snapshots held against each commit point. + private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. + private IndexCommit lastCommit; // the most recent commit point CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier) { this.openMode = openMode; this.translogDeletionPolicy = translogDeletionPolicy; this.globalCheckpointSupplier = globalCheckpointSupplier; + this.snapshottedCommits = new ObjectIntHashMap<>(); } @Override @@ -70,18 +77,22 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { } @Override - public void onCommit(List commits) throws IOException { + public synchronized void onCommit(List commits) throws IOException { final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong()); + lastCommit = commits.get(commits.size() - 1); + safeCommit = commits.get(keptPosition); for (int i = 0; i < keptPosition; i++) { - commits.get(i).delete(); + if (snapshottedCommits.containsKey(commits.get(i)) == false) { + commits.get(i).delete(); + } } - updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1)); + updateTranslogDeletionPolicy(); } - private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException { - assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted"; - final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - + private void updateTranslogDeletionPolicy() throws IOException { + assert Thread.holdsLock(this); + assert safeCommit.isDeleted() == false : "The safe commit must not be deleted"; + final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); assert lastCommit.isDeleted() == false : "The last commit must not be deleted"; final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); @@ -90,6 +101,34 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); } + /** + * Captures the most recent commit point {@link #lastCommit} or the most recent safe commit point {@link #safeCommit}. + * Index files of the capturing commit point won't be released until the commit reference is closed. + * + * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point. + */ + synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { + assert safeCommit != null : "Safe commit is not initialized yet"; + assert lastCommit != null : "Last commit is not initialized yet"; + final IndexCommit snapshotting = acquiringSafeCommit ? safeCommit : lastCommit; + snapshottedCommits.addTo(snapshotting, 1); // increase refCount + return new SnapshotIndexCommit(snapshotting); + } + + /** + * Releases an index commit that acquired by {@link #acquireIndexCommit(boolean)}. + */ + synchronized void releaseCommit(final IndexCommit snapshotCommit) { + final IndexCommit releasingCommit = ((SnapshotIndexCommit) snapshotCommit).delegate; + assert snapshottedCommits.containsKey(releasingCommit) : "Release non-snapshotted commit;" + + "snapshotted commits [" + snapshottedCommits + "], releasing commit [" + releasingCommit + "]"; + final int refCount = snapshottedCommits.addTo(releasingCommit, -1); // release refCount + assert refCount >= 0 : "Number of snapshots can not be negative [" + refCount + "]"; + if (refCount == 0) { + snapshottedCommits.remove(releasingCommit); + } + } + /** * Find a safe commit point from a list of existing commits based on the supplied global checkpoint. * The max sequence number of a safe commit point should be at most the global checkpoint. @@ -139,4 +178,60 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy { */ return 0; } + + /** + * A wrapper of an index commit that prevents it from being deleted. + */ + private static class SnapshotIndexCommit extends IndexCommit { + private final IndexCommit delegate; + + SnapshotIndexCommit(IndexCommit delegate) { + this.delegate = delegate; + } + + @Override + public String getSegmentsFileName() { + return delegate.getSegmentsFileName(); + } + + @Override + public Collection getFileNames() throws IOException { + return delegate.getFileNames(); + } + + @Override + public Directory getDirectory() { + return delegate.getDirectory(); + } + + @Override + public void delete() { + throw new UnsupportedOperationException("A snapshot commit does not support deletion"); + } + + @Override + public boolean isDeleted() { + return delegate.isDeleted(); + } + + @Override + public int getSegmentCount() { + return delegate.getSegmentCount(); + } + + @Override + public long getGeneration() { + return delegate.getGeneration(); + } + + @Override + public Map getUserData() throws IOException { + return delegate.getUserData(); + } + + @Override + public String toString() { + return "SnapshotIndexCommit{" + delegate + "}"; + } + } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 6d37502bd60..5de7062ab18 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -32,7 +32,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentReader; -import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ReferenceManager; @@ -92,7 +91,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; -import java.util.stream.Collectors; public abstract class Engine implements Closeable { @@ -568,7 +566,7 @@ public abstract class Engine implements Closeable { * @return the sequence number service */ public abstract LocalCheckpointTracker getLocalCheckpointTracker(); - + /** * Global stats on segments. */ @@ -859,9 +857,10 @@ public abstract class Engine implements Closeable { * Snapshots the index and returns a handle to it. If needed will try and "commit" the * lucene index to make sure we have a "fresh" copy of the files to snapshot. * + * @param safeCommit indicates whether the engine should acquire the most recent safe commit, or the most recent commit. * @param flushFirst indicates whether the engine should flush before returning the snapshot */ - public abstract IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException; + public abstract IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException; /** * fail engine due to some error. the engine will also be closed. @@ -1437,9 +1436,9 @@ public abstract class Engine implements Closeable { private final CheckedRunnable onClose; private final IndexCommit indexCommit; - IndexCommitRef(SnapshotDeletionPolicy deletionPolicy) throws IOException { - indexCommit = deletionPolicy.snapshot(); - onClose = () -> deletionPolicy.release(indexCommit); + IndexCommitRef(IndexCommit indexCommit, CheckedRunnable onClose) { + this.indexCommit = indexCommit; + this.onClose = onClose; } @Override diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a0d1fa92a2e..77b82752770 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -125,7 +125,7 @@ public class InternalEngine extends Engine { private final String uidField; - private final SnapshotDeletionPolicy snapshotDeletionPolicy; + private final CombinedDeletionPolicy combinedDeletionPolicy; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling @@ -184,9 +184,8 @@ public class InternalEngine extends Engine { assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null : "Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]"; this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit); - this.snapshotDeletionPolicy = new SnapshotDeletionPolicy( - new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint) - ); + this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy, + translog::getLastSyncedGlobalCheckpoint); writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit); updateMaxUnsafeAutoIdTimestampFromWriter(writer); assert engineConfig.getForceNewHistoryUUID() == false @@ -1644,7 +1643,7 @@ public class InternalEngine extends Engine { } @Override - public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws EngineException { + public IndexCommitRef acquireIndexCommit(final boolean safeCommit, final boolean flushFirst) throws EngineException { // we have to flush outside of the readlock otherwise we might have a problem upgrading // the to a write lock when we fail the engine in this operation if (flushFirst) { @@ -1652,12 +1651,8 @@ public class InternalEngine extends Engine { flush(false, true); logger.trace("finish flush for snapshot"); } - try (ReleasableLock lock = readLock.acquire()) { - logger.trace("pulling snapshot"); - return new IndexCommitRef(snapshotDeletionPolicy); - } catch (IOException e) { - throw new SnapshotFailedEngineException(shardId, e); - } + final IndexCommit snapshotCommit = combinedDeletionPolicy.acquireIndexCommit(safeCommit); + return new Engine.IndexCommitRef(snapshotCommit, () -> combinedDeletionPolicy.releaseCommit(snapshotCommit)); } private boolean failOnTragicEvent(AlreadyClosedException ex) { @@ -1828,7 +1823,7 @@ public class InternalEngine extends Engine { iwc.setCommitOnClose(false); // we by default don't commit on close iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); iwc.setIndexCommit(startingCommit); - iwc.setIndexDeletionPolicy(snapshotDeletionPolicy); + iwc.setIndexDeletionPolicy(combinedDeletionPolicy); // with tests.verbose, lucene sets this up: plumb to align with filesystem stream boolean verbose = false; try { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f5eba1b4f62..4c6c6a17c23 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1085,13 +1085,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * Creates a new {@link IndexCommit} snapshot form the currently running engine. All resources referenced by this * commit won't be freed until the commit / snapshot is closed. * + * @param safeCommit true capture the most recent safe commit point; otherwise the most recent commit point. * @param flushFirst true if the index should first be flushed to disk / a low level lucene commit should be executed */ - public Engine.IndexCommitRef acquireIndexCommit(boolean flushFirst) throws EngineException { + public Engine.IndexCommitRef acquireIndexCommit(boolean safeCommit, boolean flushFirst) throws EngineException { IndexShardState state = this.state; // one time volatile read // we allow snapshot on closed index shard, since we want to do one after we close the shard and before we close the engine if (state == IndexShardState.STARTED || state == IndexShardState.RELOCATED || state == IndexShardState.CLOSED) { - return getEngine().acquireIndexCommit(flushFirst); + return getEngine().acquireIndexCommit(safeCommit, flushFirst); } else { throw new IllegalIndexShardStateException(shardId, state, "snapshot is not allowed"); } @@ -1125,7 +1126,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return store.getMetadata(null, true); } } - indexCommit = engine.acquireIndexCommit(false); + indexCommit = engine.acquireIndexCommit(false, false); return store.getMetadata(indexCommit.getIndexCommit()); } finally { store.decRef(); diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index e156e988c87..f8f92fbb5fa 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -48,7 +48,7 @@ final class LocalShardSnapshot implements Closeable { store.incRef(); boolean success = false; try { - indexCommit = shard.acquireIndexCommit(true); + indexCommit = shard.acquireIndexCommit(false, true); success = true; } finally { if (success == false) { diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index 41878c46011..dab39c26a3c 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -246,7 +246,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard - * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed + * {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * @param commit the index commit to read the snapshot from or null if the latest snapshot should be read from the * directory * @throws CorruptIndexException if the lucene index is corrupted. This can be caused by a checksum mismatch or an @@ -270,7 +270,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * * {@link #readMetadataSnapshot(Path, ShardId, NodeEnvironment.ShardLocker, Logger)} to read a meta data while locking * {@link IndexShard#snapshotStoreMetadata()} to safely read from an existing shard - * {@link IndexShard#acquireIndexCommit(boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed + * {@link IndexShard#acquireIndexCommit(boolean, boolean)} to get an {@link IndexCommit} which is safe to use but has to be freed * * @param commit the index commit to read the snapshot from or null if the latest snapshot should be read from the * directory diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 4ebce1c0b4b..7afe6c977da 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -159,7 +159,7 @@ public class RecoverySourceHandler { } else { final Engine.IndexCommitRef phase1Snapshot; try { - phase1Snapshot = shard.acquireIndexCommit(false); + phase1Snapshot = shard.acquireIndexCommit(true, false); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 35e0b10fd87..7e2a7aab277 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -390,7 +390,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements final Repository repository = snapshotsService.getRepositoriesService().repository(snapshot.getRepository()); try { // we flush first to make sure we get the latest writes snapshotted - try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(true)) { + try (Engine.IndexCommitRef snapshotRef = indexShard.acquireIndexCommit(false, true)) { repository.snapshotShard(indexShard, snapshot.getSnapshotId(), indexId, snapshotRef.getIndexCommit(), snapshotStatus); if (logger.isDebugEnabled()) { final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); diff --git a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 0fc6195161a..ca75c70137b 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -21,7 +21,7 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.LongArrayList; import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.SnapshotDeletionPolicy; +import org.apache.lucene.store.Directory; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogDeletionPolicy; @@ -34,14 +34,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static java.util.Collections.singletonList; import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; import static org.elasticsearch.index.engine.EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -88,29 +89,64 @@ public class CombinedDeletionPolicyTests extends ESTestCase { assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); } - public void testIgnoreSnapshottingCommits() throws Exception { + public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(OPEN_INDEX_AND_TRANSLOG, translogPolicy, globalCheckpoint::get); - - long firstMaxSeqNo = randomLongBetween(0, Long.MAX_VALUE - 1); - long secondMaxSeqNo = randomLongBetween(firstMaxSeqNo + 1, Long.MAX_VALUE); - - long lastTranslogGen = randomNonNegativeLong(); - final IndexCommit firstCommit = mockIndexCommit(firstMaxSeqNo, translogUUID, randomLongBetween(0, lastTranslogGen)); - final IndexCommit secondCommit = mockIndexCommit(secondMaxSeqNo, translogUUID, lastTranslogGen); - SnapshotDeletionPolicy snapshotDeletionPolicy = new SnapshotDeletionPolicy(indexPolicy); - - snapshotDeletionPolicy.onInit(Arrays.asList(firstCommit)); - snapshotDeletionPolicy.snapshot(); - assertThat(snapshotDeletionPolicy.getSnapshots(), contains(firstCommit)); - - // SnapshotPolicy prevents the first commit from deleting, but CombinedPolicy does not retain its translog. - globalCheckpoint.set(randomLongBetween(secondMaxSeqNo, Long.MAX_VALUE)); - snapshotDeletionPolicy.onCommit(Arrays.asList(firstCommit, secondCommit)); - verify(firstCommit, never()).delete(); - verify(secondCommit, never()).delete(); + long lastMaxSeqNo = between(1, 1000); + long lastTranslogGen = between(1, 20); + int safeIndex = 0; + List commitList = new ArrayList<>(); + List snapshottingCommits = new ArrayList<>(); + final int iters = between(10, 100); + for (int i = 0; i < iters; i++) { + int newCommits = between(1, 10); + for (int n = 0; n < newCommits; n++) { + lastMaxSeqNo += between(1, 1000); + lastTranslogGen += between(1, 20); + commitList.add(mockIndexCommit(lastMaxSeqNo, translogUUID, lastTranslogGen)); + } + // Advance the global checkpoint to between [safeIndex, safeIndex + 1) + safeIndex = randomIntBetween(safeIndex, commitList.size() - 1); + long lower = Math.max(globalCheckpoint.get(), + Long.parseLong(commitList.get(safeIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO))); + long upper = safeIndex == commitList.size() - 1 ? lastMaxSeqNo : + Long.parseLong(commitList.get(safeIndex + 1).getUserData().get(SequenceNumbers.MAX_SEQ_NO)) - 1; + globalCheckpoint.set(randomLongBetween(lower, upper)); + indexPolicy.onCommit(commitList); + // Captures and releases some commits + int captures = between(0, 5); + for (int n = 0; n < captures; n++) { + boolean safe = randomBoolean(); + final IndexCommit snapshot = indexPolicy.acquireIndexCommit(safe); + expectThrows(UnsupportedOperationException.class, snapshot::delete); + snapshottingCommits.add(snapshot); + if (safe) { + assertThat(snapshot.getUserData(), equalTo(commitList.get(safeIndex).getUserData())); + } else { + assertThat(snapshot.getUserData(), equalTo(commitList.get(commitList.size() - 1).getUserData())); + } + } + randomSubsetOf(snapshottingCommits).forEach(snapshot -> { + snapshottingCommits.remove(snapshot); + indexPolicy.releaseCommit(snapshot); + }); + // Snapshotting commits must not be deleted. + snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false))); + // We don't need to retain translog for snapshotting commits. + assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), + equalTo(Long.parseLong(commitList.get(safeIndex).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), + equalTo(Long.parseLong(commitList.get(commitList.size() - 1).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + } + snapshottingCommits.forEach(indexPolicy::releaseCommit); + globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); + indexPolicy.onCommit(commitList); + for (int i = 0; i < commitList.size() - 1; i++) { + assertThat(commitList.get(i).isDeleted(), equalTo(true)); + } + assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false)); assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); } @@ -180,8 +216,16 @@ public class CombinedDeletionPolicyTests extends ESTestCase { userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); + final AtomicBoolean deleted = new AtomicBoolean(); final IndexCommit commit = mock(IndexCommit.class); + final Directory directory = mock(Directory.class); when(commit.getUserData()).thenReturn(userData); + when(commit.getDirectory()).thenReturn(directory); + when(commit.isDeleted()).thenAnswer(args -> deleted.get()); + doAnswer(arg -> { + deleted.set(true); + return null; + }).when(commit).delete(); return commit; } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f20aaedaff7..9755304d2f1 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -167,6 +167,7 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -2115,7 +2116,7 @@ public class InternalEngineTests extends EngineTestCase { boolean doneIndexing; do { doneIndexing = doneLatch.await(sleepTime, TimeUnit.MILLISECONDS); - commits.add(engine.acquireIndexCommit(true)); + commits.add(engine.acquireIndexCommit(false, true)); if (commits.size() > commitLimit) { // don't keep on piling up too many commits IOUtils.close(commits.remove(randomIntBetween(0, commits.size()-1))); // we increase the wait time to make sure we eventually if things are slow wait for threads to finish. @@ -4319,4 +4320,38 @@ public class InternalEngineTests extends EngineTestCase { assertEquals(totalNumDocs, searcher.reader().numDocs()); } } + + public void testAcquireIndexCommit() throws Exception { + IOUtils.close(engine, store); + store = createStore(); + final AtomicLong globalCheckpoint = new AtomicLong(); + try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { + int numDocs = between(1, 20); + for (int i = 0; i < numDocs; i++) { + index(engine, i); + } + final boolean inSync = randomBoolean(); + if (inSync) { + globalCheckpoint.set(numDocs - 1); + } + final boolean flushFirst = randomBoolean(); + final boolean safeCommit = randomBoolean(); + Engine.IndexCommitRef commit = engine.acquireIndexCommit(safeCommit, flushFirst); + int moreDocs = between(1, 20); + for (int i = 0; i < moreDocs; i++) { + index(engine, numDocs + i); + } + globalCheckpoint.set(numDocs + moreDocs - 1); + engine.flush(); + // check that we can still read the commit that we captured + try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) { + assertThat(reader.numDocs(), equalTo(flushFirst && (safeCommit == false || inSync) ? numDocs : 0)); + } + assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(2)); + commit.close(); + // check it's clean up + engine.flush(true, true); + assertThat(DirectoryReader.listCommits(engine.store.directory()), hasSize(1)); + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 972a278ba5d..48887aa4c11 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1045,41 +1045,6 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(indexShard); } - public void testAcquireIndexCommit() throws Exception { - boolean isPrimary = randomBoolean(); - final IndexShard shard = newStartedShard(isPrimary); - int numDocs = randomInt(20); - for (int i = 0; i < numDocs; i++) { - indexDoc(shard, "type", "id_" + i); - } - final boolean flushFirst = randomBoolean(); - Engine.IndexCommitRef commit = shard.acquireIndexCommit(flushFirst); - int moreDocs = randomInt(20); - for (int i = 0; i < moreDocs; i++) { - indexDoc(shard, "type", "id_" + numDocs + i); - } - flushShard(shard); - // check that we can still read the commit that we captured - try (IndexReader reader = DirectoryReader.open(commit.getIndexCommit())) { - assertThat(reader.numDocs(), equalTo(flushFirst ? numDocs : 0)); - } - commit.close(); - // Make the global checkpoint in sync with the local checkpoint. - if (isPrimary) { - final String allocationId = shard.shardRouting.allocationId().getId(); - shard.updateLocalCheckpointForShard(allocationId, numDocs + moreDocs - 1); - shard.updateGlobalCheckpointForShard(allocationId, shard.getLocalCheckpoint()); - } else { - shard.updateGlobalCheckpointOnReplica(numDocs + moreDocs - 1, "test"); - } - flushShard(shard, true); - - // check it's clean up - assertThat(DirectoryReader.listCommits(shard.store().directory()), hasSize(1)); - - closeShards(shard); - } - /*** * test one can snapshot the store at various lifecycle stages */ diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index cf5f24d2a6e..4963c1b74a5 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -396,7 +396,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class)); when(shard.segmentStats(anyBoolean())).thenReturn(mock(SegmentsStats.class)); when(shard.state()).thenReturn(IndexShardState.RELOCATED); - when(shard.acquireIndexCommit(anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); + when(shard.acquireIndexCommit(anyBoolean(), anyBoolean())).thenReturn(mock(Engine.IndexCommitRef.class)); doAnswer(invocation -> { ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index 4a449463b5e..85dc3a5fc39 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.indices.recovery; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.NoMergePolicy; @@ -27,6 +29,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; @@ -36,11 +39,13 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.replication.RecoveryDuringReplicationTests; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -48,6 +53,7 @@ import java.util.concurrent.Future; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; public class RecoveryTests extends ESIndexLevelReplicationTestCase { @@ -241,4 +247,28 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase { assertThat(replica.getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(numDocs - 1)); } } + + public void testPeerRecoverySendSafeCommitInFileBased() throws Exception { + IndexShard primaryShard = newStartedShard(true); + int numDocs = between(1, 100); + long globalCheckpoint = 0; + for (int i = 0; i < numDocs; i++) { + primaryShard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, + SourceToParse.source(primaryShard.shardId().getIndexName(), "test", Integer.toString(i), new BytesArray("{}"), + XContentType.JSON), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, getMappingUpdater(primaryShard, "test")); + if (randomBoolean()) { + globalCheckpoint = randomLongBetween(globalCheckpoint, i); + primaryShard.updateLocalCheckpointForShard(primaryShard.routingEntry().allocationId().getId(), globalCheckpoint); + primaryShard.updateGlobalCheckpointForShard(primaryShard.routingEntry().allocationId().getId(), globalCheckpoint); + primaryShard.flush(new FlushRequest()); + } + } + IndexShard replicaShard = newShard(primaryShard.shardId(), false); + updateMappings(replicaShard, primaryShard.indexSettings().getIndexMetaData()); + recoverReplica(replicaShard, primaryShard); + List commits = DirectoryReader.listCommits(replicaShard.store().directory()); + long maxSeqNo = Long.parseLong(commits.get(0).getUserData().get(SequenceNumbers.MAX_SEQ_NO)); + assertThat(maxSeqNo, lessThanOrEqualTo(globalCheckpoint)); + closeShards(primaryShard, replicaShard); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 4737befa30e..cd55c1126eb 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -620,7 +620,7 @@ public abstract class IndexShardTestCase extends ESTestCase { final Snapshot snapshot, final Repository repository) throws IOException { final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(); - try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(true)) { + try (Engine.IndexCommitRef indexCommitRef = shard.acquireIndexCommit(false, true)) { Index index = shard.shardId().getIndex(); IndexId indexId = new IndexId(index.getName(), index.getUUID());