Make TranslogDeletionPolicy abstract for extension (#1456)

As part of the commit 2ebd0e04, we added a new method to the EnginePlugin to provide a 
custom TranslogDeletionPolicy. This commit makes minTranslogGenRequired method 
abstract in this class for implementation by child classes. The default implementation 
is provided by DefaultTranslogDeletionPolicy.

Signed-off-by: Rabi Panda <adnapibar@gmail.com>
This commit is contained in:
Rabi Panda 2021-11-01 08:25:30 -07:00 committed by GitHub
parent f6115ae160
commit d45f5dfcab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 126 additions and 65 deletions

View File

@ -106,6 +106,7 @@ import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogCorruptedException;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;
@ -233,7 +234,7 @@ public class InternalEngine extends Engine {
if (customTranslogDeletionPolicy != null) {
translogDeletionPolicy = customTranslogDeletionPolicy;
} else {
translogDeletionPolicy = new TranslogDeletionPolicy(
translogDeletionPolicy = new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()

View File

@ -46,6 +46,7 @@ import org.opensearch.index.shard.DocsStats;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import java.io.IOException;
@ -165,7 +166,7 @@ public final class NoOpEngine extends ReadOnlyEngine {
}
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(

View File

@ -51,6 +51,7 @@ import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;
@ -238,7 +239,7 @@ public class ReadOnlyEngine extends Engine {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
config.getIndexSettings().getTranslogRetentionTotalFiles()

View File

@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.translog;
import java.io.IOException;
import java.util.List;
/**
* Default implementation for the {@link TranslogDeletionPolicy}. Plugins can override the default behaviour
* via the {@link org.opensearch.plugins.EnginePlugin#getCustomTranslogDeletionPolicyFactory()}.
*
* The default policy uses total number, size in bytes and maximum age for files.
*/
public class DefaultTranslogDeletionPolicy extends TranslogDeletionPolicy {
private long retentionSizeInBytes;
private long retentionAgeInMillis;
private int retentionTotalFiles;
public DefaultTranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
super();
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
this.retentionTotalFiles = retentionTotalFiles;
}
@Override
public synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByLocks = getMinTranslogGenRequiredByLocks();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
minByAgeAndSize = Long.MAX_VALUE;
} else {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
}
@Override
public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}
@Override
public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}
@Override
protected synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
this.retentionTotalFiles = retentionTotalFiles;
}
}

View File

@ -44,12 +44,12 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class TranslogDeletionPolicy {
public abstract class TranslogDeletionPolicy {
private final Map<Object, RuntimeException> openTranslogRef;
public void assertNoOpenTranslogRefs() {
if (openTranslogRef.isEmpty() == false) {
if (openTranslogRef != null && openTranslogRef.isEmpty() == false) {
AssertionError e = new AssertionError("not all translog generations have been released");
openTranslogRef.values().forEach(e::addSuppressed);
throw e;
@ -63,22 +63,7 @@ public class TranslogDeletionPolicy {
private final Map<Long, Counter> translogRefCounts = new HashMap<>();
private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
private long retentionSizeInBytes;
private long retentionAgeInMillis;
private int retentionTotalFiles;
/**
* @deprecated EXPERT: this variable is specific to CCR and will be moved to a plugin in the next release
*/
@Deprecated
private boolean shouldPruneTranslogByRetentionLease;
public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
this.retentionTotalFiles = retentionTotalFiles;
public TranslogDeletionPolicy() {
if (Assertions.ENABLED) {
openTranslogRef = new ConcurrentHashMap<>();
} else {
@ -100,17 +85,11 @@ public class TranslogDeletionPolicy {
this.localCheckpointOfSafeCommit = newCheckpoint;
}
public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}
public abstract void setRetentionSizeInBytes(long bytes);
public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}
public abstract void setRetentionAgeInMillis(long ageInMillis);
synchronized void setRetentionTotalFiles(int retentionTotalFiles) {
this.retentionTotalFiles = retentionTotalFiles;
}
protected abstract void setRetentionTotalFiles(int retentionTotalFiles);
/**
* acquires the basis generation for a new snapshot. Any translog generation above, and including, the returned generation
@ -165,22 +144,9 @@ public class TranslogDeletionPolicy {
* @param readers current translog readers
* @param writer current translog writer
*/
synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByLocks = getMinTranslogGenRequiredByLocks();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
minByAgeAndSize = Long.MAX_VALUE;
} else {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks);
}
public abstract long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException;
static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
public static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
if (retentionSizeInBytes >= 0) {
long totalSize = writer.sizeInBytes();
long minGen = writer.getGeneration();
@ -195,7 +161,7 @@ public class TranslogDeletionPolicy {
}
}
static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
public static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
throws IOException {
if (maxRetentionAgeInMillis >= 0) {
for (TranslogReader reader : readers) {
@ -209,7 +175,7 @@ public class TranslogDeletionPolicy {
}
}
static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, TranslogWriter writer, final int maxTotalFiles) {
public static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, TranslogWriter writer, final int maxTotalFiles) {
long minGen = writer.generation;
int totalFiles = 1; // for the current writer
for (int i = readers.size() - 1; i >= 0 && totalFiles < maxTotalFiles; i--) {
@ -223,7 +189,7 @@ public class TranslogDeletionPolicy {
return System.currentTimeMillis();
}
private long getMinTranslogGenRequiredByLocks() {
protected long getMinTranslogGenRequiredByLocks() {
return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
}

View File

@ -172,4 +172,12 @@ public class TranslogReader extends BaseTranslogReader implements Closeable {
throw new AlreadyClosedException(toString() + " is already closed");
}
}
public long getMinSeqNo() {
return checkpoint.minSeqNo;
}
public long getMaxSeqNo() {
return checkpoint.maxSeqNo;
}
}

View File

@ -193,13 +193,13 @@ public class TruncateTranslogAction {
);
long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id());
// We open translog to check for corruption, do not clean anything.
final TranslogDeletionPolicy retainAllTranslogPolicy = new TranslogDeletionPolicy(
final TranslogDeletionPolicy retainAllTranslogPolicy = new DefaultTranslogDeletionPolicy(
Long.MAX_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE
) {
@Override
long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) {
public long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) {
long minGen = writer.generation;
for (TranslogReader reader : readers) {
minGen = Math.min(reader.generation, minGen);

View File

@ -17,11 +17,14 @@ import org.opensearch.index.codec.CodecService;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogReader;
import org.opensearch.index.translog.TranslogWriter;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -136,11 +139,27 @@ public class EngineConfigFactoryTests extends OpenSearchTestCase {
private static class CustomTranslogDeletionPolicy extends TranslogDeletionPolicy {
public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier<RetentionLeases> retentionLeasesSupplier) {
super(
indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis(),
indexSettings.getTranslogRetentionTotalFiles()
);
super();
}
@Override
public void setRetentionSizeInBytes(long bytes) {
}
@Override
public void setRetentionAgeInMillis(long ageInMillis) {
}
@Override
protected void setRetentionTotalFiles(int retentionTotalFiles) {
}
@Override
public long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
return 0;
}
}
}

View File

@ -144,11 +144,11 @@ import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardUtils;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.test.IndexSettingsModule;
@ -3796,7 +3796,7 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testEngineCreationWithCustomTranslogDeletePolicy() throws IOException {
class CustomTranslogDeletionPolicy extends TranslogDeletionPolicy {
class CustomTranslogDeletionPolicy extends DefaultTranslogDeletionPolicy {
public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier<RetentionLeases> retentionLeasesSupplier) {
super(
indexSettings.getTranslogRetentionSize().getBytes(),

View File

@ -160,7 +160,7 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase {
List<BaseTranslogReader> allGens = new ArrayList<>(readersAndWriter.v1());
allGens.add(readersAndWriter.v2());
try {
TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE);
DefaultTranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE);
int selectedReader = randomIntBetween(0, allGens.size() - 1);
final long selectedGenerationByAge = allGens.get(selectedReader).generation;
long maxAge = now - allGens.get(selectedReader).getLastModifiedTime();
@ -275,7 +275,7 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase {
return new Tuple<>(readers, writer);
}
private static class MockDeletionPolicy extends TranslogDeletionPolicy {
private static class MockDeletionPolicy extends DefaultTranslogDeletionPolicy {
long now;

View File

@ -1538,7 +1538,7 @@ public class TranslogTests extends OpenSearchTestCase {
Translog translog = new Translog(
config,
translogUUID,
new TranslogDeletionPolicy(-1, -1, 0),
new DefaultTranslogDeletionPolicy(-1, -1, 0),
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
persistedSeqNos::add
@ -1653,7 +1653,7 @@ public class TranslogTests extends OpenSearchTestCase {
Translog translog = new Translog(
config,
translogUUID,
new TranslogDeletionPolicy(-1, -1, 0),
new DefaultTranslogDeletionPolicy(-1, -1, 0),
() -> SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm::get,
persistedSeqNos::add
@ -2809,7 +2809,7 @@ public class TranslogTests extends OpenSearchTestCase {
// engine blows up, after committing the above generation
translog.close();
TranslogConfig config = translog.getConfig();
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
final TranslogDeletionPolicy deletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
translog = new Translog(
config,
@ -2889,7 +2889,7 @@ public class TranslogTests extends OpenSearchTestCase {
// expected...
}
}
final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0);
final TranslogDeletionPolicy deletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(

View File

@ -38,7 +38,7 @@ import org.opensearch.index.IndexSettings;
public class TranslogDeletionPolicies {
public static TranslogDeletionPolicy createTranslogDeletionPolicy() {
return new TranslogDeletionPolicy(
return new DefaultTranslogDeletionPolicy(
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis(),
IndexSettings.INDEX_TRANSLOG_RETENTION_TOTAL_FILES_SETTING.getDefault(Settings.EMPTY)
@ -46,7 +46,7 @@ public class TranslogDeletionPolicies {
}
public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) {
return new TranslogDeletionPolicy(
return new DefaultTranslogDeletionPolicy(
indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis(),
indexSettings.getTranslogRetentionTotalFiles()