Update Settings: Allow to dynamically update `index.translog` settings, closes #765.
This commit is contained in:
parent
c2a0e0b767
commit
80a797fc4f
|
@ -28,6 +28,7 @@ import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.engine.EngineClosedException;
|
import org.elasticsearch.index.engine.EngineClosedException;
|
||||||
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
|
import org.elasticsearch.index.engine.FlushNotAllowedEngineException;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||||
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
import org.elasticsearch.index.shard.IndexShardState;
|
import org.elasticsearch.index.shard.IndexShardState;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
@ -45,23 +46,28 @@ public class TranslogService extends AbstractIndexShardComponent {
|
||||||
|
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
private final IndexSettingsService indexSettingsService;
|
||||||
|
|
||||||
private final IndexShard indexShard;
|
private final IndexShard indexShard;
|
||||||
|
|
||||||
private final Translog translog;
|
private final Translog translog;
|
||||||
|
|
||||||
private final int flushThresholdOperations;
|
private int flushThresholdOperations;
|
||||||
|
|
||||||
private final ByteSizeValue flushThresholdSize;
|
private ByteSizeValue flushThresholdSize;
|
||||||
|
|
||||||
private final TimeValue flushThresholdPeriod;
|
private TimeValue flushThresholdPeriod;
|
||||||
|
|
||||||
private final TimeValue interval;
|
private final TimeValue interval;
|
||||||
|
|
||||||
private ScheduledFuture future;
|
private ScheduledFuture future;
|
||||||
|
|
||||||
@Inject public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexShard indexShard, Translog translog) {
|
private final ApplySettings applySettings = new ApplySettings();
|
||||||
|
|
||||||
|
@Inject public TranslogService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, ThreadPool threadPool, IndexShard indexShard, Translog translog) {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
|
this.indexSettingsService = indexSettingsService;
|
||||||
this.indexShard = indexShard;
|
this.indexShard = indexShard;
|
||||||
this.translog = translog;
|
this.translog = translog;
|
||||||
|
|
||||||
|
@ -73,13 +79,36 @@ public class TranslogService extends AbstractIndexShardComponent {
|
||||||
logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);
|
logger.debug("interval [{}], flush_threshold_ops [{}], flush_threshold_size [{}], flush_threshold_period [{}]", interval, flushThresholdOperations, flushThresholdSize, flushThresholdPeriod);
|
||||||
|
|
||||||
this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush());
|
this.future = threadPool.schedule(interval, ThreadPool.Names.SAME, new TranslogBasedFlush());
|
||||||
|
|
||||||
|
indexSettingsService.addListener(applySettings);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
|
indexSettingsService.removeListener(applySettings);
|
||||||
this.future.cancel(true);
|
this.future.cancel(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class ApplySettings implements IndexSettingsService.Listener {
|
||||||
|
@Override public void onRefreshSettings(Settings settings) {
|
||||||
|
int flushThresholdOperations = settings.getAsInt("index.translog.flush_threshold_ops", TranslogService.this.flushThresholdOperations);
|
||||||
|
if (flushThresholdOperations != TranslogService.this.flushThresholdOperations) {
|
||||||
|
logger.info("updating flush_threshold_ops from [{}] to [{}]", TranslogService.this.flushThresholdOperations, flushThresholdOperations);
|
||||||
|
TranslogService.this.flushThresholdOperations = flushThresholdOperations;
|
||||||
|
}
|
||||||
|
ByteSizeValue flushThresholdSize = settings.getAsBytesSize("index.translog.flush_threshold_size", TranslogService.this.flushThresholdSize);
|
||||||
|
if (!flushThresholdSize.equals(TranslogService.this.flushThresholdSize)) {
|
||||||
|
logger.info("updating flush_threshold_size from [{}] to [{}]", TranslogService.this.flushThresholdSize, flushThresholdSize);
|
||||||
|
TranslogService.this.flushThresholdSize = flushThresholdSize;
|
||||||
|
}
|
||||||
|
TimeValue flushThresholdPeriod = settings.getAsTime("index.translog.flush_threshold_period", TranslogService.this.flushThresholdPeriod);
|
||||||
|
if (!flushThresholdPeriod.equals(TranslogService.this.flushThresholdPeriod)) {
|
||||||
|
logger.info("updating flush_threshold_period from [{}] to [{}]", TranslogService.this.flushThresholdPeriod, flushThresholdPeriod);
|
||||||
|
TranslogService.this.flushThresholdPeriod = flushThresholdPeriod;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class TranslogBasedFlush implements Runnable {
|
private class TranslogBasedFlush implements Runnable {
|
||||||
|
|
||||||
private volatile long lastFlushTime = System.currentTimeMillis();
|
private volatile long lastFlushTime = System.currentTimeMillis();
|
||||||
|
|
Loading…
Reference in New Issue