diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index c4b7369d606..3e7ee0753df 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -131,6 +131,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, IndexSettings.INDEX_GC_DELETES_SETTING, IndexSettings.INDEX_SOFT_DELETES_SETTING, + IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING, UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 7eb3f2eda8a..00610b20802 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -242,6 +242,14 @@ public final class IndexSettings { */ public static final Setting INDEX_SOFT_DELETES_SETTING = Setting.boolSetting("index.soft_deletes", true, Property.IndexScope); + /** + * Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted + * documents increases the chance of operation-based recoveries and allows querying a longer history of documents. + * If soft-deletes is enabled, an engine by default will retain all operations up to the global checkpoint. + **/ + public static final Setting INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING = + Setting.longSetting("index.soft_deletes.retention.operations", 0, 0, Property.IndexScope, Property.Dynamic); + /** * The maximum number of refresh listeners allows on this shard. */ @@ -287,6 +295,7 @@ public final class IndexSettings { private final IndexScopedSettings scopedSettings; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); private final boolean softDeleteEnabled; + private volatile long softDeleteRetentionOperations; private volatile boolean warmerEnabled; private volatile int maxResultWindow; private volatile int maxInnerResultWindow; @@ -398,6 +407,7 @@ public final class IndexSettings { mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); softDeleteEnabled = version.onOrAfter(Version.V_7_0_0_alpha1) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING); + softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING); warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING); maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING); @@ -455,6 +465,7 @@ public final class IndexSettings { scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); + scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } @@ -837,4 +848,15 @@ public final class IndexSettings { public boolean isSoftDeleteEnabled() { return softDeleteEnabled; } + + private void setSoftDeleteRetentionOperations(long ops) { + this.softDeleteRetentionOperations = ops; + } + + /** + * Returns the number of extra operations (i.e. soft-deleted documents) to be kept for recoveries and history purpose. + */ + public long getSoftDeleteRetentionOperations() { + return this.softDeleteRetentionOperations; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 8f0260f7c47..e1f8eb91f88 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.engine; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; @@ -34,8 +35,10 @@ import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.search.SearcherManager; @@ -2002,8 +2005,8 @@ public class InternalEngine extends Engine { // background merges MergePolicy mergePolicy = config().getMergePolicy(); if (softDeleteEnabled) { - // TODO: soft-delete retention policy iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD); + mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy); } iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); @@ -2016,6 +2019,20 @@ public class InternalEngine extends Engine { return iwc; } + /** + * Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges. + */ + private Query softDeletesRetentionQuery() { + ensureOpen(); + // TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit. + final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(); + // Prefer using the global checkpoint which is persisted on disk than an in-memory value. + // If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops, + // then we may not have all required operations whose seq# greater than the global checkpoint after restarted. + final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint(); + return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE); + } + /** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */ static final class SearchFactory extends EngineSearcherFactory { private final Engine.Warmer warmer; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 99b990b8b7b..350f952a1a5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -123,6 +123,7 @@ import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.IndexSettingsModule; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; @@ -178,6 +179,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.isIn; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -251,8 +253,9 @@ public class InternalEngineTests extends EngineTestCase { } public void testSegments() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore(); - InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { + InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get))) { List segments = engine.segments(false); assertThat(segments.isEmpty(), equalTo(true)); assertThat(engine.segmentsStats(false).getCount(), equalTo(0L)); @@ -324,6 +327,8 @@ public class InternalEngineTests extends EngineTestCase { engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get())); + globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + engine.getTranslog().sync(); engine.refresh("test"); segments = engine.segments(false); @@ -1279,9 +1284,13 @@ public class InternalEngineTests extends EngineTestCase { assertThat(indexResult.getVersion(), equalTo(1L)); } - public void testForceMerge() throws IOException { + public void testForceMergeWithoutSoftDeletes() throws IOException { + Settings settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build(); + IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); try (Store store = createStore(); - Engine engine = createEngine(config(defaultSettings, store, createTempDir(), + Engine engine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { @@ -1322,6 +1331,66 @@ public class InternalEngineTests extends EngineTestCase { } } + public void testForceMergeWithSoftDeletesRetention() throws Exception { + final long retainedExtraOps = randomLongBetween(0, 10); + Settings.Builder settings = Settings.builder() + .put(defaultSettings.getSettings()) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps); + final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final MapperService mapperService = createMapperService("test"); + final Set liveDocs = new HashSet<>(); + try (Store store = createStore(); + Engine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) { + int numDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); + engine.index(indexForDoc(doc)); + liveDocs.add(doc.id()); + } + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null); + if (randomBoolean()) { + engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + liveDocs.remove(doc.id()); + } + if (randomBoolean()) { + engine.index(indexForDoc(doc)); + liveDocs.add(doc.id()); + } + } + long localCheckpoint = engine.getLocalCheckpointTracker().getCheckpoint(); + globalCheckpoint.set(randomLongBetween(0, localCheckpoint)); + engine.getTranslog().sync(); + engine.forceMerge(true, 1, false, false, false); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); + Map ops = readAllOperationsInLucene(engine, mapperService) + .stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); + for (long seqno = 0; seqno <= localCheckpoint; seqno++) { + long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps; + String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]"; + if (seqno < keptIndex) { + Translog.Operation op = ops.get(seqno); + if (op != null) { + assertThat(op, instanceOf(Translog.Index.class)); + assertThat(msg, ((Translog.Index) op).id(), isIn(liveDocs)); + } + } else { + assertThat(msg, ops.get(seqno), notNullValue()); + } + } + settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0); + indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); + globalCheckpoint.set(localCheckpoint); + engine.getTranslog().sync(); + engine.forceMerge(true, 1, false, false, false); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); + assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size())); + } + } + public void testForceMergeAndClose() throws IOException, InterruptedException { int numIters = randomIntBetween(2, 10); for (int j = 0; j < numIters; j++) { @@ -2525,14 +2594,16 @@ public class InternalEngineTests extends EngineTestCase { Engine.IndexResult indexResult = engine.index(firstIndexRequest); assertThat(indexResult.getVersion(), equalTo(1L)); } + EngineConfig config = engine.config(); assertVisibleCount(engine, numDocs); engine.close(); - trimUnsafeCommits(engine.config()); - engine = new InternalEngine(engine.config()); - engine.skipTranslogRecovery(); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); - assertThat(topDocs.totalHits, equalTo(0L)); + trimUnsafeCommits(config); + try (InternalEngine engine = new InternalEngine(config)) { + engine.skipTranslogRecovery(); + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10)); + assertThat(topDocs.totalHits, equalTo(0L)); + } } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cd8baf6fc14..dcb6569ba00 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2350,7 +2350,16 @@ public class IndexShardTests extends IndexShardTestCase { deleteDoc(indexShard, "_doc", id); indexDoc(indexShard, "_doc", id); } - + // Need to update and sync the global checkpoint as the soft-deletes retention MergePolicy depends on it. + if (indexShard.indexSettings.isSoftDeleteEnabled()) { + if (indexShard.routingEntry().primary()) { + indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(), + indexShard.getLocalCheckpoint()); + } else { + indexShard.updateGlobalCheckpointOnReplica(indexShard.getLocalCheckpoint(), "test"); + } + indexShard.sync(); + } // flush the buffered deletes final FlushRequest flushRequest = new FlushRequest(); flushRequest.force(false); @@ -2910,6 +2919,7 @@ public class IndexShardTests extends IndexShardTestCase { // Deleting a doc causes its memory to be freed from the breaker deleteDoc(primary, "_doc", "0"); + primary.sync(); // need to sync global checkpoint as the soft-deletes retention MergePolicy depends on it. primary.refresh("force refresh"); ss = primary.segmentStats(randomBoolean()); diff --git a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 4d0d0d6ff3f..1548e9cd64b 100644 --- a/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -43,6 +43,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MergePolicyConfig; import org.elasticsearch.index.MergeSchedulerConfig; @@ -50,6 +51,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.cache.query.QueryCacheStats; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -69,6 +71,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -1005,10 +1008,15 @@ public class IndexStatsIT extends ESIntegTestCase { } public void testFilterCacheStats() throws Exception { - assertAcked(prepareCreate("index").setSettings(Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build()).get()); - indexRandom(true, + Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build(); + assertAcked(prepareCreate("index").setSettings(settings).get()); + indexRandom(false, true, client().prepareIndex("index", "type", "1").setSource("foo", "bar"), client().prepareIndex("index", "type", "2").setSource("foo", "baz")); + if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { + persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP. + } + refresh(); ensureGreen(); IndicesStatsResponse response = client().admin().indices().prepareStats("index").setQueryCache(true).get(); @@ -1039,6 +1047,9 @@ public class IndexStatsIT extends ESIntegTestCase { assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "1").get().getResult()); assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "2").get().getResult()); + if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) { + persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP. + } refresh(); response = client().admin().indices().prepareStats("index").setQueryCache(true).get(); assertCumulativeQueryCacheStats(response); @@ -1172,4 +1183,21 @@ public class IndexStatsIT extends ESIntegTestCase { assertThat(executionFailures.get(), emptyCollectionOf(Exception.class)); } + + /** + * Persist the global checkpoint on all shards of the given index into disk. + * This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value. + */ + private void persistGlobalCheckpoint(String index) throws Exception { + final Set nodes = internalCluster().nodesInclude(index); + for (String node : nodes) { + final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node); + for (IndexService indexService : indexServices) { + for (IndexShard indexShard : indexService) { + indexShard.sync(); + assertThat(indexShard.getLastSyncedGlobalCheckpoint(), equalTo(indexShard.getGlobalCheckpoint())); + } + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 76f5f7e51fc..22e533d1114 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -105,8 +105,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.ToLongBiFunction; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.shuffle; @@ -114,6 +116,8 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.notNullValue; public abstract class EngineTestCase extends ESTestCase { @@ -787,12 +791,22 @@ public abstract class EngineTestCase extends ESTestCase { translogOps.put(op.seqNo(), op); } } - final List luceneOps = readAllOperationsInLucene(engine, mapper); - for (Translog.Operation luceneOp : luceneOps) { - Translog.Operation translogOp = translogOps.get(luceneOp.seqNo()); - if (translogOp == null) { - continue; + final Map luceneOps = readAllOperationsInLucene(engine, mapper).stream() + .collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); + final long globalCheckpoint = engine.getTranslog().getLastSyncedGlobalCheckpoint(); + final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations(); + final long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); + for (Translog.Operation translogOp : translogOps.values()) { + final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo()); + if (luceneOp == null) { + if (globalCheckpoint + 1 - retainedOps <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) { + fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " + + "retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]"); + } else { + continue; + } } + assertThat(luceneOp, notNullValue()); assertThat(luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm())); assertThat(luceneOp.opType(), equalTo(translogOp.opType())); if (luceneOp.opType() == Translog.Operation.Type.INDEX) {