Introduce soft-deletes retention policy based on global checkpoint (#30335)
This commit introduces a soft-deletes retention merge policy based on the global checkpoint. Some notes on this simple retention policy: - This policy keeps all operations whose seq# is greater than the persisted global checkpoint and configurable extra operations prior to the global checkpoint. This is good enough for querying history changes. - This policy is not watertight for peer-recovery. We send the safe-commit in peer-recovery, thus we need to also send all operations after the local checkpoint of that commit. This is analog to the min translog generation for recovery. - This policy is too simple to support rollback. Relates #29530
This commit is contained in:
parent
6e0d0feca0
commit
2c73969505
|
@ -131,6 +131,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
|||
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
|
||||
IndexSettings.INDEX_GC_DELETES_SETTING,
|
||||
IndexSettings.INDEX_SOFT_DELETES_SETTING,
|
||||
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
|
||||
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
|
||||
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
|
||||
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
|
||||
|
|
|
@ -242,6 +242,14 @@ public final class IndexSettings {
|
|||
*/
|
||||
public static final Setting<Boolean> INDEX_SOFT_DELETES_SETTING = Setting.boolSetting("index.soft_deletes", true, Property.IndexScope);
|
||||
|
||||
/**
|
||||
* Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted
|
||||
* documents increases the chance of operation-based recoveries and allows querying a longer history of documents.
|
||||
* If soft-deletes is enabled, an engine by default will retain all operations up to the global checkpoint.
|
||||
**/
|
||||
public static final Setting<Long> INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING =
|
||||
Setting.longSetting("index.soft_deletes.retention.operations", 0, 0, Property.IndexScope, Property.Dynamic);
|
||||
|
||||
/**
|
||||
* The maximum number of refresh listeners allows on this shard.
|
||||
*/
|
||||
|
@ -287,6 +295,7 @@ public final class IndexSettings {
|
|||
private final IndexScopedSettings scopedSettings;
|
||||
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
|
||||
private final boolean softDeleteEnabled;
|
||||
private volatile long softDeleteRetentionOperations;
|
||||
private volatile boolean warmerEnabled;
|
||||
private volatile int maxResultWindow;
|
||||
private volatile int maxInnerResultWindow;
|
||||
|
@ -398,6 +407,7 @@ public final class IndexSettings {
|
|||
mergeSchedulerConfig = new MergeSchedulerConfig(this);
|
||||
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
|
||||
softDeleteEnabled = version.onOrAfter(Version.V_7_0_0_alpha1) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
|
||||
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
|
||||
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
|
||||
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
|
||||
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING);
|
||||
|
@ -455,6 +465,7 @@ public final class IndexSettings {
|
|||
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
|
||||
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
|
||||
}
|
||||
|
||||
private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
|
||||
|
@ -837,4 +848,15 @@ public final class IndexSettings {
|
|||
public boolean isSoftDeleteEnabled() {
|
||||
return softDeleteEnabled;
|
||||
}
|
||||
|
||||
private void setSoftDeleteRetentionOperations(long ops) {
|
||||
this.softDeleteRetentionOperations = ops;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of extra operations (i.e. soft-deleted documents) to be kept for recoveries and history purpose.
|
||||
*/
|
||||
public long getSoftDeleteRetentionOperations() {
|
||||
return this.softDeleteRetentionOperations;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.engine;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.LongPoint;
|
||||
import org.apache.lucene.document.NumericDocValuesField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexCommit;
|
||||
|
@ -34,8 +35,10 @@ import org.apache.lucene.index.LiveIndexWriterConfig;
|
|||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.index.SegmentCommitInfo;
|
||||
import org.apache.lucene.index.SegmentInfos;
|
||||
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.search.IndexSearcher;
|
||||
import org.apache.lucene.search.Query;
|
||||
import org.apache.lucene.search.ReferenceManager;
|
||||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.search.SearcherManager;
|
||||
|
@ -2002,8 +2005,8 @@ public class InternalEngine extends Engine {
|
|||
// background merges
|
||||
MergePolicy mergePolicy = config().getMergePolicy();
|
||||
if (softDeleteEnabled) {
|
||||
// TODO: soft-delete retention policy
|
||||
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
|
||||
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy);
|
||||
}
|
||||
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
|
||||
iwc.setSimilarity(engineConfig.getSimilarity());
|
||||
|
@ -2016,6 +2019,20 @@ public class InternalEngine extends Engine {
|
|||
return iwc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
|
||||
*/
|
||||
private Query softDeletesRetentionQuery() {
|
||||
ensureOpen();
|
||||
// TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
|
||||
final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations();
|
||||
// Prefer using the global checkpoint which is persisted on disk than an in-memory value.
|
||||
// If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
|
||||
// then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
|
||||
final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
|
||||
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
|
||||
static final class SearchFactory extends EngineSearcherFactory {
|
||||
private final Engine.Warmer warmer;
|
||||
|
|
|
@ -123,6 +123,7 @@ import org.elasticsearch.index.translog.SnapshotMatchers;
|
|||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogConfig;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.hamcrest.MatcherAssert;
|
||||
import org.hamcrest.Matchers;
|
||||
|
||||
|
@ -178,6 +179,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.isIn;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -251,8 +253,9 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
|
||||
public void testSegments() throws Exception {
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
try (Store store = createStore();
|
||||
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
|
||||
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get))) {
|
||||
List<Segment> segments = engine.segments(false);
|
||||
assertThat(segments.isEmpty(), equalTo(true));
|
||||
assertThat(engine.segmentsStats(false).getCount(), equalTo(0L));
|
||||
|
@ -324,6 +327,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
|
||||
engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get()));
|
||||
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
|
||||
engine.getTranslog().sync();
|
||||
engine.refresh("test");
|
||||
|
||||
segments = engine.segments(false);
|
||||
|
@ -1279,9 +1284,13 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
|
||||
public void testForceMerge() throws IOException {
|
||||
public void testForceMergeWithoutSoftDeletes() throws IOException {
|
||||
Settings settings = Settings.builder()
|
||||
.put(defaultSettings.getSettings())
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
|
||||
try (Store store = createStore();
|
||||
Engine engine = createEngine(config(defaultSettings, store, createTempDir(),
|
||||
Engine engine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(),
|
||||
new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP
|
||||
int numDocs = randomIntBetween(10, 100);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
|
@ -1322,6 +1331,66 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testForceMergeWithSoftDeletesRetention() throws Exception {
|
||||
final long retainedExtraOps = randomLongBetween(0, 10);
|
||||
Settings.Builder settings = Settings.builder()
|
||||
.put(defaultSettings.getSettings())
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps);
|
||||
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
|
||||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
final MapperService mapperService = createMapperService("test");
|
||||
final Set<String> liveDocs = new HashSet<>();
|
||||
try (Store store = createStore();
|
||||
Engine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) {
|
||||
int numDocs = scaledRandomIntBetween(10, 100);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
|
||||
engine.index(indexForDoc(doc));
|
||||
liveDocs.add(doc.id());
|
||||
}
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
|
||||
if (randomBoolean()) {
|
||||
engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
|
||||
liveDocs.remove(doc.id());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
engine.index(indexForDoc(doc));
|
||||
liveDocs.add(doc.id());
|
||||
}
|
||||
}
|
||||
long localCheckpoint = engine.getLocalCheckpointTracker().getCheckpoint();
|
||||
globalCheckpoint.set(randomLongBetween(0, localCheckpoint));
|
||||
engine.getTranslog().sync();
|
||||
engine.forceMerge(true, 1, false, false, false);
|
||||
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
|
||||
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
|
||||
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
|
||||
for (long seqno = 0; seqno <= localCheckpoint; seqno++) {
|
||||
long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps;
|
||||
String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]";
|
||||
if (seqno < keptIndex) {
|
||||
Translog.Operation op = ops.get(seqno);
|
||||
if (op != null) {
|
||||
assertThat(op, instanceOf(Translog.Index.class));
|
||||
assertThat(msg, ((Translog.Index) op).id(), isIn(liveDocs));
|
||||
}
|
||||
} else {
|
||||
assertThat(msg, ops.get(seqno), notNullValue());
|
||||
}
|
||||
}
|
||||
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
|
||||
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
|
||||
globalCheckpoint.set(localCheckpoint);
|
||||
engine.getTranslog().sync();
|
||||
engine.forceMerge(true, 1, false, false, false);
|
||||
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
|
||||
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testForceMergeAndClose() throws IOException, InterruptedException {
|
||||
int numIters = randomIntBetween(2, 10);
|
||||
for (int j = 0; j < numIters; j++) {
|
||||
|
@ -2525,14 +2594,16 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
|
||||
assertThat(indexResult.getVersion(), equalTo(1L));
|
||||
}
|
||||
EngineConfig config = engine.config();
|
||||
assertVisibleCount(engine, numDocs);
|
||||
engine.close();
|
||||
trimUnsafeCommits(engine.config());
|
||||
engine = new InternalEngine(engine.config());
|
||||
engine.skipTranslogRecovery();
|
||||
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
|
||||
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
|
||||
assertThat(topDocs.totalHits, equalTo(0L));
|
||||
trimUnsafeCommits(config);
|
||||
try (InternalEngine engine = new InternalEngine(config)) {
|
||||
engine.skipTranslogRecovery();
|
||||
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
|
||||
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
|
||||
assertThat(topDocs.totalHits, equalTo(0L));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2350,7 +2350,16 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
deleteDoc(indexShard, "_doc", id);
|
||||
indexDoc(indexShard, "_doc", id);
|
||||
}
|
||||
|
||||
// Need to update and sync the global checkpoint as the soft-deletes retention MergePolicy depends on it.
|
||||
if (indexShard.indexSettings.isSoftDeleteEnabled()) {
|
||||
if (indexShard.routingEntry().primary()) {
|
||||
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
|
||||
indexShard.getLocalCheckpoint());
|
||||
} else {
|
||||
indexShard.updateGlobalCheckpointOnReplica(indexShard.getLocalCheckpoint(), "test");
|
||||
}
|
||||
indexShard.sync();
|
||||
}
|
||||
// flush the buffered deletes
|
||||
final FlushRequest flushRequest = new FlushRequest();
|
||||
flushRequest.force(false);
|
||||
|
@ -2910,6 +2919,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
|
||||
// Deleting a doc causes its memory to be freed from the breaker
|
||||
deleteDoc(primary, "_doc", "0");
|
||||
primary.sync(); // need to sync global checkpoint as the soft-deletes retention MergePolicy depends on it.
|
||||
primary.refresh("force refresh");
|
||||
|
||||
ss = primary.segmentStats(randomBoolean());
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
|
@ -50,6 +51,7 @@ import org.elasticsearch.index.VersionType;
|
|||
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.IndicesRequestCache;
|
||||
|
@ -69,6 +71,7 @@ import java.util.Collections;
|
|||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -1005,10 +1008,15 @@ public class IndexStatsIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
public void testFilterCacheStats() throws Exception {
|
||||
assertAcked(prepareCreate("index").setSettings(Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build()).get());
|
||||
indexRandom(true,
|
||||
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
|
||||
assertAcked(prepareCreate("index").setSettings(settings).get());
|
||||
indexRandom(false, true,
|
||||
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
|
||||
client().prepareIndex("index", "type", "2").setSource("foo", "baz"));
|
||||
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
|
||||
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
|
||||
}
|
||||
refresh();
|
||||
ensureGreen();
|
||||
|
||||
IndicesStatsResponse response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
|
||||
|
@ -1039,6 +1047,9 @@ public class IndexStatsIT extends ESIntegTestCase {
|
|||
|
||||
assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "1").get().getResult());
|
||||
assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "2").get().getResult());
|
||||
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
|
||||
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
|
||||
}
|
||||
refresh();
|
||||
response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
|
||||
assertCumulativeQueryCacheStats(response);
|
||||
|
@ -1172,4 +1183,21 @@ public class IndexStatsIT extends ESIntegTestCase {
|
|||
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Persist the global checkpoint on all shards of the given index into disk.
|
||||
* This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value.
|
||||
*/
|
||||
private void persistGlobalCheckpoint(String index) throws Exception {
|
||||
final Set<String> nodes = internalCluster().nodesInclude(index);
|
||||
for (String node : nodes) {
|
||||
final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node);
|
||||
for (IndexService indexService : indexServices) {
|
||||
for (IndexShard indexShard : indexService) {
|
||||
indexShard.sync();
|
||||
assertThat(indexShard.getLastSyncedGlobalCheckpoint(), equalTo(indexShard.getGlobalCheckpoint()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,8 +105,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.ToLongBiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.shuffle;
|
||||
|
@ -114,6 +116,8 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
|||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
|
||||
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public abstract class EngineTestCase extends ESTestCase {
|
||||
|
||||
|
@ -787,12 +791,22 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
translogOps.put(op.seqNo(), op);
|
||||
}
|
||||
}
|
||||
final List<Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper);
|
||||
for (Translog.Operation luceneOp : luceneOps) {
|
||||
Translog.Operation translogOp = translogOps.get(luceneOp.seqNo());
|
||||
if (translogOp == null) {
|
||||
continue;
|
||||
final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper).stream()
|
||||
.collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
|
||||
final long globalCheckpoint = engine.getTranslog().getLastSyncedGlobalCheckpoint();
|
||||
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
|
||||
final long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo();
|
||||
for (Translog.Operation translogOp : translogOps.values()) {
|
||||
final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo());
|
||||
if (luceneOp == null) {
|
||||
if (globalCheckpoint + 1 - retainedOps <= translogOp.seqNo() && translogOp.seqNo() <= maxSeqNo) {
|
||||
fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " +
|
||||
"retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]");
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
assertThat(luceneOp, notNullValue());
|
||||
assertThat(luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm()));
|
||||
assertThat(luceneOp.opType(), equalTo(translogOp.opType()));
|
||||
if (luceneOp.opType() == Translog.Operation.Type.INDEX) {
|
||||
|
|
Loading…
Reference in New Issue