[ENGINE] Remove full flush / FlushType.NEW_WRITER

The `full` option and `FlushType.NEW_WRITER` only exists to allow
realtime changes to two settings (`index.codec` and `index.concurrency`).
Those settings are very expert and don't really need to be updateable
in realtime.
This commit is contained in:
Simon Willnauer 2015-02-04 12:13:40 +01:00
parent 7beaaaef62
commit 0c5599e1d1
14 changed files with 48 additions and 139 deletions

View File

@ -25,12 +25,6 @@ flush can be executed if another flush operation is already executing.
The default is `false` and will cause an exception to be thrown on The default is `false` and will cause an exception to be thrown on
the shard level if another flush operation is already running. the shard level if another flush operation is already running.
`full`:: If set to `true` a new index writer is created and settings that have
been changed related to the index writer will be refreshed. Note: if a full flush
is required for a setting to take effect this will be part of the settings update
process and it not required to be executed by the user.
(This setting can be considered as internal)
`force`:: Whether a flush should be forced even if it is not necessarily needed ie. `force`:: Whether a flush should be forced even if it is not necessarily needed ie.
if no changes will be committed to the index. This is useful if transaction log IDs if no changes will be committed to the index. This is useful if transaction log IDs
should be incremented even if no uncommitted changes are present. should be incremented even if no uncommitted changes are present.

View File

@ -16,10 +16,6 @@
"type" : "boolean", "type" : "boolean",
"description" : "Whether a flush should be forced even if it is not necessarily needed ie. if no changes will be committed to the index. This is useful if transaction log IDs should be incremented even if no uncommitted changes are present. (This setting can be considered as internal)" "description" : "Whether a flush should be forced even if it is not necessarily needed ie. if no changes will be committed to the index. This is useful if transaction log IDs should be incremented even if no uncommitted changes are present. (This setting can be considered as internal)"
}, },
"full": {
"type" : "boolean",
"description" : "If set to true a new index writer is created and settings that have been changed related to the index writer will be refreshed. Note: if a full flush is required for a setting to take effect this will be part of the settings update process and it not required to be executed by the user. (This setting can be considered as internal)"
},
"wait_if_ongoing": { "wait_if_ongoing": {
"type" : "boolean", "type" : "boolean",
"description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running." "description" : "If set to true the flush operation will block until the flush can be executed if another flush operation is already executing. The default is false and will cause an exception to be thrown on the shard level if another flush operation is already running."

View File

@ -41,7 +41,6 @@ import java.io.IOException;
public class FlushRequest extends BroadcastOperationRequest<FlushRequest> { public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
private boolean force = false; private boolean force = false;
private boolean full = false;
private boolean waitIfOngoing = false; private boolean waitIfOngoing = false;
FlushRequest() { FlushRequest() {
@ -63,21 +62,6 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
super(indices); super(indices);
} }
/**
* Should a "full" flush be performed.
*/
public boolean full() {
return this.full;
}
/**
* Should a "full" flush be performed.
*/
public FlushRequest full(boolean full) {
this.full = full;
return this;
}
/** /**
* Returns <tt>true</tt> iff a flush should block * Returns <tt>true</tt> iff a flush should block
* if a another flush operation is already running. Otherwise <tt>false</tt> * if a another flush operation is already running. Otherwise <tt>false</tt>
@ -113,7 +97,6 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeBoolean(full);
out.writeBoolean(force); out.writeBoolean(force);
out.writeBoolean(waitIfOngoing); out.writeBoolean(waitIfOngoing);
} }
@ -121,7 +104,6 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
full = in.readBoolean();
force = in.readBoolean(); force = in.readBoolean();
waitIfOngoing = in.readBoolean(); waitIfOngoing = in.readBoolean();
} }

View File

@ -32,11 +32,6 @@ public class FlushRequestBuilder extends BroadcastOperationRequestBuilder<FlushR
super(indicesClient, new FlushRequest()); super(indicesClient, new FlushRequest());
} }
public FlushRequestBuilder setFull(boolean full) {
request.full(full);
return this;
}
public FlushRequestBuilder setForce(boolean force) { public FlushRequestBuilder setForce(boolean force) {
request.force(force); request.force(force);
return this; return this;

View File

@ -188,10 +188,6 @@ public interface Engine extends Closeable {
} }
public static enum FlushType { public static enum FlushType {
/**
* A flush that causes a new writer to be created.
*/
NEW_WRITER,
/** /**
* A flush that just commits the writer, without cleaning the translog. * A flush that just commits the writer, without cleaning the translog.
*/ */

View File

@ -53,11 +53,11 @@ public final class EngineConfig {
private volatile boolean failOnMergeFailure = true; private volatile boolean failOnMergeFailure = true;
private volatile boolean failEngineOnCorruption = true; private volatile boolean failEngineOnCorruption = true;
private volatile ByteSizeValue indexingBufferSize; private volatile ByteSizeValue indexingBufferSize;
private volatile int indexConcurrency = IndexWriterConfig.DEFAULT_MAX_THREAD_STATES; private final int indexConcurrency;
private volatile boolean compoundOnFlush = true; private volatile boolean compoundOnFlush = true;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean enableGcDeletes = true; private volatile boolean enableGcDeletes = true;
private volatile String codecName = DEFAULT_CODEC_NAME; private final String codecName;
private final boolean optimizeAutoGenerateId; private final boolean optimizeAutoGenerateId;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final ShardIndexingService indexingService; private final ShardIndexingService indexingService;
@ -77,7 +77,7 @@ public final class EngineConfig {
/** /**
* Index setting for index concurrency / number of threadstates in the indexwriter. * Index setting for index concurrency / number of threadstates in the indexwriter.
* The default is depending on the number of CPUs in the system. We use a 0.65 the number of CPUs or at least {@value org.apache.lucene.index.IndexWriterConfig#DEFAULT_MAX_THREAD_STATES} * The default is depending on the number of CPUs in the system. We use a 0.65 the number of CPUs or at least {@value org.apache.lucene.index.IndexWriterConfig#DEFAULT_MAX_THREAD_STATES}
* This setting is realtime updateable * This setting is <b>not</b> realtime updateable
*/ */
public static final String INDEX_CONCURRENCY_SETTING = "index.index_concurrency"; public static final String INDEX_CONCURRENCY_SETTING = "index.index_concurrency";
@ -118,7 +118,7 @@ public final class EngineConfig {
/** /**
* Index setting to change the low level lucene codec used for writing new segments. * Index setting to change the low level lucene codec used for writing new segments.
* This setting is realtime updateable. * This setting is <b>not</b> realtime updateable.
*/ */
public static final String INDEX_CODEC_SETTING = "index.codec"; public static final String INDEX_CODEC_SETTING = "index.codec";
@ -166,15 +166,6 @@ public final class EngineConfig {
this.indexingBufferSize = indexingBufferSize; this.indexingBufferSize = indexingBufferSize;
} }
/**
* Sets the index concurrency
* @see #getIndexConcurrency()
*/
public void setIndexConcurrency(int indexConcurrency) {
this.indexConcurrency = indexConcurrency;
}
/** /**
* Enables / disables gc deletes * Enables / disables gc deletes
* *
@ -245,9 +236,7 @@ public final class EngineConfig {
/** /**
* Returns the {@link Codec} used in the engines {@link org.apache.lucene.index.IndexWriter} * Returns the {@link Codec} used in the engines {@link org.apache.lucene.index.IndexWriter}
* <p> * <p>
* Note: this settings is only read on startup and if a new writer is created. This happens either due to a * Note: this settings is only read on startup.
* settings change in the {@link org.elasticsearch.index.engine.EngineConfig.EngineSettingsListener} or if
* {@link Engine#flush(org.elasticsearch.index.engine.Engine.FlushType, boolean, boolean)} with {@link org.elasticsearch.index.engine.Engine.FlushType#NEW_WRITER} is executed.
* </p> * </p>
*/ */
public Codec getCodec() { public Codec getCodec() {
@ -412,20 +401,6 @@ public final class EngineConfig {
config.failEngineOnCorruption = failEngineOnCorruption; config.failEngineOnCorruption = failEngineOnCorruption;
change = true; change = true;
} }
int indexConcurrency = settings.getAsInt(EngineConfig.INDEX_CONCURRENCY_SETTING, config.getIndexConcurrency());
if (indexConcurrency != config.getIndexConcurrency()) {
logger.info("updating index.index_concurrency from [{}] to [{}]", config.getIndexConcurrency(), indexConcurrency);
config.setIndexConcurrency(indexConcurrency);
// we have to flush in this case, since it only applies on a new index writer
change = true;
}
final String codecName = settings.get(EngineConfig.INDEX_CODEC_SETTING, config.codecName);
if (!codecName.equals(config.codecName)) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_CODEC_SETTING, config.codecName, codecName);
config.codecName = codecName;
// we want to flush in this case, so the new codec will be reflected right away...
change = true;
}
final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure()); final boolean failOnMergeFailure = settings.getAsBoolean(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure());
if (failOnMergeFailure != config.isFailOnMergeFailure()) { if (failOnMergeFailure != config.isFailOnMergeFailure()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure); logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, config.isFailOnMergeFailure(), failOnMergeFailure);

View File

@ -266,11 +266,6 @@ public class InternalEngine implements Engine {
if (closedOrFailed == false) { if (closedOrFailed == false) {
final LiveIndexWriterConfig iwc = indexWriter.getConfig(); final LiveIndexWriterConfig iwc = indexWriter.getConfig();
iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush()); iwc.setUseCompoundFile(engineConfig.isCompoundOnFlush());
final boolean concurrencyNeedsUpdate = iwc.getMaxThreadStates() != engineConfig.getIndexConcurrency();
final boolean codecNeedsUpdate = iwc.getCodec().equals(engineConfig.getCodec()) == false;
if (codecNeedsUpdate || concurrencyNeedsUpdate) {
flush(FlushType.NEW_WRITER, false, false);
}
} }
} }
@ -720,7 +715,7 @@ public class InternalEngine implements Engine {
@Override @Override
public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException { public void flush(FlushType type, boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen(); ensureOpen();
if (type == FlushType.NEW_WRITER || type == FlushType.COMMIT_TRANSLOG) { if (type == FlushType.COMMIT_TRANSLOG) {
// check outside the lock as well so we can check without blocking on the write lock // check outside the lock as well so we can check without blocking on the write lock
if (onGoingRecoveries.get() > 0) { if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + type + "] is not allowed"); throw new FlushNotAllowedEngineException(shardId, "recovery is in progress, flush [" + type + "] is not allowed");
@ -734,46 +729,7 @@ public class InternalEngine implements Engine {
flushLock.lock(); flushLock.lock();
try { try {
if (type == FlushType.NEW_WRITER) { if (type == FlushType.COMMIT_TRANSLOG) {
try (InternalLock _ = writeLock.acquire()) {
if (onGoingRecoveries.get() > 0) {
throw new FlushNotAllowedEngineException(shardId, "Recovery is in progress, flush is not allowed");
}
try {
{ // commit and close the current writer - we write the current tanslog ID just in case
final long translogId = translog.currentId();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
indexWriter.rollback();
}
indexWriter = createWriter();
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
if (flushNeeded || force) {
flushNeeded = false;
long translogId = translogIdGenerator.incrementAndGet();
indexWriter.setCommitData(Collections.singletonMap(Translog.TRANSLOG_ID_KEY, Long.toString(translogId)));
indexWriter.commit();
translog.newTranslog(translogId);
}
SearcherManager current = this.searcherManager;
this.searcherManager = buildSearchManager(indexWriter);
versionMap.setManager(searcherManager);
try {
IOUtils.close(current);
} catch (Throwable t) {
logger.warn("Failed to close current SearcherManager", t);
}
maybePruneDeletedTombstones();
} catch (Throwable t) {
throw new FlushFailedEngineException(shardId, t);
}
}
} else if (type == FlushType.COMMIT_TRANSLOG) {
try (InternalLock _ = readLock.acquire()) { try (InternalLock _ = readLock.acquire()) {
final IndexWriter indexWriter = currentIndexWriter(); final IndexWriter indexWriter = currentIndexWriter();
if (onGoingRecoveries.get() > 0) { if (onGoingRecoveries.get() > 0) {

View File

@ -83,10 +83,8 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, Validator.POSITIVE_INTEGER); indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_DOCS, Validator.POSITIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, Validator.INTEGER_GTE_2); indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_MERGE_POLICY_MERGE_FACTOR, Validator.INTEGER_GTE_2);
indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT); indexDynamicSettings.addDynamicSetting(LogDocMergePolicyProvider.INDEX_COMPOUND_FORMAT);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CONCURRENCY_SETTING, Validator.NON_NEGATIVE_INTEGER);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH, Validator.BOOLEAN);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_CODEC_SETTING);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING);
indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING); indexDynamicSettings.addDynamicSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING);
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME); indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);

View File

@ -615,7 +615,7 @@ public class IndexShard extends AbstractIndexShardComponent {
logger.trace("flush with {}", request); logger.trace("flush with {}", request);
} }
long time = System.nanoTime(); long time = System.nanoTime();
engine().flush(request.full() ? Engine.FlushType.NEW_WRITER : Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing()); engine().flush(Engine.FlushType.COMMIT_TRANSLOG, request.force(), request.waitIfOngoing());
flushMetric.inc(System.nanoTime() - time); flushMetric.inc(System.nanoTime() - time);
} }

View File

@ -55,7 +55,6 @@ public class RestFlushAction extends BaseRestHandler {
FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index"))); FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
flushRequest.listenerThreaded(false); flushRequest.listenerThreaded(false);
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions())); flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
flushRequest.full(request.paramAsBoolean("full", flushRequest.full()));
flushRequest.force(request.paramAsBoolean("force", flushRequest.force())); flushRequest.force(request.paramAsBoolean("force", flushRequest.force()));
flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing())); flushRequest.waitIfOngoing(request.paramAsBoolean("wait_if_ongoing", flushRequest.waitIfOngoing()));
client.admin().indices().flush(flushRequest, new RestBuilderListener<FlushResponse>(channel) { client.admin().indices().flush(flushRequest, new RestBuilderListener<FlushResponse>(channel) {

View File

@ -137,7 +137,7 @@ public class ScriptComparisonBenchmark {
} }
bulkRequest.execute().actionGet(); bulkRequest.execute().actionGet();
client.admin().indices().prepareRefresh("test").execute().actionGet(); client.admin().indices().prepareRefresh("test").execute().actionGet();
client.admin().indices().prepareFlush("test").setFull(true).execute().actionGet(); client.admin().indices().prepareFlush("test").execute().actionGet();
System.out.println("done"); System.out.println("done");
} }

View File

@ -234,7 +234,7 @@ public class BasicScriptBenchmark {
} }
bulkRequest.execute().actionGet(); bulkRequest.execute().actionGet();
client.admin().indices().prepareRefresh("test").execute().actionGet(); client.admin().indices().prepareRefresh("test").execute().actionGet();
client.admin().indices().prepareFlush("test").setFull(true).execute().actionGet(); client.admin().indices().prepareFlush("test").execute().actionGet();
System.out.println("Done indexing " + numDocs + " documents"); System.out.println("Done indexing " + numDocs + " documents");
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.engine.internal; package org.elasticsearch.index.engine.internal;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Sets;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -26,6 +27,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent; import org.apache.log4j.spi.LoggingEvent;
import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField; import org.apache.lucene.document.TextField;
@ -35,6 +37,7 @@ import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -42,6 +45,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.LuceneTest;
import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -121,14 +125,28 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
private IndexSettingsService replicaSettingsService; private IndexSettingsService replicaSettingsService;
private Settings defaultSettings; private Settings defaultSettings;
private int indexConcurrency;
private String codecName;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
CodecService codecService = new CodecService(shardId.index());
indexConcurrency = randomIntBetween(1, 20);
String name = Codec.getDefault().getName();
if (Arrays.asList(codecService.availableCodecs()).contains(name)) {
// some codecs are read only so we only take the ones that we have in the service and randomly
// selected by lucene test case.
codecName = name;
} else {
codecName = "default";
}
defaultSettings = ImmutableSettings.builder() defaultSettings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, randomBoolean()) .put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, randomBoolean())
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us .put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, randomBoolean()) .put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, randomBoolean())
.put(EngineConfig.INDEX_CODEC_SETTING, codecName)
.put(EngineConfig.INDEX_CONCURRENCY_SETTING, indexConcurrency)
.build(); // TODO randomize more settings .build(); // TODO randomize more settings
threadPool = new ThreadPool(getClass().getName()); threadPool = new ThreadPool(getClass().getName());
store = createStore(); store = createStore();
@ -138,12 +156,20 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); engineSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
translog = createTranslog(); translog = createTranslog();
engine = createEngine(engineSettingsService, store, translog); engine = createEngine(engineSettingsService, store, translog);
LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)engine).getCurrentIndexWriterConfig();
assertEquals(((InternalEngine)engine).config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
if (randomBoolean()) { if (randomBoolean()) {
((InternalEngine)engine).config().setEnableGcDeletes(false); ((InternalEngine)engine).config().setEnableGcDeletes(false);
} }
replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build()); replicaSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
replicaTranslog = createTranslogReplica(); replicaTranslog = createTranslogReplica();
replicaEngine = createEngine(replicaSettingsService, storeReplica, replicaTranslog); replicaEngine = createEngine(replicaSettingsService, storeReplica, replicaTranslog);
currentIndexWriterConfig = ((InternalEngine)replicaEngine).getCurrentIndexWriterConfig();
assertEquals(((InternalEngine)replicaEngine).config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
if (randomBoolean()) { if (randomBoolean()) {
((InternalEngine)engine).config().setEnableGcDeletes(false); ((InternalEngine)engine).config().setEnableGcDeletes(false);
@ -1511,54 +1537,46 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
@Test @Test
public void testSettings() { public void testSettings() {
InternalEngine engine = (InternalEngine) this.engine;
CodecService codecService = new CodecService(shardId.index());
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(engine.config().getIndexConcurrency(), indexConcurrency);
assertEquals(currentIndexWriterConfig.getMaxThreadStates(), indexConcurrency);
IndexDynamicSettingsModule settings = new IndexDynamicSettingsModule(); IndexDynamicSettingsModule settings = new IndexDynamicSettingsModule();
assertTrue(settings.containsSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING)); assertTrue(settings.containsSetting(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH)); assertTrue(settings.containsSetting(EngineConfig.INDEX_COMPOUND_ON_FLUSH));
assertTrue(settings.containsSetting(EngineConfig.INDEX_GC_DELETES_SETTING)); assertTrue(settings.containsSetting(EngineConfig.INDEX_GC_DELETES_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_CODEC_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING)); assertTrue(settings.containsSetting(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING));
assertTrue(settings.containsSetting(EngineConfig.INDEX_CONCURRENCY_SETTING));
InternalEngine engine = (InternalEngine) this.engine;
CodecService codecService = new CodecService(shardId.index());
final int iters = between(1, 20); final int iters = between(1, 20);
for (int i = 0; i < iters; i++) { for (int i = 0; i < iters; i++) {
boolean compoundOnFlush = randomBoolean(); boolean compoundOnFlush = randomBoolean();
boolean failOnCorruption = randomBoolean(); boolean failOnCorruption = randomBoolean();
boolean failOnMerge = randomBoolean(); boolean failOnMerge = randomBoolean();
long gcDeletes = Math.max(0, randomLong()); long gcDeletes = Math.max(0, randomLong());
int indexConcurrency = randomIntBetween(1, 20);
String codecName = randomFrom(codecService.availableCodecs());
Settings build = ImmutableSettings.builder() Settings build = ImmutableSettings.builder()
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, failOnCorruption) .put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, failOnCorruption)
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush) .put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush)
.put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes) .put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes)
.put(EngineConfig.INDEX_CODEC_SETTING, codecName)
.put(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, failOnMerge) .put(EngineConfig.INDEX_FAIL_ON_MERGE_FAILURE_SETTING, failOnMerge)
.put(EngineConfig.INDEX_CONCURRENCY_SETTING, indexConcurrency)
.build(); .build();
engineSettingsService.refreshSettings(build); engineSettingsService.refreshSettings(build);
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
assertEquals(engine.config().isCompoundOnFlush(), compoundOnFlush); assertEquals(engine.config().isCompoundOnFlush(), compoundOnFlush);
assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush); assertEquals(currentIndexWriterConfig.getUseCompoundFile(), compoundOnFlush);
assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes); assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.getGcDeletesInMillis(), gcDeletes); assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(engine.config().isFailEngineOnCorruption(), failOnCorruption); assertEquals(engine.config().isFailEngineOnCorruption(), failOnCorruption);
assertEquals(engine.config().isFailOnMergeFailure(), failOnMerge); // only on the holder assertEquals(engine.config().isFailOnMergeFailure(), failOnMerge); // only on the holder
assertEquals(engine.config().getIndexConcurrency(), indexConcurrency);
assertEquals(currentIndexWriterConfig.getMaxThreadStates(), indexConcurrency);
} }
} }

View File

@ -165,7 +165,7 @@ public class SimpleTTLTests extends ElasticsearchIntegrationTest {
@Override @Override
public boolean apply(Object input) { public boolean apply(Object input) {
if (rarely()) { if (rarely()) {
client().admin().indices().prepareFlush("test").setFull(true).get(); client().admin().indices().prepareFlush("test").get();
} else if (rarely()) { } else if (rarely()) {
client().admin().indices().prepareOptimize("test").setMaxNumSegments(1).get(); client().admin().indices().prepareOptimize("test").setMaxNumSegments(1).get();
} }