diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index bf0ac87ad2..88c6d49583 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.utils; import java.nio.ByteBuffer; +import java.nio.ReadOnlyBufferException; +import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -342,4 +344,52 @@ public class ByteUtil { public static int intFromBytes(byte b1, byte b2, byte b3, byte b4) { return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF); } + + /** + * It zeroes the whole {@link ByteBuffer#capacity()} of the given {@code buffer}. + * + * @throws ReadOnlyBufferException if {@code buffer} is read-only + */ + public static void zeros(final ByteBuffer buffer) { + uncheckedZeros(buffer, 0, buffer.capacity()); + } + + /** + * It zeroes {@code bytes} of the given {@code buffer}, starting (inclusive) from {@code offset}. + * + * @throws IndexOutOfBoundsException if {@code offset + bytes > }{@link ByteBuffer#capacity()} or {@code offset >= }{@link ByteBuffer#capacity()} + * @throws IllegalArgumentException if {@code offset} or {@code capacity} are less then 0 + * @throws ReadOnlyBufferException if {@code buffer} is read-only + */ + public static void zeros(final ByteBuffer buffer, int offset, int bytes) { + if (offset < 0 || bytes < 0) { + throw new IllegalArgumentException(); + } + final int capacity = buffer.capacity(); + if (offset >= capacity || (offset + bytes) > capacity) { + throw new IndexOutOfBoundsException(); + } + uncheckedZeros(buffer, offset, bytes); + } + + private static void uncheckedZeros(final ByteBuffer buffer, int offset, int bytes) { + if (buffer.isReadOnly()) { + throw new ReadOnlyBufferException(); + } + final byte zero = (byte) 0; + if (buffer.isDirect() && PlatformDependent.hasUnsafe()) { + PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer) + offset, bytes, zero); + } else if (buffer.hasArray()) { + //SIMD OPTIMIZATION + final int arrayOffset = buffer.arrayOffset(); + final int start = arrayOffset + offset; + Arrays.fill(buffer.array(), start, start + bytes, zero); + } else { + //slow path + for (int i = 0; i < bytes; i++) { + buffer.put(i + offset, zero); + } + } + } + } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java similarity index 66% rename from artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java index 986b69839a..fd60402759 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java @@ -14,19 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.core.io.mapped; +package org.apache.activemq.artemis.utils; -import java.nio.ByteBuffer; +/** + * Collection of bit-tricks for power of 2 cases. + */ +public final class PowerOf2Util { -import io.netty.util.internal.PlatformDependent; - -final class BytesUtils { - - private BytesUtils() { + private PowerOf2Util() { } - public static long align(final long value, final long alignment) { - return (value + (alignment - 1)) & ~(alignment - 1); + /** + * Fast alignment operation with power of 2 {@code alignment} and {@code value >=0} and {@code value <}{@link Integer#MAX_VALUE}.
+ * In order to be fast is up to the caller to check arguments correctness. + * Original algorithm is on https://en.wikipedia.org/wiki/Data_structure_alignment. + */ + public static int align(final int value, final int pow2alignment) { + return (value + (pow2alignment - 1)) & ~(pow2alignment - 1); } /** @@ -54,20 +58,4 @@ final class BytesUtils { return (value & (pow2alignment - 1)) == 0; } - public static void zerosDirect(final ByteBuffer buffer) { - //DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops - int remaining = buffer.capacity(); - long address = PlatformDependent.directBufferAddress(buffer); - while (remaining >= 8) { - PlatformDependent.putLong(address, 0L); - address += 8; - remaining -= 8; - } - while (remaining > 0) { - PlatformDependent.putByte(address, (byte) 0); - address++; - remaining--; - } - } - } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java index 80aa7e5c98..000ae56c24 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java @@ -17,6 +17,11 @@ package org.apache.activemq.artemis.utils; import java.nio.ByteBuffer; +import java.nio.ByteBuffer; +import java.nio.ReadOnlyBufferException; +import java.util.Arrays; + +import io.netty.util.internal.PlatformDependent; import org.junit.Assert; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -87,6 +92,219 @@ public class ByteUtilTest { } } + private static byte[] duplicateRemaining(ByteBuffer buffer, int offset, int bytes) { + final int end = offset + bytes; + final int expectedRemaining = buffer.capacity() - end; + //it is handling the case of <0 just to allow from to > capacity + if (expectedRemaining <= 0) { + return null; + } + final byte[] remaining = new byte[expectedRemaining]; + final ByteBuffer duplicate = buffer.duplicate(); + duplicate.clear().position(end); + duplicate.get(remaining, 0, expectedRemaining); + return remaining; + } + + private static byte[] duplicateBefore(ByteBuffer buffer, int offset) { + if (offset <= 0) { + return null; + } + final int size = Math.min(buffer.capacity(), offset); + final byte[] remaining = new byte[size]; + final ByteBuffer duplicate = buffer.duplicate(); + duplicate.clear(); + duplicate.get(remaining, 0, size); + return remaining; + } + + private static void shouldZeroesByteBuffer(ByteBuffer buffer, int offset, int bytes) { + final byte[] originalBefore = duplicateBefore(buffer, offset); + final byte[] originalRemaining = duplicateRemaining(buffer, offset, bytes); + final int position = buffer.position(); + final int limit = buffer.limit(); + ByteUtil.zeros(buffer, offset, bytes); + Assert.assertEquals(position, buffer.position()); + Assert.assertEquals(limit, buffer.limit()); + final byte[] zeros = new byte[bytes]; + final byte[] content = new byte[bytes]; + final ByteBuffer duplicate = buffer.duplicate(); + duplicate.clear().position(offset); + duplicate.get(content, 0, bytes); + Assert.assertArrayEquals(zeros, content); + if (originalRemaining != null) { + final byte[] remaining = new byte[duplicate.remaining()]; + //duplicate position has been moved of bytes + duplicate.get(remaining); + Assert.assertArrayEquals(originalRemaining, remaining); + } + if (originalBefore != null) { + final byte[] before = new byte[offset]; + //duplicate position has been moved of bytes: need to reset it + duplicate.position(0); + duplicate.get(before); + Assert.assertArrayEquals(originalBefore, before); + } + } + + private ByteBuffer fill(ByteBuffer buffer, int offset, int length, byte value) { + for (int i = 0; i < length; i++) { + buffer.put(offset + i, value); + } + return buffer; + } + + @Test + public void shouldZeroesDirectByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity); + try { + fill(buffer, 0, capacity, one); + shouldZeroesByteBuffer(buffer, offset, bytes); + } finally { + if (PlatformDependent.hasUnsafe()) { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } + + @Test + public void shouldZeroesLimitedDirectByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity); + try { + fill(buffer, 0, capacity, one); + buffer.limit(0); + shouldZeroesByteBuffer(buffer, offset, bytes); + } finally { + if (PlatformDependent.hasUnsafe()) { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } + + @Test + public void shouldZeroesHeapByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + final ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + shouldZeroesByteBuffer(buffer, offset, bytes); + } + + @Test + public void shouldZeroesLimitedHeapByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + final ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + buffer.limit(0); + shouldZeroesByteBuffer(buffer, offset, bytes); + } + + @Test(expected = ReadOnlyBufferException.class) + public void shouldFailWithReadOnlyHeapByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + buffer = buffer.asReadOnlyBuffer(); + shouldZeroesByteBuffer(buffer, offset, bytes); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void shouldFailIfOffsetIsGreaterOrEqualHeapByteBufferCapacity() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 0; + final int offset = 64; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + try { + shouldZeroesByteBuffer(buffer, offset, bytes); + } catch (IndexOutOfBoundsException expectedEx) { + //verify that the buffer hasn't changed + final byte[] originalContent = duplicateRemaining(buffer, 0, 0); + final byte[] expectedContent = new byte[capacity]; + Arrays.fill(expectedContent, one); + Assert.assertArrayEquals(expectedContent, originalContent); + throw expectedEx; + } + } + + @Test(expected = IllegalArgumentException.class) + public void shouldFailIfOffsetIsNegative() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 1; + final int offset = -1; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + try { + shouldZeroesByteBuffer(buffer, offset, bytes); + } catch (IndexOutOfBoundsException expectedEx) { + //verify that the buffer hasn't changed + final byte[] originalContent = duplicateRemaining(buffer, 0, 0); + final byte[] expectedContent = new byte[capacity]; + Arrays.fill(expectedContent, one); + Assert.assertArrayEquals(expectedContent, originalContent); + throw expectedEx; + } + } + + @Test(expected = IllegalArgumentException.class) + public void shouldFailIfBytesIsNegative() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = -1; + final int offset = 0; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + try { + shouldZeroesByteBuffer(buffer, offset, bytes); + } catch (IndexOutOfBoundsException expectedEx) { + //verify that the buffer hasn't changed + final byte[] originalContent = duplicateRemaining(buffer, 0, 0); + final byte[] expectedContent = new byte[capacity]; + Arrays.fill(expectedContent, one); + Assert.assertArrayEquals(expectedContent, originalContent); + throw expectedEx; + } + } + + @Test(expected = IndexOutOfBoundsException.class) + public void shouldFailIfExceedingHeapByteBufferCapacity() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 65; + final int offset = 1; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + try { + shouldZeroesByteBuffer(buffer, offset, bytes); + } catch (IndexOutOfBoundsException expectedEx) { + //verify that the buffer hasn't changed + final byte[] originalContent = duplicateRemaining(buffer, 0, 0); + final byte[] expectedContent = new byte[capacity]; + Arrays.fill(expectedContent, one); + Assert.assertArrayEquals(expectedContent, originalContent); + throw expectedEx; + } + } + + @Test public void testIntToByte() { diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java new file mode 100644 index 0000000000..d024f2ee28 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +import static org.apache.activemq.artemis.utils.PowerOf2Util.align; + +public class PowerOf2UtilTest { + + @Test + public void shouldAlignToNextMultipleOfAlignment() { + final int alignment = 512; + assertThat(align(0, alignment), is(0)); + assertThat(align(1, alignment), is(alignment)); + assertThat(align(alignment, alignment), is(alignment)); + assertThat(align(alignment + 1, alignment), is(alignment * 2)); + + final int remainder = Integer.MAX_VALUE % alignment; + final int alignedMax = Integer.MAX_VALUE - remainder; + assertThat(align(alignedMax, alignment), is(alignedMax)); + //given that Integer.MAX_VALUE is the max value that can be represented with int + //the aligned value would be > 2^32, but (int)(2^32) = Integer.MIN_VALUE due to the sign bit + assertThat(align(Integer.MAX_VALUE, alignment), is(Integer.MIN_VALUE)); + } + +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 074bebfa78..793dc701b6 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -85,11 +85,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public int calculateBlockStart(final int position) { - int alignment = factory.getAlignment(); - - int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; - - return pos; + return factory.calculateBlockSize(position); } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index 093502680c..cc3c91c48b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo; import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.PowerOf2Util; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; @@ -163,13 +164,10 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public ByteBuffer allocateDirectBuffer(final int size) { - int blocks = size / getAlignment(); - if (size % getAlignment() != 0) { - blocks++; - } + final int alignedSize = calculateBlockSize(size); // The buffer on AIO has to be a multiple of getAlignment() - ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * getAlignment(), getAlignment()); + ByteBuffer buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment()); buffer.limit(size); @@ -183,11 +181,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public ByteBuffer newBuffer(int size) { - if (size % getAlignment() != 0) { - size = (size / getAlignment() + 1) * getAlignment(); - } - - return buffersControl.newBuffer(size); + final int alignedSize = calculateBlockSize(size); + return buffersControl.newBuffer(alignedSize, true); } @Override @@ -199,22 +194,26 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public int getAlignment() { if (alignment < 0) { + alignment = calculateAlignment(journalDir); + } + return alignment; + } - File checkFile = null; - - try { - journalDir.mkdirs(); - checkFile = File.createTempFile("journalCheck", ".tmp", journalDir); - checkFile.mkdirs(); - checkFile.createNewFile(); - alignment = LibaioContext.getBlockSize(checkFile); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - alignment = 512; - } finally { - if (checkFile != null) { - checkFile.delete(); - } + private static int calculateAlignment(File journalDir) { + File checkFile = null; + int alignment; + try { + journalDir.mkdirs(); + checkFile = File.createTempFile("journalCheck", ".tmp", journalDir); + checkFile.mkdirs(); + checkFile.createNewFile(); + alignment = LibaioContext.getBlockSize(checkFile); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + alignment = 512; + } finally { + if (checkFile != null) { + checkFile.delete(); } } return alignment; @@ -230,11 +229,19 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public int calculateBlockSize(final int position) { - int alignment = getAlignment(); + final int alignment = getAlignment(); + if (!PowerOf2Util.isPowOf2(alignment)) { + return align(position, alignment); + } else { + return PowerOf2Util.align(position, alignment); + } + } - int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; - - return pos; + /** + * It can be used to align {@code size} if alignment is not a power of 2: otherwise better to use {@link PowerOf2Util#align(int, int)} instead. + */ + private static int align(int size, int alignment) { + return (size / alignment + (size % alignment != 0 ? 1 : 0)) * alignment; } /* (non-Javadoc) @@ -442,7 +449,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor return alignedBufferSize; } - public ByteBuffer newBuffer(final int size) { + public ByteBuffer newBuffer(final int size, final boolean zeroed) { // if a new buffer wasn't requested in 10 seconds, we clear the queue // This is being done this way as we don't need another Timeout Thread // just to cleanup this @@ -481,7 +488,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor buffer.limit(calculateBlockSize(size)); } else { - clearBuffer(buffer); + if (zeroed) { + clearBuffer(buffer); + } else { + buffer.position(0); + } // set the limit of the buffer to the bufferSize being required buffer.limit(calculateBlockSize(size)); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java index 05f8f25541..63a38dfac3 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java @@ -28,6 +28,7 @@ import io.netty.buffer.Unpooled; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.utils.PowerOf2Util; import org.apache.activemq.artemis.utils.Env; final class MappedFile implements AutoCloseable { @@ -196,7 +197,7 @@ final class MappedFile implements AutoCloseable { } //any that will enter has lastZeroed OS page aligned while (toZeros >= OS_PAGE_SIZE) { - assert BytesUtils.isAligned(lastZeroed, OS_PAGE_SIZE);/**/ + assert PowerOf2Util.isAligned(lastZeroed, OS_PAGE_SIZE);/**/ final long startPage = lastZeroed - OS_PAGE_SIZE; PlatformDependent.setMemory(startPage, OS_PAGE_SIZE, (byte) 0); lastZeroed = startPage; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 2cdaba1fa5..1d7d6ba11f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -18,20 +18,21 @@ package org.apache.activemq.artemis.core.io.mapped; import java.io.File; import java.nio.ByteBuffer; -import java.util.Arrays; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.util.ByteBufferPool; +import org.apache.activemq.artemis.utils.PowerOf2Util; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.Env; public final class MappedSequentialFileFactory extends AbstractSequentialFileFactory { private int capacity; private boolean bufferPooling; - //pools only the biggest one -> optimized for the common case - private final ThreadLocal bytesPool; + private final ByteBufferPool bytesPool; public MappedSequentialFileFactory(File directory, int capacity, @@ -47,7 +48,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac this.capacity = capacity; this.setDatasync(true); this.bufferPooling = true; - this.bytesPool = new ThreadLocal<>(); + this.bytesPool = ByteBufferPool.threadLocal(true); } public MappedSequentialFileFactory capacity(int capacity) { @@ -76,7 +77,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac @Override public ByteBuffer allocateDirectBuffer(final int size) { - final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize()); + final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize()); final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); byteBuffer.limit(size); return byteBuffer; @@ -98,43 +99,18 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac } @Override - public ByteBuffer newBuffer(final int size) { + public ByteBuffer newBuffer(int size) { if (!this.bufferPooling) { return allocateDirectBuffer(size); } else { - final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize()); - ByteBuffer byteBuffer = bytesPool.get(); - if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) { - //do not free the old one (if any) until the new one will be released into the pool! - byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); - } else { - bytesPool.set(null); - PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0); - byteBuffer.clear(); - } - byteBuffer.limit(size); - return byteBuffer; + return bytesPool.borrow(size, true); } } @Override public void releaseBuffer(ByteBuffer buffer) { if (this.bufferPooling) { - if (buffer.isDirect()) { - final ByteBuffer byteBuffer = bytesPool.get(); - if (byteBuffer != buffer) { - //replace with the current pooled only if greater or null - if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) { - if (byteBuffer != null) { - //free the smaller one - PlatformDependent.freeDirectBuffer(byteBuffer); - } - bytesPool.set(buffer); - } else { - PlatformDependent.freeDirectBuffer(buffer); - } - } - } + bytesPool.release(buffer); } } @@ -168,18 +144,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac @Override public void clearBuffer(final ByteBuffer buffer) { - if (buffer.isDirect()) { - BytesUtils.zerosDirect(buffer); - } else if (buffer.hasArray()) { - final byte[] array = buffer.array(); - //SIMD OPTIMIZATION - Arrays.fill(array, (byte) 0); - } else { - final int capacity = buffer.capacity(); - for (int i = 0; i < capacity; i++) { - buffer.put(i, (byte) 0); - } - } + ByteUtil.zeros(buffer); buffer.rewind(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index c14237771a..f8f5971ab9 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -26,6 +26,8 @@ import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.util.ByteBufferPool; +import org.apache.activemq.artemis.utils.PowerOf2Util; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; @@ -35,8 +37,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { private boolean bufferPooling; - //pools only the biggest one -> optimized for the common case - private final ThreadLocal bytesPool; + private final ByteBufferPool bytesPool; public NIOSequentialFileFactory(final File journalDir, final int maxIO) { this(journalDir, null, maxIO); @@ -76,7 +77,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { final CriticalAnalyzer analyzer) { super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer); this.bufferPooling = true; - this.bytesPool = new ThreadLocal<>(); + this.bytesPool = ByteBufferPool.threadLocal(true); } public static ByteBuffer allocateDirectByteBuffer(final int size) { @@ -123,13 +124,9 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { return timedBuffer != null; } - private static int align(final int value, final int pow2alignment) { - return (value + (pow2alignment - 1)) & ~(pow2alignment - 1); - } - @Override public ByteBuffer allocateDirectBuffer(final int size) { - final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT); + final int requiredCapacity = PowerOf2Util.align(size, DEFAULT_CAPACITY_ALIGNMENT); final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); byteBuffer.limit(size); return byteBuffer; @@ -141,43 +138,18 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { } @Override - public ByteBuffer newBuffer(final int size) { + public ByteBuffer newBuffer(int size) { if (!this.bufferPooling) { return allocateDirectBuffer(size); } else { - final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT); - ByteBuffer byteBuffer = bytesPool.get(); - if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) { - //do not free the old one (if any) until the new one will be released into the pool! - byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); - } else { - bytesPool.set(null); - PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0); - byteBuffer.clear(); - } - byteBuffer.limit(size); - return byteBuffer; + return bytesPool.borrow(size, true); } } @Override public void releaseBuffer(ByteBuffer buffer) { if (this.bufferPooling) { - if (buffer.isDirect()) { - final ByteBuffer byteBuffer = bytesPool.get(); - if (byteBuffer != buffer) { - //replace with the current pooled only if greater or null - if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) { - if (byteBuffer != null) { - //free the smaller one - PlatformDependent.freeDirectBuffer(byteBuffer); - } - bytesPool.set(buffer); - } else { - PlatformDependent.freeDirectBuffer(buffer); - } - } - } + bytesPool.release(buffer); } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java new file mode 100644 index 0000000000..bebb514186 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.util; + +import java.nio.ByteBuffer; + +/** + * Object Pool that allows to borrow and release {@link ByteBuffer}s according to a specific type (direct/heap).
+ * The suggested usage pattern is: + *
{@code
+ *    ByteBuffer buffer = pool.borrow(size);
+ *    //...using buffer...
+ *    pool.release(buffer);
+ * }
+ */ +public interface ByteBufferPool { + + /** + * It returns a {@link ByteBuffer} with {@link ByteBuffer#capacity()}>={@code size}.
+ * The {@code buffer} is zeroed until {@code size} if {@code zeroed=true}, with {@link ByteBuffer#position()}=0 and {@link ByteBuffer#limit()}={@code size}. + */ + ByteBuffer borrow(int size, boolean zeroed); + + /** + * It pools or free {@code buffer} that cannot be used anymore.
+ * If {@code buffer} is of a type different from the one that the pool can borrow, it will ignore it. + */ + void release(ByteBuffer buffer); + + /** + * Factory method that creates a thread-local pool of capacity 1 of {@link ByteBuffer}s of the specified type (direct/heap). + */ + static ByteBufferPool threadLocal(boolean direct) { + return new ThreadLocalByteBufferPool(direct); + } + +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java new file mode 100644 index 0000000000..eee04668bd --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.util; + +import java.nio.ByteBuffer; +import java.util.Objects; + +import io.netty.util.internal.PlatformDependent; +import org.apache.activemq.artemis.utils.PowerOf2Util; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.artemis.utils.Env; + +final class ThreadLocalByteBufferPool implements ByteBufferPool { + + private final ThreadLocal bytesPool; + private final boolean direct; + + ThreadLocalByteBufferPool(boolean direct) { + this.bytesPool = new ThreadLocal<>(); + this.direct = direct; + } + + @Override + public ByteBuffer borrow(final int size, boolean zeroed) { + final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize()); + ByteBuffer byteBuffer = bytesPool.get(); + if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) { + //do not free the old one (if any) until the new one will be released into the pool! + byteBuffer = direct ? ByteBuffer.allocateDirect(requiredCapacity) : ByteBuffer.allocate(requiredCapacity); + } else { + bytesPool.set(null); + if (zeroed) { + ByteUtil.zeros(byteBuffer, 0, size); + } + byteBuffer.clear(); + } + byteBuffer.limit(size); + return byteBuffer; + } + + @Override + public void release(ByteBuffer buffer) { + Objects.requireNonNull(buffer); + boolean directBuffer = buffer.isDirect(); + if (directBuffer == direct && !buffer.isReadOnly()) { + final ByteBuffer byteBuffer = bytesPool.get(); + if (byteBuffer != buffer) { + //replace with the current pooled only if greater or null + if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) { + if (byteBuffer != null) { + //free the smaller one + if (directBuffer) { + PlatformDependent.freeDirectBuffer(byteBuffer); + } + } + bytesPool.set(buffer); + } else { + if (directBuffer) { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } + } + } + +} diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java new file mode 100644 index 0000000000..190d7de401 --- /dev/null +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.util; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import io.netty.util.internal.PlatformDependent; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ThreadLocalByteBufferPoolTest { + + //testing using heap buffers to avoid killing the test suite + private static final boolean isDirect = false; + private final ByteBufferPool pool = ByteBufferPool.threadLocal(isDirect); + private final boolean zeroed; + + public ThreadLocalByteBufferPoolTest(boolean zeroed) { + this.zeroed = zeroed; + } + + @Parameterized.Parameters(name = "zeroed={0}") + public static Collection getParams() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + } + + private static void assertZeroed(ByteBuffer buffer) { + ByteBuffer bb = buffer.slice(); + final byte[] content = new byte[bb.remaining()]; + bb.get(content); + final byte[] zeroed = new byte[content.length]; + Arrays.fill(zeroed, (byte) 0); + Assert.assertArrayEquals(zeroed, content); + } + + @Test + public void shouldBorrowOnlyBuffersOfTheCorrectType() { + Assert.assertEquals(isDirect, pool.borrow(0, zeroed).isDirect()); + } + + @Test + public void shouldBorrowZeroedBuffer() { + final int size = 32; + final ByteBuffer buffer = pool.borrow(size, zeroed); + Assert.assertEquals(0, buffer.position()); + Assert.assertEquals(size, buffer.limit()); + if (zeroed) { + assertZeroed(buffer); + } + } + + @Test + public void shouldBorrowTheSameBuffer() { + final int size = 32; + final ByteBuffer buffer = pool.borrow(size, zeroed); + buffer.put(0, (byte) 1); + buffer.position(1); + buffer.limit(2); + pool.release(buffer); + final int newSize = size - 1; + final ByteBuffer sameBuffer = pool.borrow(newSize, zeroed); + Assert.assertSame(buffer, sameBuffer); + Assert.assertEquals(0, sameBuffer.position()); + Assert.assertEquals(newSize, sameBuffer.limit()); + if (zeroed) { + assertZeroed(sameBuffer); + } + } + + @Test + public void shouldBorrowNewBufferIfExceedPooledCapacity() { + final int size = 32; + final ByteBuffer buffer = pool.borrow(size, zeroed); + pool.release(buffer); + final int newSize = buffer.capacity() + 1; + final ByteBuffer differentBuffer = pool.borrow(newSize, zeroed); + Assert.assertNotSame(buffer, differentBuffer); + } + + @Test + public void shouldPoolTheBiggestBuffer() { + final int size = 32; + final ByteBuffer small = pool.borrow(size, zeroed); + final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed); + pool.release(small); + big.limit(0); + pool.release(big); + Assert.assertSame(big, pool.borrow(big.capacity(), zeroed)); + } + + @Test + public void shouldNotPoolTheSmallestBuffer() { + final int size = 32; + final ByteBuffer small = pool.borrow(size, zeroed); + final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed); + big.limit(0); + pool.release(big); + pool.release(small); + Assert.assertSame(big, pool.borrow(big.capacity(), zeroed)); + } + + @Test + public void shouldNotPoolBufferOfDifferentType() { + final int size = 32; + final ByteBuffer buffer = isDirect ? ByteBuffer.allocate(size) : ByteBuffer.allocateDirect(size); + try { + pool.release(buffer); + Assert.assertNotSame(buffer, pool.borrow(size, zeroed)); + } catch (Throwable t) { + if (PlatformDependent.hasUnsafe()) { + if (buffer.isDirect()) { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } + } + + @Test + public void shouldNotPoolReadOnlyBuffer() { + final int size = 32; + final ByteBuffer borrow = pool.borrow(size, zeroed); + final ByteBuffer readOnlyBuffer = borrow.asReadOnlyBuffer(); + pool.release(readOnlyBuffer); + Assert.assertNotSame(readOnlyBuffer, pool.borrow(size, zeroed)); + } + + @Test(expected = NullPointerException.class) + public void shouldFailPoolingNullBuffer() { + pool.release(null); + } + + @Test(expected = NullPointerException.class) + public void shouldFailPoolingNullBufferIfNotEmpty() { + final int size = 32; + pool.release(pool.borrow(size, zeroed)); + pool.release(null); + } + + @Test + public void shouldBorrowOnlyThreadLocalBuffers() throws ExecutionException, InterruptedException { + final int size = 32; + final ByteBuffer buffer = pool.borrow(size, zeroed); + pool.release(buffer); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Assert.assertNotSame(buffer, executor.submit(() -> pool.borrow(size, zeroed)).get()); + } finally { + executor.shutdown(); + } + } + +}