add auto sync of translog when using local gateway

This commit is contained in:
kimchy 2010-09-24 21:25:10 +02:00
parent 1c2733ebee
commit ab9aa15bb4
3 changed files with 46 additions and 6 deletions

View File

@ -23,24 +23,27 @@ import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.none.NoneGateway; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.gateway.IndexShardGateway; import org.elasticsearch.index.gateway.IndexShardGateway;
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException; import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
import org.elasticsearch.index.gateway.RecoveryStatus; import org.elasticsearch.index.gateway.RecoveryStatus;
import org.elasticsearch.index.gateway.SnapshotStatus; import org.elasticsearch.index.gateway.SnapshotStatus;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogStreams; import org.elasticsearch.index.translog.TranslogStreams;
import org.elasticsearch.index.translog.fs.FsTranslog; import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -51,9 +54,22 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
private final RecoveryStatus recoveryStatus = new RecoveryStatus(); private final RecoveryStatus recoveryStatus = new RecoveryStatus();
@Inject public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, IndexShard indexShard) { private final ScheduledFuture flushScheduler;
@Inject public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexShard indexShard) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.indexShard = (InternalIndexShard) indexShard; this.indexShard = (InternalIndexShard) indexShard;
TimeValue sync = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(1));
if (sync.millis() > 0) {
this.indexShard.translog().syncOnEachOperation(false);
flushScheduler = threadPool.scheduleWithFixedDelay(new Sync(), sync);
} else if (sync.millis() == 0) {
flushScheduler = null;
this.indexShard.translog().syncOnEachOperation(true);
} else {
flushScheduler = null;
}
} }
@Override public String toString() { @Override public String toString() {
@ -132,7 +148,7 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
} }
@Override public String type() { @Override public String type() {
return NoneGateway.TYPE; return "local";
} }
@Override public SnapshotStatus snapshot(Snapshot snapshot) { @Override public SnapshotStatus snapshot(Snapshot snapshot) {
@ -152,5 +168,16 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
} }
@Override public void close(boolean delete) { @Override public void close(boolean delete) {
if (flushScheduler != null) {
flushScheduler.cancel(false);
}
}
private class Sync implements Runnable {
@Override public void run() {
if (indexShard.state() == IndexShardState.STARTED) {
indexShard.translog().sync();
}
}
} }
} }

View File

@ -88,9 +88,11 @@ public interface Translog extends IndexShardComponent {
Snapshot snapshot(Snapshot snapshot); Snapshot snapshot(Snapshot snapshot);
/** /**
* Flushes the translog. * Sync's the translog.
*/ */
void flush(); void sync();
void syncOnEachOperation(boolean syncOnEachOperation);
/** /**
* Closes the transaction log. * Closes the transaction log.

View File

@ -48,6 +48,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
private final Object mutex = new Object(); private final Object mutex = new Object();
private boolean syncOnEachOperation = false;
private volatile long id = 0; private volatile long id = 0;
private final AtomicInteger operationCounter = new AtomicInteger(); private final AtomicInteger operationCounter = new AtomicInteger();
@ -140,6 +142,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
synchronized (mutex) { synchronized (mutex) {
raf.raf().write(out.unsafeByteArray(), 0, size); raf.raf().write(out.unsafeByteArray(), 0, size);
if (syncOnEachOperation) {
sync();
}
lastPosition += size; lastPosition += size;
operationCounter.incrementAndGet(); operationCounter.incrementAndGet();
} }
@ -185,7 +190,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
} }
@Override public void flush() { @Override public void sync() {
synchronized (mutex) { synchronized (mutex) {
if (raf != null) { if (raf != null) {
try { try {
@ -197,6 +202,12 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
} }
} }
@Override public void syncOnEachOperation(boolean syncOnEachOperation) {
synchronized (mutex) {
this.syncOnEachOperation = syncOnEachOperation;
}
}
@Override public void close(boolean delete) { @Override public void close(boolean delete) {
synchronized (mutex) { synchronized (mutex) {
if (raf != null) { if (raf != null) {