diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java index d0d6d8e0345..5828abe1dcb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/TranslogService.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.FlushNotAllowedEngineException; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; @@ -45,23 +46,28 @@ public class TranslogService extends AbstractIndexShardComponent { private final ThreadPool threadPool; + private final IndexSettingsService indexSettingsService; + private final IndexShard indexShard; private final Translog translog; - private final int flushThresholdOperations; + private int flushThresholdOperations; - private final ByteSizeValue flushThresholdSize; + private ByteSizeValue flushThresholdSize; - private final TimeValue flushThresholdPeriod; + private TimeValue flushThresholdPeriod; private final TimeValue interval; private ScheduledFuture future; - @Inject public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexShard indexShard, Translog translog) { + private final ApplySettings applySettings = new ApplySettings(); + + @Inject public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, ThreadPool threadPool, IndexShard indexShard, Translog translog) { super(shardId, indexSettings); this.threadPool = threadPool; + this.indexSettingsService = indexSettingsService; this.indexShard = indexShard; this.translog = translog; @@ -73,13 +79,36 @@ public class TranslogService extends AbstractIndexShardComponent { logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod); this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush()); + + indexSettingsService.addListener(applySettings); } public void close() { + indexSettingsService.removeListener(applySettings); this.future.cancel(true); } + class ApplySettings implements IndexSettingsService.Listener { + @Override public void onRefreshSettings(Settings settings) { + int flushThresholdOperations = settings.getAsInt("index.translog.flush_threshold_ops", TranslogService.this.flushThresholdOperations); + if (flushThresholdOperations != TranslogService.this.flushThresholdOperations) { + logger.info("updating flush_threshold_ops from [{}] to [{}]", TranslogService.this.flushThresholdOperations, flushThresholdOperations); + TranslogService.this.flushThresholdOperations = flushThresholdOperations; + } + ByteSizeValue flushThresholdSize = settings.getAsBytesSize("index.translog.flush_threshold_size", TranslogService.this.flushThresholdSize); + if (!flushThresholdSize.equals(TranslogService.this.flushThresholdSize)) { + logger.info("updating flush_threshold_size from [{}] to [{}]", TranslogService.this.flushThresholdSize, flushThresholdSize); + TranslogService.this.flushThresholdSize = flushThresholdSize; + } + TimeValue flushThresholdPeriod = settings.getAsTime("index.translog.flush_threshold_period", TranslogService.this.flushThresholdPeriod); + if (!flushThresholdPeriod.equals(TranslogService.this.flushThresholdPeriod)) { + logger.info("updating flush_threshold_period from [{}] to [{}]", TranslogService.this.flushThresholdPeriod, flushThresholdPeriod); + TranslogService.this.flushThresholdPeriod = flushThresholdPeriod; + } + } + } + private class TranslogBasedFlush implements Runnable { private volatile long lastFlushTime = System.currentTimeMillis();