[Refactor] InternalEngine to always use soft deletes (#1933)

Soft Deletes have been enabled by default since Legacy version 7.0 and made
mandatory in Version 2.0.0. This commit refactors the InternalEngine to always
use soft-deletes. It is a follow on to making soft deletes mandatory in 2.0.0.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
This commit is contained in:
Nick Knize 2022-01-26 12:28:24 -06:00 committed by GitHub
parent 56ae7fab63
commit 57ac788bb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 132 additions and 257 deletions

View File

@ -719,6 +719,12 @@ public class FullClusterRestartIT extends AbstractFullClusterRestartTestCase {
* or not we have one. */
shouldHaveTranslog = randomBoolean();
Settings.Builder settings = Settings.builder();
if (minimumNodeVersion().before(Version.V_2_0_0) && randomBoolean()) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
}
createIndex(index, settings.build());
indexRandomDocuments(count, true, true, i -> jsonBuilder().startObject().field("field", "value").endObject());
// make sure all recoveries are done

View File

@ -1248,9 +1248,7 @@ public class IndexStatsIT extends OpenSearchIntegTestCase {
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
client().prepareIndex("index", "type", "2").setSource("foo", "baz")
);
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
}
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
refresh();
ensureGreen();
@ -1287,22 +1285,20 @@ public class IndexStatsIT extends OpenSearchIntegTestCase {
// Here we are testing that a fully deleted segment should be dropped and its cached is evicted.
// In order to instruct the merge policy not to keep a fully deleted segment,
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index");
assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) {
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(
shardStats.getRetentionLeaseStats()
.retentionLeases()
.leases()
.stream()
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1)
);
}
});
flush("index");
}
persistGlobalCheckpoint("index");
assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepareStats("index").get().getIndex("index").getShards()) {
final long maxSeqNo = shardStats.getSeqNoStats().getMaxSeqNo();
assertTrue(
shardStats.getRetentionLeaseStats()
.retentionLeases()
.leases()
.stream()
.allMatch(retentionLease -> retentionLease.retainingSequenceNumber() == maxSeqNo + 1)
);
}
});
flush("index");
logger.info("--> force merging to a single segment");
ForceMergeResponse forceMergeResponse = client().admin()
.indices()

View File

@ -41,6 +41,7 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
@ -184,7 +185,6 @@ public class InternalEngine extends Engine {
private final CounterMetric numDocAppends = new CounterMetric();
private final CounterMetric numDocUpdates = new CounterMetric();
private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();
private final boolean softDeleteEnabled;
private final SoftDeletesPolicy softDeletesPolicy;
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
@ -262,7 +262,6 @@ public class InternalEngine extends Engine {
});
assert translog.getGeneration() != null;
this.translog = translog;
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled();
this.softDeletesPolicy = newSoftDeletesPolicy();
this.combinedDeletionPolicy = new CombinedDeletionPolicy(
logger,
@ -305,7 +304,7 @@ public class InternalEngine extends Engine {
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint());
this.internalReaderManager.addListener(lastRefreshedCheckpointListener);
maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()));
if (softDeleteEnabled && localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) {
try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) {
restoreVersionMapAndCheckpointTracker(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
} catch (IOException e) {
@ -621,7 +620,6 @@ public class InternalEngine extends Engine {
long startingSeqNo
) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false);
} else {
return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE);
@ -639,7 +637,6 @@ public class InternalEngine extends Engine {
long startingSeqNo
) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
try (
Translog.Snapshot snapshot = newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false)
) {
@ -863,8 +860,10 @@ public class InternalEngine extends Engine {
} else if (op.seqNo() > docAndSeqNo.seqNo) {
status = OpVsLuceneDocStatus.OP_NEWER;
} else if (op.seqNo() == docAndSeqNo.seqNo) {
assert localCheckpointTracker.hasProcessed(op.seqNo())
|| softDeleteEnabled == false : "local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
assert localCheckpointTracker.hasProcessed(op.seqNo()) : "local checkpoint tracker is not updated seq_no="
+ op.seqNo()
+ " id="
+ op.id();
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
} else {
status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;
@ -1147,7 +1146,7 @@ public class InternalEngine extends Engine {
versionMap.enforceSafeAccess();
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.version());
plan = IndexingStrategy.processAsStaleOp(index.version());
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0);
}
@ -1324,7 +1323,6 @@ public class InternalEngine extends Engine {
}
private void addStaleDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
assert softDeleteEnabled : "Add history documents but soft-deletes is disabled";
for (ParseContext.Document doc : docs) {
doc.add(softDeletesField); // soft-deleted every document before adding to Lucene
}
@ -1402,8 +1400,8 @@ public class InternalEngine extends Engine {
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, versionForIndexing, 0, null);
}
static IndexingStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionForIndexing) {
return new IndexingStrategy(false, false, false, addStaleOpToLucene, versionForIndexing, 0, null);
static IndexingStrategy processAsStaleOp(long versionForIndexing) {
return new IndexingStrategy(false, false, false, true, versionForIndexing, 0, null);
}
static IndexingStrategy failAsTooManyDocs(Exception e) {
@ -1437,18 +1435,10 @@ public class InternalEngine extends Engine {
}
private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (softDeleteEnabled) {
if (docs.size() > 1) {
indexWriter.softUpdateDocuments(uid, docs, softDeletesField);
} else {
indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField);
}
if (docs.size() > 1) {
indexWriter.softUpdateDocuments(uid, docs, softDeletesField);
} else {
if (docs.size() > 1) {
indexWriter.updateDocuments(uid, docs);
} else {
indexWriter.updateDocument(uid, docs.get(0));
}
indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField);
}
numDocUpdates.inc(docs.size());
}
@ -1495,6 +1485,18 @@ public class InternalEngine extends Engine {
if (plan.deleteFromLucene || plan.addStaleOpToLucene) {
deleteResult = deleteInLucene(delete, plan);
if (plan.deleteFromLucene) {
numDocDeletes.inc();
versionMap.putDeleteUnderLock(
delete.uid().bytes(),
new DeleteVersionValue(
plan.versionOfDeletion,
delete.seqNo(),
delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()
)
);
}
} else {
deleteResult = new DeleteResult(
plan.versionOfDeletion,
@ -1577,7 +1579,7 @@ public class InternalEngine extends Engine {
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, delete.version());
plan = DeletionStrategy.processAsStaleOp(delete.version());
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, delete.version(), 0);
}
@ -1649,37 +1651,19 @@ public class InternalEngine extends Engine {
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException {
assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), delete.seqNo(), false, false);
try {
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
+ doc
+ " ]";
doc.add(softDeletesField);
if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
indexWriter.addDocument(doc);
} else {
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
}
} else if (plan.currentlyDeleted == false) {
// any exception that comes from this is a either an ACE or a fatal exception there
// can't be any document failures coming from this
indexWriter.deleteDocuments(delete.uid());
}
if (plan.deleteFromLucene) {
numDocDeletes.inc();
versionMap.putDeleteUnderLock(
delete.uid().bytes(),
new DeleteVersionValue(
plan.versionOfDeletion,
delete.seqNo(),
delete.primaryTerm(),
engineConfig.getThreadPool().relativeTimeInMillis()
)
);
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(delete.seqNo(), delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
+ doc
+ " ]";
doc.add(softDeletesField);
if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
indexWriter.addDocument(doc);
} else {
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
}
return new DeleteResult(plan.versionOfDeletion, delete.primaryTerm(), delete.seqNo(), plan.currentlyDeleted == false);
} catch (final Exception ex) {
@ -1759,8 +1743,8 @@ public class InternalEngine extends Engine {
return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null);
}
static DeletionStrategy processAsStaleOp(boolean addStaleOpToLucene, long versionOfDeletion) {
return new DeletionStrategy(false, addStaleOpToLucene, false, versionOfDeletion, 0, null);
static DeletionStrategy processAsStaleOp(long versionOfDeletion) {
return new DeletionStrategy(false, true, false, versionOfDeletion, 0, null);
}
static DeletionStrategy failAsTooManyDocs(Exception e) {
@ -1817,7 +1801,7 @@ public class InternalEngine extends Engine {
);
} else {
markSeqNoAsSeen(noOp.seqNo());
if (softDeleteEnabled && hasBeenProcessedBefore(noOp) == false) {
if (hasBeenProcessedBefore(noOp) == false) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
@ -2545,17 +2529,15 @@ public class InternalEngine extends Engine {
MergePolicy mergePolicy = config().getMergePolicy();
// always configure soft-deletes field so an engine with soft-deletes disabled can open a Lucene index with soft-deletes.
iwc.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD);
if (softDeleteEnabled) {
mergePolicy = new RecoverySourcePruneMergePolicy(
SourceFieldMapper.RECOVERY_SOURCE_NAME,
mergePolicy = new RecoverySourcePruneMergePolicy(
SourceFieldMapper.RECOVERY_SOURCE_NAME,
softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETES_FIELD,
softDeletesPolicy::getRetentionQuery,
new SoftDeletesRetentionMergePolicy(
Lucene.SOFT_DELETES_FIELD,
softDeletesPolicy::getRetentionQuery,
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
)
);
}
new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME)
)
);
boolean shuffleForcedMerge = Booleans.parseBoolean(System.getProperty("opensearch.shuffle_forced_merge", Boolean.TRUE.toString()));
if (shuffleForcedMerge) {
// We wrap the merge policy for all indices even though it is mostly useful for time-based indices
@ -2753,9 +2735,7 @@ public class InternalEngine extends Engine {
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
if (softDeleteEnabled) {
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
}
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
final String currentForceMergeUUID = forceMergeUUID;
if (currentForceMergeUUID != null) {
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
@ -2922,13 +2902,6 @@ public class InternalEngine extends Engine {
return numDocUpdates.count();
}
private void ensureSoftDeletesEnabled() {
if (softDeleteEnabled == false) {
assert false : "index " + shardId.getIndex() + " does not have soft-deletes enabled";
throw new IllegalStateException("index " + shardId.getIndex() + " does not have soft-deletes enabled");
}
}
@Override
public Translog.Snapshot newChangesSnapshot(
String source,
@ -2953,7 +2926,6 @@ public class InternalEngine extends Engine {
long toSeqNo,
boolean requiredFullRange
) throws IOException {
ensureSoftDeletesEnabled();
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
@ -2984,7 +2956,6 @@ public class InternalEngine extends Engine {
public boolean hasCompleteOperationHistory(String reason, HistorySource historySource, MapperService mapperService, long startingSeqNo)
throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
return getMinRetainedSeqNo() <= startingSeqNo;
} else {
final long currentLocalCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
@ -3010,14 +2981,12 @@ public class InternalEngine extends Engine {
* Operations whose seq# are at least this value should exist in the Lucene index.
*/
public final long getMinRetainedSeqNo() {
ensureSoftDeletesEnabled();
return softDeletesPolicy.getMinRetainedSeqNo();
}
@Override
public Closeable acquireHistoryRetentionLock(HistorySource historySource) {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
return softDeletesPolicy.acquireRetentionLock();
} else {
return translog.acquireRetentionLock();
@ -3035,15 +3004,14 @@ public class InternalEngine extends Engine {
return commitData;
}
private final class AssertingIndexWriter extends IndexWriter {
private static class AssertingIndexWriter extends IndexWriter {
AssertingIndexWriter(Directory d, IndexWriterConfig conf) throws IOException {
super(d, conf);
}
@Override
public long deleteDocuments(Term... terms) throws IOException {
assert softDeleteEnabled == false : "Call #deleteDocuments but soft-deletes is enabled";
return super.deleteDocuments(terms);
public long updateDocuments(Term delTerm, Iterable<? extends Iterable<? extends IndexableField>> docs) {
throw new AssertionError("must not hard update documents");
}
@Override

View File

@ -200,16 +200,13 @@ public class ReadOnlyEngine extends Engine {
DirectoryReader reader,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction
) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
reader = readerWrapperFunction.apply(reader);
return OpenSearchDirectoryReader.wrap(reader, engineConfig.getShardId());
}
protected DirectoryReader open(IndexCommit commit) throws IOException {
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
return DirectoryReader.open(commit);
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
}
@Override
@ -337,10 +334,7 @@ public class ReadOnlyEngine extends Engine {
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange
) throws IOException {
if (engineConfig.getIndexSettings().isSoftDeleteEnabled() == false) {
throw new IllegalStateException("accessing changes snapshot requires soft-deletes enabled");
}
) {
return newEmptySnapshot();
}

View File

@ -190,7 +190,7 @@ public class SourceFieldMapper extends MetadataFieldMapper {
context.doc().add(new StoredField(fieldType().name(), ref.bytes, ref.offset, ref.length));
}
if (originalSource != null && adaptedSource != originalSource && context.indexSettings().isSoftDeleteEnabled()) {
if (originalSource != null && adaptedSource != originalSource) {
// if we omitted source or modified it we add the _recovery_source to ensure we have it for ops based recovery
BytesRef ref = originalSource.toBytesRef();
context.doc().add(new StoredField(RECOVERY_SOURCE_NAME, ref.bytes, ref.offset, ref.length));

View File

@ -446,17 +446,12 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testSegmentsWithSoftDeletes() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
public void testSegments() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
assertThat(engine.segments(false), empty());
@ -1530,18 +1525,12 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testUpdateWithFullyDeletedSegments() throws IOException {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), Integer.MAX_VALUE);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Set<String> liveDocs = new HashSet<>();
try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get)
config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get)
)
) {
int numDocs = scaledRandomIntBetween(10, 100);
@ -1563,7 +1552,6 @@ public class InternalEngineTests extends EngineTestCase {
final long retainedExtraOps = randomLongBetween(0, 10);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
@ -1643,7 +1631,6 @@ public class InternalEngineTests extends EngineTestCase {
final long retainedExtraOps = randomLongBetween(0, 10);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
@ -4716,15 +4703,10 @@ public class InternalEngineTests extends EngineTestCase {
}
}
Randomness.shuffle(operations);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
Map<String, Engine.Operation> latestOps = new HashMap<>(); // id -> latest seq_no
try (
Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))
) {
CheckedRunnable<IOException> lookupAndCheck = () -> {
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
@ -6248,7 +6230,6 @@ public class InternalEngineTests extends EngineTestCase {
);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
@ -6318,7 +6299,6 @@ public class InternalEngineTests extends EngineTestCase {
);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
@ -6358,7 +6338,6 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(engine, store);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
@ -6490,17 +6469,10 @@ public class InternalEngineTests extends EngineTestCase {
final MapperService mapperService = createMapperService("test");
final long maxSeqNo = randomLongBetween(10, 50);
final AtomicLong refreshCounter = new AtomicLong();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetadata.builder(defaultSettings.getIndexMetadata())
.settings(
Settings.builder().put(defaultSettings.getSettings()).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
)
.build()
);
try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), newMergePolicy(), null, new ReferenceManager.RefreshListener() {
config(defaultSettings, store, createTempDir(), newMergePolicy(), null, new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() {
refreshCounter.incrementAndGet();
@ -6561,17 +6533,9 @@ public class InternalEngineTests extends EngineTestCase {
public void testNoOpOnClosingEngine() throws Exception {
engine.close();
Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build()
);
assertTrue(indexSettings.isSoftDeleteEnabled());
try (
Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))
) {
engine.close();
expectThrows(
@ -6583,17 +6547,9 @@ public class InternalEngineTests extends EngineTestCase {
public void testSoftDeleteOnClosingEngine() throws Exception {
engine.close();
Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build()
);
assertTrue(indexSettings.isSoftDeleteEnabled());
try (
Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))
) {
engine.close();
expectThrows(AlreadyClosedException.class, () -> engine.delete(replicaDeleteForDoc("test", 42, 7, System.nanoTime())));
@ -6637,19 +6593,13 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testRebuildLocalCheckpointTrackerAndVersionMap() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10000)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Path translogPath = createTempDir();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean());
List<List<Engine.Operation>> commits = new ArrayList<>();
commits.add(new ArrayList<>());
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
EngineConfig config = config(defaultSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
final List<DocIdSeqNoAndSource> docs;
try (InternalEngine engine = createEngine(config)) {
List<Engine.Operation> flushedOperations = new ArrayList<>();
@ -6851,17 +6801,10 @@ public class InternalEngineTests extends EngineTestCase {
public void testPruneAwayDeletedButRetainedIds() throws Exception {
IOUtils.close(engine, store);
Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build()
);
store = createStore(indexSettings, newDirectory());
store = createStore(defaultSettings, newDirectory());
LogDocMergePolicy policy = new LogDocMergePolicy();
policy.setMinMergeDocs(10000);
try (InternalEngine engine = createEngine(indexSettings, store, createTempDir(), policy)) {
try (InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), policy)) {
int numDocs = between(1, 20);
for (int i = 0; i < numDocs; i++) {
index(engine, i);
@ -7038,13 +6981,6 @@ public class InternalEngineTests extends EngineTestCase {
public void testNoOpFailure() throws IOException {
engine.close();
final Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build()
);
try (Store store = createStore(); Engine engine = createEngine((dir, iwc) -> new IndexWriter(dir, iwc) {
@Override
@ -7052,7 +6988,7 @@ public class InternalEngineTests extends EngineTestCase {
throw new IllegalArgumentException("fatal");
}
}, null, null, config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
}, null, null, config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
final Engine.NoOp op = new Engine.NoOp(0, 0, PRIMARY, System.currentTimeMillis(), "test");
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> engine.noOp(op));
assertThat(e.getMessage(), equalTo("fatal"));
@ -7063,31 +6999,17 @@ public class InternalEngineTests extends EngineTestCase {
}
}
public void testDeleteFailureSoftDeletesEnabledDocAlreadyDeleted() throws IOException {
runTestDeleteFailure(true, InternalEngine::delete);
public void testDeleteFailureDocAlreadyDeleted() throws IOException {
runTestDeleteFailure(InternalEngine::delete);
}
public void testDeleteFailureSoftDeletesEnabled() throws IOException {
runTestDeleteFailure(true, (engine, op) -> {});
}
private void runTestDeleteFailure(
final boolean softDeletesEnabled,
final CheckedBiConsumer<InternalEngine, Engine.Delete, IOException> consumer
) throws IOException {
private void runTestDeleteFailure(final CheckedBiConsumer<InternalEngine, Engine.Delete, IOException> consumer) throws IOException {
engine.close();
final Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), softDeletesEnabled)
.build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(
IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build()
);
final AtomicReference<ThrowingIndexWriter> iw = new AtomicReference<>();
try (Store store = createStore(); InternalEngine engine = createEngine((dir, iwc) -> {
iw.set(new ThrowingIndexWriter(dir, iwc));
return iw.get();
}, null, null, config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
}, null, null, config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
engine.index(new Engine.Index(newUid("0"), primaryTerm.get(), InternalEngineTests.createParsedDoc("0", null)));
final Engine.Delete op = new Engine.Delete("_doc", "0", newUid("0"), primaryTerm.get());
consumer.accept(engine, op);
@ -7348,7 +7270,6 @@ public class InternalEngineTests extends EngineTestCase {
public void testMaxDocsOnPrimary() throws Exception {
engine.close();
final boolean softDeleteEnabled = engine.config().getIndexSettings().isSoftDeleteEnabled();
int maxDocs = randomIntBetween(1, 100);
IndexWriterMaxDocsChanger.setMaxDocs(maxDocs);
try {
@ -7357,7 +7278,7 @@ public class InternalEngineTests extends EngineTestCase {
List<Engine.Operation> operations = new ArrayList<>(numDocs);
for (int i = 0; i < numDocs; i++) {
final String id;
if (softDeleteEnabled == false || randomBoolean()) {
if (randomBoolean()) {
id = Integer.toString(randomInt(numDocs));
operations.add(indexForDoc(createParsedDoc(id, null)));
} else {
@ -7390,10 +7311,6 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testMaxDocsOnReplica() throws Exception {
assumeTrue(
"Deletes do not add documents to Lucene with soft-deletes disabled",
engine.config().getIndexSettings().isSoftDeleteEnabled()
);
engine.close();
int maxDocs = randomIntBetween(1, 100);
IndexWriterMaxDocsChanger.setMaxDocs(maxDocs);

View File

@ -3258,42 +3258,37 @@ public class IndexShardTests extends IndexShardTestCase {
indexDoc(indexShard, "_doc", id);
}
// Need to update and sync the global checkpoint and the retention leases for the soft-deletes retention MergePolicy.
if (indexShard.indexSettings.isSoftDeleteEnabled()) {
final long newGlobalCheckpoint = indexShard.getLocalCheckpoint();
if (indexShard.routingEntry().primary()) {
indexShard.updateLocalCheckpointForShard(
indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint()
);
indexShard.updateGlobalCheckpointForShard(
indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint()
);
indexShard.syncRetentionLeases();
} else {
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");
final long newGlobalCheckpoint = indexShard.getLocalCheckpoint();
if (indexShard.routingEntry().primary()) {
indexShard.updateLocalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), indexShard.getLocalCheckpoint());
indexShard.updateGlobalCheckpointForShard(
indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint()
);
indexShard.syncRetentionLeases();
} else {
indexShard.updateGlobalCheckpointOnReplica(newGlobalCheckpoint, "test");
final RetentionLeases retentionLeases = indexShard.getRetentionLeases();
indexShard.updateRetentionLeasesOnReplica(
new RetentionLeases(
retentionLeases.primaryTerm(),
retentionLeases.version() + 1,
retentionLeases.leases()
.stream()
.map(
lease -> new RetentionLease(
lease.id(),
newGlobalCheckpoint + 1,
lease.timestamp(),
ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE
)
final RetentionLeases retentionLeases = indexShard.getRetentionLeases();
indexShard.updateRetentionLeasesOnReplica(
new RetentionLeases(
retentionLeases.primaryTerm(),
retentionLeases.version() + 1,
retentionLeases.leases()
.stream()
.map(
lease -> new RetentionLease(
lease.id(),
newGlobalCheckpoint + 1,
lease.timestamp(),
ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE
)
.collect(Collectors.toList())
)
);
}
indexShard.sync();
)
.collect(Collectors.toList())
)
);
}
indexShard.sync();
// flush the buffered deletes
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(false);
@ -3974,12 +3969,10 @@ public class IndexShardTests extends IndexShardTestCase {
// Here we are testing that a fully deleted segment should be dropped and its memory usage is freed.
// In order to instruct the merge policy not to keep a fully deleted segment,
// we need to flush and make that commit safe so that the SoftDeletesPolicy can drop everything.
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.getLastSyncedGlobalCheckpoint());
primary.syncRetentionLeases();
primary.sync();
flushShard(primary);
}
primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.getLastSyncedGlobalCheckpoint());
primary.syncRetentionLeases();
primary.sync();
flushShard(primary);
primary.refresh("force refresh");
ss = primary.segmentStats(randomBoolean(), randomBoolean());

View File

@ -1372,10 +1372,7 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
* Asserts the provided engine has a consistent document history between translog and Lucene index.
*/
public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException {
if (mapper == null
|| mapper.documentMapper() == null
|| engine.config().getIndexSettings().isSoftDeleteEnabled() == false
|| (engine instanceof InternalEngine) == false) {
if (mapper == null || mapper.documentMapper() == null || (engine instanceof InternalEngine) == false) {
return;
}
final List<Translog.Operation> translogOps = new ArrayList<>();
@ -1397,8 +1394,12 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint();
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
final long seqNoForRecovery;
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
if (engine.config().getIndexSettings().isSoftDeleteEnabled()) {
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
}
} else {
seqNoForRecovery = engine.getMinRetainedSeqNo();
}
final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps);
for (Translog.Operation translogOp : translogOps) {