ARTEMIS-2211 Refactor ByteBuffer pooling, alignment and zeroing

Refactored thread local ByteBuffer pooling, alignment
and zeroing in order to avoid duplicate code and
improve code coverage with tests.
In addition are being provided faster branchless
alignment operations and optional zeroing of
pooled ByteBuffers for both ASYNCIO and
NIO/MAPPED journal types.
This commit is contained in:
Francesco Nigro 2018-12-20 11:11:36 +01:00 committed by Clebert Suconic
parent eca3c6ccd3
commit 89c02f1cc0
12 changed files with 693 additions and 143 deletions

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -342,4 +344,52 @@ public class ByteUtil {
public static int intFromBytes(byte b1, byte b2, byte b3, byte b4) {
return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF);
}
/**
* It zeroes the whole {@link ByteBuffer#capacity()} of the given {@code buffer}.
*
* @throws ReadOnlyBufferException if {@code buffer} is read-only
*/
public static void zeros(final ByteBuffer buffer) {
uncheckedZeros(buffer, 0, buffer.capacity());
}
/**
* It zeroes {@code bytes} of the given {@code buffer}, starting (inclusive) from {@code offset}.
*
* @throws IndexOutOfBoundsException if {@code offset + bytes > }{@link ByteBuffer#capacity()} or {@code offset >= }{@link ByteBuffer#capacity()}
* @throws IllegalArgumentException if {@code offset} or {@code capacity} are less then 0
* @throws ReadOnlyBufferException if {@code buffer} is read-only
*/
public static void zeros(final ByteBuffer buffer, int offset, int bytes) {
if (offset < 0 || bytes < 0) {
throw new IllegalArgumentException();
}
final int capacity = buffer.capacity();
if (offset >= capacity || (offset + bytes) > capacity) {
throw new IndexOutOfBoundsException();
}
uncheckedZeros(buffer, offset, bytes);
}
private static void uncheckedZeros(final ByteBuffer buffer, int offset, int bytes) {
if (buffer.isReadOnly()) {
throw new ReadOnlyBufferException();
}
final byte zero = (byte) 0;
if (buffer.isDirect() && PlatformDependent.hasUnsafe()) {
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer) + offset, bytes, zero);
} else if (buffer.hasArray()) {
//SIMD OPTIMIZATION
final int arrayOffset = buffer.arrayOffset();
final int start = arrayOffset + offset;
Arrays.fill(buffer.array(), start, start + bytes, zero);
} else {
//slow path
for (int i = 0; i < bytes; i++) {
buffer.put(i + offset, zero);
}
}
}
}

View File

@ -14,19 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.mapped;
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
/**
* Collection of bit-tricks for power of 2 cases.
*/
public final class PowerOf2Util {
import io.netty.util.internal.PlatformDependent;
final class BytesUtils {
private BytesUtils() {
private PowerOf2Util() {
}
public static long align(final long value, final long alignment) {
return (value + (alignment - 1)) & ~(alignment - 1);
/**
* Fast alignment operation with power of 2 {@code alignment} and {@code value >=0} and {@code value <}{@link Integer#MAX_VALUE}.<br>
* In order to be fast is up to the caller to check arguments correctness.
* Original algorithm is on https://en.wikipedia.org/wiki/Data_structure_alignment.
*/
public static int align(final int value, final int pow2alignment) {
return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
}
/**
@ -54,20 +58,4 @@ final class BytesUtils {
return (value & (pow2alignment - 1)) == 0;
}
public static void zerosDirect(final ByteBuffer buffer) {
//DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops
int remaining = buffer.capacity();
long address = PlatformDependent.directBufferAddress(buffer);
while (remaining >= 8) {
PlatformDependent.putLong(address, 0L);
address += 8;
remaining -= 8;
}
while (remaining > 0) {
PlatformDependent.putByte(address, (byte) 0);
address++;
remaining--;
}
}
}

View File

@ -17,6 +17,11 @@
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import java.util.Arrays;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assert;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -87,6 +92,219 @@ public class ByteUtilTest {
}
}
private static byte[] duplicateRemaining(ByteBuffer buffer, int offset, int bytes) {
final int end = offset + bytes;
final int expectedRemaining = buffer.capacity() - end;
//it is handling the case of <0 just to allow from to > capacity
if (expectedRemaining <= 0) {
return null;
}
final byte[] remaining = new byte[expectedRemaining];
final ByteBuffer duplicate = buffer.duplicate();
duplicate.clear().position(end);
duplicate.get(remaining, 0, expectedRemaining);
return remaining;
}
private static byte[] duplicateBefore(ByteBuffer buffer, int offset) {
if (offset <= 0) {
return null;
}
final int size = Math.min(buffer.capacity(), offset);
final byte[] remaining = new byte[size];
final ByteBuffer duplicate = buffer.duplicate();
duplicate.clear();
duplicate.get(remaining, 0, size);
return remaining;
}
private static void shouldZeroesByteBuffer(ByteBuffer buffer, int offset, int bytes) {
final byte[] originalBefore = duplicateBefore(buffer, offset);
final byte[] originalRemaining = duplicateRemaining(buffer, offset, bytes);
final int position = buffer.position();
final int limit = buffer.limit();
ByteUtil.zeros(buffer, offset, bytes);
Assert.assertEquals(position, buffer.position());
Assert.assertEquals(limit, buffer.limit());
final byte[] zeros = new byte[bytes];
final byte[] content = new byte[bytes];
final ByteBuffer duplicate = buffer.duplicate();
duplicate.clear().position(offset);
duplicate.get(content, 0, bytes);
Assert.assertArrayEquals(zeros, content);
if (originalRemaining != null) {
final byte[] remaining = new byte[duplicate.remaining()];
//duplicate position has been moved of bytes
duplicate.get(remaining);
Assert.assertArrayEquals(originalRemaining, remaining);
}
if (originalBefore != null) {
final byte[] before = new byte[offset];
//duplicate position has been moved of bytes: need to reset it
duplicate.position(0);
duplicate.get(before);
Assert.assertArrayEquals(originalBefore, before);
}
}
private ByteBuffer fill(ByteBuffer buffer, int offset, int length, byte value) {
for (int i = 0; i < length; i++) {
buffer.put(offset + i, value);
}
return buffer;
}
@Test
public void shouldZeroesDirectByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity);
try {
fill(buffer, 0, capacity, one);
shouldZeroesByteBuffer(buffer, offset, bytes);
} finally {
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
@Test
public void shouldZeroesLimitedDirectByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity);
try {
fill(buffer, 0, capacity, one);
buffer.limit(0);
shouldZeroesByteBuffer(buffer, offset, bytes);
} finally {
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
@Test
public void shouldZeroesHeapByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
final ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
shouldZeroesByteBuffer(buffer, offset, bytes);
}
@Test
public void shouldZeroesLimitedHeapByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
final ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
buffer.limit(0);
shouldZeroesByteBuffer(buffer, offset, bytes);
}
@Test(expected = ReadOnlyBufferException.class)
public void shouldFailWithReadOnlyHeapByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
buffer = buffer.asReadOnlyBuffer();
shouldZeroesByteBuffer(buffer, offset, bytes);
}
@Test(expected = IndexOutOfBoundsException.class)
public void shouldFailIfOffsetIsGreaterOrEqualHeapByteBufferCapacity() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 0;
final int offset = 64;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
try {
shouldZeroesByteBuffer(buffer, offset, bytes);
} catch (IndexOutOfBoundsException expectedEx) {
//verify that the buffer hasn't changed
final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
final byte[] expectedContent = new byte[capacity];
Arrays.fill(expectedContent, one);
Assert.assertArrayEquals(expectedContent, originalContent);
throw expectedEx;
}
}
@Test(expected = IllegalArgumentException.class)
public void shouldFailIfOffsetIsNegative() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 1;
final int offset = -1;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
try {
shouldZeroesByteBuffer(buffer, offset, bytes);
} catch (IndexOutOfBoundsException expectedEx) {
//verify that the buffer hasn't changed
final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
final byte[] expectedContent = new byte[capacity];
Arrays.fill(expectedContent, one);
Assert.assertArrayEquals(expectedContent, originalContent);
throw expectedEx;
}
}
@Test(expected = IllegalArgumentException.class)
public void shouldFailIfBytesIsNegative() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = -1;
final int offset = 0;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
try {
shouldZeroesByteBuffer(buffer, offset, bytes);
} catch (IndexOutOfBoundsException expectedEx) {
//verify that the buffer hasn't changed
final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
final byte[] expectedContent = new byte[capacity];
Arrays.fill(expectedContent, one);
Assert.assertArrayEquals(expectedContent, originalContent);
throw expectedEx;
}
}
@Test(expected = IndexOutOfBoundsException.class)
public void shouldFailIfExceedingHeapByteBufferCapacity() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 65;
final int offset = 1;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
try {
shouldZeroesByteBuffer(buffer, offset, bytes);
} catch (IndexOutOfBoundsException expectedEx) {
//verify that the buffer hasn't changed
final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
final byte[] expectedContent = new byte[capacity];
Arrays.fill(expectedContent, one);
Assert.assertArrayEquals(expectedContent, originalContent);
throw expectedEx;
}
}
@Test
public void testIntToByte() {

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import org.junit.Test;
import static org.apache.activemq.artemis.utils.PowerOf2Util.align;
public class PowerOf2UtilTest {
@Test
public void shouldAlignToNextMultipleOfAlignment() {
final int alignment = 512;
assertThat(align(0, alignment), is(0));
assertThat(align(1, alignment), is(alignment));
assertThat(align(alignment, alignment), is(alignment));
assertThat(align(alignment + 1, alignment), is(alignment * 2));
final int remainder = Integer.MAX_VALUE % alignment;
final int alignedMax = Integer.MAX_VALUE - remainder;
assertThat(align(alignedMax, alignment), is(alignedMax));
//given that Integer.MAX_VALUE is the max value that can be represented with int
//the aligned value would be > 2^32, but (int)(2^32) = Integer.MIN_VALUE due to the sign bit
assertThat(align(Integer.MAX_VALUE, alignment), is(Integer.MIN_VALUE));
}
}

View File

@ -85,11 +85,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public int calculateBlockStart(final int position) {
int alignment = factory.getAlignment();
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
return pos;
return factory.calculateBlockSize(position);
}
@Override

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
@ -163,13 +164,10 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
int blocks = size / getAlignment();
if (size % getAlignment() != 0) {
blocks++;
}
final int alignedSize = calculateBlockSize(size);
// The buffer on AIO has to be a multiple of getAlignment()
ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * getAlignment(), getAlignment());
ByteBuffer buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment());
buffer.limit(size);
@ -183,11 +181,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public ByteBuffer newBuffer(int size) {
if (size % getAlignment() != 0) {
size = (size / getAlignment() + 1) * getAlignment();
}
return buffersControl.newBuffer(size);
final int alignedSize = calculateBlockSize(size);
return buffersControl.newBuffer(alignedSize, true);
}
@Override
@ -199,9 +194,14 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public int getAlignment() {
if (alignment < 0) {
alignment = calculateAlignment(journalDir);
}
return alignment;
}
private static int calculateAlignment(File journalDir) {
File checkFile = null;
int alignment;
try {
journalDir.mkdirs();
checkFile = File.createTempFile("journalCheck", ".tmp", journalDir);
@ -216,7 +216,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
checkFile.delete();
}
}
}
return alignment;
}
@ -230,11 +229,19 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public int calculateBlockSize(final int position) {
int alignment = getAlignment();
final int alignment = getAlignment();
if (!PowerOf2Util.isPowOf2(alignment)) {
return align(position, alignment);
} else {
return PowerOf2Util.align(position, alignment);
}
}
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
return pos;
/**
* It can be used to align {@code size} if alignment is not a power of 2: otherwise better to use {@link PowerOf2Util#align(int, int)} instead.
*/
private static int align(int size, int alignment) {
return (size / alignment + (size % alignment != 0 ? 1 : 0)) * alignment;
}
/* (non-Javadoc)
@ -442,7 +449,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
return alignedBufferSize;
}
public ByteBuffer newBuffer(final int size) {
public ByteBuffer newBuffer(final int size, final boolean zeroed) {
// if a new buffer wasn't requested in 10 seconds, we clear the queue
// This is being done this way as we don't need another Timeout Thread
// just to cleanup this
@ -481,7 +488,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
buffer.limit(calculateBlockSize(size));
} else {
if (zeroed) {
clearBuffer(buffer);
} else {
buffer.position(0);
}
// set the limit of the buffer to the bufferSize being required
buffer.limit(calculateBlockSize(size));

View File

@ -28,6 +28,7 @@ import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.Env;
final class MappedFile implements AutoCloseable {
@ -196,7 +197,7 @@ final class MappedFile implements AutoCloseable {
}
//any that will enter has lastZeroed OS page aligned
while (toZeros >= OS_PAGE_SIZE) {
assert BytesUtils.isAligned(lastZeroed, OS_PAGE_SIZE);/**/
assert PowerOf2Util.isAligned(lastZeroed, OS_PAGE_SIZE);/**/
final long startPage = lastZeroed - OS_PAGE_SIZE;
PlatformDependent.setMemory(startPage, OS_PAGE_SIZE, (byte) 0);
lastZeroed = startPage;

View File

@ -18,20 +18,21 @@ package org.apache.activemq.artemis.core.io.mapped;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.util.ByteBufferPool;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.Env;
public final class MappedSequentialFileFactory extends AbstractSequentialFileFactory {
private int capacity;
private boolean bufferPooling;
//pools only the biggest one -> optimized for the common case
private final ThreadLocal<ByteBuffer> bytesPool;
private final ByteBufferPool bytesPool;
public MappedSequentialFileFactory(File directory,
int capacity,
@ -47,7 +48,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
this.capacity = capacity;
this.setDatasync(true);
this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>();
this.bytesPool = ByteBufferPool.threadLocal(true);
}
public MappedSequentialFileFactory capacity(int capacity) {
@ -76,7 +77,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize());
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
byteBuffer.limit(size);
return byteBuffer;
@ -98,43 +99,18 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
}
@Override
public ByteBuffer newBuffer(final int size) {
public ByteBuffer newBuffer(int size) {
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
//do not free the old one (if any) until the new one will be released into the pool!
byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
} else {
bytesPool.set(null);
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
byteBuffer.clear();
}
byteBuffer.limit(size);
return byteBuffer;
return bytesPool.borrow(size, true);
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (this.bufferPooling) {
if (buffer.isDirect()) {
final ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer != buffer) {
//replace with the current pooled only if greater or null
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
if (byteBuffer != null) {
//free the smaller one
PlatformDependent.freeDirectBuffer(byteBuffer);
}
bytesPool.set(buffer);
} else {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
bytesPool.release(buffer);
}
}
@ -168,18 +144,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
@Override
public void clearBuffer(final ByteBuffer buffer) {
if (buffer.isDirect()) {
BytesUtils.zerosDirect(buffer);
} else if (buffer.hasArray()) {
final byte[] array = buffer.array();
//SIMD OPTIMIZATION
Arrays.fill(array, (byte) 0);
} else {
final int capacity = buffer.capacity();
for (int i = 0; i < capacity; i++) {
buffer.put(i, (byte) 0);
}
}
ByteUtil.zeros(buffer);
buffer.rewind();
}

View File

@ -26,6 +26,8 @@ import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.util.ByteBufferPool;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
@ -35,8 +37,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
private boolean bufferPooling;
//pools only the biggest one -> optimized for the common case
private final ThreadLocal<ByteBuffer> bytesPool;
private final ByteBufferPool bytesPool;
public NIOSequentialFileFactory(final File journalDir, final int maxIO) {
this(journalDir, null, maxIO);
@ -76,7 +77,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
final CriticalAnalyzer analyzer) {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>();
this.bytesPool = ByteBufferPool.threadLocal(true);
}
public static ByteBuffer allocateDirectByteBuffer(final int size) {
@ -123,13 +124,9 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return timedBuffer != null;
}
private static int align(final int value, final int pow2alignment) {
return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
final int requiredCapacity = PowerOf2Util.align(size, DEFAULT_CAPACITY_ALIGNMENT);
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
byteBuffer.limit(size);
return byteBuffer;
@ -141,43 +138,18 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
}
@Override
public ByteBuffer newBuffer(final int size) {
public ByteBuffer newBuffer(int size) {
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
//do not free the old one (if any) until the new one will be released into the pool!
byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
} else {
bytesPool.set(null);
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
byteBuffer.clear();
}
byteBuffer.limit(size);
return byteBuffer;
return bytesPool.borrow(size, true);
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (this.bufferPooling) {
if (buffer.isDirect()) {
final ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer != buffer) {
//replace with the current pooled only if greater or null
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
if (byteBuffer != null) {
//free the smaller one
PlatformDependent.freeDirectBuffer(byteBuffer);
}
bytesPool.set(buffer);
} else {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
bytesPool.release(buffer);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.util;
import java.nio.ByteBuffer;
/**
* Object Pool that allows to borrow and release {@link ByteBuffer}s according to a specific type (direct/heap).<br>
* The suggested usage pattern is:
* <pre>{@code
* ByteBuffer buffer = pool.borrow(size);
* //...using buffer...
* pool.release(buffer);
* }</pre>
*/
public interface ByteBufferPool {
/**
* It returns a {@link ByteBuffer} with {@link ByteBuffer#capacity()}>={@code size}.<br>
* The {@code buffer} is zeroed until {@code size} if {@code zeroed=true}, with {@link ByteBuffer#position()}=0 and {@link ByteBuffer#limit()}={@code size}.
*/
ByteBuffer borrow(int size, boolean zeroed);
/**
* It pools or free {@code buffer} that cannot be used anymore.<br>
* If {@code buffer} is of a type different from the one that the pool can borrow, it will ignore it.
*/
void release(ByteBuffer buffer);
/**
* Factory method that creates a thread-local pool of capacity 1 of {@link ByteBuffer}s of the specified type (direct/heap).
*/
static ByteBufferPool threadLocal(boolean direct) {
return new ThreadLocalByteBufferPool(direct);
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.util;
import java.nio.ByteBuffer;
import java.util.Objects;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.Env;
final class ThreadLocalByteBufferPool implements ByteBufferPool {
private final ThreadLocal<ByteBuffer> bytesPool;
private final boolean direct;
ThreadLocalByteBufferPool(boolean direct) {
this.bytesPool = new ThreadLocal<>();
this.direct = direct;
}
@Override
public ByteBuffer borrow(final int size, boolean zeroed) {
final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize());
ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
//do not free the old one (if any) until the new one will be released into the pool!
byteBuffer = direct ? ByteBuffer.allocateDirect(requiredCapacity) : ByteBuffer.allocate(requiredCapacity);
} else {
bytesPool.set(null);
if (zeroed) {
ByteUtil.zeros(byteBuffer, 0, size);
}
byteBuffer.clear();
}
byteBuffer.limit(size);
return byteBuffer;
}
@Override
public void release(ByteBuffer buffer) {
Objects.requireNonNull(buffer);
boolean directBuffer = buffer.isDirect();
if (directBuffer == direct && !buffer.isReadOnly()) {
final ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer != buffer) {
//replace with the current pooled only if greater or null
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
if (byteBuffer != null) {
//free the smaller one
if (directBuffer) {
PlatformDependent.freeDirectBuffer(byteBuffer);
}
}
bytesPool.set(buffer);
} else {
if (directBuffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.util;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ThreadLocalByteBufferPoolTest {
//testing using heap buffers to avoid killing the test suite
private static final boolean isDirect = false;
private final ByteBufferPool pool = ByteBufferPool.threadLocal(isDirect);
private final boolean zeroed;
public ThreadLocalByteBufferPoolTest(boolean zeroed) {
this.zeroed = zeroed;
}
@Parameterized.Parameters(name = "zeroed={0}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
}
private static void assertZeroed(ByteBuffer buffer) {
ByteBuffer bb = buffer.slice();
final byte[] content = new byte[bb.remaining()];
bb.get(content);
final byte[] zeroed = new byte[content.length];
Arrays.fill(zeroed, (byte) 0);
Assert.assertArrayEquals(zeroed, content);
}
@Test
public void shouldBorrowOnlyBuffersOfTheCorrectType() {
Assert.assertEquals(isDirect, pool.borrow(0, zeroed).isDirect());
}
@Test
public void shouldBorrowZeroedBuffer() {
final int size = 32;
final ByteBuffer buffer = pool.borrow(size, zeroed);
Assert.assertEquals(0, buffer.position());
Assert.assertEquals(size, buffer.limit());
if (zeroed) {
assertZeroed(buffer);
}
}
@Test
public void shouldBorrowTheSameBuffer() {
final int size = 32;
final ByteBuffer buffer = pool.borrow(size, zeroed);
buffer.put(0, (byte) 1);
buffer.position(1);
buffer.limit(2);
pool.release(buffer);
final int newSize = size - 1;
final ByteBuffer sameBuffer = pool.borrow(newSize, zeroed);
Assert.assertSame(buffer, sameBuffer);
Assert.assertEquals(0, sameBuffer.position());
Assert.assertEquals(newSize, sameBuffer.limit());
if (zeroed) {
assertZeroed(sameBuffer);
}
}
@Test
public void shouldBorrowNewBufferIfExceedPooledCapacity() {
final int size = 32;
final ByteBuffer buffer = pool.borrow(size, zeroed);
pool.release(buffer);
final int newSize = buffer.capacity() + 1;
final ByteBuffer differentBuffer = pool.borrow(newSize, zeroed);
Assert.assertNotSame(buffer, differentBuffer);
}
@Test
public void shouldPoolTheBiggestBuffer() {
final int size = 32;
final ByteBuffer small = pool.borrow(size, zeroed);
final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed);
pool.release(small);
big.limit(0);
pool.release(big);
Assert.assertSame(big, pool.borrow(big.capacity(), zeroed));
}
@Test
public void shouldNotPoolTheSmallestBuffer() {
final int size = 32;
final ByteBuffer small = pool.borrow(size, zeroed);
final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed);
big.limit(0);
pool.release(big);
pool.release(small);
Assert.assertSame(big, pool.borrow(big.capacity(), zeroed));
}
@Test
public void shouldNotPoolBufferOfDifferentType() {
final int size = 32;
final ByteBuffer buffer = isDirect ? ByteBuffer.allocate(size) : ByteBuffer.allocateDirect(size);
try {
pool.release(buffer);
Assert.assertNotSame(buffer, pool.borrow(size, zeroed));
} catch (Throwable t) {
if (PlatformDependent.hasUnsafe()) {
if (buffer.isDirect()) {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
}
@Test
public void shouldNotPoolReadOnlyBuffer() {
final int size = 32;
final ByteBuffer borrow = pool.borrow(size, zeroed);
final ByteBuffer readOnlyBuffer = borrow.asReadOnlyBuffer();
pool.release(readOnlyBuffer);
Assert.assertNotSame(readOnlyBuffer, pool.borrow(size, zeroed));
}
@Test(expected = NullPointerException.class)
public void shouldFailPoolingNullBuffer() {
pool.release(null);
}
@Test(expected = NullPointerException.class)
public void shouldFailPoolingNullBufferIfNotEmpty() {
final int size = 32;
pool.release(pool.borrow(size, zeroed));
pool.release(null);
}
@Test
public void shouldBorrowOnlyThreadLocalBuffers() throws ExecutionException, InterruptedException {
final int size = 32;
final ByteBuffer buffer = pool.borrow(size, zeroed);
pool.release(buffer);
final ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Assert.assertNotSame(buffer, executor.submit(() -> pool.borrow(size, zeroed)).get());
} finally {
executor.shutdown();
}
}
}