[ENGINE] Fix updates dynamic settings in InternalEngineHolder

After the refactoring in #8784 some settings didn't get passed to the
actual engine and there exists a race if the settings are updated while
the engine is started such that the actual starting engine doesn't see
the latest settings. This commit fixes the concurrency issue as well as
adds tests to ensure the settings are reflected.
This commit is contained in:
Simon Willnauer 2014-12-10 18:43:03 +01:00
parent 788d7cb451
commit f308049a90
5 changed files with 165 additions and 44 deletions

View File

@ -99,4 +99,11 @@ public class CodecService extends AbstractIndexComponent {
} }
return codec; return codec;
} }
/**
* Returns all registered available codec names
*/
public String[] availableCodecs() {
return codecs.keySet().toArray(new String[0]);
}
} }

View File

@ -1752,4 +1752,29 @@ public class InternalEngine implements Engine {
// hard fail - we can't get a SegmentReader // hard fail - we can't get a SegmentReader
throw new ElasticsearchIllegalStateException("Can not extract segment reader from given index reader [" + reader + "]"); throw new ElasticsearchIllegalStateException("Can not extract segment reader from given index reader [" + reader + "]");
} }
long getGcDeletesInMillis() {
return gcDeletesInMillis;
}
String getCodecName() {
return codecName;
}
boolean isCompoundOnFlush() {
return compoundOnFlush;
}
int getIndexConcurrency() {
return indexConcurrency;
}
boolean isFailEngineOnCorruption() {
return failEngineOnCorruption;
}
LiveIndexWriterConfig getCurrentIndexWriterConfig() {
IndexWriter writer = currentIndexWriter();
return writer == null ? null : writer.getConfig();
}
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
@ -64,7 +65,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
private final FailEngineOnMergeFailure mergeSchedulerFailureListener; private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
private final ApplySettings settingsListener; private final ApplySettings settingsListener;
private final MergeScheduleListener mergeSchedulerListener; private final MergeScheduleListener mergeSchedulerListener;
private volatile Boolean failOnMergeFailure; protected volatile Boolean failOnMergeFailure;
protected volatile boolean failEngineOnCorruption; protected volatile boolean failEngineOnCorruption;
protected volatile ByteSizeValue indexingBufferSize; protected volatile ByteSizeValue indexingBufferSize;
protected volatile int indexConcurrency; protected volatile int indexConcurrency;
@ -142,7 +143,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
this.mergeSchedulerListener = new MergeScheduleListener(); this.mergeSchedulerListener = new MergeScheduleListener();
this.mergeScheduler.addListener(mergeSchedulerListener); this.mergeScheduler.addListener(mergeSchedulerListener);
this.settingsListener = new ApplySettings(); this.settingsListener = new ApplySettings(logger, this);
this.indexSettingsService.addListener(this.settingsListener); this.indexSettingsService.addListener(this.settingsListener);
store.incRef(); store.incRef();
} }
@ -214,9 +215,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
@Override @Override
public synchronized void close() throws ElasticsearchException { public synchronized void close() throws ElasticsearchException {
if (closed) { if (closed == false) {
return;
}
closed = true; closed = true;
try { try {
InternalEngine currentEngine = this.currentEngine.getAndSet(null); InternalEngine currentEngine = this.currentEngine.getAndSet(null);
@ -230,6 +229,7 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
store.decRef(); store.decRef();
} }
} }
}
protected InternalEngine createEngine() { protected InternalEngine createEngine() {
return new InternalEngine(shardId, logger, codecService, threadPool, indexingService, return new InternalEngine(shardId, logger, codecService, threadPool, indexingService,
@ -356,55 +356,76 @@ public class InternalEngineHolder extends AbstractIndexShardComponent implements
} }
} }
class ApplySettings implements IndexSettingsService.Listener { static class ApplySettings implements IndexSettingsService.Listener {
private final ESLogger logger;
private final InternalEngineHolder holder;
ApplySettings(ESLogger logger, InternalEngineHolder holder) {
this.logger = logger;
this.holder = holder;
}
@Override @Override
public void onRefreshSettings(Settings settings) { public void onRefreshSettings(Settings settings) {
InternalEngine currentEngine = InternalEngineHolder.this.currentEngine.get();
boolean change = false; boolean change = false;
long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis)).millis(); long gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES, TimeValue.timeValueMillis(holder.gcDeletesInMillis)).millis();
if (gcDeletesInMillis != InternalEngineHolder.this.gcDeletesInMillis) { if (gcDeletesInMillis != holder.gcDeletesInMillis) {
logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(InternalEngineHolder.this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis)); logger.info("updating index.gc_deletes from [{}] to [{}]", TimeValue.timeValueMillis(holder.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
InternalEngineHolder.this.gcDeletesInMillis = gcDeletesInMillis; holder.gcDeletesInMillis = gcDeletesInMillis;
change = true; change = true;
} }
final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush); final boolean compoundOnFlush = settings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush);
if (compoundOnFlush != InternalEngineHolder.this.compoundOnFlush) { if (compoundOnFlush != holder.compoundOnFlush) {
logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, InternalEngineHolder.this.compoundOnFlush, compoundOnFlush); logger.info("updating {} from [{}] to [{}]", INDEX_COMPOUND_ON_FLUSH, holder.compoundOnFlush, compoundOnFlush);
InternalEngineHolder.this.compoundOnFlush = compoundOnFlush; holder.compoundOnFlush = compoundOnFlush;
change = true; change = true;
} }
final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption); final boolean failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption);
if (failEngineOnCorruption != InternalEngineHolder.this.failEngineOnCorruption) { if (failEngineOnCorruption != holder.failEngineOnCorruption) {
logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, InternalEngineHolder.this.failEngineOnCorruption, failEngineOnCorruption); logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_CORRUPTION, holder.failEngineOnCorruption, failEngineOnCorruption);
InternalEngineHolder.this.failEngineOnCorruption = failEngineOnCorruption; holder.failEngineOnCorruption = failEngineOnCorruption;
change = true; change = true;
} }
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngineHolder.this.indexConcurrency); int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, holder.indexConcurrency);
if (indexConcurrency != InternalEngineHolder.this.indexConcurrency) { if (indexConcurrency != holder.indexConcurrency) {
logger.info("updating index.index_concurrency from [{}] to [{}]", InternalEngineHolder.this.indexConcurrency, indexConcurrency); logger.info("updating index.index_concurrency from [{}] to [{}]", holder.indexConcurrency, indexConcurrency);
InternalEngineHolder.this.indexConcurrency = indexConcurrency; holder.indexConcurrency = indexConcurrency;
// we have to flush in this case, since it only applies on a new index writer // we have to flush in this case, since it only applies on a new index writer
change = true; change = true;
} }
if (!codecName.equals(InternalEngineHolder.this.codecName)) { final String codecName = settings.get(INDEX_CODEC, holder.codecName);
logger.info("updating index.codec from [{}] to [{}]", InternalEngineHolder.this.codecName, codecName); if (!codecName.equals(holder.codecName)) {
InternalEngineHolder.this.codecName = codecName; logger.info("updating index.codec from [{}] to [{}]", holder.codecName, codecName);
holder.codecName = codecName;
// we want to flush in this case, so the new codec will be reflected right away... // we want to flush in this case, so the new codec will be reflected right away...
change = true; change = true;
} }
if (failOnMergeFailure != InternalEngineHolder.this.failOnMergeFailure) { final boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure);
logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, InternalEngineHolder.this.failOnMergeFailure, failOnMergeFailure); if (failOnMergeFailure != holder.failOnMergeFailure) {
InternalEngineHolder.this.failOnMergeFailure = failOnMergeFailure; logger.info("updating {} from [{}] to [{}]", INDEX_FAIL_ON_MERGE_FAILURE, holder.failOnMergeFailure, failOnMergeFailure);
holder.failOnMergeFailure = failOnMergeFailure;
} }
if (change && currentEngine != null) {
currentEngine.updateSettings(gcDeletesInMillis, compoundOnFlush, failEngineOnCorruption, indexConcurrency, codecName);
if (change) {
holder.updateSettings();
} }
} }
} }
synchronized void updateSettings() {
// we need to make sure that we wait for the engine to be fully initialized
// the start method sets the current engine once it's done but samples the settings
// at construction time.
final InternalEngine engine = currentEngine.get();
if (engine != null) {
engine.updateSettings(gcDeletesInMillis, compoundOnFlush, failEngineOnCorruption, indexConcurrency, codecName);
}
}
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener { class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
@Override @Override
public void onFailedMerge(MergePolicy.MergeException e) { public void onFailedMerge(MergePolicy.MergeException e) {

View File

@ -134,4 +134,11 @@ public class IndexDynamicSettingsModule extends AbstractModule {
protected void configure() { protected void configure() {
bind(DynamicSettings.class).annotatedWith(IndexDynamicSettings.class).toInstance(indexDynamicSettings); bind(DynamicSettings.class).annotatedWith(IndexDynamicSettings.class).toInstance(indexDynamicSettings);
} }
/**
* Returns <code>true</code> iff the given setting is in the dynamic settings map. Otherwise <code>false</code>.
*/
public boolean containsSetting(String setting) {
return indexDynamicSettings.hasDynamicSetting(setting);
}
} }

View File

@ -26,11 +26,13 @@ import org.apache.log4j.Logger;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.spi.LoggingEvent; import org.apache.log4j.spi.LoggingEvent;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField; import org.apache.lucene.document.TextField;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
@ -38,6 +40,7 @@ import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
@ -63,6 +66,8 @@ import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexDynamicSettings;
import org.elasticsearch.index.settings.IndexDynamicSettingsModule;
import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.ShardUtils;
@ -89,13 +94,12 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomBoolean; import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomDouble;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS; import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy; import static org.elasticsearch.test.ElasticsearchTestCase.awaitBusy;
import static org.elasticsearch.test.ElasticsearchTestCase.randomFrom;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate; import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -1502,4 +1506,61 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
} }
} }
} }
@Test
public void testSettings() {
final InternalEngineHolder holder = (InternalEngineHolder) engine;
IndexDynamicSettingsModule settings = new IndexDynamicSettingsModule();
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_GC_DELETES));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_CODEC));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE));
assertTrue(settings.containsSetting(InternalEngineHolder.INDEX_INDEX_CONCURRENCY));
final int iters = between(1, 20);
for (int i = 0; i < iters; i++) {
boolean compoundOnFlush = randomBoolean();
boolean failOnCorruption = randomBoolean();
boolean failOnMerge = randomBoolean();
long gcDeletes = Math.max(0, randomLong());
int indexConcurrency = randomIntBetween(1, 20);
String codecName = randomFrom(holder.codecService.availableCodecs());
Settings build = ImmutableSettings.builder()
.put(InternalEngineHolder.INDEX_FAIL_ON_CORRUPTION, failOnCorruption)
.put(InternalEngineHolder.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(InternalEngineHolder.INDEX_GC_DELETES, gcDeletes)
.put(InternalEngineHolder.INDEX_CODEC, codecName)
.put(InternalEngineHolder.INDEX_FAIL_ON_MERGE_FAILURE, failOnMerge)
.put(InternalEngineHolder.INDEX_INDEX_CONCURRENCY, indexConcurrency)
.build();
engineSettingsService.refreshSettings(build);
LiveIndexWriterConfig currentIndexWriterConfig = holder.engineSafe().getCurrentIndexWriterConfig();
assertEquals(holder.compoundOnFlush, compoundOnFlush);
assertEquals(holder.engineSafe().isCompoundOnFlush(), compoundOnFlush);
assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush);
assertEquals(holder.gcDeletesInMillis, gcDeletes);
assertEquals(holder.engineSafe().getGcDeletesInMillis(), gcDeletes);
assertEquals(holder.codecName, codecName);
assertEquals(holder.engineSafe().getCodecName(), codecName);
assertEquals(currentIndexWriterConfig.getCodec(), holder.codecService.codec(codecName));
assertEquals(holder.failEngineOnCorruption, failOnCorruption);
assertEquals(holder.engineSafe().isFailEngineOnCorruption(), failOnCorruption);
assertEquals(holder.failOnMergeFailure, failOnMerge); // only on the holder
assertEquals(holder.indexConcurrency, indexConcurrency);
assertEquals(holder.engineSafe().getIndexConcurrency(), indexConcurrency);
assertEquals(currentIndexWriterConfig.getMaxThreadStates(), indexConcurrency);
}
}
} }