diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java index 782233d032..05f8f25541 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java @@ -80,71 +80,20 @@ final class MappedFile implements AutoCloseable { this.buffer.force(); } - /** - * Reads a sequence of bytes from this file into the given buffer. - *
- *
Bytes are read starting at this file's specified position. - */ - public int read(int position, ByteBuf dst, int dstStart, int dstLength) throws IOException { - final long srcAddress = this.address + position; - if (dst.hasMemoryAddress()) { - final long dstAddress = dst.memoryAddress() + dstStart; - PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength); - } else if (dst.hasArray()) { - final byte[] dstArray = dst.array(); - PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength); - } else { - throw new IllegalArgumentException("unsupported byte buffer"); + private void checkCapacity(int requiredCapacity) { + if (requiredCapacity < 0 || requiredCapacity > buffer.capacity()) { + throw new IllegalStateException("requiredCapacity must be >0 and <= " + buffer.capacity()); } - position += dstLength; - if (position > this.length) { - this.length = position; - } - return dstLength; } /** - * Reads a sequence of bytes from this file into the given buffer. - *
- *
Bytes are read starting at this file's specified position. + * It is raw because it doesn't validate capacity through {@link #checkCapacity(int)}. */ - public int read(int position, ByteBuffer dst, int dstStart, int dstLength) throws IOException { - final long srcAddress = this.address + position; - if (dst.isDirect()) { - final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart; - PlatformDependent.copyMemory(srcAddress, dstAddress, dstLength); - } else { - final byte[] dstArray = dst.array(); - PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, dstLength); - } - position += dstLength; + private void rawMovePositionAndLength(int position) { + this.position = position; if (position > this.length) { this.length = position; } - return dstLength; - } - - /** - * Reads a sequence of bytes from this file into the given buffer. - *
- *
Bytes are read starting at this file's current position, and - * then the position is updated with the number of bytes actually read. - */ - public int read(ByteBuf dst, int dstStart, int dstLength) throws IOException { - final int remaining = this.length - this.position; - final int read = Math.min(remaining, dstLength); - final long srcAddress = this.address + position; - if (dst.hasMemoryAddress()) { - final long dstAddress = dst.memoryAddress() + dstStart; - PlatformDependent.copyMemory(srcAddress, dstAddress, read); - } else if (dst.hasArray()) { - final byte[] dstArray = dst.array(); - PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read); - } else { - throw new IllegalArgumentException("unsupported byte buffer"); - } - position += read; - return read; } /** @@ -156,7 +105,7 @@ final class MappedFile implements AutoCloseable { public int read(ByteBuffer dst, int dstStart, int dstLength) throws IOException { final int remaining = this.length - this.position; final int read = Math.min(remaining, dstLength); - final long srcAddress = this.address + position; + final long srcAddress = this.address + this.position; if (dst.isDirect()) { final long dstAddress = PlatformDependent.directBufferAddress(dst) + dstStart; PlatformDependent.copyMemory(srcAddress, dstAddress, read); @@ -164,7 +113,7 @@ final class MappedFile implements AutoCloseable { final byte[] dstArray = dst.array(); PlatformDependent.copyMemory(srcAddress, dstArray, dstStart, read); } - position += read; + this.position += read; return read; } @@ -175,13 +124,12 @@ final class MappedFile implements AutoCloseable { */ public void write(EncodingSupport encodingSupport) throws IOException { final int encodedSize = encodingSupport.getEncodeSize(); + final int nextPosition = this.position + encodedSize; + checkCapacity(nextPosition); this.byteBufWrapper.setIndex(this.position, this.position); encodingSupport.encode(this.channelBufferWrapper); - position += encodedSize; - assert (byteBufWrapper.writerIndex() == position); - if (position > this.length) { - this.length = position; - } + rawMovePositionAndLength(nextPosition); + assert (byteBufWrapper.writerIndex() == this.position); } /** @@ -190,7 +138,9 @@ final class MappedFile implements AutoCloseable { *
Bytes are written starting at this file's current position, */ public void write(ByteBuf src, int srcStart, int srcLength) throws IOException { - final long destAddress = this.address + position; + final int nextPosition = this.position + srcLength; + checkCapacity(nextPosition); + final long destAddress = this.address + this.position; if (src.hasMemoryAddress()) { final long srcAddress = src.memoryAddress() + srcStart; PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); @@ -200,10 +150,7 @@ final class MappedFile implements AutoCloseable { } else { throw new IllegalArgumentException("unsupported byte buffer"); } - position += srcLength; - if (position > this.length) { - this.length = position; - } + rawMovePositionAndLength(nextPosition); } /** @@ -212,7 +159,9 @@ final class MappedFile implements AutoCloseable { *
Bytes are written starting at this file's current position, */ public void write(ByteBuffer src, int srcStart, int srcLength) throws IOException { - final long destAddress = this.address + position; + final int nextPosition = this.position + srcLength; + checkCapacity(nextPosition); + final long destAddress = this.address + this.position; if (src.isDirect()) { final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart; PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); @@ -220,52 +169,7 @@ final class MappedFile implements AutoCloseable { final byte[] srcArray = src.array(); PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength); } - position += srcLength; - if (position > this.length) { - this.length = position; - } - } - - /** - * Writes a sequence of bytes to this file from the given buffer. - *
- *
Bytes are written starting at this file's specified position, - */ - public void write(int position, ByteBuf src, int srcStart, int srcLength) throws IOException { - final long destAddress = this.address + position; - if (src.hasMemoryAddress()) { - final long srcAddress = src.memoryAddress() + srcStart; - PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); - } else if (src.hasArray()) { - final byte[] srcArray = src.array(); - PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength); - } else { - throw new IllegalArgumentException("unsupported byte buffer"); - } - position += srcLength; - if (position > this.length) { - this.length = position; - } - } - - /** - * Writes a sequence of bytes to this file from the given buffer. - *
- *
Bytes are written starting at this file's specified position, - */ - public void write(int position, ByteBuffer src, int srcStart, int srcLength) throws IOException { - final long destAddress = this.address + position; - if (src.isDirect()) { - final long srcAddress = PlatformDependent.directBufferAddress(src) + srcStart; - PlatformDependent.copyMemory(srcAddress, destAddress, srcLength); - } else { - final byte[] srcArray = src.array(); - PlatformDependent.copyMemory(srcArray, srcStart, destAddress, srcLength); - } - position += srcLength; - if (position > this.length) { - this.length = position; - } + rawMovePositionAndLength(nextPosition); } /** @@ -274,6 +178,7 @@ final class MappedFile implements AutoCloseable { *
Bytes are written starting at this file's current position, */ public void zeros(int position, final int count) throws IOException { + checkCapacity(position + count); //zeroes memory in reverse direction in OS_PAGE_SIZE batches //to gain sympathy by the page cache LRU policy final long start = this.address + position; @@ -301,7 +206,7 @@ final class MappedFile implements AutoCloseable { if (toZeros > 0) { PlatformDependent.setMemory(start, toZeros, (byte) 0); } - + //do not move this.position: only this.length can be changed position += count; if (position > this.length) { this.length = position; @@ -309,15 +214,16 @@ final class MappedFile implements AutoCloseable { } public int position() { - return position; + return this.position; } public void position(int position) { + checkCapacity(position); this.position = position; } public long length() { - return length; + return this.length; } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java index 7619d1a934..aec472396a 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFile.java @@ -135,10 +135,12 @@ final class MappedSequentialFile implements SequentialFile { public void fill(int size) throws IOException { checkIsOpen(); //the fill will give a big performance hit when done in parallel of other writings! - this.mappedFile.zeros(this.mappedFile.position(), size); + this.mappedFile.zeros(0, size); if (factory.isDatasync()) { this.mappedFile.force(); } + //set the position to 0 to match the fill contract + this.mappedFile.position(0); } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java index d1e333e90a..891bd5cce2 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java @@ -112,7 +112,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile { } //set the position to 0 to match the fill contract channel.position(0); - fileSize = size; + fileSize = channel.size(); } finally { //return it to the factory this.factory.releaseBuffer(zeroPage); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java index 937435ecab..4b9d321c17 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/journal/MappedSequentialFileFactoryTest.java @@ -123,7 +123,7 @@ public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBa SequentialFile file = factory.createSequentialFile("file.txt"); file.open(); Thread.currentThread().interrupt(); - file.fill(1024); + file.fill(fakeEncoding.getEncodeSize()); file.close(); } catch (Exception e) {