From a9e1913bd5c9ab4906e9cac2c189ed5f797a2a53 Mon Sep 17 00:00:00 2001 From: Sai Date: Wed, 8 Sep 2021 16:23:00 +0530 Subject: [PATCH] Support for translog pruning based on retention leases (#1038) * Support for translog pruning based on retention leases Signed-off-by: Sai Kumar * Addressed CR Comments Signed-off-by: Sai Kumar * Addressed test case issue Signed-off-by: Sai Kumar --- .../common/settings/IndexScopedSettings.java | 1 + .../org/opensearch/index/IndexSettings.java | 46 +++++++++- .../org/opensearch/index/engine/Engine.java | 5 ++ .../index/engine/InternalEngine.java | 7 +- .../opensearch/index/shard/IndexShard.java | 6 +- .../translog/TranslogDeletionPolicy.java | 45 +++++++++- .../opensearch/index/IndexSettingsTests.java | 31 +++++++ .../translog/TranslogDeletionPolicyTests.java | 90 ++++++++++++++++++- 8 files changed, 220 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 9e73d6fdd9c..559d0fc6293 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -156,6 +156,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING, + IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 10aaaf821cc..f51acea5185 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -260,6 +260,13 @@ public final class IndexSettings { settings -> Boolean.toString(IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(LegacyESVersion.V_7_0_0)), Property.IndexScope, Property.Final); + /** + * Specifies if the index translog should prune based on retention leases. + */ + public static final Setting INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING = + Setting.boolSetting("index.translog.retention_lease.pruning.enabled", false, + Property.IndexScope, Property.Dynamic); + /** * Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted * documents increases the chance of operation-based recoveries and allows querying a longer history of documents. @@ -286,9 +293,11 @@ public final class IndexSettings { * the chance of ops based recoveries for indices with soft-deletes disabled. * This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4). **/ + private static final ByteSizeValue DEFAULT_TRANSLOG_RETENTION_SIZE = new ByteSizeValue(512, ByteSizeUnit.MB); + public static final Setting INDEX_TRANSLOG_RETENTION_SIZE_SETTING = Setting.byteSizeSetting("index.translog.retention.size", - settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB", + settings -> DEFAULT_TRANSLOG_RETENTION_SIZE.getStringRep(), Property.Dynamic, Property.IndexScope); /** @@ -389,6 +398,7 @@ public final class IndexSettings { private final IndexScopedSettings scopedSettings; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); private final boolean softDeleteEnabled; + private volatile boolean translogPruningByRetentionLease; private volatile long softDeleteRetentionOperations; private volatile long retentionLeaseMillis; @@ -525,6 +535,9 @@ public final class IndexSettings { mergeSchedulerConfig = new MergeSchedulerConfig(this); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); softDeleteEnabled = version.onOrAfter(LegacyESVersion.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING); + translogPruningByRetentionLease = version.onOrAfter(Version.V_1_1_0) && + softDeleteEnabled && + scopedSettings.get(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING); softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING); retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING).millis(); warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); @@ -593,6 +606,8 @@ public final class IndexSettings { this::setGenerationThresholdSize); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING, + this::setTranslogPruningByRetentionLease); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset); @@ -623,8 +638,14 @@ public final class IndexSettings { this.flushAfterMergeThresholdSize = byteSizeValue; } + private void setTranslogPruningByRetentionLease(boolean enabled) { + this.translogPruningByRetentionLease = INDEX_SOFT_DELETES_SETTING.get(settings) && enabled; + } + private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { - if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) { + if (shouldDisableTranslogRetention(settings) && + !shouldPruneTranslogByRetentionLease(settings) && + byteSizeValue.getBytes() >= 0) { // ignore the translog retention settings if soft-deletes enabled this.translogRetentionSize = new ByteSizeValue(-1); } else { @@ -826,7 +847,12 @@ public final class IndexSettings { * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries */ public ByteSizeValue getTranslogRetentionSize() { - assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize; + if(shouldDisableTranslogRetention(settings) && !shouldPruneTranslogByRetentionLease(settings)) { + return new ByteSizeValue(-1); + } + else if(shouldPruneTranslogByRetentionLease(settings) && translogRetentionSize.getBytes() == -1) { + return DEFAULT_TRANSLOG_RETENTION_SIZE; + } return translogRetentionSize; } @@ -1071,6 +1097,20 @@ public final class IndexSettings { this.requiredPipeline = requiredPipeline; } + /** + * Returns true if translog ops should be pruned based on retention lease + */ + public boolean shouldPruneTranslogByRetentionLease() { + return translogPruningByRetentionLease; + } + + /** + * Returns true if translog ops should be pruned based on retention lease + */ + public static boolean shouldPruneTranslogByRetentionLease(Settings settings) { + return INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.get(settings); + } + /** * Returns true if soft-delete is enabled. */ diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index c4be05b779d..a6322b5cca0 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -1849,6 +1849,11 @@ public abstract class Engine implements Closeable { } public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { + onSettingsChanged(translogRetentionAge, translogRetentionSize, softDeletesRetentionOps, false); + } + + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, + long softDeletesRetentionOps, boolean translogPruningByRetentionLease) { } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 4a6b8e21715..50ad79ae27c 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -227,7 +227,8 @@ public class InternalEngine extends Engine { final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), - engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() + engineConfig.getIndexSettings().getTranslogRetentionTotalFiles(), + engineConfig.retentionLeasesSupplier() ); store.incRef(); IndexWriter writer = null; @@ -2572,7 +2573,8 @@ public class InternalEngine extends Engine { } @Override - public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { + public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, + long softDeletesRetentionOps, boolean translogPruningByRetentionLease) { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletes(); @@ -2585,6 +2587,7 @@ public class InternalEngine extends Engine { final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); + translogDeletionPolicy.shouldPruneTranslogByRetentionLease(translogPruningByRetentionLease); softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 75cf919185c..0b3311f872c 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1961,8 +1961,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery; engineOrNull.onSettingsChanged( disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), - disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), - indexSettings.getSoftDeleteRetentionOperations() + disableTranslogRetention && !indexSettings.shouldPruneTranslogByRetentionLease() ? + new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), + indexSettings.getSoftDeleteRetentionOperations(), + indexSettings.shouldPruneTranslogByRetentionLease() ); } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/opensearch/index/translog/TranslogDeletionPolicy.java index 42f3893fd98..02d12f69f22 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogDeletionPolicy.java @@ -35,6 +35,8 @@ package org.opensearch.index.translog; import org.apache.lucene.util.Counter; import org.opensearch.Assertions; import org.opensearch.common.lease.Releasable; +import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.seqno.SequenceNumbers; import java.io.IOException; @@ -43,10 +45,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; public class TranslogDeletionPolicy { private final Map openTranslogRef; + private Supplier retentionLeasesSupplier; public void assertNoOpenTranslogRefs() { if (openTranslogRef.isEmpty() == false) { @@ -69,6 +73,8 @@ public class TranslogDeletionPolicy { private int retentionTotalFiles; + private boolean shouldPruneTranslogByRetentionLease; + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) { this.retentionSizeInBytes = retentionSizeInBytes; this.retentionAgeInMillis = retentionAgeInMillis; @@ -80,6 +86,12 @@ public class TranslogDeletionPolicy { } } + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles, + Supplier retentionLeasesSupplier) { + this(retentionSizeInBytes, retentionAgeInMillis, retentionTotalFiles); + this.retentionLeasesSupplier = retentionLeasesSupplier; + } + public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) { if (newCheckpoint < this.localCheckpointOfSafeCommit) { throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " + @@ -100,6 +112,10 @@ public class TranslogDeletionPolicy { this.retentionTotalFiles = retentionTotalFiles; } + public synchronized void shouldPruneTranslogByRetentionLease(boolean translogPruneByRetentionLease) { + this.shouldPruneTranslogByRetentionLease = translogPruneByRetentionLease; + } + /** * acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation * will not be deleted until the returned {@link Releasable} is closed. @@ -157,6 +173,12 @@ public class TranslogDeletionPolicy { long minByLocks = getMinTranslogGenRequiredByLocks(); long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); + long minByRetentionLeasesAndSize = Long.MAX_VALUE; + if(shouldPruneTranslogByRetentionLease) { + // If retention size is specified, size takes precedence. + long minByRetentionLeases = getMinTranslogGenByRetentionLease(readers, writer, retentionLeasesSupplier); + minByRetentionLeasesAndSize = Math.max(minBySize, minByRetentionLeases); + } final long minByAgeAndSize; if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) { // both size and age are disabled; @@ -165,7 +187,28 @@ public class TranslogDeletionPolicy { minByAgeAndSize = Math.max(minByAge, minBySize); } long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); - return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks); + long minByTranslogGenSettings = Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks); + return Math.min(minByTranslogGenSettings, minByRetentionLeasesAndSize); + } + + static long getMinTranslogGenByRetentionLease(List readers, TranslogWriter writer, + Supplier retentionLeasesSupplier) { + long minGen = writer.getGeneration(); + final long minimumRetainingSequenceNumber = retentionLeasesSupplier.get() + .leases() + .stream() + .mapToLong(RetentionLease::retainingSequenceNumber) + .min() + .orElse(Long.MAX_VALUE); + + for (int i = readers.size() - 1; i >= 0; i--) { + final TranslogReader reader = readers.get(i); + if(reader.getCheckpoint().minSeqNo <= minimumRetainingSequenceNumber && + reader.getCheckpoint().maxSeqNo >= minimumRetainingSequenceNumber) { + minGen = Math.min(minGen, reader.getGeneration()); + } + } + return minGen; } static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index d2c285c06a5..97d85300bb3 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -655,4 +655,35 @@ public class IndexSettingsTests extends OpenSearchTestCase { assertThat(indexSettings.getTranslogRetentionAge(), equalTo(ageSetting)); assertThat(indexSettings.getTranslogRetentionSize(), equalTo(sizeSetting)); } + + public void testTranslogPruningSettingsWithSoftDeletesEnabled() { + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_1_1_0); + + ByteSizeValue retentionSize = new ByteSizeValue(512, ByteSizeUnit.MB); + boolean translogPruningEnabled = randomBoolean(); + settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.getKey(), translogPruningEnabled); + IndexMetadata metadata = newIndexMeta("index", settings.build()); + IndexSettings indexSettings = new IndexSettings(metadata, Settings.EMPTY); + if(translogPruningEnabled) { + assertTrue(indexSettings.shouldPruneTranslogByRetentionLease()); + assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(retentionSize.getBytes())); + } else { + assertFalse(indexSettings.shouldPruneTranslogByRetentionLease()); + assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L)); + } + } + + public void testTranslogPruningSettingsWithSoftDeletesDisabled() { + Settings.Builder settings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT); + boolean translogPruningEnabled = randomBoolean(); + ByteSizeValue retentionSize = new ByteSizeValue(512, ByteSizeUnit.MB); + settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.getKey(), translogPruningEnabled); + IndexMetadata metadata = newIndexMeta("index", settings.build()); + IndexSettings indexSettings = new IndexSettings(metadata, Settings.EMPTY); + assertFalse(indexSettings.shouldPruneTranslogByRetentionLease()); + assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(retentionSize.getBytes())); + } } diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java index 44aceab0445..75bae17a2fe 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java @@ -40,6 +40,8 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.BigArrays; import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.ShardId; import org.opensearch.test.OpenSearchTestCase; import org.mockito.Mockito; @@ -49,7 +51,9 @@ import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.function.Supplier; import static java.lang.Math.min; import static org.hamcrest.Matchers.equalTo; @@ -57,6 +61,8 @@ import static org.hamcrest.Matchers.equalTo; public class TranslogDeletionPolicyTests extends OpenSearchTestCase { + private static long TOTAL_OPS_IN_GEN = 10L; + public void testNoRetention() throws IOException { long now = System.currentTimeMillis(); Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); @@ -90,6 +96,33 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase { } } + public void testWithRetentionLease() throws IOException { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + Supplier retentionLeasesSupplier = createRetentionLeases(now, 0L, + readersAndWriter.v1().size() * TOTAL_OPS_IN_GEN - 1); + try { + final long minimumRetainingSequenceNumber = retentionLeasesSupplier.get() + .leases() + .stream() + .mapToLong(RetentionLease::retainingSequenceNumber) + .min() + .orElse(Long.MAX_VALUE); + + final long selectedReader = (minimumRetainingSequenceNumber/TOTAL_OPS_IN_GEN); + final long selectedGen = allGens.get((int) selectedReader).generation; + assertThat(TranslogDeletionPolicy + .getMinTranslogGenByRetentionLease(readersAndWriter.v1(), readersAndWriter.v2(), retentionLeasesSupplier), + equalTo(selectedGen)); + + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + public void testAgeRetention() throws IOException { long now = System.currentTimeMillis(); Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); @@ -128,6 +161,38 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase { } } + public void testBySizeAndRetentionLease() throws Exception { + long now = System.currentTimeMillis(); + Tuple, TranslogWriter> readersAndWriter = createReadersAndWriter(now); + List allGens = new ArrayList<>(readersAndWriter.v1()); + allGens.add(readersAndWriter.v2()); + try { + int selectedReader = randomIntBetween(0, allGens.size() - 1); + final long selectedGeneration = allGens.get(selectedReader).generation; + // Retaining seqno is part of lower gen + long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get(); + Supplier retentionLeasesSupplier = createRetentionLeases(now, 0L, + selectedGeneration * TOTAL_OPS_IN_GEN - 1); + TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, size, Integer.MAX_VALUE, + Integer.MAX_VALUE, retentionLeasesSupplier); + assertThat(deletionPolicy + .minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(selectedGeneration)); + assertThat(TranslogDeletionPolicy + .getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), 100L, System.currentTimeMillis()), + equalTo(readersAndWriter.v2().generation)); + + // Retention lease is part of higher gen + retentionLeasesSupplier = createRetentionLeases(now, selectedGeneration * TOTAL_OPS_IN_GEN, + allGens.size() * TOTAL_OPS_IN_GEN + TOTAL_OPS_IN_GEN - 1); + deletionPolicy = new MockDeletionPolicy(now, size, Long.MIN_VALUE, + Integer.MAX_VALUE, retentionLeasesSupplier); + assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(selectedGeneration)); + } finally { + IOUtils.close(readersAndWriter.v1()); + IOUtils.close(readersAndWriter.v2()); + } + } + /** * Tests that age trumps size but recovery trumps both. */ @@ -207,19 +272,32 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase { () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, BigArrays.NON_RECYCLING_INSTANCE); writer = Mockito.spy(writer); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); - byte[] bytes = new byte[4]; ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); - for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { + final long startSeqNo = (gen - 1) * TOTAL_OPS_IN_GEN; + final long endSeqNo = startSeqNo + TOTAL_OPS_IN_GEN - 1; + for (long ops = endSeqNo; ops >= startSeqNo; ops--) { out.reset(bytes); - out.writeInt(ops); + out.writeInt((int) ops); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops); } } return new Tuple<>(readers, writer); } + private Supplier createRetentionLeases(final Long now, final Long lowestSeqNo, + final Long highestSeqNo) throws IOException { + LinkedList leases = new LinkedList<>(); + final int numberOfLeases = randomIntBetween(1, 5); + for(int i=0 ;i new RetentionLeases(1L, 1L, leases); + } + private static class MockDeletionPolicy extends TranslogDeletionPolicy { long now; @@ -229,6 +307,12 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase { this.now = now; } + MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis, + int maxRetentionTotalFiles, Supplier retentionLeasesSupplier) { + super(retentionSizeInBytes, maxRetentionAgeInMillis, maxRetentionTotalFiles, retentionLeasesSupplier); + this.now = now; + } + @Override protected long currentTime() { return now;