From 89c02f1cc013181b688ae3be0d9639bd25544dd7 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 20 Dec 2018 11:11:36 +0100 Subject: [PATCH] ARTEMIS-2211 Refactor ByteBuffer pooling, alignment and zeroing Refactored thread local ByteBuffer pooling, alignment and zeroing in order to avoid duplicate code and improve code coverage with tests. In addition are being provided faster branchless alignment operations and optional zeroing of pooled ByteBuffers for both ASYNCIO and NIO/MAPPED journal types. --- .../activemq/artemis/utils/ByteUtil.java | 50 ++++ .../activemq/artemis/utils/PowerOf2Util.java | 38 ++- .../activemq/artemis/utils/ByteUtilTest.java | 218 ++++++++++++++++++ .../artemis/utils/PowerOf2UtilTest.java | 44 ++++ .../core/io/aio/AIOSequentialFile.java | 6 +- .../core/io/aio/AIOSequentialFileFactory.java | 73 +++--- .../artemis/core/io/mapped/MappedFile.java | 3 +- .../mapped/MappedSequentialFileFactory.java | 55 +---- .../core/io/nio/NIOSequentialFileFactory.java | 44 +--- .../artemis/core/io/util/ByteBufferPool.java | 51 ++++ .../io/util/ThreadLocalByteBufferPool.java | 80 +++++++ .../util/ThreadLocalByteBufferPoolTest.java | 174 ++++++++++++++ 12 files changed, 693 insertions(+), 143 deletions(-) rename artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java => artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java (66%) create mode 100644 artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java create mode 100644 artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java create mode 100644 artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java create mode 100644 artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java index bf0ac87ad2..88c6d49583 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java @@ -17,6 +17,8 @@ package org.apache.activemq.artemis.utils; import java.nio.ByteBuffer; +import java.nio.ReadOnlyBufferException; +import java.util.Arrays; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -342,4 +344,52 @@ public class ByteUtil { public static int intFromBytes(byte b1, byte b2, byte b3, byte b4) { return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF); } + + /** + * It zeroes the whole {@link ByteBuffer#capacity()} of the given {@code buffer}. + * + * @throws ReadOnlyBufferException if {@code buffer} is read-only + */ + public static void zeros(final ByteBuffer buffer) { + uncheckedZeros(buffer, 0, buffer.capacity()); + } + + /** + * It zeroes {@code bytes} of the given {@code buffer}, starting (inclusive) from {@code offset}. + * + * @throws IndexOutOfBoundsException if {@code offset + bytes > }{@link ByteBuffer#capacity()} or {@code offset >= }{@link ByteBuffer#capacity()} + * @throws IllegalArgumentException if {@code offset} or {@code capacity} are less then 0 + * @throws ReadOnlyBufferException if {@code buffer} is read-only + */ + public static void zeros(final ByteBuffer buffer, int offset, int bytes) { + if (offset < 0 || bytes < 0) { + throw new IllegalArgumentException(); + } + final int capacity = buffer.capacity(); + if (offset >= capacity || (offset + bytes) > capacity) { + throw new IndexOutOfBoundsException(); + } + uncheckedZeros(buffer, offset, bytes); + } + + private static void uncheckedZeros(final ByteBuffer buffer, int offset, int bytes) { + if (buffer.isReadOnly()) { + throw new ReadOnlyBufferException(); + } + final byte zero = (byte) 0; + if (buffer.isDirect() && PlatformDependent.hasUnsafe()) { + PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer) + offset, bytes, zero); + } else if (buffer.hasArray()) { + //SIMD OPTIMIZATION + final int arrayOffset = buffer.arrayOffset(); + final int start = arrayOffset + offset; + Arrays.fill(buffer.array(), start, start + bytes, zero); + } else { + //slow path + for (int i = 0; i < bytes; i++) { + buffer.put(i + offset, zero); + } + } + } + } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java similarity index 66% rename from artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java rename to artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java index 986b69839a..fd60402759 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/BytesUtils.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/PowerOf2Util.java @@ -14,19 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.core.io.mapped; +package org.apache.activemq.artemis.utils; -import java.nio.ByteBuffer; +/** + * Collection of bit-tricks for power of 2 cases. + */ +public final class PowerOf2Util { -import io.netty.util.internal.PlatformDependent; - -final class BytesUtils { - - private BytesUtils() { + private PowerOf2Util() { } - public static long align(final long value, final long alignment) { - return (value + (alignment - 1)) & ~(alignment - 1); + /** + * Fast alignment operation with power of 2 {@code alignment} and {@code value >=0} and {@code value <}{@link Integer#MAX_VALUE}.
+ * In order to be fast is up to the caller to check arguments correctness. + * Original algorithm is on https://en.wikipedia.org/wiki/Data_structure_alignment. + */ + public static int align(final int value, final int pow2alignment) { + return (value + (pow2alignment - 1)) & ~(pow2alignment - 1); } /** @@ -54,20 +58,4 @@ final class BytesUtils { return (value & (pow2alignment - 1)) == 0; } - public static void zerosDirect(final ByteBuffer buffer) { - //DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops - int remaining = buffer.capacity(); - long address = PlatformDependent.directBufferAddress(buffer); - while (remaining >= 8) { - PlatformDependent.putLong(address, 0L); - address += 8; - remaining -= 8; - } - while (remaining > 0) { - PlatformDependent.putByte(address, (byte) 0); - address++; - remaining--; - } - } - } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java index 80aa7e5c98..000ae56c24 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ByteUtilTest.java @@ -17,6 +17,11 @@ package org.apache.activemq.artemis.utils; import java.nio.ByteBuffer; +import java.nio.ByteBuffer; +import java.nio.ReadOnlyBufferException; +import java.util.Arrays; + +import io.netty.util.internal.PlatformDependent; import org.junit.Assert; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -87,6 +92,219 @@ public class ByteUtilTest { } } + private static byte[] duplicateRemaining(ByteBuffer buffer, int offset, int bytes) { + final int end = offset + bytes; + final int expectedRemaining = buffer.capacity() - end; + //it is handling the case of <0 just to allow from to > capacity + if (expectedRemaining <= 0) { + return null; + } + final byte[] remaining = new byte[expectedRemaining]; + final ByteBuffer duplicate = buffer.duplicate(); + duplicate.clear().position(end); + duplicate.get(remaining, 0, expectedRemaining); + return remaining; + } + + private static byte[] duplicateBefore(ByteBuffer buffer, int offset) { + if (offset <= 0) { + return null; + } + final int size = Math.min(buffer.capacity(), offset); + final byte[] remaining = new byte[size]; + final ByteBuffer duplicate = buffer.duplicate(); + duplicate.clear(); + duplicate.get(remaining, 0, size); + return remaining; + } + + private static void shouldZeroesByteBuffer(ByteBuffer buffer, int offset, int bytes) { + final byte[] originalBefore = duplicateBefore(buffer, offset); + final byte[] originalRemaining = duplicateRemaining(buffer, offset, bytes); + final int position = buffer.position(); + final int limit = buffer.limit(); + ByteUtil.zeros(buffer, offset, bytes); + Assert.assertEquals(position, buffer.position()); + Assert.assertEquals(limit, buffer.limit()); + final byte[] zeros = new byte[bytes]; + final byte[] content = new byte[bytes]; + final ByteBuffer duplicate = buffer.duplicate(); + duplicate.clear().position(offset); + duplicate.get(content, 0, bytes); + Assert.assertArrayEquals(zeros, content); + if (originalRemaining != null) { + final byte[] remaining = new byte[duplicate.remaining()]; + //duplicate position has been moved of bytes + duplicate.get(remaining); + Assert.assertArrayEquals(originalRemaining, remaining); + } + if (originalBefore != null) { + final byte[] before = new byte[offset]; + //duplicate position has been moved of bytes: need to reset it + duplicate.position(0); + duplicate.get(before); + Assert.assertArrayEquals(originalBefore, before); + } + } + + private ByteBuffer fill(ByteBuffer buffer, int offset, int length, byte value) { + for (int i = 0; i < length; i++) { + buffer.put(offset + i, value); + } + return buffer; + } + + @Test + public void shouldZeroesDirectByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity); + try { + fill(buffer, 0, capacity, one); + shouldZeroesByteBuffer(buffer, offset, bytes); + } finally { + if (PlatformDependent.hasUnsafe()) { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } + + @Test + public void shouldZeroesLimitedDirectByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity); + try { + fill(buffer, 0, capacity, one); + buffer.limit(0); + shouldZeroesByteBuffer(buffer, offset, bytes); + } finally { + if (PlatformDependent.hasUnsafe()) { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } + + @Test + public void shouldZeroesHeapByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + final ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + shouldZeroesByteBuffer(buffer, offset, bytes); + } + + @Test + public void shouldZeroesLimitedHeapByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + final ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + buffer.limit(0); + shouldZeroesByteBuffer(buffer, offset, bytes); + } + + @Test(expected = ReadOnlyBufferException.class) + public void shouldFailWithReadOnlyHeapByteBuffer() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 32; + final int offset = 1; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + buffer = buffer.asReadOnlyBuffer(); + shouldZeroesByteBuffer(buffer, offset, bytes); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void shouldFailIfOffsetIsGreaterOrEqualHeapByteBufferCapacity() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 0; + final int offset = 64; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + try { + shouldZeroesByteBuffer(buffer, offset, bytes); + } catch (IndexOutOfBoundsException expectedEx) { + //verify that the buffer hasn't changed + final byte[] originalContent = duplicateRemaining(buffer, 0, 0); + final byte[] expectedContent = new byte[capacity]; + Arrays.fill(expectedContent, one); + Assert.assertArrayEquals(expectedContent, originalContent); + throw expectedEx; + } + } + + @Test(expected = IllegalArgumentException.class) + public void shouldFailIfOffsetIsNegative() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 1; + final int offset = -1; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + try { + shouldZeroesByteBuffer(buffer, offset, bytes); + } catch (IndexOutOfBoundsException expectedEx) { + //verify that the buffer hasn't changed + final byte[] originalContent = duplicateRemaining(buffer, 0, 0); + final byte[] expectedContent = new byte[capacity]; + Arrays.fill(expectedContent, one); + Assert.assertArrayEquals(expectedContent, originalContent); + throw expectedEx; + } + } + + @Test(expected = IllegalArgumentException.class) + public void shouldFailIfBytesIsNegative() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = -1; + final int offset = 0; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + try { + shouldZeroesByteBuffer(buffer, offset, bytes); + } catch (IndexOutOfBoundsException expectedEx) { + //verify that the buffer hasn't changed + final byte[] originalContent = duplicateRemaining(buffer, 0, 0); + final byte[] expectedContent = new byte[capacity]; + Arrays.fill(expectedContent, one); + Assert.assertArrayEquals(expectedContent, originalContent); + throw expectedEx; + } + } + + @Test(expected = IndexOutOfBoundsException.class) + public void shouldFailIfExceedingHeapByteBufferCapacity() { + final byte one = (byte) 1; + final int capacity = 64; + final int bytes = 65; + final int offset = 1; + ByteBuffer buffer = ByteBuffer.allocate(capacity); + fill(buffer, 0, capacity, one); + try { + shouldZeroesByteBuffer(buffer, offset, bytes); + } catch (IndexOutOfBoundsException expectedEx) { + //verify that the buffer hasn't changed + final byte[] originalContent = duplicateRemaining(buffer, 0, 0); + final byte[] expectedContent = new byte[capacity]; + Arrays.fill(expectedContent, one); + Assert.assertArrayEquals(expectedContent, originalContent); + throw expectedEx; + } + } + + @Test public void testIntToByte() { diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java new file mode 100644 index 0000000000..d024f2ee28 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/PowerOf2UtilTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +import static org.apache.activemq.artemis.utils.PowerOf2Util.align; + +public class PowerOf2UtilTest { + + @Test + public void shouldAlignToNextMultipleOfAlignment() { + final int alignment = 512; + assertThat(align(0, alignment), is(0)); + assertThat(align(1, alignment), is(alignment)); + assertThat(align(alignment, alignment), is(alignment)); + assertThat(align(alignment + 1, alignment), is(alignment * 2)); + + final int remainder = Integer.MAX_VALUE % alignment; + final int alignedMax = Integer.MAX_VALUE - remainder; + assertThat(align(alignedMax, alignment), is(alignedMax)); + //given that Integer.MAX_VALUE is the max value that can be represented with int + //the aligned value would be > 2^32, but (int)(2^32) = Integer.MIN_VALUE due to the sign bit + assertThat(align(Integer.MAX_VALUE, alignment), is(Integer.MIN_VALUE)); + } + +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java index 074bebfa78..793dc701b6 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFile.java @@ -85,11 +85,7 @@ public class AIOSequentialFile extends AbstractSequentialFile { @Override public int calculateBlockStart(final int position) { - int alignment = factory.getAlignment(); - - int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; - - return pos; + return factory.calculateBlockSize(position); } @Override diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java index 093502680c..cc3c91c48b 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/aio/AIOSequentialFileFactory.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile; import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo; import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache; import org.apache.activemq.artemis.journal.ActiveMQJournalLogger; +import org.apache.activemq.artemis.utils.PowerOf2Util; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; @@ -163,13 +164,10 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public ByteBuffer allocateDirectBuffer(final int size) { - int blocks = size / getAlignment(); - if (size % getAlignment() != 0) { - blocks++; - } + final int alignedSize = calculateBlockSize(size); // The buffer on AIO has to be a multiple of getAlignment() - ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * getAlignment(), getAlignment()); + ByteBuffer buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment()); buffer.limit(size); @@ -183,11 +181,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public ByteBuffer newBuffer(int size) { - if (size % getAlignment() != 0) { - size = (size / getAlignment() + 1) * getAlignment(); - } - - return buffersControl.newBuffer(size); + final int alignedSize = calculateBlockSize(size); + return buffersControl.newBuffer(alignedSize, true); } @Override @@ -199,22 +194,26 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public int getAlignment() { if (alignment < 0) { + alignment = calculateAlignment(journalDir); + } + return alignment; + } - File checkFile = null; - - try { - journalDir.mkdirs(); - checkFile = File.createTempFile("journalCheck", ".tmp", journalDir); - checkFile.mkdirs(); - checkFile.createNewFile(); - alignment = LibaioContext.getBlockSize(checkFile); - } catch (Throwable e) { - logger.warn(e.getMessage(), e); - alignment = 512; - } finally { - if (checkFile != null) { - checkFile.delete(); - } + private static int calculateAlignment(File journalDir) { + File checkFile = null; + int alignment; + try { + journalDir.mkdirs(); + checkFile = File.createTempFile("journalCheck", ".tmp", journalDir); + checkFile.mkdirs(); + checkFile.createNewFile(); + alignment = LibaioContext.getBlockSize(checkFile); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + alignment = 512; + } finally { + if (checkFile != null) { + checkFile.delete(); } } return alignment; @@ -230,11 +229,19 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor @Override public int calculateBlockSize(final int position) { - int alignment = getAlignment(); + final int alignment = getAlignment(); + if (!PowerOf2Util.isPowOf2(alignment)) { + return align(position, alignment); + } else { + return PowerOf2Util.align(position, alignment); + } + } - int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; - - return pos; + /** + * It can be used to align {@code size} if alignment is not a power of 2: otherwise better to use {@link PowerOf2Util#align(int, int)} instead. + */ + private static int align(int size, int alignment) { + return (size / alignment + (size % alignment != 0 ? 1 : 0)) * alignment; } /* (non-Javadoc) @@ -442,7 +449,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor return alignedBufferSize; } - public ByteBuffer newBuffer(final int size) { + public ByteBuffer newBuffer(final int size, final boolean zeroed) { // if a new buffer wasn't requested in 10 seconds, we clear the queue // This is being done this way as we don't need another Timeout Thread // just to cleanup this @@ -481,7 +488,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor buffer.limit(calculateBlockSize(size)); } else { - clearBuffer(buffer); + if (zeroed) { + clearBuffer(buffer); + } else { + buffer.position(0); + } // set the limit of the buffer to the bufferSize being required buffer.limit(calculateBlockSize(size)); diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java index 05f8f25541..63a38dfac3 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedFile.java @@ -28,6 +28,7 @@ import io.netty.buffer.Unpooled; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.utils.PowerOf2Util; import org.apache.activemq.artemis.utils.Env; final class MappedFile implements AutoCloseable { @@ -196,7 +197,7 @@ final class MappedFile implements AutoCloseable { } //any that will enter has lastZeroed OS page aligned while (toZeros >= OS_PAGE_SIZE) { - assert BytesUtils.isAligned(lastZeroed, OS_PAGE_SIZE);/**/ + assert PowerOf2Util.isAligned(lastZeroed, OS_PAGE_SIZE);/**/ final long startPage = lastZeroed - OS_PAGE_SIZE; PlatformDependent.setMemory(startPage, OS_PAGE_SIZE, (byte) 0); lastZeroed = startPage; diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java index 2cdaba1fa5..1d7d6ba11f 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/MappedSequentialFileFactory.java @@ -18,20 +18,21 @@ package org.apache.activemq.artemis.core.io.mapped; import java.io.File; import java.nio.ByteBuffer; -import java.util.Arrays; import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.util.ByteBufferPool; +import org.apache.activemq.artemis.utils.PowerOf2Util; +import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.Env; public final class MappedSequentialFileFactory extends AbstractSequentialFileFactory { private int capacity; private boolean bufferPooling; - //pools only the biggest one -> optimized for the common case - private final ThreadLocal bytesPool; + private final ByteBufferPool bytesPool; public MappedSequentialFileFactory(File directory, int capacity, @@ -47,7 +48,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac this.capacity = capacity; this.setDatasync(true); this.bufferPooling = true; - this.bytesPool = new ThreadLocal<>(); + this.bytesPool = ByteBufferPool.threadLocal(true); } public MappedSequentialFileFactory capacity(int capacity) { @@ -76,7 +77,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac @Override public ByteBuffer allocateDirectBuffer(final int size) { - final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize()); + final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize()); final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); byteBuffer.limit(size); return byteBuffer; @@ -98,43 +99,18 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac } @Override - public ByteBuffer newBuffer(final int size) { + public ByteBuffer newBuffer(int size) { if (!this.bufferPooling) { return allocateDirectBuffer(size); } else { - final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize()); - ByteBuffer byteBuffer = bytesPool.get(); - if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) { - //do not free the old one (if any) until the new one will be released into the pool! - byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); - } else { - bytesPool.set(null); - PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0); - byteBuffer.clear(); - } - byteBuffer.limit(size); - return byteBuffer; + return bytesPool.borrow(size, true); } } @Override public void releaseBuffer(ByteBuffer buffer) { if (this.bufferPooling) { - if (buffer.isDirect()) { - final ByteBuffer byteBuffer = bytesPool.get(); - if (byteBuffer != buffer) { - //replace with the current pooled only if greater or null - if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) { - if (byteBuffer != null) { - //free the smaller one - PlatformDependent.freeDirectBuffer(byteBuffer); - } - bytesPool.set(buffer); - } else { - PlatformDependent.freeDirectBuffer(buffer); - } - } - } + bytesPool.release(buffer); } } @@ -168,18 +144,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac @Override public void clearBuffer(final ByteBuffer buffer) { - if (buffer.isDirect()) { - BytesUtils.zerosDirect(buffer); - } else if (buffer.hasArray()) { - final byte[] array = buffer.array(); - //SIMD OPTIMIZATION - Arrays.fill(array, (byte) 0); - } else { - final int capacity = buffer.capacity(); - for (int i = 0; i < capacity; i++) { - buffer.put(i, (byte) 0); - } - } + ByteUtil.zeros(buffer); buffer.rewind(); } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java index c14237771a..f8f5971ab9 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java @@ -26,6 +26,8 @@ import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.SequentialFile; +import org.apache.activemq.artemis.core.io.util.ByteBufferPool; +import org.apache.activemq.artemis.utils.PowerOf2Util; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; @@ -35,8 +37,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { private boolean bufferPooling; - //pools only the biggest one -> optimized for the common case - private final ThreadLocal bytesPool; + private final ByteBufferPool bytesPool; public NIOSequentialFileFactory(final File journalDir, final int maxIO) { this(journalDir, null, maxIO); @@ -76,7 +77,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { final CriticalAnalyzer analyzer) { super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer); this.bufferPooling = true; - this.bytesPool = new ThreadLocal<>(); + this.bytesPool = ByteBufferPool.threadLocal(true); } public static ByteBuffer allocateDirectByteBuffer(final int size) { @@ -123,13 +124,9 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { return timedBuffer != null; } - private static int align(final int value, final int pow2alignment) { - return (value + (pow2alignment - 1)) & ~(pow2alignment - 1); - } - @Override public ByteBuffer allocateDirectBuffer(final int size) { - final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT); + final int requiredCapacity = PowerOf2Util.align(size, DEFAULT_CAPACITY_ALIGNMENT); final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); byteBuffer.limit(size); return byteBuffer; @@ -141,43 +138,18 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory { } @Override - public ByteBuffer newBuffer(final int size) { + public ByteBuffer newBuffer(int size) { if (!this.bufferPooling) { return allocateDirectBuffer(size); } else { - final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT); - ByteBuffer byteBuffer = bytesPool.get(); - if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) { - //do not free the old one (if any) until the new one will be released into the pool! - byteBuffer = ByteBuffer.allocateDirect(requiredCapacity); - } else { - bytesPool.set(null); - PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0); - byteBuffer.clear(); - } - byteBuffer.limit(size); - return byteBuffer; + return bytesPool.borrow(size, true); } } @Override public void releaseBuffer(ByteBuffer buffer) { if (this.bufferPooling) { - if (buffer.isDirect()) { - final ByteBuffer byteBuffer = bytesPool.get(); - if (byteBuffer != buffer) { - //replace with the current pooled only if greater or null - if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) { - if (byteBuffer != null) { - //free the smaller one - PlatformDependent.freeDirectBuffer(byteBuffer); - } - bytesPool.set(buffer); - } else { - PlatformDependent.freeDirectBuffer(buffer); - } - } - } + bytesPool.release(buffer); } } diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java new file mode 100644 index 0000000000..bebb514186 --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ByteBufferPool.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.util; + +import java.nio.ByteBuffer; + +/** + * Object Pool that allows to borrow and release {@link ByteBuffer}s according to a specific type (direct/heap).
+ * The suggested usage pattern is: + *
{@code
+ *    ByteBuffer buffer = pool.borrow(size);
+ *    //...using buffer...
+ *    pool.release(buffer);
+ * }
+ */ +public interface ByteBufferPool { + + /** + * It returns a {@link ByteBuffer} with {@link ByteBuffer#capacity()}>={@code size}.
+ * The {@code buffer} is zeroed until {@code size} if {@code zeroed=true}, with {@link ByteBuffer#position()}=0 and {@link ByteBuffer#limit()}={@code size}. + */ + ByteBuffer borrow(int size, boolean zeroed); + + /** + * It pools or free {@code buffer} that cannot be used anymore.
+ * If {@code buffer} is of a type different from the one that the pool can borrow, it will ignore it. + */ + void release(ByteBuffer buffer); + + /** + * Factory method that creates a thread-local pool of capacity 1 of {@link ByteBuffer}s of the specified type (direct/heap). + */ + static ByteBufferPool threadLocal(boolean direct) { + return new ThreadLocalByteBufferPool(direct); + } + +} diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java new file mode 100644 index 0000000000..eee04668bd --- /dev/null +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPool.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.util; + +import java.nio.ByteBuffer; +import java.util.Objects; + +import io.netty.util.internal.PlatformDependent; +import org.apache.activemq.artemis.utils.PowerOf2Util; +import org.apache.activemq.artemis.utils.ByteUtil; +import org.apache.activemq.artemis.utils.Env; + +final class ThreadLocalByteBufferPool implements ByteBufferPool { + + private final ThreadLocal bytesPool; + private final boolean direct; + + ThreadLocalByteBufferPool(boolean direct) { + this.bytesPool = new ThreadLocal<>(); + this.direct = direct; + } + + @Override + public ByteBuffer borrow(final int size, boolean zeroed) { + final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize()); + ByteBuffer byteBuffer = bytesPool.get(); + if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) { + //do not free the old one (if any) until the new one will be released into the pool! + byteBuffer = direct ? ByteBuffer.allocateDirect(requiredCapacity) : ByteBuffer.allocate(requiredCapacity); + } else { + bytesPool.set(null); + if (zeroed) { + ByteUtil.zeros(byteBuffer, 0, size); + } + byteBuffer.clear(); + } + byteBuffer.limit(size); + return byteBuffer; + } + + @Override + public void release(ByteBuffer buffer) { + Objects.requireNonNull(buffer); + boolean directBuffer = buffer.isDirect(); + if (directBuffer == direct && !buffer.isReadOnly()) { + final ByteBuffer byteBuffer = bytesPool.get(); + if (byteBuffer != buffer) { + //replace with the current pooled only if greater or null + if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) { + if (byteBuffer != null) { + //free the smaller one + if (directBuffer) { + PlatformDependent.freeDirectBuffer(byteBuffer); + } + } + bytesPool.set(buffer); + } else { + if (directBuffer) { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } + } + } + +} diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java new file mode 100644 index 0000000000..190d7de401 --- /dev/null +++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/util/ThreadLocalByteBufferPoolTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.io.util; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import io.netty.util.internal.PlatformDependent; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ThreadLocalByteBufferPoolTest { + + //testing using heap buffers to avoid killing the test suite + private static final boolean isDirect = false; + private final ByteBufferPool pool = ByteBufferPool.threadLocal(isDirect); + private final boolean zeroed; + + public ThreadLocalByteBufferPoolTest(boolean zeroed) { + this.zeroed = zeroed; + } + + @Parameterized.Parameters(name = "zeroed={0}") + public static Collection getParams() { + return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); + } + + private static void assertZeroed(ByteBuffer buffer) { + ByteBuffer bb = buffer.slice(); + final byte[] content = new byte[bb.remaining()]; + bb.get(content); + final byte[] zeroed = new byte[content.length]; + Arrays.fill(zeroed, (byte) 0); + Assert.assertArrayEquals(zeroed, content); + } + + @Test + public void shouldBorrowOnlyBuffersOfTheCorrectType() { + Assert.assertEquals(isDirect, pool.borrow(0, zeroed).isDirect()); + } + + @Test + public void shouldBorrowZeroedBuffer() { + final int size = 32; + final ByteBuffer buffer = pool.borrow(size, zeroed); + Assert.assertEquals(0, buffer.position()); + Assert.assertEquals(size, buffer.limit()); + if (zeroed) { + assertZeroed(buffer); + } + } + + @Test + public void shouldBorrowTheSameBuffer() { + final int size = 32; + final ByteBuffer buffer = pool.borrow(size, zeroed); + buffer.put(0, (byte) 1); + buffer.position(1); + buffer.limit(2); + pool.release(buffer); + final int newSize = size - 1; + final ByteBuffer sameBuffer = pool.borrow(newSize, zeroed); + Assert.assertSame(buffer, sameBuffer); + Assert.assertEquals(0, sameBuffer.position()); + Assert.assertEquals(newSize, sameBuffer.limit()); + if (zeroed) { + assertZeroed(sameBuffer); + } + } + + @Test + public void shouldBorrowNewBufferIfExceedPooledCapacity() { + final int size = 32; + final ByteBuffer buffer = pool.borrow(size, zeroed); + pool.release(buffer); + final int newSize = buffer.capacity() + 1; + final ByteBuffer differentBuffer = pool.borrow(newSize, zeroed); + Assert.assertNotSame(buffer, differentBuffer); + } + + @Test + public void shouldPoolTheBiggestBuffer() { + final int size = 32; + final ByteBuffer small = pool.borrow(size, zeroed); + final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed); + pool.release(small); + big.limit(0); + pool.release(big); + Assert.assertSame(big, pool.borrow(big.capacity(), zeroed)); + } + + @Test + public void shouldNotPoolTheSmallestBuffer() { + final int size = 32; + final ByteBuffer small = pool.borrow(size, zeroed); + final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed); + big.limit(0); + pool.release(big); + pool.release(small); + Assert.assertSame(big, pool.borrow(big.capacity(), zeroed)); + } + + @Test + public void shouldNotPoolBufferOfDifferentType() { + final int size = 32; + final ByteBuffer buffer = isDirect ? ByteBuffer.allocate(size) : ByteBuffer.allocateDirect(size); + try { + pool.release(buffer); + Assert.assertNotSame(buffer, pool.borrow(size, zeroed)); + } catch (Throwable t) { + if (PlatformDependent.hasUnsafe()) { + if (buffer.isDirect()) { + PlatformDependent.freeDirectBuffer(buffer); + } + } + } + } + + @Test + public void shouldNotPoolReadOnlyBuffer() { + final int size = 32; + final ByteBuffer borrow = pool.borrow(size, zeroed); + final ByteBuffer readOnlyBuffer = borrow.asReadOnlyBuffer(); + pool.release(readOnlyBuffer); + Assert.assertNotSame(readOnlyBuffer, pool.borrow(size, zeroed)); + } + + @Test(expected = NullPointerException.class) + public void shouldFailPoolingNullBuffer() { + pool.release(null); + } + + @Test(expected = NullPointerException.class) + public void shouldFailPoolingNullBufferIfNotEmpty() { + final int size = 32; + pool.release(pool.borrow(size, zeroed)); + pool.release(null); + } + + @Test + public void shouldBorrowOnlyThreadLocalBuffers() throws ExecutionException, InterruptedException { + final int size = 32; + final ByteBuffer buffer = pool.borrow(size, zeroed); + pool.release(buffer); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + Assert.assertNotSame(buffer, executor.submit(() -> pool.borrow(size, zeroed)).get()); + } finally { + executor.shutdown(); + } + } + +}