From 3950169c21d97538708320ef29a7f5bab279a50f Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 19 Oct 2017 20:17:34 +0200 Subject: [PATCH] ARTEMIS-1471 Needs Bounds Checking on writes for MappedSequentialFile The MappedSequentialFile relies on the assumption that any writers won't exceed the maximum capacity of the file, leaving the JVM to crash otherwise. This commit adds proper bounds checking on write operations (and position changes too) in order to provide recoverable effects if such scenario should occour. In addition are provided minor fixes on Mapped and Nio SequentialFile::fill behaviour to match the original contract. --- .../artemis/core/io/mapped/MappedFile.java | 144 +++--------------- .../core/io/mapped/MappedSequentialFile.java | 4 +- .../core/io/nio/NIOSequentialFile.java | 2 +- .../MappedSequentialFileFactoryTest.java | 2 +- 4 files changed, 30 insertions(+), 122 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java index 782233d032..05f8f25541 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java @@ -80,71 +80,20 @@ final class MappedFile implements AutoCloseable { this.buffer.force(); } - /** - * Reads a sequence of bytes from this file into the given buffer. - *

- *

Bytes are read starting at this file's specified position. - */ - public int read(int position, ByteBuf dst, int dstStart, int dstLength) throws IOException { - final long srcAddress = this.address + position; - if (dst.hasMemoryAddress()) { - final long dstAddress = dst.memoryAddress() + dstStart; - PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength); - } else if (dst.hasArray()) { - final byte[] dstArray = dst.array(); - PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength); - } else { - throw new IllegalArgumentException("unsupported byte buffer"); + private void checkCapacity(int requiredCapacity) { + if (requiredCapacity < 0 || requiredCapacity > buffer.capacity()) { + throw new IllegalStateException("requiredCapacity must be >0 and <= " + buffer.capacity()); } - position += dstLength; - if (position > this.length) { - this.length = position; - } - return dstLength; } /** - * Reads a sequence of bytes from this file into the given buffer. - *

- *

Bytes are read starting at this file's specified position. + * It is raw because it doesn't validate capacity through {@link #checkCapacity(int)}. */ - public int read(int position, ByteBuffer dst, int dstStart, int dstLength) throws IOException { - final long srcAddress = this.address + position; - if (dst.isDirect()) { - final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart; - PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength); - } else { - final byte[] dstArray = dst.array(); - PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength); - } - position += dstLength; + private void rawMovePositionAndLength(int position) { + this.position = position; if (position > this.length) { this.length = position; } - return dstLength; - } - - /** - * Reads a sequence of bytes from this file into the given buffer. - *

- *

Bytes are read starting at this file's current position, and - * then the position is updated with the number of bytes actually read. - */ - public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException { - final int remaining = this.length - this.position; - final int read = Math.min(remaining, dstLength); - final long srcAddress = this.address + position; - if (dst.hasMemoryAddress()) { - final long dstAddress = dst.memoryAddress() + dstStart; - PlatformDependent.copyMemory(srcAddress, dstAddress, read); - } else if (dst.hasArray()) { - final byte[] dstArray = dst.array(); - PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read); - } else { - throw new IllegalArgumentException("unsupported byte buffer"); - } - position += read; - return read; } /** @@ -156,7 +105,7 @@ final class MappedFile implements AutoCloseable { public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException { final int remaining = this.length - this.position; final int read = Math.min(remaining, dstLength); - final long srcAddress = this.address + position; + final long srcAddress = this.address + this.position; if (dst.isDirect()) { final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart; PlatformDependent.copyMemory(srcAddress, dstAddress, read); @@ -164,7 +113,7 @@ final class MappedFile implements AutoCloseable { final byte[] dstArray = dst.array(); PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read); } - position += read; + this.position += read; return read; } @@ -175,13 +124,12 @@ final class MappedFile implements AutoCloseable { */ public void write(EncodingSupport encodingSupport) throws IOException { final int encodedSize = encodingSupport.getEncodeSize(); + final int nextPosition = this.position + encodedSize; + checkCapacity(nextPosition); this.byteBufWrapper.setIndex(this.position, this.position); encodingSupport.encode(this.channelBufferWrapper); - position += encodedSize; - assert (byteBufWrapper.writerIndex() == position); - if (position > this.length) { - this.length = position; - } + rawMovePositionAndLength(nextPosition); + assert (byteBufWrapper.writerIndex() == this.position); } /** @@ -190,7 +138,9 @@ final class MappedFile implements AutoCloseable { *

Bytes are written starting at this file's current position, */ public void write(ByteBuf src, int srcStart, int srcLength) throws IOException { - final long destAddress = this.address + position; + final int nextPosition = this.position + srcLength; + checkCapacity(nextPosition); + final long destAddress = this.address + this.position; if (src.hasMemoryAddress()) { final long srcAddress = src.memoryAddress() + srcStart; PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); @@ -200,10 +150,7 @@ final class MappedFile implements AutoCloseable { } else { throw new IllegalArgumentException("unsupported byte buffer"); } - position += srcLength; - if (position > this.length) { - this.length = position; - } + rawMovePositionAndLength(nextPosition); } /** @@ -212,7 +159,9 @@ final class MappedFile implements AutoCloseable { *

Bytes are written starting at this file's current position, */ public void write(ByteBuffer src, int srcStart, int srcLength) throws IOException { - final long destAddress = this.address + position; + final int nextPosition = this.position + srcLength; + checkCapacity(nextPosition); + final long destAddress = this.address + this.position; if (src.isDirect()) { final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart; PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); @@ -220,52 +169,7 @@ final class MappedFile implements AutoCloseable { final byte[] srcArray = src.array(); PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength); } - position += srcLength; - if (position > this.length) { - this.length = position; - } - } - - /** - * Writes a sequence of bytes to this file from the given buffer. - *

- *

Bytes are written starting at this file's specified position, - */ - public void write(int position, ByteBuf src, int srcStart, int srcLength) throws IOException { - final long destAddress = this.address + position; - if (src.hasMemoryAddress()) { - final long srcAddress = src.memoryAddress() + srcStart; - PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); - } else if (src.hasArray()) { - final byte[] srcArray = src.array(); - PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength); - } else { - throw new IllegalArgumentException("unsupported byte buffer"); - } - position += srcLength; - if (position > this.length) { - this.length = position; - } - } - - /** - * Writes a sequence of bytes to this file from the given buffer. - *

- *

Bytes are written starting at this file's specified position, - */ - public void write(int position, ByteBuffer src, int srcStart, int srcLength) throws IOException { - final long destAddress = this.address + position; - if (src.isDirect()) { - final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart; - PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); - } else { - final byte[] srcArray = src.array(); - PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength); - } - position += srcLength; - if (position > this.length) { - this.length = position; - } + rawMovePositionAndLength(nextPosition); } /** @@ -274,6 +178,7 @@ final class MappedFile implements AutoCloseable { *

Bytes are written starting at this file's current position, */ public void zeros(int position, final int count) throws IOException { + checkCapacity(position + count); //zeroes memory in reverse direction in OS_PAGE_SIZE batches //to gain sympathy by the page cache LRU policy final long start = this.address + position; @@ -301,7 +206,7 @@ final class MappedFile implements AutoCloseable { if (toZeros > 0) { PlatformDependent.setMemory(start, toZeros, (byte) 0); } - + //do not move this.position: only this.length can be changed position += count; if (position > this.length) { this.length = position; @@ -309,15 +214,16 @@ final class MappedFile implements AutoCloseable { } public int position() { - return position; + return this.position; } public void position(int position) { + checkCapacity(position); this.position = position; } public long length() { - return length; + return this.length; } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java index 7619d1a934..aec472396a 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java @@ -135,10 +135,12 @@ final class MappedSequentialFile implements SequentialFile { public void fill(int size) throws IOException { checkIsOpen(); //the fill will give a big performance hit when done in parallel of other writings! - this.mappedFile.zeros(this.mappedFile.position(), size); + this.mappedFile.zeros(0, size); if (factory.isDatasync()) { this.mappedFile.force(); } + //set the position to 0 to match the fill contract + this.mappedFile.position(0); } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index d1e333e90a..891bd5cce2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -112,7 +112,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { } //set the position to 0 to match the fill contract channel.position(0); - fileSize = size; + fileSize = channel.size(); } finally { //return it to the factory this.factory.releaseBuffer(zeroPage); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java index 937435ecab..4b9d321c17 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java @@ -123,7 +123,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa SequentialFile file = factory.createSequentialFile("file.txt"); file.open(); Thread.currentThread().interrupt(); - file.fill(1024); + file.fill(fakeEncoding.getEncodeSize()); file.close(); } catch (Exception e) {