ARTEMIS-1471 Needs Bounds Checking on writes for MappedSequentialFile

The MappedSequentialFile relies on the assumption that any writers
won't exceed the maximum capacity of the file, leaving the JVM to crash otherwise.
This commit adds proper bounds checking on write operations (and position changes too)
in order to provide recoverable effects if such scenario should occour.
In addition are provided minor fixes on Mapped and Nio SequentialFile::fill behaviour
to match the original contract.
This commit is contained in:
Francesco Nigro 2017-10-19 20:17:34 +02:00 committed by Clebert Suconic
parent f7ec00b845
commit 3950169c21
4 changed files with 30 additions and 122 deletions

View File

@ -80,71 +80,20 @@ final class MappedFile implements AutoCloseable {
this.buffer.force();
}
/**
* Reads a sequence of bytes from this file into the given buffer.
* <p>
* <p> Bytes are read starting at this file's specified position.
*/
public int read(int position, ByteBuf dst, int dstStart, int dstLength) throws IOException {
final long srcAddress = this.address + position;
if (dst.hasMemoryAddress()) {
final long dstAddress = dst.memoryAddress() + dstStart;
PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
} else if (dst.hasArray()) {
final byte[] dstArray = dst.array();
PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength);
} else {
throw new IllegalArgumentException("unsupported byte buffer");
private void checkCapacity(int requiredCapacity) {
if (requiredCapacity < 0 || requiredCapacity > buffer.capacity()) {
throw new IllegalStateException("requiredCapacity must be >0 and <= " + buffer.capacity());
}
position += dstLength;
if (position > this.length) {
this.length = position;
}
return dstLength;
}
/**
* Reads a sequence of bytes from this file into the given buffer.
* <p>
* <p> Bytes are read starting at this file's specified position.
* It is raw because it doesn't validate capacity through {@link #checkCapacity(int)}.
*/
public int read(int position, ByteBuffer dst, int dstStart, int dstLength) throws IOException {
final long srcAddress = this.address + position;
if (dst.isDirect()) {
final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength);
} else {
final byte[] dstArray = dst.array();
PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength);
}
position += dstLength;
private void rawMovePositionAndLength(int position) {
this.position = position;
if (position > this.length) {
this.length = position;
}
return dstLength;
}
/**
* Reads a sequence of bytes from this file into the given buffer.
* <p>
* <p> Bytes are read starting at this file's current position, and
* then the position is updated with the number of bytes actually read.
*/
public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException {
final int remaining = this.length - this.position;
final int read = Math.min(remaining, dstLength);
final long srcAddress = this.address + position;
if (dst.hasMemoryAddress()) {
final long dstAddress = dst.memoryAddress() + dstStart;
PlatformDependent.copyMemory(srcAddress, dstAddress, read);
} else if (dst.hasArray()) {
final byte[] dstArray = dst.array();
PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read);
} else {
throw new IllegalArgumentException("unsupported byte buffer");
}
position += read;
return read;
}
/**
@ -156,7 +105,7 @@ final class MappedFile implements AutoCloseable {
public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException {
final int remaining = this.length - this.position;
final int read = Math.min(remaining, dstLength);
final long srcAddress = this.address + position;
final long srcAddress = this.address + this.position;
if (dst.isDirect()) {
final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart;
PlatformDependent.copyMemory(srcAddress, dstAddress, read);
@ -164,7 +113,7 @@ final class MappedFile implements AutoCloseable {
final byte[] dstArray = dst.array();
PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read);
}
position += read;
this.position += read;
return read;
}
@ -175,13 +124,12 @@ final class MappedFile implements AutoCloseable {
*/
public void write(EncodingSupport encodingSupport) throws IOException {
final int encodedSize = encodingSupport.getEncodeSize();
final int nextPosition = this.position + encodedSize;
checkCapacity(nextPosition);
this.byteBufWrapper.setIndex(this.position, this.position);
encodingSupport.encode(this.channelBufferWrapper);
position += encodedSize;
assert (byteBufWrapper.writerIndex() == position);
if (position > this.length) {
this.length = position;
}
rawMovePositionAndLength(nextPosition);
assert (byteBufWrapper.writerIndex() == this.position);
}
/**
@ -190,7 +138,9 @@ final class MappedFile implements AutoCloseable {
* <p> Bytes are written starting at this file's current position,
*/
public void write(ByteBuf src, int srcStart, int srcLength) throws IOException {
final long destAddress = this.address + position;
final int nextPosition = this.position + srcLength;
checkCapacity(nextPosition);
final long destAddress = this.address + this.position;
if (src.hasMemoryAddress()) {
final long srcAddress = src.memoryAddress() + srcStart;
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
@ -200,10 +150,7 @@ final class MappedFile implements AutoCloseable {
} else {
throw new IllegalArgumentException("unsupported byte buffer");
}
position += srcLength;
if (position > this.length) {
this.length = position;
}
rawMovePositionAndLength(nextPosition);
}
/**
@ -212,7 +159,9 @@ final class MappedFile implements AutoCloseable {
* <p> Bytes are written starting at this file's current position,
*/
public void write(ByteBuffer src, int srcStart, int srcLength) throws IOException {
final long destAddress = this.address + position;
final int nextPosition = this.position + srcLength;
checkCapacity(nextPosition);
final long destAddress = this.address + this.position;
if (src.isDirect()) {
final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
@ -220,52 +169,7 @@ final class MappedFile implements AutoCloseable {
final byte[] srcArray = src.array();
PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
}
position += srcLength;
if (position > this.length) {
this.length = position;
}
}
/**
* Writes a sequence of bytes to this file from the given buffer.
* <p>
* <p> Bytes are written starting at this file's specified position,
*/
public void write(int position, ByteBuf src, int srcStart, int srcLength) throws IOException {
final long destAddress = this.address + position;
if (src.hasMemoryAddress()) {
final long srcAddress = src.memoryAddress() + srcStart;
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
} else if (src.hasArray()) {
final byte[] srcArray = src.array();
PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
} else {
throw new IllegalArgumentException("unsupported byte buffer");
}
position += srcLength;
if (position > this.length) {
this.length = position;
}
}
/**
* Writes a sequence of bytes to this file from the given buffer.
* <p>
* <p> Bytes are written starting at this file's specified position,
*/
public void write(int position, ByteBuffer src, int srcStart, int srcLength) throws IOException {
final long destAddress = this.address + position;
if (src.isDirect()) {
final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart;
PlatformDependent.copyMemory(srcAddress, destAddress, srcLength);
} else {
final byte[] srcArray = src.array();
PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength);
}
position += srcLength;
if (position > this.length) {
this.length = position;
}
rawMovePositionAndLength(nextPosition);
}
/**
@ -274,6 +178,7 @@ final class MappedFile implements AutoCloseable {
* <p> Bytes are written starting at this file's current position,
*/
public void zeros(int position, final int count) throws IOException {
checkCapacity(position + count);
//zeroes memory in reverse direction in OS_PAGE_SIZE batches
//to gain sympathy by the page cache LRU policy
final long start = this.address + position;
@ -301,7 +206,7 @@ final class MappedFile implements AutoCloseable {
if (toZeros > 0) {
PlatformDependent.setMemory(start, toZeros, (byte) 0);
}
//do not move this.position: only this.length can be changed
position += count;
if (position > this.length) {
this.length = position;
@ -309,15 +214,16 @@ final class MappedFile implements AutoCloseable {
}
public int position() {
return position;
return this.position;
}
public void position(int position) {
checkCapacity(position);
this.position = position;
}
public long length() {
return length;
return this.length;
}
@Override

View File

@ -135,10 +135,12 @@ final class MappedSequentialFile implements SequentialFile {
public void fill(int size) throws IOException {
checkIsOpen();
//the fill will give a big performance hit when done in parallel of other writings!
this.mappedFile.zeros(this.mappedFile.position(), size);
this.mappedFile.zeros(0, size);
if (factory.isDatasync()) {
this.mappedFile.force();
}
//set the position to 0 to match the fill contract
this.mappedFile.position(0);
}
@Override

View File

@ -112,7 +112,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
}
//set the position to 0 to match the fill contract
channel.position(0);
fileSize = size;
fileSize = channel.size();
} finally {
//return it to the factory
this.factory.releaseBuffer(zeroPage);

View File

@ -123,7 +123,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
Thread.currentThread().interrupt();
file.fill(1024);
file.fill(fakeEncoding.getEncodeSize());
file.close();
} catch (Exception e) {