diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 49392088692..d0051714b07 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -42,6 +42,7 @@ import java.nio.file.Files; import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -53,35 +54,23 @@ public class TranslogWriter extends TranslogReader { public static final int VERSION = VERSION_CHECKPOINTS; private final ShardId shardId; - private final ReleasableLock readLock; - private final ReleasableLock writeLock; /* the offset in bytes that was written when the file was last synced*/ private volatile long lastSyncedOffset; /* the number of translog operations written to this file */ private volatile int operationCounter; - /* the offset in bytes written to the file */ - private volatile long writtenOffset; /* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */ private volatile Throwable tragedy; - - private final byte[] buffer; - private int bufferCount; - private WrapperOutputStream bufferOs = new WrapperOutputStream(); - + /* A buffered outputstream what writes to the writers channel */ + private final OutputStream outputStream; /* the total offset of this file including the bytes written to the file as well as into the buffer */ private volatile long totalOffset; - public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference, ByteSizeValue bufferSize) throws IOException { super(generation, channelReference, channelReference.getChannel().position()); this.shardId = shardId; - ReadWriteLock rwl = new ReentrantReadWriteLock(); - readLock = new ReleasableLock(rwl.readLock()); - writeLock = new ReleasableLock(rwl.writeLock()); - this.writtenOffset = channelReference.getChannel().position(); - this.totalOffset = writtenOffset; - this.buffer = new byte[bufferSize.bytesAsInt()]; + this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channelReference.getChannel()), bufferSize.bytesAsInt()); this.lastSyncedOffset = channelReference.getChannel().position(); + totalOffset = lastSyncedOffset; } public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { @@ -116,75 +105,59 @@ public class TranslogWriter extends TranslogReader { return tragedy; } - private final void closeWithTragicEvent(Throwable throwable) throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { - if (tragedy == null) { - tragedy = throwable; - } else { - tragedy.addSuppressed(throwable); - } - close(); + private synchronized final void closeWithTragicEvent(Throwable throwable) throws IOException { + if (tragedy == null) { + tragedy = throwable; + } else { + tragedy.addSuppressed(throwable); } + close(); } /** * add the given bytes to the translog and return the location they were written at */ - public Translog.Location add(BytesReference data) throws IOException { - try (ReleasableLock lock = writeLock.acquire()) { - ensureOpen(); - final long offset = totalOffset; - if (data.length() >= buffer.length) { - flush(); - // we use the channel to write, since on windows, writing to the RAF might not be reflected - // when reading through the channel - try { - data.writeTo(channel); - } catch (Throwable ex) { - closeWithTragicEvent(ex); - throw ex; - } - writtenOffset += data.length(); - totalOffset += data.length(); - } else { - if (data.length() > buffer.length - bufferCount) { - flush(); - } - data.writeTo(bufferOs); - totalOffset += data.length(); - } - operationCounter++; - return new Translog.Location(generation, offset, data.length()); + public synchronized Translog.Location add(BytesReference data) throws IOException { + ensureOpen(); + final long offset = totalOffset; + try { + data.writeTo(outputStream); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; } + totalOffset += data.length(); + operationCounter++; + return new Translog.Location(generation, offset, data.length()); } /** * write all buffered ops to disk and fsync file */ - public synchronized void sync() throws IOException { + public void sync() throws IOException { if (syncNeeded()) { - ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event - channelReference.incRef(); - try { - final long offsetToSync; - final int opsCounter; - try (ReleasableLock lock = writeLock.acquire()) { - flush(); + synchronized (this) { + ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event + channelReference.incRef(); + try { + final long offsetToSync; + final int opsCounter; + outputStream.flush(); offsetToSync = totalOffset; opsCounter = operationCounter; + // we can do this outside of the write lock but we have to protect from + // concurrent syncs + ensureOpen(); // just for kicks - the checkpoint happens or not either way + try { + checkpoint(offsetToSync, opsCounter, channelReference); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + lastSyncedOffset = offsetToSync; + }finally{ + channelReference.decRef(); } - // we can do this outside of the write lock but we have to protect from - // concurrent syncs - ensureOpen(); // just for kicks - the checkpoint happens or not either way - try { - checkpoint(offsetToSync, opsCounter, channelReference); - } catch (Throwable ex) { - closeWithTragicEvent(ex); - throw ex; - } - lastSyncedOffset = offsetToSync; - } finally { - channelReference.decRef(); } } } @@ -204,28 +177,6 @@ public class TranslogWriter extends TranslogReader { return totalOffset; } - - /** - * Flushes the buffer if the translog is buffered. - */ - private final void flush() throws IOException { - assert writeLock.isHeldByCurrentThread(); - if (bufferCount > 0) { - ensureOpen(); - // we use the channel to write, since on windows, writing to the RAF might not be reflected - // when reading through the channel - final int bufferSize = bufferCount; - try { - Channels.writeToChannel(buffer, 0, bufferSize, channel); - } catch (Throwable ex) { - closeWithTragicEvent(ex); - throw ex; - } - writtenOffset += bufferSize; - bufferCount = 0; - } - } - /** * returns a new reader that follows the current writes (most importantly allows making * repeated snapshots that includes new content) @@ -235,7 +186,7 @@ public class TranslogWriter extends TranslogReader { channelReference.incRef(); boolean success = false; try { - TranslogReader reader = new InnerReader(this.generation, firstOperationOffset, channelReference); + final TranslogReader reader = new InnerReader(this.generation, firstOperationOffset, channelReference); success = true; return reader; } finally { @@ -250,16 +201,18 @@ public class TranslogWriter extends TranslogReader { */ public ImmutableTranslogReader immutableReader() throws TranslogException { if (channelReference.tryIncRef()) { - try (ReleasableLock lock = writeLock.acquire()) { - ensureOpen(); - flush(); - ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, writtenOffset, operationCounter); - channelReference.incRef(); // for new reader - return reader; - } catch (Exception e) { - throw new TranslogException(shardId, "exception while creating an immutable reader", e); - } finally { - channelReference.decRef(); + synchronized (this) { + try { + ensureOpen(); + outputStream.flush(); + ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, getWrittenOffset(), operationCounter); + channelReference.incRef(); // for new reader + return reader; + } catch (Exception e) { + throw new TranslogException(shardId, "exception while creating an immutable reader", e); + } finally { + channelReference.decRef(); + } } } else { throw new TranslogException(shardId, "can't increment channel [" + channelReference + "] ref count"); @@ -272,6 +225,10 @@ public class TranslogWriter extends TranslogReader { return new BytesArray(buffer.array()).equals(expectedBytes); } + private long getWrittenOffset() throws IOException { + return channelReference.getChannel().position(); + } + /** * this class is used when one wants a reference to this file which exposes all recently written operation. * as such it needs access to the internals of the current reader @@ -313,14 +270,9 @@ public class TranslogWriter extends TranslogReader { @Override protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException { - try (ReleasableLock lock = readLock.acquire()) { - if (position >= writtenOffset) { - assert targetBuffer.hasArray() : "buffer must have array"; - final int sourcePosition = (int) (position - writtenOffset); - System.arraycopy(buffer, sourcePosition, - targetBuffer.array(), targetBuffer.position(), targetBuffer.limit()); - targetBuffer.position(targetBuffer.limit()); - return; + if (position+targetBuffer.limit() > getWrittenOffset()) { + synchronized (this) { + outputStream.flush(); } } // we don't have to have a read lock here because we only write ahead to the file, so all writes has been complete @@ -355,18 +307,31 @@ public class TranslogWriter extends TranslogReader { } } - class WrapperOutputStream extends OutputStream { - @Override - public void write(int b) throws IOException { - buffer[bufferCount++] = (byte) b; + private final class BufferedChannelOutputStream extends BufferedOutputStream { + + public BufferedChannelOutputStream(OutputStream out, int size) throws IOException { + super(out, size); } @Override - public void write(byte[] b, int off, int len) throws IOException { - // we do safety checked when we decide to use this stream... - System.arraycopy(b, off, buffer, bufferCount, len); - bufferCount += len; + public synchronized void flush() throws IOException { + if (count > 0) { + try { + ensureOpen(); + super.flush(); + } catch (Throwable ex) { + closeWithTragicEvent(ex); + throw ex; + } + } + } + + @Override + public void close() throws IOException { + // the stream is intentionally not closed because + // closing it will close the FileChannel + throw new IllegalStateException("never close this stream"); } } }