[Engine] Verify checksums on merge
Enable lucene verification of checksums on segments before merging them. This prevents corruption from existing segments from silently slipping into newer merged segments. Closes #7360
This commit is contained in:
parent
b00424aba7
commit
1711041c57
|
@ -97,6 +97,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
private volatile ByteSizeValue indexingBufferSize;
|
private volatile ByteSizeValue indexingBufferSize;
|
||||||
private volatile int indexConcurrency;
|
private volatile int indexConcurrency;
|
||||||
private volatile boolean compoundOnFlush = true;
|
private volatile boolean compoundOnFlush = true;
|
||||||
|
private volatile boolean checksumOnMerge = true;
|
||||||
|
|
||||||
private long gcDeletesInMillis;
|
private long gcDeletesInMillis;
|
||||||
|
|
||||||
|
@ -200,6 +201,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
this.similarityService = similarityService;
|
this.similarityService = similarityService;
|
||||||
this.codecService = codecService;
|
this.codecService = codecService;
|
||||||
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
|
this.compoundOnFlush = indexSettings.getAsBoolean(INDEX_COMPOUND_ON_FLUSH, this.compoundOnFlush);
|
||||||
|
this.checksumOnMerge = indexSettings.getAsBoolean(INDEX_CHECKSUM_ON_MERGE, this.checksumOnMerge);
|
||||||
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
|
this.indexConcurrency = indexSettings.getAsInt(INDEX_INDEX_CONCURRENCY, Math.max(IndexWriterConfig.DEFAULT_MAX_THREAD_STATES, (int) (EsExecutors.boundedNumberOfProcessors(indexSettings) * 0.65)));
|
||||||
this.versionMap = new LiveVersionMap();
|
this.versionMap = new LiveVersionMap();
|
||||||
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
|
this.dirtyLocks = new Object[indexConcurrency * 50]; // we multiply it to have enough...
|
||||||
|
@ -1397,6 +1399,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
* in combination with the default writelock timeout*/
|
* in combination with the default writelock timeout*/
|
||||||
config.setWriteLockTimeout(5000);
|
config.setWriteLockTimeout(5000);
|
||||||
config.setUseCompoundFile(this.compoundOnFlush);
|
config.setUseCompoundFile(this.compoundOnFlush);
|
||||||
|
config.setCheckIntegrityAtMerge(checksumOnMerge);
|
||||||
// Warm-up hook for newly-merged segments. Warming up segments here is better since it will be performed at the end
|
// Warm-up hook for newly-merged segments. Warming up segments here is better since it will be performed at the end
|
||||||
// of the merge operation and won't slow down _refresh
|
// of the merge operation and won't slow down _refresh
|
||||||
config.setMergedSegmentWarmer(new IndexReaderWarmer() {
|
config.setMergedSegmentWarmer(new IndexReaderWarmer() {
|
||||||
|
@ -1431,6 +1434,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
|
|
||||||
public static final String INDEX_INDEX_CONCURRENCY = "index.index_concurrency";
|
public static final String INDEX_INDEX_CONCURRENCY = "index.index_concurrency";
|
||||||
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
|
public static final String INDEX_COMPOUND_ON_FLUSH = "index.compound_on_flush";
|
||||||
|
public static final String INDEX_CHECKSUM_ON_MERGE = "index.checksum_on_merge";
|
||||||
public static final String INDEX_GC_DELETES = "index.gc_deletes";
|
public static final String INDEX_GC_DELETES = "index.gc_deletes";
|
||||||
public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
|
public static final String INDEX_FAIL_ON_MERGE_FAILURE = "index.fail_on_merge_failure";
|
||||||
public static final String INDEX_FAIL_ON_CORRUPTION = "index.fail_on_corruption";
|
public static final String INDEX_FAIL_ON_CORRUPTION = "index.fail_on_corruption";
|
||||||
|
@ -1453,6 +1457,13 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
|
||||||
indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
|
indexWriter.getConfig().setUseCompoundFile(compoundOnFlush);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final boolean checksumOnMerge = settings.getAsBoolean(INDEX_CHECKSUM_ON_MERGE, InternalEngine.this.checksumOnMerge);
|
||||||
|
if (checksumOnMerge != InternalEngine.this.checksumOnMerge) {
|
||||||
|
logger.info("updating {} from [{}] to [{}]", InternalEngine.INDEX_CHECKSUM_ON_MERGE, InternalEngine.this.checksumOnMerge, checksumOnMerge);
|
||||||
|
InternalEngine.this.checksumOnMerge = checksumOnMerge;
|
||||||
|
indexWriter.getConfig().setCheckIntegrityAtMerge(checksumOnMerge);
|
||||||
|
}
|
||||||
|
|
||||||
InternalEngine.this.failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngine.this.failEngineOnCorruption);
|
InternalEngine.this.failEngineOnCorruption = settings.getAsBoolean(INDEX_FAIL_ON_CORRUPTION, InternalEngine.this.failEngineOnCorruption);
|
||||||
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngine.this.indexConcurrency);
|
int indexConcurrency = settings.getAsInt(INDEX_INDEX_CONCURRENCY, InternalEngine.this.indexConcurrency);
|
||||||
boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, InternalEngine.this.failOnMergeFailure);
|
boolean failOnMergeFailure = settings.getAsBoolean(INDEX_FAIL_ON_MERGE_FAILURE, InternalEngine.this.failOnMergeFailure);
|
||||||
|
|
|
@ -89,6 +89,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
|
||||||
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_CODEC);
|
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_CODEC);
|
||||||
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_MERGE_FAILURE);
|
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_MERGE_FAILURE);
|
||||||
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_CORRUPTION);
|
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_FAIL_ON_CORRUPTION);
|
||||||
|
indexDynamicSettings.addDynamicSetting(InternalEngine.INDEX_CHECKSUM_ON_MERGE, Validator.BOOLEAN);
|
||||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
|
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
|
||||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
|
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
|
||||||
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
|
indexDynamicSettings.addDynamicSetting(ShardSlowLogIndexingService.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
|
||||||
|
|
|
@ -34,6 +34,13 @@ public class InternalEngineSettingsTest extends ElasticsearchSingleNodeTest {
|
||||||
assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(false));
|
assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(false));
|
||||||
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, true).build()).get();
|
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, true).build()).get();
|
||||||
assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(true));
|
assertThat(engine(service).currentIndexWriterConfig().getUseCompoundFile(), is(true));
|
||||||
|
|
||||||
|
// INDEX_CHECKSUM_ON_MERGE
|
||||||
|
assertThat(engine(service).currentIndexWriterConfig().getCheckIntegrityAtMerge(), is(true));
|
||||||
|
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_CHECKSUM_ON_MERGE, false).build()).get();
|
||||||
|
assertThat(engine(service).currentIndexWriterConfig().getCheckIntegrityAtMerge(), is(false));
|
||||||
|
client().admin().indices().prepareUpdateSettings("foo").setSettings(ImmutableSettings.builder().put(InternalEngine.INDEX_CHECKSUM_ON_MERGE, true).build()).get();
|
||||||
|
assertThat(engine(service).currentIndexWriterConfig().getCheckIntegrityAtMerge(), is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -115,6 +115,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
super.setUp();
|
super.setUp();
|
||||||
defaultSettings = ImmutableSettings.builder()
|
defaultSettings = ImmutableSettings.builder()
|
||||||
.put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean())
|
.put(InternalEngine.INDEX_COMPOUND_ON_FLUSH, getRandom().nextBoolean())
|
||||||
|
.put(InternalEngine.INDEX_CHECKSUM_ON_MERGE, getRandom().nextBoolean())
|
||||||
.put(InternalEngine.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us
|
.put(InternalEngine.INDEX_GC_DELETES, "1h") // make sure this doesn't kick in on us
|
||||||
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, randomBoolean())
|
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, randomBoolean())
|
||||||
.build(); // TODO randomize more settings
|
.build(); // TODO randomize more settings
|
||||||
|
|
Loading…
Reference in New Issue