diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java index 3d15843186b..aef2550bd1c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java @@ -23,24 +23,27 @@ import org.apache.lucene.index.IndexReader; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.gateway.none.NoneGateway; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; import org.elasticsearch.index.gateway.RecoveryStatus; import org.elasticsearch.index.gateway.SnapshotStatus; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.fs.FsTranslog; +import org.elasticsearch.threadpool.ThreadPool; import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.util.concurrent.ScheduledFuture; /** * @author kimchy (shay.banon) @@ -51,9 +54,22 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen private final RecoveryStatus recoveryStatus = new RecoveryStatus(); - @Inject public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) { + private final ScheduledFuture flushScheduler; + + @Inject public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexShard indexShard) { super(shardId, indexSettings); this.indexShard = (InternalIndexShard) indexShard; + + TimeValue sync = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(1)); + if (sync.millis() > 0) { + this.indexShard.translog().syncOnEachOperation(false); + flushScheduler = threadPool.scheduleWithFixedDelay(new Sync(), sync); + } else if (sync.millis() == 0) { + flushScheduler = null; + this.indexShard.translog().syncOnEachOperation(true); + } else { + flushScheduler = null; + } } @Override public String toString() { @@ -132,7 +148,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen } @Override public String type() { - return NoneGateway.TYPE; + return "local"; } @Override public SnapshotStatus snapshot(Snapshot snapshot) { @@ -152,5 +168,16 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen } @Override public void close(boolean delete) { + if (flushScheduler != null) { + flushScheduler.cancel(false); + } + } + + private class Sync implements Runnable { + @Override public void run() { + if (indexShard.state() == IndexShardState.STARTED) { + indexShard.translog().sync(); + } + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index b5fab93ae4b..c29ff20cb2d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -88,9 +88,11 @@ public interface Translog extends IndexShardComponent { Snapshot snapshot(Snapshot snapshot); /** - * Flushes the translog. + * Sync's the translog. */ - void flush(); + void sync(); + + void syncOnEachOperation(boolean syncOnEachOperation); /** * Closes the transaction log. diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 74cdc79725c..0a294eb5799 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -48,6 +48,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog private final Object mutex = new Object(); + private boolean syncOnEachOperation = false; + private volatile long id = 0; private final AtomicInteger operationCounter = new AtomicInteger(); @@ -140,6 +142,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog synchronized (mutex) { raf.raf().write(out.unsafeByteArray(), 0, size); + if (syncOnEachOperation) { + sync(); + } lastPosition += size; operationCounter.incrementAndGet(); } @@ -185,7 +190,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } } - @Override public void flush() { + @Override public void sync() { synchronized (mutex) { if (raf != null) { try { @@ -197,6 +202,12 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } } + @Override public void syncOnEachOperation(boolean syncOnEachOperation) { + synchronized (mutex) { + this.syncOnEachOperation = syncOnEachOperation; + } + } + @Override public void close(boolean delete) { synchronized (mutex) { if (raf != null) {