diff --git a/src/main/java/org/elasticsearch/index/codec/CodecService.java b/src/main/java/org/elasticsearch/index/codec/CodecService.java index 907c1d0e2fd..48ff8ea6c59 100644 --- a/src/main/java/org/elasticsearch/index/codec/CodecService.java +++ b/src/main/java/org/elasticsearch/index/codec/CodecService.java @@ -99,4 +99,11 @@ public class CodecService extends AbstractIndexComponent { } return codec; } + + /** + * Returns all registered available codec names + */ + public String[] availableCodecs() { + return codecs.keySet().toArray(new String[0]); + } } diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index df40a63237b..8624e5d479c 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -1752,4 +1752,29 @@ public class InternalEngine implements Engine { // hard fail - we can't get a SegmentReader throw new ElasticsearchIllegalStateException("Can not extract segment reader from given index reader [" + reader + "]"); } + + long getGcDeletesInMillis() { + return gcDeletesInMillis; + } + + String getCodecName() { + return codecName; + } + + boolean isCompoundOnFlush() { + return compoundOnFlush; + } + + int getIndexConcurrency() { + return indexConcurrency; + } + + boolean isFailEngineOnCorruption() { + return failEngineOnCorruption; + } + + LiveIndexWriterConfig getCurrentIndexWriterConfig() { + IndexWriter writer = currentIndexWriter(); + return writer == null ? null : writer.getConfig(); + } } diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java index c3c5f95a1ad..6410bbfb044 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngineHolder.java @@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -64,7 +65,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements private final FailEngineOnMergeFailure mergeSchedulerFailureListener; private final ApplySettings settingsListener; private final MergeScheduleListener mergeSchedulerListener; - private volatile Boolean failOnMergeFailure; + protected volatile Boolean failOnMergeFailure; protected volatile boolean failEngineOnCorruption; protected volatile ByteSizeValue indexingBufferSize; protected volatile int indexConcurrency; @@ -142,7 +143,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements this.mergeSchedulerListener = new MergeScheduleListener(); this.mergeScheduler.addListener(mergeSchedulerListener); - this.settingsListener = new ApplySettings(); + this.settingsListener = new ApplySettings(logger, this); this.indexSettingsService.addListener(this.settingsListener); store.incRef(); } @@ -214,20 +215,19 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements @Override public synchronized void close() throws ElasticsearchException { - if (closed) { - return; - } - closed = true; - try { - InternalEngine currentEngine = this.currentEngine.getAndSet(null); - if (currentEngine != null) { - currentEngine.close(); + if (closed == false) { + closed = true; + try { + InternalEngine currentEngine = this.currentEngine.getAndSet(null); + if (currentEngine != null) { + currentEngine.close(); + } + mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); + mergeScheduler.removeListener(mergeSchedulerListener); + indexSettingsService.removeListener(settingsListener); + } finally { + store.decRef(); } - mergeScheduler.removeFailureListener(mergeSchedulerFailureListener); - mergeScheduler.removeListener(mergeSchedulerListener); - indexSettingsService.removeListener(settingsListener); - } finally { - store.decRef(); } } @@ -356,55 +356,76 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements } } - class ApplySettings implements IndexSettingsService.Listener { + static class ApplySettings implements IndexSettingsService.Listener { + + private final ESLogger logger; + private final InternalEngineHolder holder; + + ApplySettings(ESLogger logger, InternalEngineHolder holder) { + this.logger = logger; + this.holder = holder; + } @Override public void onRefreshSettings(Settings settings) { - InternalEngine currentEngine = InternalEngineHolder.this.currentEngine.get(); boolean change = false; - long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis)).millis(); - if (gcDeletesInMillis != InternalEngineHolder.this.gcDeletesInMillis) { - logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis)); - InternalEngineHolder.this.gcDeletesInMillis = gcDeletesInMillis; + long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(holder.gcDeletesInMillis)).millis(); + if (gcDeletesInMillis != holder.gcDeletesInMillis) { + logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(holder.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis)); + holder.gcDeletesInMillis = gcDeletesInMillis; change = true; } - final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush); - if (compoundOnFlush != InternalEngineHolder.this.compoundOnFlush) { - logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush, compoundOnFlush); - InternalEngineHolder.this.compoundOnFlush = compoundOnFlush; + final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush); + if (compoundOnFlush != holder.compoundOnFlush) { + logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush, compoundOnFlush); + holder.compoundOnFlush = compoundOnFlush; change = true; } - final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption); - if (failEngineOnCorruption != InternalEngineHolder.this.failEngineOnCorruption) { - logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption, failEngineOnCorruption); - InternalEngineHolder.this.failEngineOnCorruption = failEngineOnCorruption; + final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption); + if (failEngineOnCorruption != holder.failEngineOnCorruption) { + logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption, failEngineOnCorruption); + holder.failEngineOnCorruption = failEngineOnCorruption; change = true; } - int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngineHolder.this.indexConcurrency); - if (indexConcurrency != InternalEngineHolder.this.indexConcurrency) { - logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngineHolder.this.indexConcurrency, indexConcurrency); - InternalEngineHolder.this.indexConcurrency = indexConcurrency; + int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, holder.indexConcurrency); + if (indexConcurrency != holder.indexConcurrency) { + logger.info("updating index.index_concurrency from [{}] to [{}]", holder.indexConcurrency, indexConcurrency); + holder.indexConcurrency = indexConcurrency; // we have to flush in this case, since it only applies on a new index writer change = true; } - if (!codecName.equals(InternalEngineHolder.this.codecName)) { - logger.info("updating index.codec from [{}] to [{}]", InternalEngineHolder.this.codecName, codecName); - InternalEngineHolder.this.codecName = codecName; + final String codecName = settings.get(INDEX_CODEC, holder.codecName); + if (!codecName.equals(holder.codecName)) { + logger.info("updating index.codec from [{}] to [{}]", holder.codecName, codecName); + holder.codecName = codecName; // we want to flush in this case, so the new codec will be reflected right away... change = true; } - if (failOnMergeFailure != InternalEngineHolder.this.failOnMergeFailure) { - logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, InternalEngineHolder.this.failOnMergeFailure, failOnMergeFailure); - InternalEngineHolder.this.failOnMergeFailure = failOnMergeFailure; + final boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure); + if (failOnMergeFailure != holder.failOnMergeFailure) { + logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure, failOnMergeFailure); + holder.failOnMergeFailure = failOnMergeFailure; } - if (change && currentEngine != null) { - currentEngine.updateSettings(gcDeletesInMillis, compoundOnFlush, failEngineOnCorruption, indexConcurrency, codecName); + + + if (change) { + holder.updateSettings(); } } } + synchronized void updateSettings() { + // we need to make sure that we wait for the engine to be fully initialized + // the start method sets the current engine once it's done but samples the settings + // at construction time. + final InternalEngine engine = currentEngine.get(); + if (engine != null) { + engine.updateSettings(gcDeletesInMillis, compoundOnFlush, failEngineOnCorruption, indexConcurrency, codecName); + } + } + class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener { @Override public void onFailedMerge(MergePolicy.MergeException e) { diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 99f9391a916..5bbbcd8a93f 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -134,4 +134,11 @@ public class IndexDynamicSettingsModule extends AbstractModule { protected void configure() { bind(DynamicSettings.class).annotatedWith(IndexDynamicSettings.class).toInstance(indexDynamicSettings); } + + /** + * Returns true iff the given setting is in the dynamic settings map. Otherwise false. + */ + public boolean containsSetting(String setting) { + return indexDynamicSettings.hasDynamicSetting(setting); + } } diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 341fba631f2..8f7584094a4 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -26,11 +26,13 @@ import org.apache.log4j.Logger; import org.apache.log4j.LogManager; import org.apache.log4j.spi.LoggingEvent; import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexDeletionPolicy; +import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; @@ -38,6 +40,7 @@ import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; @@ -63,6 +66,8 @@ import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; +import org.elasticsearch.index.settings.IndexDynamicSettings; +import org.elasticsearch.index.settings.IndexDynamicSettingsModule; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; @@ -89,13 +94,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean; -import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble; -import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween; +import static com.carrotsearch.randomizedtesting.RandomizedTest.*; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy; +import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom; import static org.elasticsearch.test.ElasticsearchTestCase.terminate; import static org.hamcrest.Matchers.*; @@ -1502,4 +1506,61 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { } } } + + @Test + public void testSettings() { + final InternalEngineHolder holder = (InternalEngineHolder) engine; + IndexDynamicSettingsModule settings = new IndexDynamicSettingsModule(); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_GC_DELETES)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_CODEC)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE)); + assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_INDEX_CONCURRENCY)); + + final int iters = between(1, 20); + for (int i = 0; i < iters; i++) { + boolean compoundOnFlush = randomBoolean(); + boolean failOnCorruption = randomBoolean(); + boolean failOnMerge = randomBoolean(); + long gcDeletes = Math.max(0, randomLong()); + int indexConcurrency = randomIntBetween(1, 20); + String codecName = randomFrom(holder.codecService.availableCodecs()); + + Settings build = ImmutableSettings.builder() + .put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, failOnCorruption) + .put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush) + .put(InternalEngineHolder.INDEX_GC_DELETES, gcDeletes) + .put(InternalEngineHolder.INDEX_CODEC, codecName) + .put(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE, failOnMerge) + .put(InternalEngineHolder.INDEX_INDEX_CONCURRENCY, indexConcurrency) + .build(); + + engineSettingsService.refreshSettings(build); + LiveIndexWriterConfig currentIndexWriterConfig = holder.engineSafe().getCurrentIndexWriterConfig(); + assertEquals(holder.compoundOnFlush, compoundOnFlush); + assertEquals(holder.engineSafe().isCompoundOnFlush(), compoundOnFlush); + assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush); + + + assertEquals(holder.gcDeletesInMillis, gcDeletes); + assertEquals(holder.engineSafe().getGcDeletesInMillis(), gcDeletes); + + assertEquals(holder.codecName, codecName); + assertEquals(holder.engineSafe().getCodecName(), codecName); + assertEquals(currentIndexWriterConfig.getCodec(), holder.codecService.codec(codecName)); + + + assertEquals(holder.failEngineOnCorruption, failOnCorruption); + assertEquals(holder.engineSafe().isFailEngineOnCorruption(), failOnCorruption); + + assertEquals(holder.failOnMergeFailure, failOnMerge); // only on the holder + + assertEquals(holder.indexConcurrency, indexConcurrency); + assertEquals(holder.engineSafe().getIndexConcurrency(), indexConcurrency); + assertEquals(currentIndexWriterConfig.getMaxThreadStates(), indexConcurrency); + + + } + } }