From 57ac788bb6e46074860212c5d587bc9525ef1f6c Mon Sep 17 00:00:00 2001 From: Nick Knize Date: Wed, 26 Jan 2022 12:28:24 -0600 Subject: [PATCH] [Refactor] InternalEngine to always use soft deletes (#1933) Soft Deletes have been enabled by default since Legacy version 7.0 and made mandatory in Version 2.0.0. This commit refactors the InternalEngine to always use soft-deletes. It is a follow on to making soft deletes mandatory in 2.0.0. Signed-off-by: Nicholas Walter Knize --- .../upgrades/FullClusterRestartIT.java | 6 + .../indices/stats/IndexStatsIT.java | 34 ++--- .../index/engine/InternalEngine.java | 138 +++++++----------- .../index/engine/ReadOnlyEngine.java | 10 +- .../index/mapper/SourceFieldMapper.java | 2 +- .../index/engine/InternalEngineTests.java | 115 ++------------- .../index/shard/IndexShardTests.java | 71 ++++----- .../index/engine/EngineTestCase.java | 13 +- 8 files changed, 132 insertions(+), 257 deletions(-) diff --git a/qa/full-cluster-restart/src/test/java/org/opensearch/upgrades/FullClusterRestartIT.java b/qa/full-cluster-restart/src/test/java/org/opensearch/upgrades/FullClusterRestartIT.java index 81b6408b213..8f1ff300236 100644 --- a/qa/full-cluster-restart/src/test/java/org/opensearch/upgrades/FullClusterRestartIT.java +++ b/qa/full-cluster-restart/src/test/java/org/opensearch/upgrades/FullClusterRestartIT.java @@ -719,6 +719,12 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase { * or not we have one. */ shouldHaveTranslog = randomBoolean(); + Settings.Builder settings = Settings.builder(); + if (minimumNodeVersion().before(Version.V_2_0_0) && randomBoolean()) { + settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean()); + } + createIndex(index, settings.build()); + indexRandomDocuments(count, true, true, i -> jsonBuilder().startObject().field("field", "value").endObject()); // make sure all recoveries are done diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index 4dd6646670b..b247c9b3f39 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -1248,9 +1248,7 @@ public class IndexStatsIT extends OpenSearchIntegTestCase { client().prepareIndex("index", "type", "1").setSource("foo", "bar"), client().prepareIndex("index", "type", "2").setSource("foo", "baz") ); - if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { - persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP. - } + persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP. refresh(); ensureGreen(); @@ -1287,22 +1285,20 @@ public class IndexStatsIT extends OpenSearchIntegTestCase { // Here we are testing that a fully deleted segment should be dropped and its cached is evicted. // In order to instruct the merge policy not to keep a fully deleted segment, // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. - if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { - persistGlobalCheckpoint("index"); - assertBusy(() -> { - for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) { - final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); - assertTrue( - shardStats.getRetentionLeaseStats() - .retentionLeases() - .leases() - .stream() - .allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1) - ); - } - }); - flush("index"); - } + persistGlobalCheckpoint("index"); + assertBusy(() -> { + for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) { + final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo(); + assertTrue( + shardStats.getRetentionLeaseStats() + .retentionLeases() + .leases() + .stream() + .allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1) + ); + } + }); + flush("index"); logger.info("--> force merging to a single segment"); ForceMergeResponse forceMergeResponse = client().admin() .indices() diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 3fdeaf13ae5..3c9c2d2d591 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -41,6 +41,7 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; @@ -184,7 +185,6 @@ public class InternalEngine extends Engine { private final CounterMetric numDocAppends = new CounterMetric(); private final CounterMetric numDocUpdates = new CounterMetric(); private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField(); - private final boolean softDeleteEnabled; private final SoftDeletesPolicy softDeletesPolicy; private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; @@ -262,7 +262,6 @@ public class InternalEngine extends Engine { }); assert translog.getGeneration() != null; this.translog = translog; - this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled(); this.softDeletesPolicy = newSoftDeletesPolicy(); this.combinedDeletionPolicy = new CombinedDeletionPolicy( logger, @@ -305,7 +304,7 @@ public class InternalEngine extends Engine { this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); - if (softDeleteEnabled && localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { + if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) { restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); } catch (IOException e) { @@ -621,7 +620,6 @@ public class InternalEngine extends Engine { long startingSeqNo ) throws IOException { if (historySource == HistorySource.INDEX) { - ensureSoftDeletesEnabled(); return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); } else { return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE); @@ -639,7 +637,6 @@ public class InternalEngine extends Engine { long startingSeqNo ) throws IOException { if (historySource == HistorySource.INDEX) { - ensureSoftDeletesEnabled(); try ( Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false) ) { @@ -863,8 +860,10 @@ public class InternalEngine extends Engine { } else if (op.seqNo() > docAndSeqNo.seqNo) { status = OpVsLuceneDocStatus.OP_NEWER; } else if (op.seqNo() == docAndSeqNo.seqNo) { - assert localCheckpointTracker.hasProcessed(op.seqNo()) - || softDeleteEnabled == false : "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id(); + assert localCheckpointTracker.hasProcessed(op.seqNo()) : "local checkpoint tracker is not updated seq_no=" + + op.seqNo() + + " id=" + + op.id(); status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; } else { status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL; @@ -1147,7 +1146,7 @@ public class InternalEngine extends Engine { versionMap.enforceSafeAccess(); final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version()); + plan = IndexingStrategy.processAsStaleOp(index.version()); } else { plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0); } @@ -1324,7 +1323,6 @@ public class InternalEngine extends Engine { } private void addStaleDocs(final List docs, final IndexWriter indexWriter) throws IOException { - assert softDeleteEnabled : "Add history documents but soft-deletes is disabled"; for (ParseContext.Document doc : docs) { doc.add(softDeletesField); // soft-deleted every document before adding to Lucene } @@ -1402,8 +1400,8 @@ public class InternalEngine extends Engine { return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, versionForIndexing, 0, null); } - static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionForIndexing) { - return new IndexingStrategy(false, false, false, addStaleOpToLucene, versionForIndexing, 0, null); + static IndexingStrategy processAsStaleOp(long versionForIndexing) { + return new IndexingStrategy(false, false, false, true, versionForIndexing, 0, null); } static IndexingStrategy failAsTooManyDocs(Exception e) { @@ -1437,18 +1435,10 @@ public class InternalEngine extends Engine { } private void updateDocs(final Term uid, final List docs, final IndexWriter indexWriter) throws IOException { - if (softDeleteEnabled) { - if (docs.size() > 1) { - indexWriter.softUpdateDocuments(uid, docs, softDeletesField); - } else { - indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField); - } + if (docs.size() > 1) { + indexWriter.softUpdateDocuments(uid, docs, softDeletesField); } else { - if (docs.size() > 1) { - indexWriter.updateDocuments(uid, docs); - } else { - indexWriter.updateDocument(uid, docs.get(0)); - } + indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField); } numDocUpdates.inc(docs.size()); } @@ -1495,6 +1485,18 @@ public class InternalEngine extends Engine { if (plan.deleteFromLucene || plan.addStaleOpToLucene) { deleteResult = deleteInLucene(delete, plan); + if (plan.deleteFromLucene) { + numDocDeletes.inc(); + versionMap.putDeleteUnderLock( + delete.uid().bytes(), + new DeleteVersionValue( + plan.versionOfDeletion, + delete.seqNo(), + delete.primaryTerm(), + engineConfig.getThreadPool().relativeTimeInMillis() + ) + ); + } } else { deleteResult = new DeleteResult( plan.versionOfDeletion, @@ -1577,7 +1579,7 @@ public class InternalEngine extends Engine { } else { final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete); if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) { - plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, delete.version()); + plan = DeletionStrategy.processAsStaleOp(delete.version()); } else { plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0); } @@ -1649,37 +1651,19 @@ public class InternalEngine extends Engine { private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), delete.seqNo(), false, false); try { - if (softDeleteEnabled) { - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id()); - assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; - tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm()); - tombstone.version().setLongValue(plan.versionOfDeletion); - final ParseContext.Document doc = tombstone.docs().get(0); - assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set [" - + doc - + " ]"; - doc.add(softDeletesField); - if (plan.addStaleOpToLucene || plan.currentlyDeleted) { - indexWriter.addDocument(doc); - } else { - indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField); - } - } else if (plan.currentlyDeleted == false) { - // any exception that comes from this is a either an ACE or a fatal exception there - // can't be any document failures coming from this - indexWriter.deleteDocuments(delete.uid()); - } - if (plan.deleteFromLucene) { - numDocDeletes.inc(); - versionMap.putDeleteUnderLock( - delete.uid().bytes(), - new DeleteVersionValue( - plan.versionOfDeletion, - delete.seqNo(), - delete.primaryTerm(), - engineConfig.getThreadPool().relativeTimeInMillis() - ) - ); + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id()); + assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; + tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm()); + tombstone.version().setLongValue(plan.versionOfDeletion); + final ParseContext.Document doc = tombstone.docs().get(0); + assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set [" + + doc + + " ]"; + doc.add(softDeletesField); + if (plan.addStaleOpToLucene || plan.currentlyDeleted) { + indexWriter.addDocument(doc); + } else { + indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField); } return new DeleteResult(plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false); } catch (final Exception ex) { @@ -1759,8 +1743,8 @@ public class InternalEngine extends Engine { return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null); } - static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionOfDeletion) { - return new DeletionStrategy(false, addStaleOpToLucene, false, versionOfDeletion, 0, null); + static DeletionStrategy processAsStaleOp(long versionOfDeletion) { + return new DeletionStrategy(false, true, false, versionOfDeletion, 0, null); } static DeletionStrategy failAsTooManyDocs(Exception e) { @@ -1817,7 +1801,7 @@ public class InternalEngine extends Engine { ); } else { markSeqNoAsSeen(noOp.seqNo()); - if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) { + if (hasBeenProcessedBefore(noOp) == false) { try { final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); @@ -2545,17 +2529,15 @@ public class InternalEngine extends Engine { MergePolicy mergePolicy = config().getMergePolicy(); // always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes. iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD); - if (softDeleteEnabled) { - mergePolicy = new RecoverySourcePruneMergePolicy( - SourceFieldMapper.RECOVERY_SOURCE_NAME, + mergePolicy = new RecoverySourcePruneMergePolicy( + SourceFieldMapper.RECOVERY_SOURCE_NAME, + softDeletesPolicy::getRetentionQuery, + new SoftDeletesRetentionMergePolicy( + Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, - new SoftDeletesRetentionMergePolicy( - Lucene.SOFT_DELETES_FIELD, - softDeletesPolicy::getRetentionQuery, - new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME) - ) - ); - } + new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME) + ) + ); boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString())); if (shuffleForcedMerge) { // We wrap the merge policy for all indices even though it is mostly useful for time-based indices @@ -2753,9 +2735,7 @@ public class InternalEngine extends Engine { commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); - if (softDeleteEnabled) { - commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); - } + commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); final String currentForceMergeUUID = forceMergeUUID; if (currentForceMergeUUID != null) { commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID); @@ -2922,13 +2902,6 @@ public class InternalEngine extends Engine { return numDocUpdates.count(); } - private void ensureSoftDeletesEnabled() { - if (softDeleteEnabled == false) { - assert false : "index " + shardId.getIndex() + " does not have soft-deletes enabled"; - throw new IllegalStateException("index " + shardId.getIndex() + " does not have soft-deletes enabled"); - } - } - @Override public Translog.Snapshot newChangesSnapshot( String source, @@ -2953,7 +2926,6 @@ public class InternalEngine extends Engine { long toSeqNo, boolean requiredFullRange ) throws IOException { - ensureSoftDeletesEnabled(); ensureOpen(); refreshIfNeeded(source, toSeqNo); Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); @@ -2984,7 +2956,6 @@ public class InternalEngine extends Engine { public boolean hasCompleteOperationHistory(String reason, HistorySource historySource, MapperService mapperService, long startingSeqNo) throws IOException { if (historySource == HistorySource.INDEX) { - ensureSoftDeletesEnabled(); return getMinRetainedSeqNo() <= startingSeqNo; } else { final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); @@ -3010,14 +2981,12 @@ public class InternalEngine extends Engine { * Operations whose seq# are at least this value should exist in the Lucene index. */ public final long getMinRetainedSeqNo() { - ensureSoftDeletesEnabled(); return softDeletesPolicy.getMinRetainedSeqNo(); } @Override public Closeable acquireHistoryRetentionLock(HistorySource historySource) { if (historySource == HistorySource.INDEX) { - ensureSoftDeletesEnabled(); return softDeletesPolicy.acquireRetentionLock(); } else { return translog.acquireRetentionLock(); @@ -3035,15 +3004,14 @@ public class InternalEngine extends Engine { return commitData; } - private final class AssertingIndexWriter extends IndexWriter { + private static class AssertingIndexWriter extends IndexWriter { AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException { super(d, conf); } @Override - public long deleteDocuments(Term... terms) throws IOException { - assert softDeleteEnabled == false : "Call #deleteDocuments but soft-deletes is enabled"; - return super.deleteDocuments(terms); + public long updateDocuments(Term delTerm, Iterable> docs) { + throw new AssertionError("must not hard update documents"); } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index b0c05701ae0..98ad64d2e2a 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -200,16 +200,13 @@ public class ReadOnlyEngine extends Engine { DirectoryReader reader, Function readerWrapperFunction ) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) { - reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD); - } reader = readerWrapperFunction.apply(reader); return OpenSearchDirectoryReader.wrap(reader, engineConfig.getShardId()); } protected DirectoryReader open(IndexCommit commit) throws IOException { assert Transports.assertNotTransportThread("opening index commit of a read-only engine"); - return DirectoryReader.open(commit); + return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD); } @Override @@ -337,10 +334,7 @@ public class ReadOnlyEngine extends Engine { long fromSeqNo, long toSeqNo, boolean requiredFullRange - ) throws IOException { - if (engineConfig.getIndexSettings().isSoftDeleteEnabled() == false) { - throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled"); - } + ) { return newEmptySnapshot(); } diff --git a/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java index 3e2ec7bab09..54888bf2dfd 100644 --- a/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java @@ -190,7 +190,7 @@ public class SourceFieldMapper extends MetadataFieldMapper { context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length)); } - if (originalSource != null && adaptedSource != originalSource && context.indexSettings().isSoftDeleteEnabled()) { + if (originalSource != null && adaptedSource != originalSource) { // if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery BytesRef ref = originalSource.toBytesRef(); context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length)); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 0a7dda8c603..85f325fbee2 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -446,17 +446,12 @@ public class InternalEngineTests extends EngineTestCase { } } - public void testSegmentsWithSoftDeletes() throws Exception { - Settings.Builder settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); - final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + public void testSegments() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( Store store = createStore(); InternalEngine engine = createEngine( - config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get) + config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get) ) ) { assertThat(engine.segments(false), empty()); @@ -1530,18 +1525,12 @@ public class InternalEngineTests extends EngineTestCase { } public void testUpdateWithFullyDeletedSegments() throws IOException { - Settings.Builder settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), Integer.MAX_VALUE); - final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final Set liveDocs = new HashSet<>(); try ( Store store = createStore(); InternalEngine engine = createEngine( - config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get) + config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get) ) ) { int numDocs = scaledRandomIntBetween(10, 100); @@ -1563,7 +1552,6 @@ public class InternalEngineTests extends EngineTestCase { final long retainedExtraOps = randomLongBetween(0, 10); Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps); final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); @@ -1643,7 +1631,6 @@ public class InternalEngineTests extends EngineTestCase { final long retainedExtraOps = randomLongBetween(0, 10); Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps); final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); @@ -4716,15 +4703,10 @@ public class InternalEngineTests extends EngineTestCase { } } Randomness.shuffle(operations); - Settings.Builder settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); - final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); Map latestOps = new HashMap<>(); // id -> latest seq_no try ( Store store = createStore(); - InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null)) + InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null)) ) { CheckedRunnable lookupAndCheck = () -> { try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { @@ -6248,7 +6230,6 @@ public class InternalEngineTests extends EngineTestCase { ); Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); @@ -6318,7 +6299,6 @@ public class InternalEngineTests extends EngineTestCase { ); Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); @@ -6358,7 +6338,6 @@ public class InternalEngineTests extends EngineTestCase { IOUtils.close(engine, store); Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); @@ -6490,17 +6469,10 @@ public class InternalEngineTests extends EngineTestCase { final MapperService mapperService = createMapperService("test"); final long maxSeqNo = randomLongBetween(10, 50); final AtomicLong refreshCounter = new AtomicLong(); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - IndexMetadata.builder(defaultSettings.getIndexMetadata()) - .settings( - Settings.builder().put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - ) - .build() - ); try ( Store store = createStore(); InternalEngine engine = createEngine( - config(indexSettings, store, createTempDir(), newMergePolicy(), null, new ReferenceManager.RefreshListener() { + config(defaultSettings, store, createTempDir(), newMergePolicy(), null, new ReferenceManager.RefreshListener() { @Override public void beforeRefresh() { refreshCounter.incrementAndGet(); @@ -6561,17 +6533,9 @@ public class InternalEngineTests extends EngineTestCase { public void testNoOpOnClosingEngine() throws Exception { engine.close(); - Settings settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build() - ); - assertTrue(indexSettings.isSoftDeleteEnabled()); try ( Store store = createStore(); - InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null)) + InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null)) ) { engine.close(); expectThrows( @@ -6583,17 +6547,9 @@ public class InternalEngineTests extends EngineTestCase { public void testSoftDeleteOnClosingEngine() throws Exception { engine.close(); - Settings settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build() - ); - assertTrue(indexSettings.isSoftDeleteEnabled()); try ( Store store = createStore(); - InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null)) + InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null)) ) { engine.close(); expectThrows(AlreadyClosedException.class, () -> engine.delete(replicaDeleteForDoc("test", 42, 7, System.nanoTime()))); @@ -6637,19 +6593,13 @@ public class InternalEngineTests extends EngineTestCase { } public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception { - Settings.Builder settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); - final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); Path translogPath = createTempDir(); List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()); List> commits = new ArrayList<>(); commits.add(new ArrayList<>()); try (Store store = createStore()) { - EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + EngineConfig config = config(defaultSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); final List docs; try (InternalEngine engine = createEngine(config)) { List flushedOperations = new ArrayList<>(); @@ -6851,17 +6801,10 @@ public class InternalEngineTests extends EngineTestCase { public void testPruneAwayDeletedButRetainedIds() throws Exception { IOUtils.close(engine, store); - Settings settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build() - ); - store = createStore(indexSettings, newDirectory()); + store = createStore(defaultSettings, newDirectory()); LogDocMergePolicy policy = new LogDocMergePolicy(); policy.setMinMergeDocs(10000); - try (InternalEngine engine = createEngine(indexSettings, store, createTempDir(), policy)) { + try (InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), policy)) { int numDocs = between(1, 20); for (int i = 0; i < numDocs; i++) { index(engine, i); @@ -7038,13 +6981,6 @@ public class InternalEngineTests extends EngineTestCase { public void testNoOpFailure() throws IOException { engine.close(); - final Settings settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .build(); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build() - ); try (Store store = createStore(); Engine engine = createEngine((dir, iwc) -> new IndexWriter(dir, iwc) { @Override @@ -7052,7 +6988,7 @@ public class InternalEngineTests extends EngineTestCase { throw new IllegalArgumentException("fatal"); } - }, null, null, config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) { + }, null, null, config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) { final Engine.NoOp op = new Engine.NoOp(0, 0, PRIMARY, System.currentTimeMillis(), "test"); final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> engine.noOp(op)); assertThat(e.getMessage(), equalTo("fatal")); @@ -7063,31 +6999,17 @@ public class InternalEngineTests extends EngineTestCase { } } - public void testDeleteFailureSoftDeletesEnabledDocAlreadyDeleted() throws IOException { - runTestDeleteFailure(true, InternalEngine::delete); + public void testDeleteFailureDocAlreadyDeleted() throws IOException { + runTestDeleteFailure(InternalEngine::delete); } - public void testDeleteFailureSoftDeletesEnabled() throws IOException { - runTestDeleteFailure(true, (engine, op) -> {}); - } - - private void runTestDeleteFailure( - final boolean softDeletesEnabled, - final CheckedBiConsumer consumer - ) throws IOException { + private void runTestDeleteFailure(final CheckedBiConsumer consumer) throws IOException { engine.close(); - final Settings settings = Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), softDeletesEnabled) - .build(); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build() - ); final AtomicReference iw = new AtomicReference<>(); try (Store store = createStore(); InternalEngine engine = createEngine((dir, iwc) -> { iw.set(new ThrowingIndexWriter(dir, iwc)); return iw.get(); - }, null, null, config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) { + }, null, null, config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) { engine.index(new Engine.Index(newUid("0"), primaryTerm.get(), InternalEngineTests.createParsedDoc("0", null))); final Engine.Delete op = new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get()); consumer.accept(engine, op); @@ -7348,7 +7270,6 @@ public class InternalEngineTests extends EngineTestCase { public void testMaxDocsOnPrimary() throws Exception { engine.close(); - final boolean softDeleteEnabled = engine.config().getIndexSettings().isSoftDeleteEnabled(); int maxDocs = randomIntBetween(1, 100); IndexWriterMaxDocsChanger.setMaxDocs(maxDocs); try { @@ -7357,7 +7278,7 @@ public class InternalEngineTests extends EngineTestCase { List operations = new ArrayList<>(numDocs); for (int i = 0; i < numDocs; i++) { final String id; - if (softDeleteEnabled == false || randomBoolean()) { + if (randomBoolean()) { id = Integer.toString(randomInt(numDocs)); operations.add(indexForDoc(createParsedDoc(id, null))); } else { @@ -7390,10 +7311,6 @@ public class InternalEngineTests extends EngineTestCase { } public void testMaxDocsOnReplica() throws Exception { - assumeTrue( - "Deletes do not add documents to Lucene with soft-deletes disabled", - engine.config().getIndexSettings().isSoftDeleteEnabled() - ); engine.close(); int maxDocs = randomIntBetween(1, 100); IndexWriterMaxDocsChanger.setMaxDocs(maxDocs); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index ee2cde5c572..cdb2857c858 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3258,42 +3258,37 @@ public class IndexShardTests extends IndexShardTestCase { indexDoc(indexShard, "_doc", id); } // Need to update and sync the global checkpoint and the retention leases for the soft-deletes retention MergePolicy. - if (indexShard.indexSettings.isSoftDeleteEnabled()) { - final long newGlobalCheckpoint = indexShard.getLocalCheckpoint(); - if (indexShard.routingEntry().primary()) { - indexShard.updateLocalCheckpointForShard( - indexShard.routingEntry().allocationId().getId(), - indexShard.getLocalCheckpoint() - ); - indexShard.updateGlobalCheckpointForShard( - indexShard.routingEntry().allocationId().getId(), - indexShard.getLocalCheckpoint() - ); - indexShard.syncRetentionLeases(); - } else { - indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test"); + final long newGlobalCheckpoint = indexShard.getLocalCheckpoint(); + if (indexShard.routingEntry().primary()) { + indexShard.updateLocalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), indexShard.getLocalCheckpoint()); + indexShard.updateGlobalCheckpointForShard( + indexShard.routingEntry().allocationId().getId(), + indexShard.getLocalCheckpoint() + ); + indexShard.syncRetentionLeases(); + } else { + indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test"); - final RetentionLeases retentionLeases = indexShard.getRetentionLeases(); - indexShard.updateRetentionLeasesOnReplica( - new RetentionLeases( - retentionLeases.primaryTerm(), - retentionLeases.version() + 1, - retentionLeases.leases() - .stream() - .map( - lease -> new RetentionLease( - lease.id(), - newGlobalCheckpoint + 1, - lease.timestamp(), - ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE - ) + final RetentionLeases retentionLeases = indexShard.getRetentionLeases(); + indexShard.updateRetentionLeasesOnReplica( + new RetentionLeases( + retentionLeases.primaryTerm(), + retentionLeases.version() + 1, + retentionLeases.leases() + .stream() + .map( + lease -> new RetentionLease( + lease.id(), + newGlobalCheckpoint + 1, + lease.timestamp(), + ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE ) - .collect(Collectors.toList()) - ) - ); - } - indexShard.sync(); + ) + .collect(Collectors.toList()) + ) + ); } + indexShard.sync(); // flush the buffered deletes final FlushRequest flushRequest = new FlushRequest(); flushRequest.force(false); @@ -3974,12 +3969,10 @@ public class IndexShardTests extends IndexShardTestCase { // Here we are testing that a fully deleted segment should be dropped and its memory usage is freed. // In order to instruct the merge policy not to keep a fully deleted segment, // we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything. - if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { - primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.getLastSyncedGlobalCheckpoint()); - primary.syncRetentionLeases(); - primary.sync(); - flushShard(primary); - } + primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.getLastSyncedGlobalCheckpoint()); + primary.syncRetentionLeases(); + primary.sync(); + flushShard(primary); primary.refresh("force refresh"); ss = primary.segmentStats(randomBoolean(), randomBoolean()); diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index ac02bf92a90..af59e775103 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -1372,10 +1372,7 @@ public abstract class EngineTestCase extends OpenSearchTestCase { * Asserts the provided engine has a consistent document history between translog and Lucene index. */ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException { - if (mapper == null - || mapper.documentMapper() == null - || engine.config().getIndexSettings().isSoftDeleteEnabled() == false - || (engine instanceof InternalEngine) == false) { + if (mapper == null || mapper.documentMapper() == null || (engine instanceof InternalEngine) == false) { return; } final List translogOps = new ArrayList<>(); @@ -1397,8 +1394,12 @@ public abstract class EngineTestCase extends OpenSearchTestCase { final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint(); final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations(); final long seqNoForRecovery; - try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { - seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; + if (engine.config().getIndexSettings().isSoftDeleteEnabled()) { + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; + } + } else { + seqNoForRecovery = engine.getMinRetainedSeqNo(); } final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps); for (Translog.Operation translogOp : translogOps) {