diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index e215669761c..84278fa92b3 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -39,6 +39,7 @@ import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; public class TranslogWriter extends BaseTranslogReader implements Closeable { @@ -60,7 +61,8 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { private volatile long totalOffset; protected final AtomicBoolean closed = new AtomicBoolean(false); - + // lock order synchronized(syncLock) -> synchronized(this) + private final Object syncLock = new Object(); public TranslogWriter(ShardId shardId, long generation, FileChannel channel, Path path, ByteSizeValue bufferSize) throws IOException { super(generation, channel, path, channel.position()); @@ -146,23 +148,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { * raising the exception. */ public void sync() throws IOException { - if (syncNeeded()) { - synchronized (this) { - ensureOpen(); - final long offsetToSync; - final int opsCounter; - try { - outputStream.flush(); - offsetToSync = totalOffset; - opsCounter = operationCounter; - checkpoint(offsetToSync, opsCounter, generation, channel, path); - } catch (Throwable ex) { - closeWithTragicEvent(ex); - throw ex; - } - lastSyncedOffset = offsetToSync; - } - } + syncUpTo(Long.MAX_VALUE); } /** @@ -229,9 +215,38 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { * @return true if this call caused an actual sync operation */ public boolean syncUpTo(long offset) throws IOException { - if (lastSyncedOffset < offset) { - sync(); - return true; + if (lastSyncedOffset < offset && syncNeeded()) { + synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait + if (lastSyncedOffset < offset && syncNeeded()) { + // double checked locking - we don't want to fsync unless we have to and now that we have + // the lock we should check again since if this code is busy we might have fsynced enough already + final long offsetToSync; + final int opsCounter; + synchronized (this) { + ensureOpen(); + try { + outputStream.flush(); + offsetToSync = totalOffset; + opsCounter = operationCounter; + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + } + // now do the actual fsync outside of the synchronized block such that + // we can continue writing to the buffer etc. + try { + channel.force(false); + writeCheckpoint(offsetToSync, opsCounter, path.getParent(), generation, StandardOpenOption.WRITE); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + assert lastSyncedOffset <= offsetToSync : "illegal state: " + lastSyncedOffset + " <= " + offsetToSync; + lastSyncedOffset = offsetToSync; // write protected by syncLock + return true; + } + } } return false; } @@ -254,11 +269,6 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { Channels.readFromFileChannelWithEofException(channel, position, targetBuffer); } - private synchronized void checkpoint(long lastSyncPosition, int operationCounter, long generation, FileChannel translogFileChannel, Path translogFilePath) throws IOException { - translogFileChannel.force(false); - writeCheckpoint(lastSyncPosition, operationCounter, translogFilePath.getParent(), generation, StandardOpenOption.WRITE); - } - private static void writeCheckpoint(long syncPosition, int numOperations, Path translogFile, long generation, OpenOption... options) throws IOException { final Path checkpointFile = translogFile.resolve(Translog.CHECKPOINT_FILE_NAME); Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation); @@ -269,7 +279,7 @@ public class TranslogWriter extends BaseTranslogReader implements Closeable { static final ChannelFactory DEFAULT = new ChannelFactory(); - // only for testing until we have a disk-full FileSystemt + // only for testing until we have a disk-full FileSystem public FileChannel open(Path file) throws IOException { return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); }