diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 1029c0d4d81..4edd4ef7a67 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -35,7 +35,9 @@ import org.elasticsearch.index.translog.TranslogStreams; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * @author kimchy (shay.banon) @@ -54,7 +56,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog private final AtomicInteger operationCounter = new AtomicInteger(); - private long lastPosition = 0; + private AtomicLong lastPosition = new AtomicLong(0); + private AtomicLong lastWrittenPosition = new AtomicLong(0); private RafReference raf; @@ -113,7 +116,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog @Override public void newTranslog() throws TranslogException { synchronized (mutex) { operationCounter.set(0); - lastPosition = 0; + lastPosition.set(0); + lastWrittenPosition.set(0); this.id = id + 1; if (raf != null) { raf.decreaseRefCount(true); @@ -131,7 +135,8 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog @Override public void newTranslog(long id) throws TranslogException { synchronized (mutex) { operationCounter.set(0); - lastPosition = 0; + lastPosition.set(0); + lastWrittenPosition.set(0); this.id = id; if (raf != null) { raf.decreaseRefCount(true); @@ -158,12 +163,15 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog out.seek(0); out.writeInt(size - 4); + long position = lastPosition.getAndAdd(size); + // use channel#write and not raf#write since it allows for concurrent writes + // with regards to positions + raf.channel().write(ByteBuffer.wrap(out.unsafeByteArray(), 0, size), position); + if (syncOnEachOperation) { + raf.channel().force(false); + } synchronized (mutex) { - raf.raf().write(out.unsafeByteArray(), 0, size); - if (syncOnEachOperation) { - sync(); - } - lastPosition += size; + lastWrittenPosition.getAndAdd(size); operationCounter.incrementAndGet(); } } catch (Exception e) { @@ -175,10 +183,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog synchronized (mutex) { try { raf.increaseRefCount(); + raf.channel().force(true); // sync here, so we make sure we read back teh data? if (useStream) { - return new FsStreamSnapshot(shardId, this.id, raf, lastPosition, operationCounter.get(), operationCounter.get()); + return new FsStreamSnapshot(shardId, this.id, raf, lastWrittenPosition.get(), operationCounter.get(), operationCounter.get()); } else { - return new FsChannelSnapshot(shardId, this.id, raf, lastPosition, operationCounter.get(), operationCounter.get()); + return new FsChannelSnapshot(shardId, this.id, raf, lastWrittenPosition.get(), operationCounter.get(), operationCounter.get()); } } catch (Exception e) { throw new TranslogException(shardId, "Failed to snapshot", e); @@ -193,12 +202,13 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } try { raf.increaseRefCount(); + raf.channel().force(true); // sync here, so we make sure we read back teh data? if (useStream) { - FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastPosition, operationCounter.get(), operationCounter.get() - snapshot.totalOperations()); + FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastWrittenPosition.get(), operationCounter.get(), operationCounter.get() - snapshot.totalOperations()); newSnapshot.seekForward(snapshot.position()); return newSnapshot; } else { - FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastPosition, operationCounter.get(), operationCounter.get() - snapshot.totalOperations()); + FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastWrittenPosition.get(), operationCounter.get(), operationCounter.get() - snapshot.totalOperations()); newSnapshot.seekForward(snapshot.position()); return newSnapshot; } @@ -212,7 +222,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog synchronized (mutex) { if (raf != null) { try { - raf.raf().getFD().sync(); + raf.channel().force(true); } catch (Exception e) { // ignore } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/RafReference.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/RafReference.java index f16c668267b..4c20eedeb34 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/RafReference.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/RafReference.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; import java.util.concurrent.atomic.AtomicInteger; /** @@ -36,11 +37,14 @@ public class RafReference { private final RandomAccessFile raf; + private final FileChannel channel; + private final AtomicInteger refCount = new AtomicInteger(); public RafReference(File file) throws FileNotFoundException { this.file = file; this.raf = new RandomAccessFile(file, "rw"); + this.channel = raf.getChannel(); this.refCount.incrementAndGet(); } @@ -48,6 +52,10 @@ public class RafReference { return this.file; } + public FileChannel channel() { + return this.channel; + } + public RandomAccessFile raf() { return this.raf; }