Support for translog pruning based on retention leases (#1038)

* Support for translog pruning based on retention leases

Signed-off-by: Sai Kumar <karanas@amazon.com>

* Addressed CR Comments

Signed-off-by: Sai Kumar <karanas@amazon.com>

* Addressed test case issue

Signed-off-by: Sai Kumar <karanas@amazon.com>
This commit is contained in:
Sai 2021-09-08 16:23:00 +05:30 committed by GitHub
parent 919c5e05aa
commit a9e1913bd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 220 additions and 11 deletions

View File

@ -156,6 +156,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_FLUSH_AFTER_MERGE_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING, IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,

View File

@ -260,6 +260,13 @@ public final class IndexSettings {
settings -> Boolean.toString(IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(LegacyESVersion.V_7_0_0)), settings -> Boolean.toString(IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).onOrAfter(LegacyESVersion.V_7_0_0)),
Property.IndexScope, Property.Final); Property.IndexScope, Property.Final);
/**
* Specifies if the index translog should prune based on retention leases.
*/
public static final Setting<Boolean> INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING =
Setting.boolSetting("index.translog.retention_lease.pruning.enabled", false,
Property.IndexScope, Property.Dynamic);
/** /**
* Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted * Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted
* documents increases the chance of operation-based recoveries and allows querying a longer history of documents. * documents increases the chance of operation-based recoveries and allows querying a longer history of documents.
@ -286,9 +293,11 @@ public final class IndexSettings {
* the chance of ops based recoveries for indices with soft-deletes disabled. * the chance of ops based recoveries for indices with soft-deletes disabled.
* This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4). * This setting will be ignored if soft-deletes is used in peer recoveries (default in 7.4).
**/ **/
private static final ByteSizeValue DEFAULT_TRANSLOG_RETENTION_SIZE = new ByteSizeValue(512, ByteSizeUnit.MB);
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING = public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", Setting.byteSizeSetting("index.translog.retention.size",
settings -> shouldDisableTranslogRetention(settings) ? "-1" : "512MB", settings -> DEFAULT_TRANSLOG_RETENTION_SIZE.getStringRep(),
Property.Dynamic, Property.IndexScope); Property.Dynamic, Property.IndexScope);
/** /**
@ -389,6 +398,7 @@ public final class IndexSettings {
private final IndexScopedSettings scopedSettings; private final IndexScopedSettings scopedSettings;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private final boolean softDeleteEnabled; private final boolean softDeleteEnabled;
private volatile boolean translogPruningByRetentionLease;
private volatile long softDeleteRetentionOperations; private volatile long softDeleteRetentionOperations;
private volatile long retentionLeaseMillis; private volatile long retentionLeaseMillis;
@ -525,6 +535,9 @@ public final class IndexSettings {
mergeSchedulerConfig = new MergeSchedulerConfig(this); mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = version.onOrAfter(LegacyESVersion.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING); softDeleteEnabled = version.onOrAfter(LegacyESVersion.V_6_5_0) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
translogPruningByRetentionLease = version.onOrAfter(Version.V_1_1_0) &&
softDeleteEnabled &&
scopedSettings.get(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING);
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING); softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING).millis(); retentionLeaseMillis = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING).millis();
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
@ -593,6 +606,8 @@ public final class IndexSettings {
this::setGenerationThresholdSize); this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize); scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING,
this::setTranslogPruningByRetentionLease);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners); scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset); scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset);
@ -623,8 +638,14 @@ public final class IndexSettings {
this.flushAfterMergeThresholdSize = byteSizeValue; this.flushAfterMergeThresholdSize = byteSizeValue;
} }
private void setTranslogPruningByRetentionLease(boolean enabled) {
this.translogPruningByRetentionLease = INDEX_SOFT_DELETES_SETTING.get(settings) && enabled;
}
private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) { private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
if (shouldDisableTranslogRetention(settings) && byteSizeValue.getBytes() >= 0) { if (shouldDisableTranslogRetention(settings) &&
!shouldPruneTranslogByRetentionLease(settings) &&
byteSizeValue.getBytes() >= 0) {
// ignore the translog retention settings if soft-deletes enabled // ignore the translog retention settings if soft-deletes enabled
this.translogRetentionSize = new ByteSizeValue(-1); this.translogRetentionSize = new ByteSizeValue(-1);
} else { } else {
@ -826,7 +847,12 @@ public final class IndexSettings {
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries * Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/ */
public ByteSizeValue getTranslogRetentionSize() { public ByteSizeValue getTranslogRetentionSize() {
assert shouldDisableTranslogRetention(settings) == false || translogRetentionSize.getBytes() == -1L : translogRetentionSize; if(shouldDisableTranslogRetention(settings) && !shouldPruneTranslogByRetentionLease(settings)) {
return new ByteSizeValue(-1);
}
else if(shouldPruneTranslogByRetentionLease(settings) && translogRetentionSize.getBytes() == -1) {
return DEFAULT_TRANSLOG_RETENTION_SIZE;
}
return translogRetentionSize; return translogRetentionSize;
} }
@ -1071,6 +1097,20 @@ public final class IndexSettings {
this.requiredPipeline = requiredPipeline; this.requiredPipeline = requiredPipeline;
} }
/**
* Returns <code>true</code> if translog ops should be pruned based on retention lease
*/
public boolean shouldPruneTranslogByRetentionLease() {
return translogPruningByRetentionLease;
}
/**
* Returns <code>true</code> if translog ops should be pruned based on retention lease
*/
public static boolean shouldPruneTranslogByRetentionLease(Settings settings) {
return INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.get(settings);
}
/** /**
* Returns <code>true</code> if soft-delete is enabled. * Returns <code>true</code> if soft-delete is enabled.
*/ */

View File

@ -1849,6 +1849,11 @@ public abstract class Engine implements Closeable {
} }
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
onSettingsChanged(translogRetentionAge, translogRetentionSize, softDeletesRetentionOps, false);
}
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize,
long softDeletesRetentionOps, boolean translogPruningByRetentionLease) {
} }

View File

@ -227,7 +227,8 @@ public class InternalEngine extends Engine {
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(), engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles() engineConfig.getIndexSettings().getTranslogRetentionTotalFiles(),
engineConfig.retentionLeasesSupplier()
); );
store.incRef(); store.incRef();
IndexWriter writer = null; IndexWriter writer = null;
@ -2572,7 +2573,8 @@ public class InternalEngine extends Engine {
} }
@Override @Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize,
long softDeletesRetentionOps, boolean translogPruningByRetentionLease) {
mergeScheduler.refreshConfig(); mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletes(); maybePruneDeletes();
@ -2585,6 +2587,7 @@ public class InternalEngine extends Engine {
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
translogDeletionPolicy.shouldPruneTranslogByRetentionLease(translogPruningByRetentionLease);
softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps); softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps);
} }

View File

@ -1961,8 +1961,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery; final boolean disableTranslogRetention = indexSettings.isSoftDeleteEnabled() && useRetentionLeasesInPeerRecovery;
engineOrNull.onSettingsChanged( engineOrNull.onSettingsChanged(
disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(), disableTranslogRetention ? TimeValue.MINUS_ONE : indexSettings.getTranslogRetentionAge(),
disableTranslogRetention ? new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(), disableTranslogRetention && !indexSettings.shouldPruneTranslogByRetentionLease() ?
indexSettings.getSoftDeleteRetentionOperations() new ByteSizeValue(-1) : indexSettings.getTranslogRetentionSize(),
indexSettings.getSoftDeleteRetentionOperations(),
indexSettings.shouldPruneTranslogByRetentionLease()
); );
} }
} }

View File

@ -35,6 +35,8 @@ package org.opensearch.index.translog;
import org.apache.lucene.util.Counter; import org.apache.lucene.util.Counter;
import org.opensearch.Assertions; import org.opensearch.Assertions;
import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasable;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.seqno.SequenceNumbers;
import java.io.IOException; import java.io.IOException;
@ -43,10 +45,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class TranslogDeletionPolicy { public class TranslogDeletionPolicy {
private final Map<Object, RuntimeException> openTranslogRef; private final Map<Object, RuntimeException> openTranslogRef;
private Supplier<RetentionLeases> retentionLeasesSupplier;
public void assertNoOpenTranslogRefs() { public void assertNoOpenTranslogRefs() {
if (openTranslogRef.isEmpty() == false) { if (openTranslogRef.isEmpty() == false) {
@ -69,6 +73,8 @@ public class TranslogDeletionPolicy {
private int retentionTotalFiles; private int retentionTotalFiles;
private boolean shouldPruneTranslogByRetentionLease;
public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) { public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
this.retentionSizeInBytes = retentionSizeInBytes; this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis; this.retentionAgeInMillis = retentionAgeInMillis;
@ -80,6 +86,12 @@ public class TranslogDeletionPolicy {
} }
} }
public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles,
Supplier<RetentionLeases> retentionLeasesSupplier) {
this(retentionSizeInBytes, retentionAgeInMillis, retentionTotalFiles);
this.retentionLeasesSupplier = retentionLeasesSupplier;
}
public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) { public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) {
if (newCheckpoint < this.localCheckpointOfSafeCommit) { if (newCheckpoint < this.localCheckpointOfSafeCommit) {
throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " + throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " +
@ -100,6 +112,10 @@ public class TranslogDeletionPolicy {
this.retentionTotalFiles = retentionTotalFiles; this.retentionTotalFiles = retentionTotalFiles;
} }
public synchronized void shouldPruneTranslogByRetentionLease(boolean translogPruneByRetentionLease) {
this.shouldPruneTranslogByRetentionLease = translogPruneByRetentionLease;
}
/** /**
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation * acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
* will not be deleted until the returned {@link Releasable} is closed. * will not be deleted until the returned {@link Releasable} is closed.
@ -157,6 +173,12 @@ public class TranslogDeletionPolicy {
long minByLocks = getMinTranslogGenRequiredByLocks(); long minByLocks = getMinTranslogGenRequiredByLocks();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime()); long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes); long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
long minByRetentionLeasesAndSize = Long.MAX_VALUE;
if(shouldPruneTranslogByRetentionLease) {
// If retention size is specified, size takes precedence.
long minByRetentionLeases = getMinTranslogGenByRetentionLease(readers, writer, retentionLeasesSupplier);
minByRetentionLeasesAndSize = Math.max(minBySize, minByRetentionLeases);
}
final long minByAgeAndSize; final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) { if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled; // both size and age are disabled;
@ -165,7 +187,28 @@ public class TranslogDeletionPolicy {
minByAgeAndSize = Math.max(minByAge, minBySize); minByAgeAndSize = Math.max(minByAge, minBySize);
} }
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks); long minByTranslogGenSettings = Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
return Math.min(minByTranslogGenSettings, minByRetentionLeasesAndSize);
}
static long getMinTranslogGenByRetentionLease(List<TranslogReader> readers, TranslogWriter writer,
Supplier<RetentionLeases> retentionLeasesSupplier) {
long minGen = writer.getGeneration();
final long minimumRetainingSequenceNumber = retentionLeasesSupplier.get()
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);
for (int i = readers.size() - 1; i >= 0; i--) {
final TranslogReader reader = readers.get(i);
if(reader.getCheckpoint().minSeqNo <= minimumRetainingSequenceNumber &&
reader.getCheckpoint().maxSeqNo >= minimumRetainingSequenceNumber) {
minGen = Math.min(minGen, reader.getGeneration());
}
}
return minGen;
} }
static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) { static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {

View File

@ -655,4 +655,35 @@ public class IndexSettingsTests extends OpenSearchTestCase {
assertThat(indexSettings.getTranslogRetentionAge(), equalTo(ageSetting)); assertThat(indexSettings.getTranslogRetentionAge(), equalTo(ageSetting));
assertThat(indexSettings.getTranslogRetentionSize(), equalTo(sizeSetting)); assertThat(indexSettings.getTranslogRetentionSize(), equalTo(sizeSetting));
} }
public void testTranslogPruningSettingsWithSoftDeletesEnabled() {
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_1_1_0);
ByteSizeValue retentionSize = new ByteSizeValue(512, ByteSizeUnit.MB);
boolean translogPruningEnabled = randomBoolean();
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.getKey(), translogPruningEnabled);
IndexMetadata metadata = newIndexMeta("index", settings.build());
IndexSettings indexSettings = new IndexSettings(metadata, Settings.EMPTY);
if(translogPruningEnabled) {
assertTrue(indexSettings.shouldPruneTranslogByRetentionLease());
assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(retentionSize.getBytes()));
} else {
assertFalse(indexSettings.shouldPruneTranslogByRetentionLease());
assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(-1L));
}
}
public void testTranslogPruningSettingsWithSoftDeletesDisabled() {
Settings.Builder settings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT);
boolean translogPruningEnabled = randomBoolean();
ByteSizeValue retentionSize = new ByteSizeValue(512, ByteSizeUnit.MB);
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_LEASE_PRUNING_ENABLED_SETTING.getKey(), translogPruningEnabled);
IndexMetadata metadata = newIndexMeta("index", settings.build());
IndexSettings indexSettings = new IndexSettings(metadata, Settings.EMPTY);
assertFalse(indexSettings.shouldPruneTranslogByRetentionLease());
assertThat(indexSettings.getTranslogRetentionSize().getBytes(), equalTo(retentionSize.getBytes()));
}
} }

View File

@ -40,6 +40,8 @@ import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.BigArrays;
import org.opensearch.core.internal.io.IOUtils; import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.OpenSearchTestCase;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -49,7 +51,9 @@ import java.nio.channels.FileChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.function.Supplier;
import static java.lang.Math.min; import static java.lang.Math.min;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -57,6 +61,8 @@ import static org.hamcrest.Matchers.equalTo;
public class TranslogDeletionPolicyTests extends OpenSearchTestCase { public class TranslogDeletionPolicyTests extends OpenSearchTestCase {
private static long TOTAL_OPS_IN_GEN = 10L;
public void testNoRetention() throws IOException { public void testNoRetention() throws IOException {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now); Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
@ -90,6 +96,33 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase {
} }
} }
public void testWithRetentionLease() throws IOException {
long now = System.currentTimeMillis();
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
Supplier<RetentionLeases> retentionLeasesSupplier = createRetentionLeases(now, 0L,
readersAndWriter.v1().size() * TOTAL_OPS_IN_GEN - 1);
try {
final long minimumRetainingSequenceNumber = retentionLeasesSupplier.get()
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);
final long selectedReader = (minimumRetainingSequenceNumber/TOTAL_OPS_IN_GEN);
final long selectedGen = allGens.get((int) selectedReader).generation;
assertThat(TranslogDeletionPolicy
.getMinTranslogGenByRetentionLease(readersAndWriter.v1(), readersAndWriter.v2(), retentionLeasesSupplier),
equalTo(selectedGen));
} finally {
IOUtils.close(readersAndWriter.v1());
IOUtils.close(readersAndWriter.v2());
}
}
public void testAgeRetention() throws IOException { public void testAgeRetention() throws IOException {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now); Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
@ -128,6 +161,38 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase {
} }
} }
public void testBySizeAndRetentionLease() throws Exception {
long now = System.currentTimeMillis();
Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
int selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGeneration = allGens.get(selectedReader).generation;
// Retaining seqno is part of lower gen
long size = allGens.stream().skip(selectedReader).map(BaseTranslogReader::sizeInBytes).reduce(Long::sum).get();
Supplier<RetentionLeases> retentionLeasesSupplier = createRetentionLeases(now, 0L,
selectedGeneration * TOTAL_OPS_IN_GEN - 1);
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, size, Integer.MAX_VALUE,
Integer.MAX_VALUE, retentionLeasesSupplier);
assertThat(deletionPolicy
.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(selectedGeneration));
assertThat(TranslogDeletionPolicy
.getMinTranslogGenByAge(readersAndWriter.v1(), readersAndWriter.v2(), 100L, System.currentTimeMillis()),
equalTo(readersAndWriter.v2().generation));
// Retention lease is part of higher gen
retentionLeasesSupplier = createRetentionLeases(now, selectedGeneration * TOTAL_OPS_IN_GEN,
allGens.size() * TOTAL_OPS_IN_GEN + TOTAL_OPS_IN_GEN - 1);
deletionPolicy = new MockDeletionPolicy(now, size, Long.MIN_VALUE,
Integer.MAX_VALUE, retentionLeasesSupplier);
assertThat(deletionPolicy.minTranslogGenRequired(readersAndWriter.v1(), readersAndWriter.v2()), equalTo(selectedGeneration));
} finally {
IOUtils.close(readersAndWriter.v1());
IOUtils.close(readersAndWriter.v2());
}
}
/** /**
* Tests that age trumps size but recovery trumps both. * Tests that age trumps size but recovery trumps both.
*/ */
@ -207,19 +272,32 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase {
() -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, BigArrays.NON_RECYCLING_INSTANCE); () -> 1L, randomNonNegativeLong(), new TragicExceptionHolder(), seqNo -> {}, BigArrays.NON_RECYCLING_INSTANCE);
writer = Mockito.spy(writer); writer = Mockito.spy(writer);
Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime();
byte[] bytes = new byte[4]; byte[] bytes = new byte[4];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
for (int ops = randomIntBetween(0, 20); ops > 0; ops--) { final long startSeqNo = (gen - 1) * TOTAL_OPS_IN_GEN;
final long endSeqNo = startSeqNo + TOTAL_OPS_IN_GEN - 1;
for (long ops = endSeqNo; ops >= startSeqNo; ops--) {
out.reset(bytes); out.reset(bytes);
out.writeInt(ops); out.writeInt((int) ops);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops); writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops);
} }
} }
return new Tuple<>(readers, writer); return new Tuple<>(readers, writer);
} }
private Supplier<RetentionLeases> createRetentionLeases(final Long now, final Long lowestSeqNo,
final Long highestSeqNo) throws IOException {
LinkedList<RetentionLease> leases = new LinkedList<>();
final int numberOfLeases = randomIntBetween(1, 5);
for(int i=0 ;i<numberOfLeases; i++) {
long seqNo = randomLongBetween(lowestSeqNo, highestSeqNo);
leases.add(new RetentionLease("test_"+i, seqNo,
now - (numberOfLeases - i) * 1000, "test"));
}
return () -> new RetentionLeases(1L, 1L, leases);
}
private static class MockDeletionPolicy extends TranslogDeletionPolicy { private static class MockDeletionPolicy extends TranslogDeletionPolicy {
long now; long now;
@ -229,6 +307,12 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase {
this.now = now; this.now = now;
} }
MockDeletionPolicy(long now, long retentionSizeInBytes, long maxRetentionAgeInMillis,
int maxRetentionTotalFiles, Supplier<RetentionLeases> retentionLeasesSupplier) {
super(retentionSizeInBytes, maxRetentionAgeInMillis, maxRetentionTotalFiles, retentionLeasesSupplier);
this.now = now;
}
@Override @Override
protected long currentTime() { protected long currentTime() {
return now; return now;