Add extension point for custom TranslogDeletionPolicy in EnginePlugin. (#1404)

This commit adds a method that can be used to provide a custom TranslogDeletionPolicy
from within plugins that implement the EnginePlugin interface. This enables plugins to
provide a custom deletion policy with the current limitation that only one plugin can
override the policy. An exception will be thrown if more than one plugin overrides the
policy.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
This commit is contained in:
Rabi Panda 2021-10-22 10:06:51 -07:00 committed by Nicholas Walter Knize
parent 8f566123c1
commit c86d765e7c
No known key found for this signature in database
GPG Key ID: 51F0CC22F625308A
7 changed files with 328 additions and 13 deletions

View File

@ -52,6 +52,7 @@ import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;
@ -70,6 +71,7 @@ public final class EngineConfig {
private final ShardId shardId;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
private volatile boolean enableGcDeletes = true;
private final TimeValue flushMergesAfter;
private final String codecName;
@ -145,9 +147,6 @@ public final class EngineConfig {
private final TranslogConfig translogConfig;
/**
* Creates a new {@link org.opensearch.index.engine.EngineConfig}
*/
public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
@ -171,6 +170,61 @@ public final class EngineConfig {
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this(
shardId,
threadPool,
indexSettings,
warmer,
store,
mergePolicy,
analyzer,
similarity,
codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
null,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier
);
}
/**
* Creates a new {@link org.opensearch.index.engine.EngineConfig}
*/
EngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TranslogDeletionPolicyFactory translogDeletionPolicyFactory,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
@ -200,6 +254,7 @@ public final class EngineConfig {
this.queryCache = queryCache;
this.queryCachingPolicy = queryCachingPolicy;
this.translogConfig = translogConfig;
this.translogDeletionPolicyFactory = translogDeletionPolicyFactory;
this.flushMergesAfter = flushMergesAfter;
this.externalRefreshListener = externalRefreshListener;
this.internalRefreshListener = internalRefreshListener;
@ -423,4 +478,8 @@ public final class EngineConfig {
public TombstoneDocSupplier getTombstoneDocSupplier() {
return tombstoneDocSupplier;
}
public TranslogDeletionPolicyFactory getCustomTranslogDeletionPolicyFactory() {
return translogDeletionPolicyFactory;
}
}

View File

@ -22,6 +22,7 @@ import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
@ -38,7 +39,8 @@ import java.util.function.Supplier;
* A factory to create an EngineConfig based on custom plugin overrides
*/
public class EngineConfigFactory {
private final Optional<CodecService> codecService;
private final CodecService codecService;
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
/** default ctor primarily used for tests without plugins */
public EngineConfigFactory(IndexSettings idxSettings) {
@ -56,6 +58,8 @@ public class EngineConfigFactory {
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings) {
Optional<CodecService> codecService = Optional.empty();
String codecServiceOverridingPlugin = null;
Optional<TranslogDeletionPolicyFactory> translogDeletionPolicyFactory = Optional.empty();
String translogDeletionPolicyOverridingPlugin = null;
for (EnginePlugin enginePlugin : enginePlugins) {
// get overriding codec service from EnginePlugin
if (codecService.isPresent() == false) {
@ -69,11 +73,23 @@ public class EngineConfigFactory {
+ enginePlugin.getClass().getName()
);
}
if (translogDeletionPolicyFactory.isPresent() == false) {
translogDeletionPolicyFactory = enginePlugin.getCustomTranslogDeletionPolicyFactory();
translogDeletionPolicyOverridingPlugin = enginePlugin.getClass().getName();
} else {
throw new IllegalStateException(
"existing TranslogDeletionPolicyFactory is already overridden in: "
+ translogDeletionPolicyOverridingPlugin
+ " attempting to override again by: "
+ enginePlugin.getClass().getName()
);
}
this.codecService = codecService;
}
this.codecService = codecService.orElse(null);
this.translogDeletionPolicyFactory = translogDeletionPolicyFactory.orElse((idxs, rtls) -> null);
}
/** Insantiates a new EngineConfig from the provided custom overrides */
/** Instantiates a new EngineConfig from the provided custom overrides */
public EngineConfig newEngineConfig(
ShardId shardId,
ThreadPool threadPool,
@ -108,11 +124,12 @@ public class EngineConfigFactory {
mergePolicy,
analyzer,
similarity,
this.codecService.isPresent() == true ? this.codecService.get() : codecService,
this.codecService != null ? this.codecService : codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
translogDeletionPolicyFactory,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,

View File

@ -224,12 +224,21 @@ public class InternalEngine extends Engine {
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
final TranslogDeletionPolicy translogDeletionPolicy;
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
if (customTranslogDeletionPolicy != null) {
translogDeletionPolicy = customTranslogDeletionPolicy;
} else {
translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles(),
engineConfig.retentionLeasesSupplier()
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
);
}
store.incRef();
IndexWriter writer = null;
Translog translog = null;

View File

@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.translog;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.seqno.RetentionLeases;
import java.util.function.Supplier;
@FunctionalInterface
public interface TranslogDeletionPolicyFactory {
TranslogDeletionPolicy create(IndexSettings settings, Supplier<RetentionLeases> supplier);
}

View File

@ -35,8 +35,12 @@ package org.opensearch.plugins;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import java.util.Optional;
import java.util.function.Supplier;
/**
* A plugin that provides alternative engine implementations.
@ -63,4 +67,17 @@ public interface EnginePlugin {
default Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.empty();
}
/**
* When an index is created this method is invoked for each engine plugin. Engine plugins that need to provide a
* custom {@link TranslogDeletionPolicy} can override this method to return a function that takes the {@link IndexSettings}
* and a {@link Supplier} for {@link RetentionLeases} and returns a custom {@link TranslogDeletionPolicy}.
*
* Only one of the installed Engine plugins can override this otherwise {@link IllegalStateException} will be thrown.
*
* @return a function that returns an instance of {@link TranslogDeletionPolicy}
*/
default Optional<TranslogDeletionPolicyFactory> getCustomTranslogDeletionPolicyFactory() {
return Optional.empty();
}
}

View File

@ -0,0 +1,146 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.engine;
import org.apache.logging.log4j.LogManager;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.OpenSearchTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
public class EngineConfigFactoryTests extends OpenSearchTestCase {
public void testCreateEngineConfigFromFactory() {
IndexMetadata meta = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
List<EnginePlugin> plugins = Collections.singletonList(new FooEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings);
EngineConfig config = factory.newEngineConfig(
null,
null,
indexSettings,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
TimeValue.timeValueMinutes(5),
null,
null,
null,
null,
null,
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null
);
assertNotNull(config.getCodec());
assertNotNull(config.getCustomTranslogDeletionPolicyFactory());
assertTrue(config.getCustomTranslogDeletionPolicyFactory().create(indexSettings, null) instanceof CustomTranslogDeletionPolicy);
}
public void testCreateEngineConfigFromFactoryMultipleCodecServiceIllegalStateException() {
IndexMetadata meta = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BarEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
}
public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolicyFactoryIllegalStateException() {
IndexMetadata meta = IndexMetadata.builder("test")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BazEnginePlugin());
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
}
private static class FooEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.empty();
}
@Override
public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.of(new CodecService(null, LogManager.getLogger(getClass())));
}
@Override
public Optional<TranslogDeletionPolicyFactory> getCustomTranslogDeletionPolicyFactory() {
return Optional.of(CustomTranslogDeletionPolicy::new);
}
}
private static class BarEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.empty();
}
@Override
public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.of(new CodecService(null, LogManager.getLogger(getClass())));
}
}
private static class BazEnginePlugin extends Plugin implements EnginePlugin {
@Override
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
return Optional.empty();
}
@Override
public Optional<TranslogDeletionPolicyFactory> getCustomTranslogDeletionPolicyFactory() {
return Optional.of(CustomTranslogDeletionPolicy::new);
}
}
private static class CustomTranslogDeletionPolicy extends TranslogDeletionPolicy {
public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier<RetentionLeases> retentionLeasesSupplier) {
super(
indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis(),
indexSettings.getTranslogRetentionTotalFiles()
);
}
}
}

View File

@ -148,6 +148,8 @@ import org.opensearch.index.translog.SnapshotMatchers;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.VersionUtils;
@ -3793,6 +3795,52 @@ public class InternalEngineTests extends EngineTestCase {
assertVisibleCount(engine, numDocs, true);
}
public void testEngineCreationWithCustomTranslogDeletePolicy() throws IOException {
class CustomTranslogDeletionPolicy extends TranslogDeletionPolicy {
public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier<RetentionLeases> retentionLeasesSupplier) {
super(
indexSettings.getTranslogRetentionSize().getBytes(),
indexSettings.getTranslogRetentionAge().getMillis(),
indexSettings.getTranslogRetentionTotalFiles()
);
}
}
TranslogDeletionPolicyFactory translogDeletionPolicyFactory = CustomTranslogDeletionPolicy::new;
try (Store store = createStore()) {
EngineConfig config = engine.config();
EngineConfig configWithCustomTranslogDeletionPolicyFactory = new EngineConfig(
config.getShardId(),
config.getThreadPool(),
config.getIndexSettings(),
config.getWarmer(),
store,
config.getMergePolicy(),
config.getAnalyzer(),
config.getSimilarity(),
new CodecService(null, logger),
config.getEventListener(),
config.getQueryCache(),
config.getQueryCachingPolicy(),
config.getTranslogConfig(),
translogDeletionPolicyFactory,
config.getFlushMergesAfter(),
config.getExternalRefreshListener(),
config.getInternalRefreshListener(),
config.getIndexSort(),
config.getCircuitBreakerService(),
config.getGlobalCheckpointSupplier(),
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier()
);
try (InternalEngine engine = createEngine(configWithCustomTranslogDeletionPolicyFactory)) {
assertTrue(engine.getTranslog().getDeletionPolicy() instanceof CustomTranslogDeletionPolicy);
}
}
}
public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException {
AtomicReference<Exception> exception = new AtomicReference<>();
String operation = randomFrom("optimize", "refresh", "flush");