[ENGINE] Factor out settings updates from Engine

The engine is already pretty complex, it's still confulated with
code that doesn't necessarily belong there. Updateing the settings from
the settings service can be done on the level above. This commit cleans up
the settings code in the engine and moves it to the IndexShard.
This commit is contained in:
Simon Willnauer 2015-02-10 10:36:56 +01:00
parent c9893ba0c2
commit 401e6c6b06
7 changed files with 168 additions and 186 deletions

View File

@ -59,8 +59,8 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public abstract class Engine implements Closeable {
private final ESLogger logger;
private final EngineConfig engineConfig;
protected final ESLogger logger;
protected final EngineConfig engineConfig;
protected Engine(EngineConfig engineConfig) {
Preconditions.checkNotNull(engineConfig.getStore(), "Store must be provided to the engine");
@ -110,6 +110,10 @@ public abstract class Engine implements Closeable {
return new EngineSearcher(source, searcher, manager, engineConfig.getStore(), logger);
}
public final EngineConfig config() {
return engineConfig;
}
/** A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
* is enabled
@ -169,8 +173,6 @@ public abstract class Engine implements Closeable {
}
}
public abstract void updateIndexingBufferSize(ByteSizeValue indexingBufferSize);
public abstract void create(Create create) throws EngineException;
public abstract void index(Index index) throws EngineException;
@ -248,8 +250,6 @@ public abstract class Engine implements Closeable {
/** fail engine due to some error. the engine will also be closed. */
public abstract void failEngine(String reason, Throwable failure);
public abstract ByteSizeValue indexingBufferSize();
public static interface FailedEngineListener {
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
}

View File

@ -271,14 +271,6 @@ public final class EngineConfig {
return indexingService;
}
/**
* Returns an {@link org.elasticsearch.index.settings.IndexSettingsService} used to register a {@link org.elasticsearch.index.engine.EngineConfig.EngineSettingsListener} instance
* in order to get notification for realtime changeable settings exposed in this {@link org.elasticsearch.index.engine.EngineConfig}.
*/
public IndexSettingsService getIndexSettingsService() {
return indexSettingsService;
}
/**
* Returns an {@link org.elasticsearch.indices.IndicesWarmer} used to warm new searchers before they are used for searching.
* Note: This method might retrun <code>null</code>
@ -365,58 +357,30 @@ public final class EngineConfig {
}
/**
* Basic realtime updateable settings listener that can be used ot receive notification
* if an index setting changed.
* Sets the GC deletes cycle in milliseconds.
*/
public static abstract class EngineSettingsListener implements IndexSettingsService.Listener {
public void setGcDeletesInMillis(long gcDeletesInMillis) {
this.gcDeletesInMillis = gcDeletesInMillis;
}
private final ESLogger logger;
private final EngineConfig config;
/**
* Sets if flushed segments should be written as compound file system. Defaults to <code>true</code>
*/
public void setCompoundOnFlush(boolean compoundOnFlush) {
this.compoundOnFlush = compoundOnFlush;
}
public EngineSettingsListener(ESLogger logger, EngineConfig config) {
this.logger = logger;
this.config = config;
}
/**
* Sets if the engine should be failed in the case of a corrupted index. Defaults to <code>true</code>
*/
public void setFailEngineOnCorruption(boolean failEngineOnCorruption) {
this.failEngineOnCorruption = failEngineOnCorruption;
}
@Override
public final void onRefreshSettings(Settings settings) {
boolean change = false;
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
config.gcDeletesInMillis = gcDeletesInMillis;
change = true;
}
final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush());
if (compoundOnFlush != config.isCompoundOnFlush()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush);
config.compoundOnFlush = compoundOnFlush;
change = true;
}
final boolean failEngineOnCorruption = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption());
if (failEngineOnCorruption != config.isFailEngineOnCorruption()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption(), failEngineOnCorruption);
config.failEngineOnCorruption = failEngineOnCorruption;
change = true;
}
final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure());
if (failOnMergeFailure != config.isFailOnMergeFailure()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure);
config.failOnMergeFailure = failOnMergeFailure;
change = true;
}
if (change) {
onChange();
}
}
/**
* This method is called if any of the settings that are exposed as realtime updateble settings has changed.
* This method should be overwritten by subclasses to react on settings changes.
*/
protected abstract void onChange();
/**
* Sets if the engine should be failed if a merge error is hit. Defaults to <code>true</code>
*/
public void setFailOnMergeFailure(boolean failOnMergeFailure) {
this.failOnMergeFailure = failOnMergeFailure;
}
}

View File

@ -75,12 +75,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class InternalEngine extends Engine {
protected final ESLogger logger;
protected final ShardId shardId;
private final EngineConfig engineConfig;
private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
private final MergeSchedulerListener mergeSchedulerListener;
private final EngineConfig.EngineSettingsListener listener;
/** When we last pruned expired tombstones from versionMap.deletes: */
private volatile long lastDeleteVersionPruneTimeMSec;
@ -132,7 +129,6 @@ public class InternalEngine extends Engine {
super(engineConfig);
this.store = engineConfig.getStore();
this.shardId = engineConfig.getShardId();
this.logger = Loggers.getLogger(getClass(), engineConfig.getIndexSettings(), shardId);
this.versionMap = new LiveVersionMap();
store.incRef();
IndexWriter writer = null;
@ -154,15 +150,7 @@ public class InternalEngine extends Engine {
this.failedEngineListener = engineConfig.getFailedEngineListener();
throttle = new IndexThrottle();
this.engineConfig = engineConfig;
this.searcherFactory = new SearchFactory(engineConfig);
listener = new EngineConfig.EngineSettingsListener(logger, engineConfig) {
@Override
protected void onChange() {
updateSettings();
}
};
engineConfig.getIndexSettingsService().addListener(listener);
try {
writer = createWriter();
} catch (IOException e) {
@ -188,31 +176,6 @@ public class InternalEngine extends Engine {
}
}
@Override
public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
ByteSizeValue preValue = engineConfig.getIndexingBufferSize();
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
engineConfig.setIndexingBufferSize(indexingBufferSize);
indexWriter.getConfig().setRAMBufferSizeMB(indexingBufferSize.mbFrac());
}
if (preValue.bytes() != indexingBufferSize.bytes()) {
// its inactive, make sure we do a refresh / full IW flush in this case, since the memory
// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
if (indexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER && preValue != EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) {
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, indexingBufferSize);
try {
refresh("update index buffer");
} catch (Throwable e) {
logger.warn("failed to refresh after setting shard to inactive", e);
}
} else {
logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, indexingBufferSize);
}
}
}
private SearcherManager createSearcherManager() throws EngineException {
boolean success = false;
SearcherManager searcherManager = null;
@ -254,18 +217,16 @@ public class InternalEngine extends Engine {
}
}
private void updateSettings() {
if (isClosed.get() == false) {
private void updateIndexWriterSettings() {
try {
final LiveIndexWriterConfig iwc = indexWriter.getConfig();
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().mbFrac());
iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush());
} catch (AlreadyClosedException ex) {
// ignore
}
}
@Override
public ByteSizeValue indexingBufferSize() {
return engineConfig.getIndexingBufferSize();
}
@Override
public GetResult get(Get get) throws EngineException {
try (ReleasableLock _ = readLock.acquire()) {
@ -682,6 +643,7 @@ public class InternalEngine extends Engine {
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
updateIndexWriterSettings();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
ensureOpen();
@ -712,6 +674,7 @@ public class InternalEngine extends Engine {
private void flush(boolean commitTranslog, boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
updateIndexWriterSettings();
if (commitTranslog) {
// check outside the lock as well so we can check without blocking on the write lock
if (onGoingRecoveries.get() > 0) {
@ -1175,7 +1138,6 @@ public class InternalEngine extends Engine {
store.decRef();
this.mergeScheduler.removeListener(mergeSchedulerListener);
this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
engineConfig.getIndexSettingsService().removeListener(listener);
logger.debug("engine closed [{}]", reason);
}
}
@ -1423,10 +1385,6 @@ public class InternalEngine extends Engine {
}
}
EngineConfig config() {
return engineConfig;
}
private void commitIndexWriter(IndexWriter writer) throws IOException {
try {

View File

@ -44,6 +44,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
@ -911,20 +912,34 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public void updateBufferSize(ByteSizeValue shardIndexingBufferSize, ByteSizeValue shardTranslogBufferSize) {
Engine engine = engine();
engine.updateIndexingBufferSize(shardIndexingBufferSize);
ByteSizeValue preValue = config.getIndexingBufferSize();
config.setIndexingBufferSize(shardIndexingBufferSize);
if (preValue.bytes() != shardIndexingBufferSize.bytes()) {
// its inactive, make sure we do a refresh / full IW flush in this case, since the memory
// changes only after a "data" change has happened to the writer
// the index writer lazily allocates memory and a refresh will clean it all up.
if (shardIndexingBufferSize == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER && preValue != EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER) {
logger.debug("updating index_buffer_size from [{}] to (inactive) [{}]", preValue, shardIndexingBufferSize);
try {
refresh("update index buffer");
} catch (Throwable e) {
logger.warn("failed to refresh after setting shard to inactive", e);
}
} else {
logger.debug("updating index_buffer_size from [{}] to [{}]", preValue, shardIndexingBufferSize);
}
}
translog().updateBuffer(shardTranslogBufferSize);
}
public void markAsInactive() {
Engine engine = engine();
engine.updateIndexingBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER);
translog().updateBuffer(Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
updateBufferSize(EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER, Translog.INACTIVE_SHARD_TRANSLOG_BUFFER);
}
private class ApplyRefreshSettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
boolean change = false;
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
return;
@ -944,6 +959,36 @@ public class IndexShard extends AbstractIndexShardComponent {
refreshScheduledFuture = threadPool.schedule(refreshInterval, ThreadPool.Names.SAME, new EngineRefresher());
}
}
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
config.setGcDeletesInMillis(gcDeletesInMillis);
change = true;
}
final boolean compoundOnFlush = settings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush());
if (compoundOnFlush != config.isCompoundOnFlush()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_COMPOUND_ON_FLUSH, config.isCompoundOnFlush(), compoundOnFlush);
config.setCompoundOnFlush(compoundOnFlush);
change = true;
}
final boolean failEngineOnCorruption = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption());
if (failEngineOnCorruption != config.isFailEngineOnCorruption()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, config.isFailEngineOnCorruption(), failEngineOnCorruption);
config.setFailEngineOnCorruption(failEngineOnCorruption);
change = true;
}
final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure());
if (failOnMergeFailure != config.isFailOnMergeFailure()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure);
config.setFailOnMergeFailure(failOnMergeFailure);
change = true;
}
}
if (change) {
refresh("apply settings");
}
}
}
@ -1103,5 +1148,4 @@ public class IndexShard extends AbstractIndexShardComponent {
this.currentEngineReference.set(engineFactory.newEngine(config));
}
}
}

View File

@ -18,7 +18,9 @@
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
@ -28,7 +30,7 @@ import static org.hamcrest.Matchers.is;
public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
public void testLuceneSettings() {
public void testSettingsUpdate() {
final IndexService service = createIndex("foo");
// INDEX_COMPOUND_ON_FLUSH
InternalEngine engine = ((InternalEngine)engine(service));
@ -37,6 +39,58 @@ public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(false));
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true).build()).get();
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true));
final int iters = between(1, 20);
for (int i = 0; i < iters; i++) {
boolean compoundOnFlush = randomBoolean();
boolean failOnCorruption = randomBoolean();
boolean failOnMerge = randomBoolean();
long gcDeletes = Math.max(0, randomLong());
Settings build = ImmutableSettings.builder()
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, failOnCorruption)
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes)
.put(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, failOnMerge)
.build();
client().admin().indices().prepareUpdateSettings("foo").setSettings(build).get();
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
assertEquals(engine.config().isCompoundOnFlush(), compoundOnFlush);
assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush);
assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.config().isFailEngineOnCorruption(), failOnCorruption);
assertEquals(engine.config().isFailOnMergeFailure(), failOnMerge); // only on the holder
}
Settings settings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, 1000)
.build();
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
assertEquals(engine.getGcDeletesInMillis(), 1000);
assertTrue(engine.config().isEnableGcDeletes());
settings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "0ms")
.build();
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
assertEquals(engine.getGcDeletesInMillis(), 0);
assertTrue(engine.config().isEnableGcDeletes());
settings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, 1000)
.build();
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
assertEquals(engine.getGcDeletesInMillis(), 1000);
assertTrue(engine.config().isEnableGcDeletes());
}

View File

@ -109,10 +109,6 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
protected Engine engine;
protected Engine replicaEngine;
private IndexSettingsService engineSettingsService;
private IndexSettingsService replicaSettingsService;
private Settings defaultSettings;
private int indexConcurrency;
private String codecName;
@ -142,9 +138,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
store.deleteContent();
storeReplica = createStore();
storeReplica.deleteContent();
engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
translog = createTranslog();
engine = createEngine(engineSettingsService, store, translog);
engine = createEngine(store, translog);
LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)engine).getCurrentIndexWriterConfig();
assertEquals(((InternalEngine)engine).config().getCodec().getName(), codecService.codec(codecName).getName());
@ -152,9 +147,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
if (randomBoolean()) {
((InternalEngine)engine).config().setEnableGcDeletes(false);
}
replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
replicaTranslog = createTranslogReplica();
replicaEngine = createEngine(replicaSettingsService, storeReplica, replicaTranslog);
replicaEngine = createEngine(storeReplica, replicaTranslog);
currentIndexWriterConfig = ((InternalEngine)replicaEngine).getCurrentIndexWriterConfig();
assertEquals(((InternalEngine)replicaEngine).config().getCodec().getName(), codecService.codec(codecName).getName());
@ -238,11 +232,12 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService);
}
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog) {
protected InternalEngine createEngine(Store store, Translog translog) {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
return createEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService));
}
protected Engine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
protected InternalEngine createEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
return new InternalEngine(config(indexSettingsService, store, translog, mergeSchedulerProvider));
}
@ -308,7 +303,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, false).build());
((InternalEngine)engine).config().setCompoundOnFlush(false);
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, false);
engine.create(new Engine.Create(null, newUid("3"), doc3));
@ -356,7 +351,7 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
engineSettingsService.refreshSettings(ImmutableSettings.builder().put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true).build());
((InternalEngine)engine).config().setCompoundOnFlush(true);
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, false);
engine.create(new Engine.Create(null, newUid("4"), doc4));
engine.refresh("test");
@ -438,7 +433,8 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
}
});
final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
final Engine engine = createEngine(indexSettingsService, store, createTranslog(), mergeSchedulerProvider);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, false);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
@ -1395,12 +1391,10 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
Store store = createStore();
// Make sure enableGCDeletes == false works:
Settings settings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "0ms")
.build();
Engine engine = new InternalEngine(config(engineSettingsService, store, createTranslog(), createMergeScheduler(engineSettingsService)));
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
Engine engine = new InternalEngine(config(indexSettingsService, store, createTranslog(), createMergeScheduler(indexSettingsService)));
((InternalEngine)engine).config().setEnableGcDeletes(false);
// Add document
@ -1489,27 +1483,22 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
int refCount = store.refCount();
assertTrue("refCount: "+ store.refCount(), store.refCount() > 0);
Translog translog = createTranslog();
Settings build = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), build);
Engine holder;
InternalEngine holder;
try {
holder = createEngine(indexSettingsService, store, translog);
holder = createEngine(store, translog);
} catch (EngineCreationFailureException ex) {
assertEquals(store.refCount(), refCount);
continue;
}
indexSettingsService.refreshSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true).build());
holder.config().setFailEngineOnCorruption(true);
assertEquals(store.refCount(), refCount+1);
final int numStarts = scaledRandomIntBetween(1, 5);
for (int j = 0; j < numStarts; j++) {
try {
assertEquals(store.refCount(), refCount + 1);
holder.close();
holder = createEngine(indexSettingsService, store, translog);
holder = createEngine(store, translog);
holder.config().setFailEngineOnCorruption(true);
assertEquals(store.refCount(), refCount + 1);
} catch (EngineCreationFailureException ex) {
// all is fine
@ -1541,32 +1530,6 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
assertTrue(settings.containsSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH));
assertTrue(settings.containsSetting(EngineConfig.INDEX_GC_DELETES_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING));
final int iters = between(1, 20);
for (int i = 0; i < iters; i++) {
boolean compoundOnFlush = randomBoolean();
boolean failOnCorruption = randomBoolean();
boolean failOnMerge = randomBoolean();
long gcDeletes = Math.max(0, randomLong());
Settings build = ImmutableSettings.builder()
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, failOnCorruption)
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes)
.put(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, failOnMerge)
.build();
engineSettingsService.refreshSettings(build);
currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
assertEquals(engine.config().isCompoundOnFlush(), compoundOnFlush);
assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush);
assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.config().isFailEngineOnCorruption(), failOnCorruption);
assertEquals(engine.config().isFailOnMergeFailure(), failOnMerge); // only on the holder
}
}
@Test

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
@ -58,15 +57,15 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
boolean success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return shard1.engine().indexingBufferSize().bytes() <= expected2ShardsSize &&
shard2.engine().indexingBufferSize().bytes() <= expected2ShardsSize;
return shard1.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize &&
shard2.engine().config().getIndexingBufferSize().bytes() <= expected2ShardsSize;
}
});
if (!success) {
fail("failed to update shard indexing buffer size. expected [" + expected2ShardsSize + "] shard1 [" +
shard1.engine().indexingBufferSize().bytes() + "] shard2 [" +
shard2.engine().indexingBufferSize().bytes() + "]"
shard1.engine().config().getIndexingBufferSize().bytes() + "] shard2 [" +
shard2.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
@ -74,13 +73,13 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return shard1.engine().indexingBufferSize().bytes() >= expected1ShardSize;
return shard1.engine().config().getIndexingBufferSize().bytes() >= expected1ShardSize;
}
});
if (!success) {
fail("failed to update shard indexing buffer size after deleting shards. expected [" + expected1ShardSize + "] got [" +
shard1.engine().indexingBufferSize().bytes() + "]"
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
@ -99,12 +98,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
boolean success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return shard1.engine().indexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
return shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
}
});
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().indexingBufferSize().bytes() + "]"
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
@ -113,12 +112,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return shard1.engine().indexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
return shard1.engine().config().getIndexingBufferSize().bytes() > EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
}
});
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected something larger then [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().indexingBufferSize().bytes() + "]"
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
@ -127,12 +126,12 @@ public class IndexingMemoryControllerTests extends ElasticsearchIntegrationTest
success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return shard1.engine().indexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
return shard1.engine().config().getIndexingBufferSize().bytes() == EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER.bytes();
}
});
if (!success) {
fail("failed to update shard indexing buffer size due to inactive state. expected [" + EngineConfig.INACTIVE_SHARD_INDEXING_BUFFER + "] got [" +
shard1.engine().indexingBufferSize().bytes() + "]"
shard1.engine().config().getIndexingBufferSize().bytes() + "]"
);
}
}