Shared Gateway: Allow to dynamically update the `snapshot_interval` using update settings, closes #954.

This commit is contained in:
kimchy 2011-05-21 02:31:24 +03:00
parent 6805d17fc9
commit d310038701
1 changed files with 30 additions and 4 deletions

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard;
@ -48,6 +49,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private final ThreadPool threadPool;
private final IndexSettingsService indexSettingsService;
private final InternalIndexShard indexShard;
private final IndexShardGateway shardGateway;
@ -61,7 +64,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private volatile long lastTranslogLength;
private final TimeValue snapshotInterval;
private volatile TimeValue snapshotInterval;
private volatile ScheduledFuture snapshotScheduleFuture;
@ -69,15 +72,37 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
private IndexShardGateway.SnapshotLock snapshotLock;
@Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings,
private final SnapshotRunnable snapshotRunnable = new SnapshotRunnable();
private final ApplySettings applySettings = new ApplySettings();
@Inject public IndexShardGatewayService(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
ThreadPool threadPool, IndexShard indexShard, IndexShardGateway shardGateway) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.indexSettingsService = indexSettingsService;
this.indexShard = (InternalIndexShard) indexShard;
this.shardGateway = shardGateway;
this.snapshotOnClose = componentSettings.getAsBoolean("snapshot_on_close", true);
this.snapshotInterval = componentSettings.getAsTime("snapshot_interval", TimeValue.timeValueSeconds(10));
indexSettingsService.addListener(applySettings);
}
class ApplySettings implements IndexSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
TimeValue snapshotInterval = settings.getAsTime("index.gateway.snapshot_interval", IndexShardGatewayService.this.snapshotInterval);
if (!snapshotInterval.equals(IndexShardGatewayService.this.snapshotInterval)) {
logger.info("updating snapshot_interval from [{}] to [{}]", IndexShardGatewayService.this.snapshotInterval, snapshotInterval);
IndexShardGatewayService.this.snapshotInterval = snapshotInterval;
if (snapshotScheduleFuture != null) {
snapshotScheduleFuture.cancel(false);
snapshotScheduleFuture = null;
}
scheduleSnapshotIfNeeded();
}
}
}
/**
@ -280,6 +305,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
}
public synchronized void close(boolean delete) {
indexSettingsService.removeListener(applySettings);
if (snapshotScheduleFuture != null) {
snapshotScheduleFuture.cancel(true);
snapshotScheduleFuture = null;
@ -319,12 +345,12 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
if (logger.isDebugEnabled()) {
logger.debug("scheduling snapshot every [{}]", snapshotInterval);
}
snapshotScheduleFuture = threadPool.schedule(snapshotInterval, ThreadPool.Names.SNAPSHOT, new SnapshotRunnable());
snapshotScheduleFuture = threadPool.schedule(snapshotInterval, ThreadPool.Names.SNAPSHOT, snapshotRunnable);
}
}
private class SnapshotRunnable implements Runnable {
@Override public void run() {
@Override public synchronized void run() {
try {
snapshot("scheduled");
} catch (Throwable e) {