diff --git a/src/main/java/org/elasticsearch/index/engine/Engine.java b/src/main/java/org/elasticsearch/index/engine/Engine.java index 19fd06092fa..f86e203c961 100644 --- a/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -59,8 +59,8 @@ import java.util.concurrent.locks.ReentrantLock; */ public abstract class Engine implements Closeable { - private final ESLogger logger; - private final EngineConfig engineConfig; + protected final ESLogger logger; + protected final EngineConfig engineConfig; protected Engine(EngineConfig engineConfig) { Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine"); @@ -110,6 +110,10 @@ public abstract class Engine implements Closeable { return new EngineSearcher(source, searcher, manager, engineConfig.getStore(), logger); } + public final EngineConfig config() { + return engineConfig; + } + /** A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling * is enabled @@ -169,8 +173,6 @@ public abstract class Engine implements Closeable { } } - public abstract void updateIndexingBufferSize(ByteSizeValue indexingBufferSize); - public abstract void create(Create create) throws EngineException; public abstract void index(Index index) throws EngineException; @@ -248,8 +250,6 @@ public abstract class Engine implements Closeable { /** fail engine due to some error. the engine will also be closed. */ public abstract void failEngine(String reason, Throwable failure); - public abstract ByteSizeValue indexingBufferSize(); - public static interface FailedEngineListener { void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t); } diff --git a/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 7abb15424e6..e49651d0464 100644 --- a/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -271,14 +271,6 @@ public final class EngineConfig { return indexingService; } - /** - * Returns an {@link org.elasticsearch.index.settings.IndexSettingsService} used to register a {@link org.elasticsearch.index.engine.EngineConfig.EngineSettingsListener} instance - * in order to get notification for realtime changeable settings exposed in this {@link org.elasticsearch.index.engine.EngineConfig}. - */ - public IndexSettingsService getIndexSettingsService() { - return indexSettingsService; - } - /** * Returns an {@link org.elasticsearch.indices.IndicesWarmer} used to warm new searchers before they are used for searching. * Note: This method might retrun null @@ -365,58 +357,30 @@ public final class EngineConfig { } /** - * Basic realtime updateable settings listener that can be used ot receive notification - * if an index setting changed. + * Sets the GC deletes cycle in milliseconds. */ - public static abstract class EngineSettingsListener implements IndexSettingsService.Listener { + public void setGcDeletesInMillis(long gcDeletesInMillis) { + this.gcDeletesInMillis = gcDeletesInMillis; + } - private final ESLogger logger; - private final EngineConfig config; + /** + * Sets if flushed segments should be written as compound file system. Defaults to true + */ + public void setCompoundOnFlush(boolean compoundOnFlush) { + this.compoundOnFlush = compoundOnFlush; + } - public EngineSettingsListener(ESLogger logger, EngineConfig config) { - this.logger = logger; - this.config = config; - } + /** + * Sets if the engine should be failed in the case of a corrupted index. Defaults to true + */ + public void setFailEngineOnCorruption(boolean failEngineOnCorruption) { + this.failEngineOnCorruption = failEngineOnCorruption; + } - @Override - public final void onRefreshSettings(Settings settings) { - boolean change = false; - long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); - if (gcDeletesInMillis != config.getGcDeletesInMillis()) { - logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); - config.gcDeletesInMillis = gcDeletesInMillis; - change = true; - } - - final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush()); - if (compoundOnFlush != config.isCompoundOnFlush()) { - logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush); - config.compoundOnFlush = compoundOnFlush; - change = true; - } - - final boolean failEngineOnCorruption = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption()); - if (failEngineOnCorruption != config.isFailEngineOnCorruption()) { - logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption(), failEngineOnCorruption); - config.failEngineOnCorruption = failEngineOnCorruption; - change = true; - } - final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure()); - if (failOnMergeFailure != config.isFailOnMergeFailure()) { - logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure); - config.failOnMergeFailure = failOnMergeFailure; - change = true; - } - - if (change) { - onChange(); - } - } - - /** - * This method is called if any of the settings that are exposed as realtime updateble settings has changed. - * This method should be overwritten by subclasses to react on settings changes. - */ - protected abstract void onChange(); + /** + * Sets if the engine should be failed if a merge error is hit. Defaults to true + */ + public void setFailOnMergeFailure(boolean failOnMergeFailure) { + this.failOnMergeFailure = failOnMergeFailure; } } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 70da3cdfcfb..599299ac983 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -75,12 +75,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class InternalEngine extends Engine { - protected final ESLogger logger; protected final ShardId shardId; - private final EngineConfig engineConfig; private final FailEngineOnMergeFailure mergeSchedulerFailureListener; private final MergeSchedulerListener mergeSchedulerListener; - private final EngineConfig.EngineSettingsListener listener; /** When we last pruned expired tombstones from versionMap.deletes: */ private volatile long lastDeleteVersionPruneTimeMSec; @@ -132,7 +129,6 @@ public class InternalEngine extends Engine { super(engineConfig); this.store = engineConfig.getStore(); this.shardId = engineConfig.getShardId(); - this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), shardId); this.versionMap = new LiveVersionMap(); store.incRef(); IndexWriter writer = null; @@ -154,15 +150,7 @@ public class InternalEngine extends Engine { this.failedEngineListener = engineConfig.getFailedEngineListener(); throttle = new IndexThrottle(); - this.engineConfig = engineConfig; this.searcherFactory = new SearchFactory(engineConfig); - listener = new EngineConfig.EngineSettingsListener(logger, engineConfig) { - @Override - protected void onChange() { - updateSettings(); - } - }; - engineConfig.getIndexSettingsService().addListener(listener); try { writer = createWriter(); } catch (IOException e) { @@ -188,31 +176,6 @@ public class InternalEngine extends Engine { } } - @Override - public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) { - ByteSizeValue preValue = engineConfig.getIndexingBufferSize(); - try (ReleasableLock _ = readLock.acquire()) { - ensureOpen(); - engineConfig.setIndexingBufferSize(indexingBufferSize); - indexWriter.getConfig().setRAMBufferSizeMB(indexingBufferSize.mbFrac()); - } - if (preValue.bytes() != indexingBufferSize.bytes()) { - // its inactive, make sure we do a refresh / full IW flush in this case, since the memory - // changes only after a "data" change has happened to the writer - // the index writer lazily allocates memory and a refresh will clean it all up. - if (indexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER && preValue != EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) { - logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, indexingBufferSize); - try { - refresh("update index buffer"); - } catch (Throwable e) { - logger.warn("failed to refresh after setting shard to inactive", e); - } - } else { - logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, indexingBufferSize); - } - } - } - private SearcherManager createSearcherManager() throws EngineException { boolean success = false; SearcherManager searcherManager = null; @@ -254,18 +217,16 @@ public class InternalEngine extends Engine { } } - private void updateSettings() { - if (isClosed.get() == false) { + private void updateIndexWriterSettings() { + try { final LiveIndexWriterConfig iwc = indexWriter.getConfig(); + iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().mbFrac()); iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush()); + } catch (AlreadyClosedException ex) { + // ignore } } - @Override - public ByteSizeValue indexingBufferSize() { - return engineConfig.getIndexingBufferSize(); - } - @Override public GetResult get(Get get) throws EngineException { try (ReleasableLock _ = readLock.acquire()) { @@ -682,6 +643,7 @@ public class InternalEngine extends Engine { // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) try (ReleasableLock _ = readLock.acquire()) { ensureOpen(); + updateIndexWriterSettings(); searcherManager.maybeRefreshBlocking(); } catch (AlreadyClosedException e) { ensureOpen(); @@ -712,6 +674,7 @@ public class InternalEngine extends Engine { private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException { ensureOpen(); + updateIndexWriterSettings(); if (commitTranslog) { // check outside the lock as well so we can check without blocking on the write lock if (onGoingRecoveries.get() > 0) { @@ -1175,7 +1138,6 @@ public class InternalEngine extends Engine { store.decRef(); this.mergeScheduler.removeListener(mergeSchedulerListener); this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); - engineConfig.getIndexSettingsService().removeListener(listener); logger.debug("engine closed [{}]", reason); } } @@ -1423,10 +1385,6 @@ public class InternalEngine extends Engine { } } - EngineConfig config() { - return engineConfig; - } - private void commitIndexWriter(IndexWriter writer) throws IOException { try { diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f13811b6ed7..b64fe608c29 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -911,20 +912,34 @@ public class IndexShard extends AbstractIndexShardComponent { } public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) { - Engine engine = engine(); - engine.updateIndexingBufferSize(shardIndexingBufferSize); + ByteSizeValue preValue = config.getIndexingBufferSize(); + config.setIndexingBufferSize(shardIndexingBufferSize); + if (preValue.bytes() != shardIndexingBufferSize.bytes()) { + // its inactive, make sure we do a refresh / full IW flush in this case, since the memory + // changes only after a "data" change has happened to the writer + // the index writer lazily allocates memory and a refresh will clean it all up. + if (shardIndexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER && preValue != EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) { + logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, shardIndexingBufferSize); + try { + refresh("update index buffer"); + } catch (Throwable e) { + logger.warn("failed to refresh after setting shard to inactive", e); + } + } else { + logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, shardIndexingBufferSize); + } + } translog().updateBuffer(shardTranslogBufferSize); } public void markAsInactive() { - Engine engine = engine(); - engine.updateIndexingBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER); - translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER); + updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, Translog.INACTIVE_SHARD_TRANSLOG_BUFFER); } private class ApplyRefreshSettings implements IndexSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { + boolean change = false; synchronized (mutex) { if (state == IndexShardState.CLOSED) { return; @@ -944,6 +959,36 @@ public class IndexShard extends AbstractIndexShardComponent { refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher()); } } + + long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis(); + if (gcDeletesInMillis != config.getGcDeletesInMillis()) { + logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis)); + config.setGcDeletesInMillis(gcDeletesInMillis); + change = true; + } + + final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush()); + if (compoundOnFlush != config.isCompoundOnFlush()) { + logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush); + config.setCompoundOnFlush(compoundOnFlush); + change = true; + } + + final boolean failEngineOnCorruption = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption()); + if (failEngineOnCorruption != config.isFailEngineOnCorruption()) { + logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption(), failEngineOnCorruption); + config.setFailEngineOnCorruption(failEngineOnCorruption); + change = true; + } + final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure()); + if (failOnMergeFailure != config.isFailOnMergeFailure()) { + logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure); + config.setFailOnMergeFailure(failOnMergeFailure); + change = true; + } + } + if (change) { + refresh("apply settings"); } } } @@ -1103,5 +1148,4 @@ public class IndexShard extends AbstractIndexShardComponent { this.currentEngineReference.set(engineFactory.newEngine(config)); } } - } diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTest.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTest.java index e394b986020..ff0c025ca09 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTest.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineSettingsTest.java @@ -18,7 +18,9 @@ */ package org.elasticsearch.index.engine; +import org.apache.lucene.index.LiveIndexWriterConfig; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; @@ -28,7 +30,7 @@ import static org.hamcrest.Matchers.is; public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest { - public void testLuceneSettings() { + public void testSettingsUpdate() { final IndexService service = createIndex("foo"); // INDEX_COMPOUND_ON_FLUSH InternalEngine engine = ((InternalEngine)engine(service)); @@ -37,6 +39,58 @@ public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest { assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(false)); client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true).build()).get(); assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true)); + + + final int iters = between(1, 20); + for (int i = 0; i < iters; i++) { + boolean compoundOnFlush = randomBoolean(); + boolean failOnCorruption = randomBoolean(); + boolean failOnMerge = randomBoolean(); + long gcDeletes = Math.max(0, randomLong()); + + Settings build = ImmutableSettings.builder() + .put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, failOnCorruption) + .put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush) + .put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes) + .put(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, failOnMerge) + .build(); + + client().admin().indices().prepareUpdateSettings("foo").setSettings(build).get(); + LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); + assertEquals(engine.config().isCompoundOnFlush(), compoundOnFlush); + assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush); + + + assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes); + assertEquals(engine.getGcDeletesInMillis(), gcDeletes); + assertEquals(engine.config().isFailEngineOnCorruption(), failOnCorruption); + assertEquals(engine.config().isFailOnMergeFailure(), failOnMerge); // only on the holder + + } + + Settings settings = ImmutableSettings.builder() + .put(EngineConfig.INDEX_GC_DELETES_SETTING, 1000) + .build(); + client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get(); + assertEquals(engine.getGcDeletesInMillis(), 1000); + assertTrue(engine.config().isEnableGcDeletes()); + + + settings = ImmutableSettings.builder() + .put(EngineConfig.INDEX_GC_DELETES_SETTING, "0ms") + .build(); + + client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get(); + assertEquals(engine.getGcDeletesInMillis(), 0); + assertTrue(engine.config().isEnableGcDeletes()); + + settings = ImmutableSettings.builder() + .put(EngineConfig.INDEX_GC_DELETES_SETTING, 1000) + .build(); + client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get(); + assertEquals(engine.getGcDeletesInMillis(), 1000); + assertTrue(engine.config().isEnableGcDeletes()); + } diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f55698809b9..e419db86aef 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -109,10 +109,6 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { protected Engine engine; protected Engine replicaEngine; - private IndexSettingsService engineSettingsService; - - private IndexSettingsService replicaSettingsService; - private Settings defaultSettings; private int indexConcurrency; private String codecName; @@ -142,9 +138,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { store.deleteContent(); storeReplica = createStore(); storeReplica.deleteContent(); - engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); translog = createTranslog(); - engine = createEngine(engineSettingsService, store, translog); + engine = createEngine(store, translog); LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)engine).getCurrentIndexWriterConfig(); assertEquals(((InternalEngine)engine).config().getCodec().getName(), codecService.codec(codecName).getName()); @@ -152,9 +147,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { if (randomBoolean()) { ((InternalEngine)engine).config().setEnableGcDeletes(false); } - replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); replicaTranslog = createTranslogReplica(); - replicaEngine = createEngine(replicaSettingsService, storeReplica, replicaTranslog); + replicaEngine = createEngine(storeReplica, replicaTranslog); currentIndexWriterConfig = ((InternalEngine)replicaEngine).getCurrentIndexWriterConfig(); assertEquals(((InternalEngine)replicaEngine).config().getCodec().getName(), codecService.codec(codecName).getName()); @@ -238,11 +232,12 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService); } - protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog) { + protected InternalEngine createEngine(Store store, Translog translog) { + IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); return createEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService)); } - protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { + protected InternalEngine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) { return new InternalEngine(config(indexSettingsService, store, translog, mergeSchedulerProvider)); } @@ -308,7 +303,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { assertThat(segments.get(0).getDeletedDocs(), equalTo(0)); assertThat(segments.get(0).isCompound(), equalTo(defaultCompound)); - engineSettingsService.refreshSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, false).build()); + ((InternalEngine)engine).config().setCompoundOnFlush(false); ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, false); engine.create(new Engine.Create(null, newUid("3"), doc3)); @@ -356,7 +351,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { assertThat(segments.get(1).getDeletedDocs(), equalTo(0)); assertThat(segments.get(1).isCompound(), equalTo(false)); - engineSettingsService.refreshSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true).build()); + ((InternalEngine)engine).config().setCompoundOnFlush(true); ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, false); engine.create(new Engine.Create(null, newUid("4"), doc4)); engine.refresh("test"); @@ -438,7 +433,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { } }); - final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider); + IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); + final Engine engine = createEngine(indexSettingsService, store, createTranslog(), mergeSchedulerProvider); ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false); Engine.Index index = new Engine.Index(null, newUid("1"), doc); engine.index(index); @@ -1395,12 +1391,10 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { Store store = createStore(); - // Make sure enableGCDeletes == false works: - Settings settings = ImmutableSettings.builder() - .put(EngineConfig.INDEX_GC_DELETES_SETTING, "0ms") - .build(); - Engine engine = new InternalEngine(config(engineSettingsService, store, createTranslog(), createMergeScheduler(engineSettingsService))); + + IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); + Engine engine = new InternalEngine(config(indexSettingsService, store, createTranslog(), createMergeScheduler(indexSettingsService))); ((InternalEngine)engine).config().setEnableGcDeletes(false); // Add document @@ -1489,27 +1483,22 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { int refCount = store.refCount(); assertTrue("refCount: "+ store.refCount(), store.refCount() > 0); Translog translog = createTranslog(); - Settings build = ImmutableSettings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); - IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), build); - Engine holder; + InternalEngine holder; try { - holder = createEngine(indexSettingsService, store, translog); + holder = createEngine(store, translog); } catch (EngineCreationFailureException ex) { assertEquals(store.refCount(), refCount); continue; } - indexSettingsService.refreshSettings(ImmutableSettings.builder() - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true).build()); - + holder.config().setFailEngineOnCorruption(true); assertEquals(store.refCount(), refCount+1); final int numStarts = scaledRandomIntBetween(1, 5); for (int j = 0; j < numStarts; j++) { try { assertEquals(store.refCount(), refCount + 1); holder.close(); - holder = createEngine(indexSettingsService, store, translog); + holder = createEngine(store, translog); + holder.config().setFailEngineOnCorruption(true); assertEquals(store.refCount(), refCount + 1); } catch (EngineCreationFailureException ex) { // all is fine @@ -1541,32 +1530,6 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { assertTrue(settings.containsSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH)); assertTrue(settings.containsSetting(EngineConfig.INDEX_GC_DELETES_SETTING)); assertTrue(settings.containsSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING)); - final int iters = between(1, 20); - for (int i = 0; i < iters; i++) { - boolean compoundOnFlush = randomBoolean(); - boolean failOnCorruption = randomBoolean(); - boolean failOnMerge = randomBoolean(); - long gcDeletes = Math.max(0, randomLong()); - - Settings build = ImmutableSettings.builder() - .put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, failOnCorruption) - .put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush) - .put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes) - .put(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, failOnMerge) - .build(); - - engineSettingsService.refreshSettings(build); - currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); - assertEquals(engine.config().isCompoundOnFlush(), compoundOnFlush); - assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush); - - - assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes); - assertEquals(engine.getGcDeletesInMillis(), gcDeletes); - assertEquals(engine.config().isFailEngineOnCorruption(), failOnCorruption); - assertEquals(engine.config().isFailOnMergeFailure(), failOnMerge); // only on the holder - - } } @Test diff --git a/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java index a14be120e97..a3e47bb5c18 100644 --- a/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java +++ b/src/test/java/org/elasticsearch/indices/memory/IndexingMemoryControllerTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -58,15 +57,15 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest boolean success = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - return shard1.engine().indexingBufferSize().bytes() <= expected2ShardsSize && - shard2.engine().indexingBufferSize().bytes() <= expected2ShardsSize; + return shard1.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize && + shard2.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize; } }); if (!success) { fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" + - shard1.engine().indexingBufferSize().bytes() + "] shard2 [" + - shard2.engine().indexingBufferSize().bytes() + "]" + shard1.engine().config().getIndexingBufferSize().bytes() + "] shard2 [" + + shard2.engine().config().getIndexingBufferSize().bytes() + "]" ); } @@ -74,13 +73,13 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest success = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - return shard1.engine().indexingBufferSize().bytes() >= expected1ShardSize; + return shard1.engine().config().getIndexingBufferSize().bytes() >= expected1ShardSize; } }); if (!success) { fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" + - shard1.engine().indexingBufferSize().bytes() + "]" + shard1.engine().config().getIndexingBufferSize().bytes() + "]" ); } @@ -99,12 +98,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest boolean success = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - return shard1.engine().indexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + return shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); } }); if (!success) { fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().indexingBufferSize().bytes() + "]" + shard1.engine().config().getIndexingBufferSize().bytes() + "]" ); } @@ -113,12 +112,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest success = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - return shard1.engine().indexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + return shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); } }); if (!success) { fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().indexingBufferSize().bytes() + "]" + shard1.engine().config().getIndexingBufferSize().bytes() + "]" ); } @@ -127,12 +126,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest success = awaitBusy(new Predicate() { @Override public boolean apply(Object input) { - return shard1.engine().indexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); + return shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes(); } }); if (!success) { fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" + - shard1.engine().indexingBufferSize().bytes() + "]" + shard1.engine().config().getIndexingBufferSize().bytes() + "]" ); } }