diff --git a/server/src/internalClusterTest/java/org/opensearch/index/seqno/RetentionLeaseIT.java b/server/src/internalClusterTest/java/org/opensearch/index/seqno/RetentionLeaseIT.java index 932e49b9c56..ed6074b39c8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/seqno/RetentionLeaseIT.java @@ -43,7 +43,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; @@ -122,7 +121,7 @@ public class RetentionLeaseIT extends OpenSearchIntegTestCase { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = countDownLatchListener(latch); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -175,7 +174,7 @@ public class RetentionLeaseIT extends OpenSearchIntegTestCase { final CountDownLatch latch = new CountDownLatch(1); final ActionListener listener = countDownLatchListener(latch); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {}; currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener)); latch.await(); retentionLock.close(); @@ -186,7 +185,7 @@ public class RetentionLeaseIT extends OpenSearchIntegTestCase { final CountDownLatch latch = new CountDownLatch(1); primary.removeRetentionLease(id, countDownLatchListener(latch)); // simulate a peer recovery which locks the soft deletes policy on the primary - final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock(Engine.HistorySource.INDEX) : () -> {}; + final Closeable retentionLock = randomBoolean() ? primary.acquireHistoryRetentionLock() : () -> {}; currentRetentionLeases.remove(id); latch.await(); retentionLock.close(); diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 2384bcef264..2d9cba2ee09 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -730,7 +730,7 @@ public abstract class Engine implements Closeable { /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public abstract Closeable acquireHistoryRetentionLock(HistorySource historySource); + public abstract Closeable acquireHistoryRetentionLock(); /** * Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive). @@ -744,51 +744,7 @@ public abstract class Engine implements Closeable { boolean requiredFullRange ) throws IOException; - /** - * Creates a new history snapshot from either Lucene/Translog for reading operations whose seqno in the requesting - * seqno range (both inclusive). - */ - public Translog.Snapshot newChangesSnapshot( - String source, - HistorySource historySource, - MapperService mapperService, - long fromSeqNo, - long toSeqNo, - boolean requiredFullRange - ) throws IOException { - return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange); - } - - /** - * Creates a new history snapshot for reading operations since {@code startingSeqNo} (inclusive). - * The returned snapshot can be retrieved from either Lucene index or translog files. - */ - public abstract Translog.Snapshot readHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException; - - /** - * Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine. - */ - public abstract int estimateNumberOfHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException; - - /** - * Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog) - */ - public abstract boolean hasCompleteOperationHistory( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException; + public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo); /** * Gets the minimum retained sequence number for this engine. @@ -2029,12 +1985,4 @@ public abstract class Engine implements Closeable { * to advance this marker to at least the given sequence number. */ public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary); - - /** - * Whether we should read history operations from translog or Lucene index - */ - public enum HistorySource { - TRANSLOG, - INDEX - } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index a6bd9590b7b..1edd0c67c33 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -608,45 +608,6 @@ public class InternalEngine extends Engine { revisitIndexDeletionPolicyOnTranslogSynced(); } - /** - * Creates a new history snapshot for reading operations since the provided seqno. - * The returned snapshot can be retrieved from either Lucene index or translog files. - */ - @Override - public Translog.Snapshot readHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException { - if (historySource == HistorySource.INDEX) { - return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); - } else { - return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE); - } - } - - /** - * Returns the estimated number of history operations whose seq# at least the provided seq# in this engine. - */ - @Override - public int estimateNumberOfHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) throws IOException { - if (historySource == HistorySource.INDEX) { - try ( - Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false) - ) { - return snapshot.totalOperations(); - } - } else { - return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo); - } - } - @Override public TranslogStats getTranslogStats() { return getTranslog().stats(); @@ -2815,22 +2776,6 @@ public class InternalEngine extends Engine { return numDocUpdates.count(); } - @Override - public Translog.Snapshot newChangesSnapshot( - String source, - HistorySource historySource, - MapperService mapperService, - long fromSeqNo, - long toSeqNo, - boolean requiredFullRange - ) throws IOException { - if (historySource == HistorySource.INDEX) { - return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange); - } else { - return getTranslog().newSnapshot(fromSeqNo, toSeqNo, requiredFullRange); - } - } - @Override public Translog.Snapshot newChangesSnapshot( String source, @@ -2865,28 +2810,8 @@ public class InternalEngine extends Engine { } } - @Override - public boolean hasCompleteOperationHistory(String reason, HistorySource historySource, MapperService mapperService, long startingSeqNo) - throws IOException { - if (historySource == HistorySource.INDEX) { - return getMinRetainedSeqNo() <= startingSeqNo; - } else { - final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - // avoid scanning translog if not necessary - if (startingSeqNo > currentLocalCheckpoint) { - return true; - } - final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE)) { - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - tracker.markSeqNoAsProcessed(operation.seqNo()); - } - } - } - return tracker.getProcessedCheckpoint() >= currentLocalCheckpoint; - } + public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { + return getMinRetainedSeqNo() <= startingSeqNo; } /** @@ -2897,13 +2822,8 @@ public class InternalEngine extends Engine { return softDeletesPolicy.getMinRetainedSeqNo(); } - @Override - public Closeable acquireHistoryRetentionLock(HistorySource historySource) { - if (historySource == HistorySource.INDEX) { - return softDeletesPolicy.acquireRetentionLock(); - } else { - return translog.acquireRetentionLock(); - } + public Closeable acquireHistoryRetentionLock() { + return softDeletesPolicy.acquireRetentionLock(); } /** diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 0491eb0db94..d9cf8e2cd65 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -320,7 +320,7 @@ public class ReadOnlyEngine extends Engine { public void syncTranslog() {} @Override - public Closeable acquireHistoryRetentionLock(HistorySource historySource) { + public Closeable acquireHistoryRetentionLock() { return () -> {}; } @@ -335,33 +335,7 @@ public class ReadOnlyEngine extends Engine { return newEmptySnapshot(); } - @Override - public Translog.Snapshot readHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) { - return newEmptySnapshot(); - } - - @Override - public int estimateNumberOfHistoryOperations( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) { - return 0; - } - - @Override - public boolean hasCompleteOperationHistory( - String reason, - HistorySource historySource, - MapperService mapperService, - long startingSeqNo - ) { + public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { // we can do operation-based recovery if we don't have to replay any operation. return startingSeqNo > seqNoStats.getMaxSeqNo(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 63238ae5ea0..b93fe71e3f7 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2304,23 +2304,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Acquires a lock on the translog files and Lucene soft-deleted documents to prevent them from being trimmed */ - public Closeable acquireHistoryRetentionLock(Engine.HistorySource source) { - return getEngine().acquireHistoryRetentionLock(source); - } - - /** - * Returns the estimated number of history operations whose seq# at least the provided seq# in this shard. - */ - public int estimateNumberOfHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { - return getEngine().estimateNumberOfHistoryOperations(reason, source, mapperService, startingSeqNo); - } - - /** - * Creates a new history snapshot for reading operations since the provided starting seqno (inclusive). - * The returned snapshot can be retrieved from either Lucene index or translog files. - */ - public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { - return getEngine().readHistoryOperations(reason, source, mapperService, startingSeqNo); + public Closeable acquireHistoryRetentionLock() { + return getEngine().acquireHistoryRetentionLock(); } /** @@ -2329,17 +2314,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * the provided starting seqno (inclusive) and ending seqno (inclusive) * The returned snapshot can be retrieved from either Lucene index or translog files. */ - public Translog.Snapshot getHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo, long endSeqNo) - throws IOException { - return getEngine().newChangesSnapshot(reason, source, mapperService, startingSeqNo, endSeqNo, true); + public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException { + return getEngine().newChangesSnapshot(reason, mapperService, startingSeqNo, endSeqNo, true); } /** * Checks if we have a completed history of operations since the given starting seqno (inclusive). - * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock(Engine.HistorySource)} + * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()} */ - public boolean hasCompleteHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException { - return getEngine().hasCompleteOperationHistory(reason, source, mapperService, startingSeqNo); + public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo) { + return getEngine().hasCompleteOperationHistory(reason, startingSeqNo); } /** @@ -2529,7 +2513,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl assert assertPrimaryMode(); verifyNotClosed(); ensureSoftDeletesEnabled("retention leases"); - try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { + try (Closeable ignore = acquireHistoryRetentionLock()) { final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; @@ -2552,7 +2536,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl assert assertPrimaryMode(); verifyNotClosed(); ensureSoftDeletesEnabled("retention leases"); - try (Closeable ignore = acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { + try (Closeable ignore = acquireHistoryRetentionLock()) { final long actualRetainingSequenceNumber = retainingSequenceNumber == RETAIN_ALL ? getMinRetainedSeqNo() : retainingSequenceNumber; diff --git a/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java index b5e40881cfd..bbdf948af5c 100644 --- a/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java @@ -49,7 +49,6 @@ import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.core.internal.io.IOUtils; -import org.opensearch.index.engine.Engine; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.Translog; import org.opensearch.tasks.Task; @@ -99,16 +98,13 @@ public class PrimaryReplicaSyncer { Translog.Snapshot snapshot = null; try { final long startingSeqNo = indexShard.getLastKnownGlobalCheckpoint() + 1; + assert startingSeqNo >= 0 : "startingSeqNo must be non-negative; got [" + startingSeqNo + "]"; final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); final ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - snapshot = indexShard.getHistoryOperations( - "resync", - indexShard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, - startingSeqNo - ); + snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false); final Translog.Snapshot originalSnapshot = snapshot; final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 42910d314a4..dcb7024ae8c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -132,6 +132,7 @@ public class RecoverySourceHandler { private final CancellableThreads cancellableThreads = new CancellableThreads(); private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); + private static final String PEER_RECOVERY_NAME = "peer-recovery"; public RecoverySourceHandler( IndexShard shard, @@ -187,7 +188,6 @@ public class RecoverySourceHandler { IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); }; - final boolean softDeletesEnabled = shard.indexSettings().isSoftDeleteEnabled(); final SetOnce retentionLeaseRef = new SetOnce<>(); runUnderPrimaryPermit(() -> { @@ -211,19 +211,13 @@ public class RecoverySourceHandler { cancellableThreads, logger ); - final Engine.HistorySource historySource; - if (softDeletesEnabled && (shard.useRetentionLeasesInPeerRecovery() || retentionLeaseRef.get() != null)) { - historySource = Engine.HistorySource.INDEX; - } else { - historySource = Engine.HistorySource.TRANSLOG; - } - final Closeable retentionLock = shard.acquireHistoryRetentionLock(historySource); + final Closeable retentionLock = shard.acquireHistoryRetentionLock(); resources.add(retentionLock); final long startingSeqNo; final boolean isSequenceNumberBasedRecovery = request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && isTargetSameHistory() - && shard.hasCompleteHistoryOperations("peer-recovery", historySource, request.startingSeqNo()) - && (historySource == Engine.HistorySource.TRANSLOG + && shard.hasCompleteHistoryOperations(PEER_RECOVERY_NAME, request.startingSeqNo()) + && ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) || (retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo())); // NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease, // because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's @@ -231,7 +225,7 @@ public class RecoverySourceHandler { // Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery // without having a complete history. - if (isSequenceNumberBasedRecovery && softDeletesEnabled && retentionLeaseRef.get() != null) { + if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) { // all the history we need is retained by an existing retention lease, so we do not need a separate retention lock retentionLock.close(); logger.trace("history is retained by {}", retentionLeaseRef.get()); @@ -274,13 +268,11 @@ public class RecoverySourceHandler { // advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can // always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled // down. - startingSeqNo = softDeletesEnabled - ? Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L - : 0; + startingSeqNo = Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L; logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); try { - final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo); + final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo); final Releasable releaseStore = acquireStore(shard.store()); resources.add(releaseStore); sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> { @@ -327,10 +319,7 @@ public class RecoverySourceHandler { sendFileStep.whenComplete(r -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog( - shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo), - prepareEngineStep - ); + prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); }, onFailure); prepareEngineStep.whenComplete(prepareEngineTime -> { @@ -350,11 +339,10 @@ public class RecoverySourceHandler { ); final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); - logger.trace( - "snapshot translog for recovery; current size is [{}]", - shard.estimateNumberOfHistoryOperations("peer-recovery", historySource, startingSeqNo) - ); - final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo); + if (logger.isTraceEnabled()) { + logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo)); + } + final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false); resources.add(phase2Snapshot); retentionLock.close(); @@ -415,6 +403,12 @@ public class RecoverySourceHandler { return targetHistoryUUID.equals(shard.getHistoryUUID()); } + private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) { + return snapshot.totalOperations(); + } + } + static void runUnderPrimaryPermit( CancellableThreads.Interruptible runnable, String reason, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index b4bcec32733..3ea7cad528e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -344,11 +344,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private boolean hasUncommittedOperations() throws IOException { long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - return indexShard.estimateNumberOfHistoryOperations( - "peer-recovery", - indexShard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, - localCheckpointOfCommit + 1 - ) > 0; + try ( + Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false) + ) { + return snapshot.totalOperations() > 0; + } } @Override diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 980ee4fd95a..359f73ff3d5 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -6163,10 +6163,8 @@ public class InternalEngineTests extends EngineTestCase { } } MapperService mapperService = createMapperService("test"); - List luceneOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.INDEX, mapperService); - List translogOps = readAllOperationsBasedOnSource(engine, Engine.HistorySource.TRANSLOG, mapperService); + List luceneOps = readAllOperationsBasedOnSource(engine, mapperService); assertThat(luceneOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); - assertThat(translogOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); } } @@ -6317,7 +6315,7 @@ public class InternalEngineTests extends EngineTestCase { if (rarely()) { engine.forceMerge(randomBoolean(), 1, false, false, false, UUIDs.randomBase64UUID()); } - try (Closeable ignored = engine.acquireHistoryRetentionLock(Engine.HistorySource.INDEX)) { + try (Closeable ignored = engine.acquireHistoryRetentionLock()) { long minRetainSeqNos = engine.getMinRetainedSeqNo(); assertThat(minRetainSeqNos, lessThanOrEqualTo(globalCheckpoint.get() + 1)); Long[] expectedOps = existingSeqNos.stream().filter(seqno -> seqno >= minRetainSeqNos).toArray(Long[]::new); diff --git a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java index 5a366574fd3..2167bc7c2c0 100644 --- a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java @@ -508,18 +508,12 @@ public class IndexLevelReplicationTests extends OpenSearchIndexLevelReplicationT assertThat(snapshot.totalOperations(), equalTo(0)); } } - try ( - Translog.Snapshot snapshot = shard.getHistoryOperations( - "test", - shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, - 0 - ) - ) { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } // the failure replicated directly from the replication channel. - indexResp = shards.index(new IndexRequest(index.getName(), "type", "any").source("{}", XContentType.JSON)); + indexResp = shards.index(new IndexRequest(index.getName()).id("any").source("{}", XContentType.JSON)); assertThat(indexResp.getFailure().getCause(), equalTo(indexException)); Translog.NoOp noop2 = new Translog.NoOp(1, primaryTerm, indexException.toString()); expectedTranslogOps.add(noop2); @@ -532,13 +526,7 @@ public class IndexLevelReplicationTests extends OpenSearchIndexLevelReplicationT assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2))); } } - try ( - Translog.Snapshot snapshot = shard.getHistoryOperations( - "test", - shard.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, - 0 - ) - ) { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } diff --git a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java index 4a9b445c12f..1c3fa908f11 100644 --- a/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/PrimaryReplicaSyncerTests.java @@ -53,34 +53,23 @@ import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.VersionType; -import org.opensearch.index.engine.Engine; import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.index.translog.TestTranslog; -import org.opensearch.index.translog.Translog; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; public class PrimaryReplicaSyncerTests extends IndexShardTestCase { @@ -238,41 +227,6 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase { } } - public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception { - IndexShard shard = spy(newStartedShard(true)); - when(shard.getLastKnownGlobalCheckpoint()).thenReturn(SequenceNumbers.UNASSIGNED_SEQ_NO); - int numOps = between(0, 20); - List operations = new ArrayList<>(); - for (int i = 0; i < numOps; i++) { - operations.add( - new Translog.Index( - "_doc", - Integer.toString(i), - randomBoolean() ? SequenceNumbers.UNASSIGNED_SEQ_NO : i, - primaryTerm, - new byte[] { 1 } - ) - ); - } - Engine.HistorySource source = shard.indexSettings.isSoftDeleteEnabled() - ? Engine.HistorySource.INDEX - : Engine.HistorySource.TRANSLOG; - doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), eq(source), anyLong()); - TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); - List sentOperations = new ArrayList<>(); - PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { - sentOperations.addAll(Arrays.asList(request.getOperations())); - listener.onResponse(new ResyncReplicationResponse()); - }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction); - syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); - PlainActionFuture fut = new PlainActionFuture<>(); - syncer.resync(shard, fut); - fut.actionGet(); - assertThat(sentOperations, equalTo(operations.stream().filter(op -> op.seqNo() >= 0).collect(Collectors.toList()))); - closeShards(shard); - } - public void testStatusSerialization() throws IOException { PrimaryReplicaSyncer.ResyncTask.Status status = new PrimaryReplicaSyncer.ResyncTask.Status( randomAlphaOfLength(10), diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index d105da65658..0e6b959ee46 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -41,7 +41,6 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; -import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.bulk.BulkShardRequest; @@ -69,7 +68,6 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; -import org.opensearch.test.VersionUtils; import java.io.IOException; import java.util.HashMap; @@ -146,108 +144,6 @@ public class RecoveryTests extends OpenSearchIndexLevelReplicationTestCase { } } - public void testRecoveryWithOutOfOrderDeleteWithTranslog() throws Exception { - /* - * The flow of this test: - * - delete #1 - * - roll generation (to create gen 2) - * - index #0 - * - index #3 - * - flush (commit point has max_seqno 3, and local checkpoint 1 -> points at gen 2, previous commit point is maintained) - * - index #2 - * - index #5 - * - If flush and the translog retention disabled, delete #1 will be removed while index #0 is still retained and replayed. - */ - Settings settings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, VersionUtils.randomPreviousCompatibleVersion(random(), Version.V_2_0_0)) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) - .build(); - try (ReplicationGroup shards = createGroup(1, settings)) { - shards.startAll(); - // create out of order delete and index op on replica - final IndexShard orgReplica = shards.getReplicas().get(0); - final String indexName = orgReplica.shardId().getIndexName(); - final long primaryTerm = orgReplica.getOperationPrimaryTerm(); - - // delete #1 - orgReplica.advanceMaxSeqNoOfUpdatesOrDeletes(1); // manually advance msu for this delete - orgReplica.applyDeleteOperationOnReplica(1, primaryTerm, 2, "type", "id"); - getTranslog(orgReplica).rollGeneration(); // isolate the delete in it's own generation - // index #0 - orgReplica.applyIndexOperationOnReplica( - 0, - primaryTerm, - 1, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false, - new SourceToParse(indexName, "type", "id", new BytesArray("{}"), XContentType.JSON) - ); - // index #3 - orgReplica.applyIndexOperationOnReplica( - 3, - primaryTerm, - 1, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false, - new SourceToParse(indexName, "type", "id-3", new BytesArray("{}"), XContentType.JSON) - ); - // Flushing a new commit with local checkpoint=1 allows to delete the translog gen #1. - orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true)); - // index #2 - orgReplica.applyIndexOperationOnReplica( - 2, - primaryTerm, - 1, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false, - new SourceToParse(indexName, "type", "id-2", new BytesArray("{}"), XContentType.JSON) - ); - orgReplica.sync(); // advance local checkpoint - orgReplica.updateGlobalCheckpointOnReplica(3L, "test"); - // index #5 -> force NoOp #4. - orgReplica.applyIndexOperationOnReplica( - 5, - primaryTerm, - 1, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, - false, - new SourceToParse(indexName, "type", "id-5", new BytesArray("{}"), XContentType.JSON) - ); - - final int translogOps; - if (randomBoolean()) { - if (randomBoolean()) { - logger.info("--> flushing shard (translog will be trimmed)"); - IndexMetadata.Builder builder = IndexMetadata.builder(orgReplica.indexSettings().getIndexMetadata()); - builder.settings( - Settings.builder() - .put(orgReplica.indexSettings().getSettings()) - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") - .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1") - ); - orgReplica.indexSettings().updateIndexMetadata(builder.build()); - orgReplica.onSettingsChanged(); - translogOps = 5; // 4 ops + seqno gaps (delete #1 is removed but index #0 will be replayed). - } else { - logger.info("--> flushing shard (translog will be retained)"); - translogOps = 6; // 5 ops + seqno gaps - } - flushShard(orgReplica); - } else { - translogOps = 6; // 5 ops + seqno gaps - } - - final IndexShard orgPrimary = shards.getPrimary(); - shards.promoteReplicaToPrimary(orgReplica).get(); // wait for primary/replica sync to make sure seq# gap is closed. - - IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); - shards.recoverReplica(newReplica); - shards.assertAllEqual(3); - - assertThat(getTranslog(newReplica).totalOperations(), equalTo(translogOps)); - } - } - public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { Settings settings = Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) @@ -329,7 +225,7 @@ public class RecoveryTests extends OpenSearchIndexLevelReplicationTestCase { IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); shards.assertAllEqual(3); - try (Translog.Snapshot snapshot = newReplica.getHistoryOperations("test", Engine.HistorySource.INDEX, 0)) { + try (Translog.Snapshot snapshot = newReplica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { assertThat(snapshot, SnapshotMatchers.size(6)); } } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index f723570272c..24d24cd9f1a 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -1350,13 +1350,9 @@ public abstract class EngineTestCase extends OpenSearchTestCase { /** * Reads all engine operations that have been processed by the engine from Lucene index/Translog based on source. */ - public static List readAllOperationsBasedOnSource( - Engine engine, - Engine.HistorySource historySource, - MapperService mapper - ) throws IOException { + public static List readAllOperationsBasedOnSource(Engine engine, MapperService mapper) throws IOException { final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", historySource, mapper, 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", mapper, 0, Long.MAX_VALUE, false)) { Translog.Operation op; while ((op = snapshot.next()) != null) { operations.add(op);