Simplify TranslogWriter to always write to a stream
We used to write into an in-memory buffer and if necessary also allow reading from the memory buffer if the some translog locations that are not flushed to the channel need to be read. This commit hides all writing behind a buffered output stream and if ncecessary flushes all buffered data to the channel for reading. This allows for several simplifcations like reusing javas build in BufferedOutputStream and removes the need for read write locks on the translog writer. All thread safety is now achived using the synchronized primitive.
This commit is contained in:
parent
107859f347
commit
f69502dd04
|
@ -42,6 +42,7 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.OpenOption;
|
import java.nio.file.OpenOption;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
@ -53,35 +54,23 @@ public class TranslogWriter extends TranslogReader {
|
||||||
public static final int VERSION = VERSION_CHECKPOINTS;
|
public static final int VERSION = VERSION_CHECKPOINTS;
|
||||||
|
|
||||||
private final ShardId shardId;
|
private final ShardId shardId;
|
||||||
private final ReleasableLock readLock;
|
|
||||||
private final ReleasableLock writeLock;
|
|
||||||
/* the offset in bytes that was written when the file was last synced*/
|
/* the offset in bytes that was written when the file was last synced*/
|
||||||
private volatile long lastSyncedOffset;
|
private volatile long lastSyncedOffset;
|
||||||
/* the number of translog operations written to this file */
|
/* the number of translog operations written to this file */
|
||||||
private volatile int operationCounter;
|
private volatile int operationCounter;
|
||||||
/* the offset in bytes written to the file */
|
|
||||||
private volatile long writtenOffset;
|
|
||||||
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
|
/* if we hit an exception that we can't recover from we assign it to this var and ship it with every AlreadyClosedException we throw */
|
||||||
private volatile Throwable tragedy;
|
private volatile Throwable tragedy;
|
||||||
|
/* A buffered outputstream what writes to the writers channel */
|
||||||
private final byte[] buffer;
|
private final OutputStream outputStream;
|
||||||
private int bufferCount;
|
|
||||||
private WrapperOutputStream bufferOs = new WrapperOutputStream();
|
|
||||||
|
|
||||||
/* the total offset of this file including the bytes written to the file as well as into the buffer */
|
/* the total offset of this file including the bytes written to the file as well as into the buffer */
|
||||||
private volatile long totalOffset;
|
private volatile long totalOffset;
|
||||||
|
|
||||||
|
|
||||||
public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference, ByteSizeValue bufferSize) throws IOException {
|
public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference, ByteSizeValue bufferSize) throws IOException {
|
||||||
super(generation, channelReference, channelReference.getChannel().position());
|
super(generation, channelReference, channelReference.getChannel().position());
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
ReadWriteLock rwl = new ReentrantReadWriteLock();
|
this.outputStream = new BufferedChannelOutputStream(java.nio.channels.Channels.newOutputStream(channelReference.getChannel()), bufferSize.bytesAsInt());
|
||||||
readLock = new ReleasableLock(rwl.readLock());
|
|
||||||
writeLock = new ReleasableLock(rwl.writeLock());
|
|
||||||
this.writtenOffset = channelReference.getChannel().position();
|
|
||||||
this.totalOffset = writtenOffset;
|
|
||||||
this.buffer = new byte[bufferSize.bytesAsInt()];
|
|
||||||
this.lastSyncedOffset = channelReference.getChannel().position();
|
this.lastSyncedOffset = channelReference.getChannel().position();
|
||||||
|
totalOffset = lastSyncedOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException {
|
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException {
|
||||||
|
@ -116,75 +105,59 @@ public class TranslogWriter extends TranslogReader {
|
||||||
return tragedy;
|
return tragedy;
|
||||||
}
|
}
|
||||||
|
|
||||||
private final void closeWithTragicEvent(Throwable throwable) throws IOException {
|
private synchronized final void closeWithTragicEvent(Throwable throwable) throws IOException {
|
||||||
try (ReleasableLock lock = writeLock.acquire()) {
|
if (tragedy == null) {
|
||||||
if (tragedy == null) {
|
tragedy = throwable;
|
||||||
tragedy = throwable;
|
} else {
|
||||||
} else {
|
tragedy.addSuppressed(throwable);
|
||||||
tragedy.addSuppressed(throwable);
|
|
||||||
}
|
|
||||||
close();
|
|
||||||
}
|
}
|
||||||
|
close();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* add the given bytes to the translog and return the location they were written at
|
* add the given bytes to the translog and return the location they were written at
|
||||||
*/
|
*/
|
||||||
public Translog.Location add(BytesReference data) throws IOException {
|
public synchronized Translog.Location add(BytesReference data) throws IOException {
|
||||||
try (ReleasableLock lock = writeLock.acquire()) {
|
ensureOpen();
|
||||||
ensureOpen();
|
final long offset = totalOffset;
|
||||||
final long offset = totalOffset;
|
try {
|
||||||
if (data.length() >= buffer.length) {
|
data.writeTo(outputStream);
|
||||||
flush();
|
} catch (Throwable ex) {
|
||||||
// we use the channel to write, since on windows, writing to the RAF might not be reflected
|
closeWithTragicEvent(ex);
|
||||||
// when reading through the channel
|
throw ex;
|
||||||
try {
|
|
||||||
data.writeTo(channel);
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
closeWithTragicEvent(ex);
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
writtenOffset += data.length();
|
|
||||||
totalOffset += data.length();
|
|
||||||
} else {
|
|
||||||
if (data.length() > buffer.length - bufferCount) {
|
|
||||||
flush();
|
|
||||||
}
|
|
||||||
data.writeTo(bufferOs);
|
|
||||||
totalOffset += data.length();
|
|
||||||
}
|
|
||||||
operationCounter++;
|
|
||||||
return new Translog.Location(generation, offset, data.length());
|
|
||||||
}
|
}
|
||||||
|
totalOffset += data.length();
|
||||||
|
operationCounter++;
|
||||||
|
return new Translog.Location(generation, offset, data.length());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* write all buffered ops to disk and fsync file
|
* write all buffered ops to disk and fsync file
|
||||||
*/
|
*/
|
||||||
public synchronized void sync() throws IOException {
|
public void sync() throws IOException {
|
||||||
if (syncNeeded()) {
|
if (syncNeeded()) {
|
||||||
ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event
|
synchronized (this) {
|
||||||
channelReference.incRef();
|
ensureOpen(); // this call gives a better exception that the incRef if we are closed by a tragic event
|
||||||
try {
|
channelReference.incRef();
|
||||||
final long offsetToSync;
|
try {
|
||||||
final int opsCounter;
|
final long offsetToSync;
|
||||||
try (ReleasableLock lock = writeLock.acquire()) {
|
final int opsCounter;
|
||||||
flush();
|
outputStream.flush();
|
||||||
offsetToSync = totalOffset;
|
offsetToSync = totalOffset;
|
||||||
opsCounter = operationCounter;
|
opsCounter = operationCounter;
|
||||||
|
// we can do this outside of the write lock but we have to protect from
|
||||||
|
// concurrent syncs
|
||||||
|
ensureOpen(); // just for kicks - the checkpoint happens or not either way
|
||||||
|
try {
|
||||||
|
checkpoint(offsetToSync, opsCounter, channelReference);
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
closeWithTragicEvent(ex);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
lastSyncedOffset = offsetToSync;
|
||||||
|
}finally{
|
||||||
|
channelReference.decRef();
|
||||||
}
|
}
|
||||||
// we can do this outside of the write lock but we have to protect from
|
|
||||||
// concurrent syncs
|
|
||||||
ensureOpen(); // just for kicks - the checkpoint happens or not either way
|
|
||||||
try {
|
|
||||||
checkpoint(offsetToSync, opsCounter, channelReference);
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
closeWithTragicEvent(ex);
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
lastSyncedOffset = offsetToSync;
|
|
||||||
} finally {
|
|
||||||
channelReference.decRef();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -204,28 +177,6 @@ public class TranslogWriter extends TranslogReader {
|
||||||
return totalOffset;
|
return totalOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flushes the buffer if the translog is buffered.
|
|
||||||
*/
|
|
||||||
private final void flush() throws IOException {
|
|
||||||
assert writeLock.isHeldByCurrentThread();
|
|
||||||
if (bufferCount > 0) {
|
|
||||||
ensureOpen();
|
|
||||||
// we use the channel to write, since on windows, writing to the RAF might not be reflected
|
|
||||||
// when reading through the channel
|
|
||||||
final int bufferSize = bufferCount;
|
|
||||||
try {
|
|
||||||
Channels.writeToChannel(buffer, 0, bufferSize, channel);
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
closeWithTragicEvent(ex);
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
writtenOffset += bufferSize;
|
|
||||||
bufferCount = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* returns a new reader that follows the current writes (most importantly allows making
|
* returns a new reader that follows the current writes (most importantly allows making
|
||||||
* repeated snapshots that includes new content)
|
* repeated snapshots that includes new content)
|
||||||
|
@ -235,7 +186,7 @@ public class TranslogWriter extends TranslogReader {
|
||||||
channelReference.incRef();
|
channelReference.incRef();
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
TranslogReader reader = new InnerReader(this.generation, firstOperationOffset, channelReference);
|
final TranslogReader reader = new InnerReader(this.generation, firstOperationOffset, channelReference);
|
||||||
success = true;
|
success = true;
|
||||||
return reader;
|
return reader;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -250,16 +201,18 @@ public class TranslogWriter extends TranslogReader {
|
||||||
*/
|
*/
|
||||||
public ImmutableTranslogReader immutableReader() throws TranslogException {
|
public ImmutableTranslogReader immutableReader() throws TranslogException {
|
||||||
if (channelReference.tryIncRef()) {
|
if (channelReference.tryIncRef()) {
|
||||||
try (ReleasableLock lock = writeLock.acquire()) {
|
synchronized (this) {
|
||||||
ensureOpen();
|
try {
|
||||||
flush();
|
ensureOpen();
|
||||||
ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, writtenOffset, operationCounter);
|
outputStream.flush();
|
||||||
channelReference.incRef(); // for new reader
|
ImmutableTranslogReader reader = new ImmutableTranslogReader(this.generation, channelReference, firstOperationOffset, getWrittenOffset(), operationCounter);
|
||||||
return reader;
|
channelReference.incRef(); // for new reader
|
||||||
} catch (Exception e) {
|
return reader;
|
||||||
throw new TranslogException(shardId, "exception while creating an immutable reader", e);
|
} catch (Exception e) {
|
||||||
} finally {
|
throw new TranslogException(shardId, "exception while creating an immutable reader", e);
|
||||||
channelReference.decRef();
|
} finally {
|
||||||
|
channelReference.decRef();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new TranslogException(shardId, "can't increment channel [" + channelReference + "] ref count");
|
throw new TranslogException(shardId, "can't increment channel [" + channelReference + "] ref count");
|
||||||
|
@ -272,6 +225,10 @@ public class TranslogWriter extends TranslogReader {
|
||||||
return new BytesArray(buffer.array()).equals(expectedBytes);
|
return new BytesArray(buffer.array()).equals(expectedBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getWrittenOffset() throws IOException {
|
||||||
|
return channelReference.getChannel().position();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* this class is used when one wants a reference to this file which exposes all recently written operation.
|
* this class is used when one wants a reference to this file which exposes all recently written operation.
|
||||||
* as such it needs access to the internals of the current reader
|
* as such it needs access to the internals of the current reader
|
||||||
|
@ -313,14 +270,9 @@ public class TranslogWriter extends TranslogReader {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
|
protected void readBytes(ByteBuffer targetBuffer, long position) throws IOException {
|
||||||
try (ReleasableLock lock = readLock.acquire()) {
|
if (position+targetBuffer.limit() > getWrittenOffset()) {
|
||||||
if (position >= writtenOffset) {
|
synchronized (this) {
|
||||||
assert targetBuffer.hasArray() : "buffer must have array";
|
outputStream.flush();
|
||||||
final int sourcePosition = (int) (position - writtenOffset);
|
|
||||||
System.arraycopy(buffer, sourcePosition,
|
|
||||||
targetBuffer.array(), targetBuffer.position(), targetBuffer.limit());
|
|
||||||
targetBuffer.position(targetBuffer.limit());
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// we don't have to have a read lock here because we only write ahead to the file, so all writes has been complete
|
// we don't have to have a read lock here because we only write ahead to the file, so all writes has been complete
|
||||||
|
@ -355,18 +307,31 @@ public class TranslogWriter extends TranslogReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class WrapperOutputStream extends OutputStream {
|
|
||||||
|
|
||||||
@Override
|
private final class BufferedChannelOutputStream extends BufferedOutputStream {
|
||||||
public void write(int b) throws IOException {
|
|
||||||
buffer[bufferCount++] = (byte) b;
|
public BufferedChannelOutputStream(OutputStream out, int size) throws IOException {
|
||||||
|
super(out, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(byte[] b, int off, int len) throws IOException {
|
public synchronized void flush() throws IOException {
|
||||||
// we do safety checked when we decide to use this stream...
|
if (count > 0) {
|
||||||
System.arraycopy(b, off, buffer, bufferCount, len);
|
try {
|
||||||
bufferCount += len;
|
ensureOpen();
|
||||||
|
super.flush();
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
closeWithTragicEvent(ex);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
// the stream is intentionally not closed because
|
||||||
|
// closing it will close the FileChannel
|
||||||
|
throw new IllegalStateException("never close this stream");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue