diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index f3a0dc20c22..8bfe42cbe44 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -126,7 +126,7 @@ public interface Translog extends IndexShardComponent, CloseableIndexComponent { /** * Sync's the translog. */ - void sync(); + void sync() throws IOException; boolean syncNeeded(); diff --git a/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java index 91c69c512d5..0166b85d784 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/BufferingFsTranslogFile.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.translog.TranslogException; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -39,10 +40,11 @@ public class BufferingFsTranslogFile implements FsTranslogFile { private final RafReference raf; private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final AtomicBoolean closed = new AtomicBoolean(); private volatile int operationCounter; - private long lastPosition; + private volatile long lastPosition; private volatile long lastWrittenPosition; private volatile long lastSyncPosition = 0; @@ -98,6 +100,7 @@ public class BufferingFsTranslogFile implements FsTranslogFile { } private void flushBuffer() throws IOException { + assert (((ReentrantReadWriteLock.WriteLock) rwl.writeLock()).isHeldByCurrentThread()); if (bufferCount > 0) { // we use the channel to write, since on windows, writing to the RAF might not be reflected // when reading through the channel @@ -146,40 +149,36 @@ public class BufferingFsTranslogFile implements FsTranslogFile { } @Override - public void sync() { - try { - // check if we really need to sync here... - long last = lastPosition; - if (last == lastSyncPosition) { - return; - } - lastSyncPosition = last; - rwl.writeLock().lock(); - try { - flushBuffer(); - } finally { - rwl.writeLock().unlock(); - } - raf.channel().force(false); - } catch (Exception e) { - // ignore + public void sync() throws IOException { + if (!syncNeeded()) { + return; } + rwl.writeLock().lock(); + try { + flushBuffer(); + lastSyncPosition = lastPosition; + } finally { + rwl.writeLock().unlock(); + } + raf.channel().force(false); } @Override public void close(boolean delete) { - if (!delete) { - rwl.writeLock().lock(); - try { - flushBuffer(); - sync(); - } catch (IOException e) { - throw new TranslogException(shardId, "failed to close", e); - } finally { - rwl.writeLock().unlock(); - } + if (!closed.compareAndSet(false, true)) { + return; + } + try { + if (!delete) { + try { + sync(); + } catch (Exception e) { + throw new TranslogException(shardId, "failed to sync on close", e); + } + } + } finally { + raf.decreaseRefCount(delete); } - raf.decreaseRefCount(delete); } @Override diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 21d9e8d935a..d485e695905 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -391,12 +391,20 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog } @Override - public void sync() { + public void sync() throws IOException { FsTranslogFile current1 = this.current; if (current1 == null) { return; } - current1.sync(); + try { + current1.sync(); + } catch (IOException e) { + // if we switches translots (!=), then this failure is not relevant + // we are working on a new translog + if (this.current == current1) { + throw e; + } + } } @Override diff --git a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java index 5992f9fbe37..4e573f6211c 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/FsTranslogFile.java @@ -74,7 +74,7 @@ public interface FsTranslogFile { void updateBufferSize(int bufferSize) throws TranslogException; - void sync(); + void sync() throws IOException; boolean syncNeeded(); } diff --git a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java index 5d601aed68b..e98a931876a 100644 --- a/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java +++ b/src/main/java/org/elasticsearch/index/translog/fs/SimpleFsTranslogFile.java @@ -26,6 +26,7 @@ import org.elasticsearch.index.translog.TranslogException; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -34,6 +35,7 @@ public class SimpleFsTranslogFile implements FsTranslogFile { private final long id; private final ShardId shardId; private final RafReference raf; + private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicInteger operationCounter = new AtomicInteger(); @@ -76,8 +78,20 @@ public class SimpleFsTranslogFile implements FsTranslogFile { } public void close(boolean delete) { - sync(); - raf.decreaseRefCount(delete); + if (!closed.compareAndSet(false, true)) { + return; + } + try { + if (!delete) { + try { + sync(); + } catch (Exception e) { + throw new TranslogException(shardId, "failed to sync on close", e); + } + } + } finally { + raf.decreaseRefCount(delete); + } } /** @@ -99,18 +113,14 @@ public class SimpleFsTranslogFile implements FsTranslogFile { return lastWrittenPosition.get() != lastSyncPosition; } - public void sync() { - try { - // check if we really need to sync here... - long last = lastWrittenPosition.get(); - if (last == lastSyncPosition) { - return; - } - lastSyncPosition = last; - raf.channel().force(false); - } catch (Exception e) { - // ignore + public void sync() throws IOException { + // check if we really need to sync here... + long last = lastWrittenPosition.get(); + if (last == lastSyncPosition) { + return; } + lastSyncPosition = last; + raf.channel().force(false); } @Override