MapperService has to be passed in as null for EnginePlugins CodecService constructor (#2177)
* MapperService has to be passed in as null for EnginePlugins CodecService constructor Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Addressing code review comments Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Delayed CodecService instantiation up to the shard initialization Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Added logger (associated with shard) to CodecServiceConfig Signed-off-by: Andriy Redko <andriy.redko@aiven.io> * Refactored the EngineConfigFactory / IndexShard instantiation of the CodecService Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
This commit is contained in:
parent
a6a47e7321
commit
9c679cbbfc
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.index.codec;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.opensearch.common.Nullable;
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.mapper.MapperService;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* The configuration parameters necessary for the {@link CodecService} instance construction.
|
||||
*/
|
||||
public final class CodecServiceConfig {
|
||||
private final IndexSettings indexSettings;
|
||||
private final MapperService mapperService;
|
||||
private final Logger logger;
|
||||
|
||||
public CodecServiceConfig(IndexSettings indexSettings, @Nullable MapperService mapperService, @Nullable Logger logger) {
|
||||
this.indexSettings = Objects.requireNonNull(indexSettings);
|
||||
this.mapperService = mapperService;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public IndexSettings getIndexSettings() {
|
||||
return indexSettings;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public MapperService getMapperService() {
|
||||
return mapperService;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public Logger getLogger() {
|
||||
return logger;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*
|
||||
* The OpenSearch Contributors require contributions made to
|
||||
* this file be licensed under the Apache-2.0 license or a
|
||||
* compatible open source license.
|
||||
*/
|
||||
|
||||
package org.opensearch.index.codec;
|
||||
|
||||
/**
|
||||
* A factory for creating new {@link CodecService} instance
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface CodecServiceFactory {
|
||||
/**
|
||||
* Create new {@link CodecService} instance
|
||||
* @param config code service configuration
|
||||
* @return new {@link CodecService} instance
|
||||
*/
|
||||
CodecService createCodecService(CodecServiceConfig config);
|
||||
}
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
package org.opensearch.index.engine;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.search.QueryCache;
|
||||
|
@ -15,9 +16,13 @@ import org.apache.lucene.search.QueryCachingPolicy;
|
|||
import org.apache.lucene.search.ReferenceManager;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.similarities.Similarity;
|
||||
import org.opensearch.common.Nullable;
|
||||
import org.opensearch.common.unit.TimeValue;
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.codec.CodecService;
|
||||
import org.opensearch.index.codec.CodecServiceConfig;
|
||||
import org.opensearch.index.codec.CodecServiceFactory;
|
||||
import org.opensearch.index.mapper.MapperService;
|
||||
import org.opensearch.index.seqno.RetentionLeases;
|
||||
import org.opensearch.index.shard.ShardId;
|
||||
import org.opensearch.index.store.Store;
|
||||
|
@ -39,7 +44,7 @@ import java.util.function.Supplier;
|
|||
* A factory to create an EngineConfig based on custom plugin overrides
|
||||
*/
|
||||
public class EngineConfigFactory {
|
||||
private final CodecService codecService;
|
||||
private final CodecServiceFactory codecServiceFactory;
|
||||
private final TranslogDeletionPolicyFactory translogDeletionPolicyFactory;
|
||||
|
||||
/** default ctor primarily used for tests without plugins */
|
||||
|
@ -58,6 +63,8 @@ public class EngineConfigFactory {
|
|||
EngineConfigFactory(Collection<EnginePlugin> enginePlugins, IndexSettings idxSettings) {
|
||||
Optional<CodecService> codecService = Optional.empty();
|
||||
String codecServiceOverridingPlugin = null;
|
||||
Optional<CodecServiceFactory> codecServiceFactory = Optional.empty();
|
||||
String codecServiceFactoryOverridingPlugin = null;
|
||||
Optional<TranslogDeletionPolicyFactory> translogDeletionPolicyFactory = Optional.empty();
|
||||
String translogDeletionPolicyOverridingPlugin = null;
|
||||
for (EnginePlugin enginePlugin : enginePlugins) {
|
||||
|
@ -65,7 +72,7 @@ public class EngineConfigFactory {
|
|||
if (codecService.isPresent() == false) {
|
||||
codecService = enginePlugin.getCustomCodecService(idxSettings);
|
||||
codecServiceOverridingPlugin = enginePlugin.getClass().getName();
|
||||
} else {
|
||||
} else if (enginePlugin.getCustomCodecService(idxSettings).isPresent()) {
|
||||
throw new IllegalStateException(
|
||||
"existing codec service already overridden in: "
|
||||
+ codecServiceOverridingPlugin
|
||||
|
@ -76,7 +83,7 @@ public class EngineConfigFactory {
|
|||
if (translogDeletionPolicyFactory.isPresent() == false) {
|
||||
translogDeletionPolicyFactory = enginePlugin.getCustomTranslogDeletionPolicyFactory();
|
||||
translogDeletionPolicyOverridingPlugin = enginePlugin.getClass().getName();
|
||||
} else {
|
||||
} else if (enginePlugin.getCustomTranslogDeletionPolicyFactory().isPresent()) {
|
||||
throw new IllegalStateException(
|
||||
"existing TranslogDeletionPolicyFactory is already overridden in: "
|
||||
+ translogDeletionPolicyOverridingPlugin
|
||||
|
@ -84,12 +91,37 @@ public class EngineConfigFactory {
|
|||
+ enginePlugin.getClass().getName()
|
||||
);
|
||||
}
|
||||
// get overriding CodecServiceFactory from EnginePlugin
|
||||
if (codecServiceFactory.isPresent() == false) {
|
||||
codecServiceFactory = enginePlugin.getCustomCodecServiceFactory(idxSettings);
|
||||
codecServiceFactoryOverridingPlugin = enginePlugin.getClass().getName();
|
||||
} else if (enginePlugin.getCustomCodecServiceFactory(idxSettings).isPresent()) {
|
||||
throw new IllegalStateException(
|
||||
"existing codec service factory already overridden in: "
|
||||
+ codecServiceFactoryOverridingPlugin
|
||||
+ " attempting to override again by: "
|
||||
+ enginePlugin.getClass().getName()
|
||||
);
|
||||
}
|
||||
}
|
||||
this.codecService = codecService.orElse(null);
|
||||
|
||||
if (codecService.isPresent() && codecServiceFactory.isPresent()) {
|
||||
throw new IllegalStateException(
|
||||
"both codec service and codec service factory are present, codec service provided by: "
|
||||
+ codecServiceOverridingPlugin
|
||||
+ " conflicts with codec service factory provided by: "
|
||||
+ codecServiceFactoryOverridingPlugin
|
||||
);
|
||||
}
|
||||
|
||||
final CodecService instance = codecService.orElse(null);
|
||||
this.codecServiceFactory = (instance != null) ? (config) -> instance : codecServiceFactory.orElse(null);
|
||||
this.translogDeletionPolicyFactory = translogDeletionPolicyFactory.orElse((idxs, rtls) -> null);
|
||||
}
|
||||
|
||||
/** Instantiates a new EngineConfig from the provided custom overrides */
|
||||
/**
|
||||
* Instantiates a new EngineConfig from the provided custom overrides
|
||||
*/
|
||||
public EngineConfig newEngineConfig(
|
||||
ShardId shardId,
|
||||
ThreadPool threadPool,
|
||||
|
@ -114,6 +146,10 @@ public class EngineConfigFactory {
|
|||
LongSupplier primaryTermSupplier,
|
||||
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
|
||||
) {
|
||||
CodecService codecServiceToUse = codecService;
|
||||
if (codecService == null && this.codecServiceFactory != null) {
|
||||
codecServiceToUse = newCodecServiceOrDefault(indexSettings, null, null, null);
|
||||
}
|
||||
|
||||
return new EngineConfig(
|
||||
shardId,
|
||||
|
@ -124,7 +160,7 @@ public class EngineConfigFactory {
|
|||
mergePolicy,
|
||||
analyzer,
|
||||
similarity,
|
||||
this.codecService != null ? this.codecService : codecService,
|
||||
codecServiceToUse,
|
||||
eventListener,
|
||||
queryCache,
|
||||
queryCachingPolicy,
|
||||
|
@ -141,4 +177,15 @@ public class EngineConfigFactory {
|
|||
tombstoneDocSupplier
|
||||
);
|
||||
}
|
||||
|
||||
public CodecService newCodecServiceOrDefault(
|
||||
IndexSettings indexSettings,
|
||||
@Nullable MapperService mapperService,
|
||||
Logger logger,
|
||||
CodecService defaultCodecService
|
||||
) {
|
||||
return this.codecServiceFactory != null
|
||||
? this.codecServiceFactory.createCodecService(new CodecServiceConfig(indexSettings, mapperService, logger))
|
||||
: defaultCodecService;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3155,6 +3155,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
this.warmer.warm(reader);
|
||||
}
|
||||
};
|
||||
|
||||
return this.engineConfigFactory.newEngineConfig(
|
||||
shardId,
|
||||
threadPool,
|
||||
|
@ -3164,7 +3165,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
indexSettings.getMergePolicy(),
|
||||
mapperService != null ? mapperService.indexAnalyzer() : null,
|
||||
similarityService.similarity(mapperService),
|
||||
codecService,
|
||||
engineConfigFactory.newCodecServiceOrDefault(indexSettings, mapperService, logger, codecService),
|
||||
shardEventListener,
|
||||
indexCache != null ? indexCache.query() : null,
|
||||
cachingPolicy,
|
||||
|
|
|
@ -34,6 +34,7 @@ package org.opensearch.plugins;
|
|||
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.codec.CodecService;
|
||||
import org.opensearch.index.codec.CodecServiceFactory;
|
||||
import org.opensearch.index.engine.EngineFactory;
|
||||
import org.opensearch.index.seqno.RetentionLeases;
|
||||
import org.opensearch.index.translog.TranslogDeletionPolicy;
|
||||
|
@ -63,11 +64,26 @@ public interface EnginePlugin {
|
|||
* to determine if a custom {@link CodecService} should be provided for the given index. A plugin that is not overriding
|
||||
* the {@link CodecService} through the plugin can ignore this method and the Codec specified in the {@link IndexSettings}
|
||||
* will be used.
|
||||
*
|
||||
* @deprecated Please use {@code getCustomCodecServiceFactory()} instead as it provides more context for {@link CodecService}
|
||||
* instance construction.
|
||||
*/
|
||||
@Deprecated
|
||||
default Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* EXPERT:
|
||||
* When an index is created this method is invoked for each engine plugin. Engine plugins can inspect the index settings
|
||||
* to determine if a custom {@link CodecServiceFactory} should be provided for the given index. A plugin that is not overriding
|
||||
* the {@link CodecServiceFactory} through the plugin can ignore this method and the default Codec specified in the
|
||||
* {@link IndexSettings} will be used.
|
||||
*/
|
||||
default Optional<CodecServiceFactory> getCustomCodecServiceFactory(IndexSettings indexSettings) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* When an index is created this method is invoked for each engine plugin. Engine plugins that need to provide a
|
||||
* custom {@link TranslogDeletionPolicy} can override this method to return a function that takes the {@link IndexSettings}
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.opensearch.cluster.metadata.IndexMetadata;
|
|||
import org.opensearch.common.unit.TimeValue;
|
||||
import org.opensearch.index.IndexSettings;
|
||||
import org.opensearch.index.codec.CodecService;
|
||||
import org.opensearch.index.codec.CodecServiceFactory;
|
||||
import org.opensearch.index.seqno.RetentionLeases;
|
||||
import org.opensearch.index.translog.TranslogDeletionPolicy;
|
||||
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
|
||||
|
@ -84,6 +85,18 @@ public class EngineConfigFactoryTests extends OpenSearchTestCase {
|
|||
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
|
||||
}
|
||||
|
||||
public void testCreateEngineConfigFromFactoryMultipleCodecServiceAndFactoryIllegalStateException() {
|
||||
IndexMetadata meta = IndexMetadata.builder("test")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
List<EnginePlugin> plugins = Arrays.asList(new FooEnginePlugin(), new BakEnginePlugin());
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
|
||||
|
||||
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
|
||||
}
|
||||
|
||||
public void testCreateEngineConfigFromFactoryMultipleCustomTranslogDeletionPolicyFactoryIllegalStateException() {
|
||||
IndexMetadata meta = IndexMetadata.builder("test")
|
||||
.settings(settings(Version.CURRENT))
|
||||
|
@ -96,6 +109,43 @@ public class EngineConfigFactoryTests extends OpenSearchTestCase {
|
|||
expectThrows(IllegalStateException.class, () -> new EngineConfigFactory(plugins, indexSettings));
|
||||
}
|
||||
|
||||
public void testCreateCodecServiceFromFactory() {
|
||||
IndexMetadata meta = IndexMetadata.builder("test")
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
List<EnginePlugin> plugins = Arrays.asList(new BakEnginePlugin());
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
|
||||
|
||||
EngineConfigFactory factory = new EngineConfigFactory(plugins, indexSettings);
|
||||
EngineConfig config = factory.newEngineConfig(
|
||||
null,
|
||||
null,
|
||||
indexSettings,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
TimeValue.timeValueMinutes(5),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
() -> new RetentionLeases(0, 0, Collections.emptyList()),
|
||||
null,
|
||||
null
|
||||
);
|
||||
assertNotNull(config.getCodec());
|
||||
}
|
||||
|
||||
private static class FooEnginePlugin extends Plugin implements EnginePlugin {
|
||||
@Override
|
||||
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
|
||||
|
@ -125,6 +175,18 @@ public class EngineConfigFactoryTests extends OpenSearchTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private static class BakEnginePlugin extends Plugin implements EnginePlugin {
|
||||
@Override
|
||||
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<CodecServiceFactory> getCustomCodecServiceFactory(IndexSettings indexSettings) {
|
||||
return Optional.of(config -> new CodecService(config.getMapperService(), LogManager.getLogger(getClass())));
|
||||
}
|
||||
}
|
||||
|
||||
private static class BazEnginePlugin extends Plugin implements EnginePlugin {
|
||||
@Override
|
||||
public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {
|
||||
|
|
Loading…
Reference in New Issue