Update Settings: Allow to dynamically change `index.term_index_interval` and `index.term_index_divisor, closes #762.
This commit is contained in:
parent
353d2cb21f
commit
ad0d681b6d
|
@ -307,7 +307,7 @@ public class SimpleEngineBenchmark {
|
|||
|
||||
ThreadPool threadPool = new ThreadPool();
|
||||
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
|
||||
Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)),
|
||||
Engine engine = new RobinEngine(shardId, settings, new IndexSettingsService(shardId.index(), settings), store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS)),
|
||||
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NonBloomCache(shardId.index()));
|
||||
engine.start();
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.elasticsearch.index.merge.policy.EnableMergePolicy;
|
|||
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
|
@ -75,14 +76,16 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
|
||||
private final boolean compoundFormat;
|
||||
|
||||
private final int termIndexInterval;
|
||||
private volatile int termIndexInterval;
|
||||
|
||||
private final int termIndexDivisor;
|
||||
private volatile int termIndexDivisor;
|
||||
|
||||
private final ReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||
|
||||
private final AtomicBoolean optimizeMutex = new AtomicBoolean();
|
||||
|
||||
private final IndexSettingsService indexSettingsService;
|
||||
|
||||
private final Store store;
|
||||
|
||||
private final SnapshotDeletionPolicy deletionPolicy;
|
||||
|
@ -124,7 +127,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
|
||||
private final Object[] dirtyLocks;
|
||||
|
||||
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
|
||||
private final ApplySettings applySettings = new ApplySettings();
|
||||
|
||||
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
|
||||
Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
|
||||
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
|
||||
AnalysisService analysisService, SimilarityService similarityService,
|
||||
BloomCache bloomCache) throws EngineException {
|
||||
|
@ -139,6 +145,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
this.compoundFormat = indexSettings.getAsBoolean("index.compound_format", indexSettings.getAsBoolean("index.merge.policy.use_compound_file", store == null ? false : store.suggestUseCompoundFile()));
|
||||
this.asyncLoadBloomFilter = componentSettings.getAsBoolean("async_load_bloom", true); // Here for testing, should always be true
|
||||
|
||||
this.indexSettingsService = indexSettingsService;
|
||||
this.store = store;
|
||||
this.deletionPolicy = deletionPolicy;
|
||||
this.translog = translog;
|
||||
|
@ -153,6 +160,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
for (int i = 0; i < dirtyLocks.length; i++) {
|
||||
dirtyLocks[i] = new Object();
|
||||
}
|
||||
|
||||
this.indexSettingsService.addListener(applySettings);
|
||||
}
|
||||
|
||||
@Override public void updateIndexingBufferSize(ByteSizeValue indexingBufferSize) {
|
||||
|
@ -794,6 +803,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
if (closed) {
|
||||
return;
|
||||
}
|
||||
indexSettingsService.removeListener(applySettings);
|
||||
closed = true;
|
||||
rwl.writeLock().lock();
|
||||
this.versionMap.clear();
|
||||
|
@ -879,6 +889,31 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
|
|||
return indexWriter;
|
||||
}
|
||||
|
||||
class ApplySettings implements IndexSettingsService.Listener {
|
||||
@Override public void onRefreshSettings(Settings settings) {
|
||||
int termIndexInterval = settings.getAsInt("index.term_index_interval", RobinEngine.this.termIndexInterval);
|
||||
int termIndexDivisor = settings.getAsInt("index.term_index_divisor", RobinEngine.this.termIndexDivisor); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR
|
||||
|
||||
if (termIndexInterval != RobinEngine.this.termIndexInterval || termIndexDivisor != RobinEngine.this.termIndexDivisor) {
|
||||
rwl.readLock().lock();
|
||||
try {
|
||||
if (termIndexInterval != RobinEngine.this.termIndexInterval) {
|
||||
logger.info("updating term_index_interval from [{}] to [{}]", RobinEngine.this.termIndexInterval, termIndexInterval);
|
||||
RobinEngine.this.termIndexInterval = termIndexInterval;
|
||||
}
|
||||
if (termIndexDivisor != RobinEngine.this.termIndexDivisor) {
|
||||
logger.info("updating term_index_divisor from [{}] to [{}]", RobinEngine.this.termIndexDivisor, termIndexDivisor);
|
||||
RobinEngine.this.termIndexDivisor = termIndexDivisor;
|
||||
}
|
||||
} finally {
|
||||
rwl.readLock().unlock();
|
||||
}
|
||||
// we need to do a full flush in order to have the new settings taken affect
|
||||
flush(new Flush().full(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private AcquirableResource<ReaderSearcherHolder> buildNrtResource(IndexWriter indexWriter) throws IOException {
|
||||
IndexReader indexReader = indexWriter.getReader();
|
||||
ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(indexReader);
|
||||
|
|
|
@ -42,7 +42,8 @@ public class IndexSettingsService extends AbstractIndexComponent {
|
|||
}
|
||||
|
||||
public synchronized void refreshSettings(Settings settings) {
|
||||
if (this.settings.getAsMap().equals(settings.getAsMap())) {
|
||||
// this.settings include also the node settings
|
||||
if (this.settings.getByPrefix("index.").getAsMap().equals(settings.getByPrefix("index.").getAsMap())) {
|
||||
// nothing to update, same settings
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.index.analysis.AnalysisService;
|
|||
import org.elasticsearch.index.cache.bloom.none.NonBloomCache;
|
||||
import org.elasticsearch.index.engine.AbstractSimpleEngineTests;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -35,7 +36,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
|
|||
public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
|
||||
|
||||
protected Engine createEngine(Store store, Translog translog) {
|
||||
return new RobinEngine(shardId, EMPTY_SETTINGS, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
|
||||
return new RobinEngine(shardId, EMPTY_SETTINGS, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
|
||||
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NonBloomCache(shardId.index()));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue