[Engine] `index.fail_on_corruption` is not updateable
The `index.fail_on_corruption` was not updateable via the index settings API. This commit also fixed the setting prefix to be consistent with other setting on the engine. Yet, this feature is unreleased so this won't break anything. Closes #6941
This commit is contained in:
parent
abeefbddea
commit
d65a9d63a2
|
@ -210,7 +210,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", true);
|
this.optimizeAutoGenerateId = indexSettings.getAsBoolean("index.optimize_auto_generated_id", true);
|
||||||
|
|
||||||
this.indexSettingsService.addListener(applySettings);
|
this.indexSettingsService.addListener(applySettings);
|
||||||
this.failEngineOnCorruption = indexSettings.getAsBoolean(ENGINE_FAIL_ON_CORRUPTION, true);
|
this.failEngineOnCorruption = indexSettings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, true);
|
||||||
this.failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, true);
|
this.failOnMergeFailure = indexSettings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, true);
|
||||||
if (failOnMergeFailure) {
|
if (failOnMergeFailure) {
|
||||||
this.mergeScheduler.addFailureListener(new FailEngineOnMergeFailure());
|
this.mergeScheduler.addFailureListener(new FailEngineOnMergeFailure());
|
||||||
|
@ -1107,7 +1107,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
if (this.failEngineOnCorruption) {
|
if (this.failEngineOnCorruption) {
|
||||||
failEngine("corrupt file detected source: [" + source + "]", t);
|
failEngine("corrupt file detected source: [" + source + "]", t);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, ENGINE_FAIL_ON_CORRUPTION, this.failEngineOnCorruption);
|
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, INDEX_FAIL_ON_CORRUPTION, this.failEngineOnCorruption);
|
||||||
}
|
}
|
||||||
}else if (ExceptionsHelper.isOOM(t)) {
|
}else if (ExceptionsHelper.isOOM(t)) {
|
||||||
failEngine("out of memory", t);
|
failEngine("out of memory", t);
|
||||||
|
@ -1265,7 +1265,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
if (failEngineOnCorruption) {
|
if (failEngineOnCorruption) {
|
||||||
failEngine("corrupt file detected source: [merge]", e);
|
failEngine("corrupt file detected source: [merge]", e);
|
||||||
} else {
|
} else {
|
||||||
logger.warn("corrupt file detected source: [merge] but [{}] is set to [{}]", e, ENGINE_FAIL_ON_CORRUPTION, failEngineOnCorruption);
|
logger.warn("corrupt file detected source: [merge] but [{}] is set to [{}]", e, INDEX_FAIL_ON_CORRUPTION, failEngineOnCorruption);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
failEngine("merge exception", e);
|
failEngine("merge exception", e);
|
||||||
|
@ -1405,7 +1405,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
|
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
|
||||||
public static final String INDEX_GC_DELETES = "index.gc_deletes";
|
public static final String INDEX_GC_DELETES = "index.gc_deletes";
|
||||||
public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
|
public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
|
||||||
public static final String ENGINE_FAIL_ON_CORRUPTION = "index.fail_on_corruption";
|
public static final String INDEX_FAIL_ON_CORRUPTION = "index.fail_on_corruption";
|
||||||
|
|
||||||
|
|
||||||
class ApplySettings implements IndexSettingsService.Listener {
|
class ApplySettings implements IndexSettingsService.Listener {
|
||||||
|
@ -1425,7 +1425,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
|
indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
|
||||||
}
|
}
|
||||||
|
|
||||||
InternalEngine.this.failEngineOnCorruption = indexSettings.getAsBoolean(ENGINE_FAIL_ON_CORRUPTION, InternalEngine.this.failEngineOnCorruption);
|
InternalEngine.this.failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngine.this.failEngineOnCorruption);
|
||||||
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngine.this.indexConcurrency);
|
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngine.this.indexConcurrency);
|
||||||
boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, InternalEngine.this.failOnMergeFailure);
|
boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, InternalEngine.this.failOnMergeFailure);
|
||||||
String codecName = settings.get(INDEX_CODEC, InternalEngine.this.codecName);
|
String codecName = settings.get(INDEX_CODEC, InternalEngine.this.codecName);
|
||||||
|
|
|
@ -84,6 +84,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
|
||||||
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_GC_DELETES, Validator.TIME);
|
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_GC_DELETES, Validator.TIME);
|
||||||
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_CODEC);
|
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_CODEC);
|
||||||
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_MERGE_FAILURE);
|
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_MERGE_FAILURE);
|
||||||
|
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_CORRUPTION);
|
||||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
|
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
|
||||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
|
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
|
||||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
|
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
|
||||||
|
|
|
@ -114,7 +114,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
defaultSettings = ImmutableSettings.builder()
|
defaultSettings = ImmutableSettings.builder()
|
||||||
.put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean())
|
.put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean())
|
||||||
.put(InternalEngine.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us
|
.put(InternalEngine.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us
|
||||||
.put(InternalEngine.ENGINE_FAIL_ON_CORRUPTION, randomBoolean())
|
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, randomBoolean())
|
||||||
.build(); // TODO randomize more settings
|
.build(); // TODO randomize more settings
|
||||||
threadPool = new ThreadPool(getClass().getName());
|
threadPool = new ThreadPool(getClass().getName());
|
||||||
store = createStore();
|
store = createStore();
|
||||||
|
@ -607,7 +607,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
|
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
|
||||||
engine.create(new Engine.Create(null, newUid("1"), doc));
|
engine.create(new Engine.Create(null, newUid("1"), doc));
|
||||||
engine.flush(new Engine.Flush());
|
engine.flush(new Engine.Flush());
|
||||||
final boolean failEngine = defaultSettings.getAsBoolean(InternalEngine.ENGINE_FAIL_ON_CORRUPTION, false);
|
final boolean failEngine = defaultSettings.getAsBoolean(InternalEngine.INDEX_FAIL_ON_CORRUPTION, false);
|
||||||
final int failInPhase = randomIntBetween(1,3);
|
final int failInPhase = randomIntBetween(1,3);
|
||||||
try {
|
try {
|
||||||
engine.recover(new Engine.RecoveryHandler() {
|
engine.recover(new Engine.RecoveryHandler() {
|
||||||
|
|
|
@ -107,13 +107,18 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
|
||||||
}
|
}
|
||||||
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(3));
|
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(3));
|
||||||
|
|
||||||
|
final boolean failOnCorruption = randomBoolean();
|
||||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
|
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
|
||||||
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
|
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
|
||||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||||
.put(InternalEngine.ENGINE_FAIL_ON_CORRUPTION, true)
|
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, failOnCorruption)
|
||||||
.put("indices.recovery.concurrent_streams", 10)
|
.put("indices.recovery.concurrent_streams", 10)
|
||||||
));
|
));
|
||||||
|
if (failOnCorruption == false) { // test the dynamic setting
|
||||||
|
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder()
|
||||||
|
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)).get();
|
||||||
|
}
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
disableAllocation("test");
|
disableAllocation("test");
|
||||||
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
|
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
|
||||||
|
@ -215,7 +220,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||||
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
|
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
|
||||||
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||||
.put(InternalEngine.ENGINE_FAIL_ON_CORRUPTION, true)
|
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)
|
||||||
.put("indices.recovery.concurrent_streams", 10)
|
.put("indices.recovery.concurrent_streams", 10)
|
||||||
));
|
));
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
|
@ -303,7 +308,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
|
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||||
.put(InternalEngine.ENGINE_FAIL_ON_CORRUPTION, true)
|
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)
|
||||||
.put("index.routing.allocation.include._name", primariesNode.getNode().name())
|
.put("index.routing.allocation.include._name", primariesNode.getNode().name())
|
||||||
.put("indices.recovery.concurrent_streams", 10)
|
.put("indices.recovery.concurrent_streams", 10)
|
||||||
));
|
));
|
||||||
|
|
Loading…
Reference in New Issue