[CORE] Move back to single EngineConfig
We need to preserve settings (yet transient) even though the engine is not yet started. This commit moves back to a single EngineConfig to simplify IndexShard and settings state. Closes #10584
This commit is contained in:
parent
67b48da15f
commit
0fcd31b6dc
|
@ -130,9 +130,8 @@ public final class EngineConfig {
|
|||
/**
|
||||
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
|
||||
*/
|
||||
public EngineConfig(ShardId shardId, boolean optimizeAutoGenerateId, ThreadPool threadPool, ShardIndexingService indexingService, IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, TranslogRecoveryPerformer translogRecoveryPerformer) {
|
||||
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService, IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener, TranslogRecoveryPerformer translogRecoveryPerformer) {
|
||||
this.shardId = shardId;
|
||||
this.optimizeAutoGenerateId = optimizeAutoGenerateId;
|
||||
this.threadPool = threadPool;
|
||||
this.indexingService = indexingService;
|
||||
this.indexSettingsService = indexSettingsService;
|
||||
|
@ -147,6 +146,7 @@ public final class EngineConfig {
|
|||
this.codecService = codecService;
|
||||
this.failedEngineListener = failedEngineListener;
|
||||
Settings indexSettings = indexSettingsService.getSettings();
|
||||
this.optimizeAutoGenerateId = indexSettings.getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false);
|
||||
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
|
||||
this.indexConcurrency = indexSettings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
|
||||
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
|
||||
|
|
|
@ -164,6 +164,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
private final SnapshotDeletionPolicy deletionPolicy;
|
||||
private final SimilarityService similarityService;
|
||||
private final MergePolicyProvider mergePolicyProvider;
|
||||
private final EngineConfig engineConfig;
|
||||
|
||||
private TimeValue refreshInterval;
|
||||
|
||||
|
@ -250,6 +251,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
logger.debug("state: [CREATED]");
|
||||
|
||||
this.checkIndexOnStartup = indexSettings.get("index.shard.check_on_startup", "false");
|
||||
this.engineConfig = newEngineConfig();
|
||||
}
|
||||
|
||||
public Store store() {
|
||||
|
@ -792,8 +794,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
if (state != IndexShardState.RECOVERING) {
|
||||
throw new IndexShardNotRecoveringException(shardId, state);
|
||||
}
|
||||
final Engine engine = engine();
|
||||
return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations);
|
||||
return engineConfig.getTranslogRecoveryPerformer().performBatchRecovery(engine(), operations);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -819,10 +820,9 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
|
||||
// we disable deletes since we allow for operations to be executed against the shard while recovering
|
||||
// but we need to make sure we don't loose deletes until we are done recovering
|
||||
final EngineConfig config = newEngineConfig();
|
||||
config.setEnableGcDeletes(false);
|
||||
createNewEngine(skipTranslogRecovery, config);
|
||||
return config.getTranslogRecoveryPerformer().getRecoveredTypes();
|
||||
engineConfig.setEnableGcDeletes(false);
|
||||
createNewEngine(skipTranslogRecovery, engineConfig);
|
||||
return engineConfig.getTranslogRecoveryPerformer().getRecoveredTypes();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -872,7 +872,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
translog.clearUnreferenced();
|
||||
engine().refresh("recovery_finalization");
|
||||
startScheduledTasksIfNeeded();
|
||||
engine().config().setEnableGcDeletes(true);
|
||||
engineConfig.setEnableGcDeletes(true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -957,7 +957,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
|
||||
final EngineConfig config = engine().config();
|
||||
final EngineConfig config = engineConfig;
|
||||
final ByteSizeValue preValue = config.getIndexingBufferSize();
|
||||
config.setIndexingBufferSize(shardIndexingBufferSize);
|
||||
// update engine if it is already started.
|
||||
|
@ -1004,11 +1004,10 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
public void onRefreshSettings(Settings settings) {
|
||||
boolean change = false;
|
||||
synchronized (mutex) {
|
||||
final Engine engine = engineUnsafe();
|
||||
if (engine == null) {
|
||||
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
|
||||
return;
|
||||
}
|
||||
final EngineConfig config = engine.config();
|
||||
final EngineConfig config = engineConfig;
|
||||
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose);
|
||||
if (flushOnClose != IndexShard.this.flushOnClose) {
|
||||
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, IndexShard.this.flushOnClose, flushOnClose);
|
||||
|
@ -1279,7 +1278,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return mapperService.documentMapperWithAutoCreate(type);
|
||||
}
|
||||
|
||||
protected EngineConfig newEngineConfig() {
|
||||
private final EngineConfig newEngineConfig() {
|
||||
final TranslogRecoveryPerformer translogRecoveryPerformer = new TranslogRecoveryPerformer(mapperService, mapperAnalyzer, queryParserService, indexAliasesService, indexCache) {
|
||||
@Override
|
||||
protected void operationProcessed() {
|
||||
|
@ -1288,7 +1287,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
};
|
||||
return new EngineConfig(shardId,
|
||||
indexSettingsService.getSettings().getAsBoolean(EngineConfig.INDEX_OPTIMIZE_AUTOGENERATED_ID_SETTING, false),
|
||||
threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, translog, mergePolicyProvider, mergeScheduler,
|
||||
mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer);
|
||||
}
|
||||
|
|
|
@ -139,6 +139,10 @@ public class TranslogRecoveryPerformer {
|
|||
operationProcessed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once for every processed operation by this recovery performer.
|
||||
* This can be used to get progress information on the translog execution.
|
||||
*/
|
||||
protected void operationProcessed() {
|
||||
// noop
|
||||
}
|
||||
|
|
|
@ -270,7 +270,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
|
|||
|
||||
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
|
||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||
EngineConfig config = new EngineConfig(shardId, false/*per default optimization for auto generated ids is disabled*/, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
|
||||
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
|
||||
, null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
|
||||
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() {
|
||||
@Override
|
||||
|
|
|
@ -244,7 +244,7 @@ public class ShadowEngineTests extends ElasticsearchLuceneTestCase {
|
|||
|
||||
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
|
||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||
EngineConfig config = new EngineConfig(shardId, false/*per default optimization for auto generated ids is disabled*/, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
|
||||
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
|
||||
, null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
|
||||
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
|
||||
@Override
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.indices.memory;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
|
@ -37,7 +36,6 @@ import org.junit.Test;
|
|||
public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Test
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/10584")
|
||||
public void testIndexBufferSizeUpdateAfterCreationRemoval() throws InterruptedException {
|
||||
|
||||
createNode(ImmutableSettings.EMPTY);
|
||||
|
|
Loading…
Reference in New Issue