This commit is contained in:
Clebert Suconic 2019-03-12 18:36:01 -04:00
commit 896142b7a4
12 changed files with 693 additions and 143 deletions

View File

@ -17,6 +17,8 @@
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -342,4 +344,52 @@ public class ByteUtil {
public static int intFromBytes(byte b1, byte b2, byte b3, byte b4) {
return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF);
}
/**
* It zeroes the whole {@link ByteBuffer#capacity()} of the given {@code buffer}.
*
* @throws ReadOnlyBufferException if {@code buffer} is read-only
*/
public static void zeros(final ByteBuffer buffer) {
uncheckedZeros(buffer, 0, buffer.capacity());
}
/**
* It zeroes {@code bytes} of the given {@code buffer}, starting (inclusive) from {@code offset}.
*
* @throws IndexOutOfBoundsException if {@code offset + bytes > }{@link ByteBuffer#capacity()} or {@code offset >= }{@link ByteBuffer#capacity()}
* @throws IllegalArgumentException if {@code offset} or {@code capacity} are less then 0
* @throws ReadOnlyBufferException if {@code buffer} is read-only
*/
public static void zeros(final ByteBuffer buffer, int offset, int bytes) {
if (offset < 0 || bytes < 0) {
throw new IllegalArgumentException();
}
final int capacity = buffer.capacity();
if (offset >= capacity || (offset + bytes) > capacity) {
throw new IndexOutOfBoundsException();
}
uncheckedZeros(buffer, offset, bytes);
}
private static void uncheckedZeros(final ByteBuffer buffer, int offset, int bytes) {
if (buffer.isReadOnly()) {
throw new ReadOnlyBufferException();
}
final byte zero = (byte) 0;
if (buffer.isDirect() && PlatformDependent.hasUnsafe()) {
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer) + offset, bytes, zero);
} else if (buffer.hasArray()) {
//SIMD OPTIMIZATION
final int arrayOffset = buffer.arrayOffset();
final int start = arrayOffset + offset;
Arrays.fill(buffer.array(), start, start + bytes, zero);
} else {
//slow path
for (int i = 0; i < bytes; i++) {
buffer.put(i + offset, zero);
}
}
}
}

View File

@ -14,19 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.mapped;
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
/**
* Collection of bit-tricks for power of 2 cases.
*/
public final class PowerOf2Util {
import io.netty.util.internal.PlatformDependent;
final class BytesUtils {
private BytesUtils() {
private PowerOf2Util() {
}
public static long align(final long value, final long alignment) {
return (value + (alignment - 1)) & ~(alignment - 1);
/**
* Fast alignment operation with power of 2 {@code alignment} and {@code value >=0} and {@code value <}{@link Integer#MAX_VALUE}.<br>
* In order to be fast is up to the caller to check arguments correctness.
* Original algorithm is on https://en.wikipedia.org/wiki/Data_structure_alignment.
*/
public static int align(final int value, final int pow2alignment) {
return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
}
/**
@ -54,20 +58,4 @@ final class BytesUtils {
return (value & (pow2alignment - 1)) == 0;
}
public static void zerosDirect(final ByteBuffer buffer) {
//DANGEROUS!! erases bound-checking using directly addresses -> safe only if it use counted loops
int remaining = buffer.capacity();
long address = PlatformDependent.directBufferAddress(buffer);
while (remaining >= 8) {
PlatformDependent.putLong(address, 0L);
address += 8;
remaining -= 8;
}
while (remaining > 0) {
PlatformDependent.putByte(address, (byte) 0);
address++;
remaining--;
}
}
}

View File

@ -17,6 +17,11 @@
package org.apache.activemq.artemis.utils;
import java.nio.ByteBuffer;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import java.util.Arrays;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assert;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@ -87,6 +92,219 @@ public class ByteUtilTest {
}
}
private static byte[] duplicateRemaining(ByteBuffer buffer, int offset, int bytes) {
final int end = offset + bytes;
final int expectedRemaining = buffer.capacity() - end;
//it is handling the case of <0 just to allow from to > capacity
if (expectedRemaining <= 0) {
return null;
}
final byte[] remaining = new byte[expectedRemaining];
final ByteBuffer duplicate = buffer.duplicate();
duplicate.clear().position(end);
duplicate.get(remaining, 0, expectedRemaining);
return remaining;
}
private static byte[] duplicateBefore(ByteBuffer buffer, int offset) {
if (offset <= 0) {
return null;
}
final int size = Math.min(buffer.capacity(), offset);
final byte[] remaining = new byte[size];
final ByteBuffer duplicate = buffer.duplicate();
duplicate.clear();
duplicate.get(remaining, 0, size);
return remaining;
}
private static void shouldZeroesByteBuffer(ByteBuffer buffer, int offset, int bytes) {
final byte[] originalBefore = duplicateBefore(buffer, offset);
final byte[] originalRemaining = duplicateRemaining(buffer, offset, bytes);
final int position = buffer.position();
final int limit = buffer.limit();
ByteUtil.zeros(buffer, offset, bytes);
Assert.assertEquals(position, buffer.position());
Assert.assertEquals(limit, buffer.limit());
final byte[] zeros = new byte[bytes];
final byte[] content = new byte[bytes];
final ByteBuffer duplicate = buffer.duplicate();
duplicate.clear().position(offset);
duplicate.get(content, 0, bytes);
Assert.assertArrayEquals(zeros, content);
if (originalRemaining != null) {
final byte[] remaining = new byte[duplicate.remaining()];
//duplicate position has been moved of bytes
duplicate.get(remaining);
Assert.assertArrayEquals(originalRemaining, remaining);
}
if (originalBefore != null) {
final byte[] before = new byte[offset];
//duplicate position has been moved of bytes: need to reset it
duplicate.position(0);
duplicate.get(before);
Assert.assertArrayEquals(originalBefore, before);
}
}
private ByteBuffer fill(ByteBuffer buffer, int offset, int length, byte value) {
for (int i = 0; i < length; i++) {
buffer.put(offset + i, value);
}
return buffer;
}
@Test
public void shouldZeroesDirectByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity);
try {
fill(buffer, 0, capacity, one);
shouldZeroesByteBuffer(buffer, offset, bytes);
} finally {
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
@Test
public void shouldZeroesLimitedDirectByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
final ByteBuffer buffer = ByteBuffer.allocateDirect(capacity);
try {
fill(buffer, 0, capacity, one);
buffer.limit(0);
shouldZeroesByteBuffer(buffer, offset, bytes);
} finally {
if (PlatformDependent.hasUnsafe()) {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
@Test
public void shouldZeroesHeapByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
final ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
shouldZeroesByteBuffer(buffer, offset, bytes);
}
@Test
public void shouldZeroesLimitedHeapByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
final ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
buffer.limit(0);
shouldZeroesByteBuffer(buffer, offset, bytes);
}
@Test(expected = ReadOnlyBufferException.class)
public void shouldFailWithReadOnlyHeapByteBuffer() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 32;
final int offset = 1;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
buffer = buffer.asReadOnlyBuffer();
shouldZeroesByteBuffer(buffer, offset, bytes);
}
@Test(expected = IndexOutOfBoundsException.class)
public void shouldFailIfOffsetIsGreaterOrEqualHeapByteBufferCapacity() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 0;
final int offset = 64;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
try {
shouldZeroesByteBuffer(buffer, offset, bytes);
} catch (IndexOutOfBoundsException expectedEx) {
//verify that the buffer hasn't changed
final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
final byte[] expectedContent = new byte[capacity];
Arrays.fill(expectedContent, one);
Assert.assertArrayEquals(expectedContent, originalContent);
throw expectedEx;
}
}
@Test(expected = IllegalArgumentException.class)
public void shouldFailIfOffsetIsNegative() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 1;
final int offset = -1;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
try {
shouldZeroesByteBuffer(buffer, offset, bytes);
} catch (IndexOutOfBoundsException expectedEx) {
//verify that the buffer hasn't changed
final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
final byte[] expectedContent = new byte[capacity];
Arrays.fill(expectedContent, one);
Assert.assertArrayEquals(expectedContent, originalContent);
throw expectedEx;
}
}
@Test(expected = IllegalArgumentException.class)
public void shouldFailIfBytesIsNegative() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = -1;
final int offset = 0;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
try {
shouldZeroesByteBuffer(buffer, offset, bytes);
} catch (IndexOutOfBoundsException expectedEx) {
//verify that the buffer hasn't changed
final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
final byte[] expectedContent = new byte[capacity];
Arrays.fill(expectedContent, one);
Assert.assertArrayEquals(expectedContent, originalContent);
throw expectedEx;
}
}
@Test(expected = IndexOutOfBoundsException.class)
public void shouldFailIfExceedingHeapByteBufferCapacity() {
final byte one = (byte) 1;
final int capacity = 64;
final int bytes = 65;
final int offset = 1;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
fill(buffer, 0, capacity, one);
try {
shouldZeroesByteBuffer(buffer, offset, bytes);
} catch (IndexOutOfBoundsException expectedEx) {
//verify that the buffer hasn't changed
final byte[] originalContent = duplicateRemaining(buffer, 0, 0);
final byte[] expectedContent = new byte[capacity];
Arrays.fill(expectedContent, one);
Assert.assertArrayEquals(expectedContent, originalContent);
throw expectedEx;
}
}
@Test
public void testIntToByte() {

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import org.junit.Test;
import static org.apache.activemq.artemis.utils.PowerOf2Util.align;
public class PowerOf2UtilTest {
@Test
public void shouldAlignToNextMultipleOfAlignment() {
final int alignment = 512;
assertThat(align(0, alignment), is(0));
assertThat(align(1, alignment), is(alignment));
assertThat(align(alignment, alignment), is(alignment));
assertThat(align(alignment + 1, alignment), is(alignment * 2));
final int remainder = Integer.MAX_VALUE % alignment;
final int alignedMax = Integer.MAX_VALUE - remainder;
assertThat(align(alignedMax, alignment), is(alignedMax));
//given that Integer.MAX_VALUE is the max value that can be represented with int
//the aligned value would be > 2^32, but (int)(2^32) = Integer.MIN_VALUE due to the sign bit
assertThat(align(Integer.MAX_VALUE, alignment), is(Integer.MIN_VALUE));
}
}

View File

@ -85,11 +85,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public int calculateBlockStart(final int position) {
int alignment = factory.getAlignment();
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
return pos;
return factory.calculateBlockSize(position);
}
@Override

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.nativo.jlibaio.util.CallbackCache;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
@ -163,13 +164,10 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
int blocks = size / getAlignment();
if (size % getAlignment() != 0) {
blocks++;
}
final int alignedSize = calculateBlockSize(size);
// The buffer on AIO has to be a multiple of getAlignment()
ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * getAlignment(), getAlignment());
ByteBuffer buffer = LibaioContext.newAlignedBuffer(alignedSize, getAlignment());
buffer.limit(size);
@ -183,11 +181,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public ByteBuffer newBuffer(int size) {
if (size % getAlignment() != 0) {
size = (size / getAlignment() + 1) * getAlignment();
}
return buffersControl.newBuffer(size);
final int alignedSize = calculateBlockSize(size);
return buffersControl.newBuffer(alignedSize, true);
}
@Override
@ -199,22 +194,26 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public int getAlignment() {
if (alignment < 0) {
alignment = calculateAlignment(journalDir);
}
return alignment;
}
File checkFile = null;
try {
journalDir.mkdirs();
checkFile = File.createTempFile("journalCheck", ".tmp", journalDir);
checkFile.mkdirs();
checkFile.createNewFile();
alignment = LibaioContext.getBlockSize(checkFile);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
alignment = 512;
} finally {
if (checkFile != null) {
checkFile.delete();
}
private static int calculateAlignment(File journalDir) {
File checkFile = null;
int alignment;
try {
journalDir.mkdirs();
checkFile = File.createTempFile("journalCheck", ".tmp", journalDir);
checkFile.mkdirs();
checkFile.createNewFile();
alignment = LibaioContext.getBlockSize(checkFile);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
alignment = 512;
} finally {
if (checkFile != null) {
checkFile.delete();
}
}
return alignment;
@ -230,11 +229,19 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override
public int calculateBlockSize(final int position) {
int alignment = getAlignment();
final int alignment = getAlignment();
if (!PowerOf2Util.isPowOf2(alignment)) {
return align(position, alignment);
} else {
return PowerOf2Util.align(position, alignment);
}
}
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
return pos;
/**
* It can be used to align {@code size} if alignment is not a power of 2: otherwise better to use {@link PowerOf2Util#align(int, int)} instead.
*/
private static int align(int size, int alignment) {
return (size / alignment + (size % alignment != 0 ? 1 : 0)) * alignment;
}
/* (non-Javadoc)
@ -442,7 +449,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
return alignedBufferSize;
}
public ByteBuffer newBuffer(final int size) {
public ByteBuffer newBuffer(final int size, final boolean zeroed) {
// if a new buffer wasn't requested in 10 seconds, we clear the queue
// This is being done this way as we don't need another Timeout Thread
// just to cleanup this
@ -481,7 +488,11 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
buffer.limit(calculateBlockSize(size));
} else {
clearBuffer(buffer);
if (zeroed) {
clearBuffer(buffer);
} else {
buffer.position(0);
}
// set the limit of the buffer to the bufferSize being required
buffer.limit(calculateBlockSize(size));

View File

@ -28,6 +28,7 @@ import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.Env;
final class MappedFile implements AutoCloseable {
@ -196,7 +197,7 @@ final class MappedFile implements AutoCloseable {
}
//any that will enter has lastZeroed OS page aligned
while (toZeros >= OS_PAGE_SIZE) {
assert BytesUtils.isAligned(lastZeroed, OS_PAGE_SIZE);/**/
assert PowerOf2Util.isAligned(lastZeroed, OS_PAGE_SIZE);/**/
final long startPage = lastZeroed - OS_PAGE_SIZE;
PlatformDependent.setMemory(startPage, OS_PAGE_SIZE, (byte) 0);
lastZeroed = startPage;

View File

@ -18,20 +18,21 @@ package org.apache.activemq.artemis.core.io.mapped;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.util.ByteBufferPool;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.Env;
public final class MappedSequentialFileFactory extends AbstractSequentialFileFactory {
private int capacity;
private boolean bufferPooling;
//pools only the biggest one -> optimized for the common case
private final ThreadLocal<ByteBuffer> bytesPool;
private final ByteBufferPool bytesPool;
public MappedSequentialFileFactory(File directory,
int capacity,
@ -47,7 +48,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
this.capacity = capacity;
this.setDatasync(true);
this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>();
this.bytesPool = ByteBufferPool.threadLocal(true);
}
public MappedSequentialFileFactory capacity(int capacity) {
@ -76,7 +77,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize());
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
byteBuffer.limit(size);
return byteBuffer;
@ -98,43 +99,18 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
}
@Override
public ByteBuffer newBuffer(final int size) {
public ByteBuffer newBuffer(int size) {
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
final int requiredCapacity = (int) BytesUtils.align(size, Env.osPageSize());
ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
//do not free the old one (if any) until the new one will be released into the pool!
byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
} else {
bytesPool.set(null);
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
byteBuffer.clear();
}
byteBuffer.limit(size);
return byteBuffer;
return bytesPool.borrow(size, true);
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (this.bufferPooling) {
if (buffer.isDirect()) {
final ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer != buffer) {
//replace with the current pooled only if greater or null
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
if (byteBuffer != null) {
//free the smaller one
PlatformDependent.freeDirectBuffer(byteBuffer);
}
bytesPool.set(buffer);
} else {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
bytesPool.release(buffer);
}
}
@ -168,18 +144,7 @@ public final class MappedSequentialFileFactory extends AbstractSequentialFileFac
@Override
public void clearBuffer(final ByteBuffer buffer) {
if (buffer.isDirect()) {
BytesUtils.zerosDirect(buffer);
} else if (buffer.hasArray()) {
final byte[] array = buffer.array();
//SIMD OPTIMIZATION
Arrays.fill(array, (byte) 0);
} else {
final int capacity = buffer.capacity();
for (int i = 0; i < capacity; i++) {
buffer.put(i, (byte) 0);
}
}
ByteUtil.zeros(buffer);
buffer.rewind();
}

View File

@ -26,6 +26,8 @@ import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.util.ByteBufferPool;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
@ -35,8 +37,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
private boolean bufferPooling;
//pools only the biggest one -> optimized for the common case
private final ThreadLocal<ByteBuffer> bytesPool;
private final ByteBufferPool bytesPool;
public NIOSequentialFileFactory(final File journalDir, final int maxIO) {
this(journalDir, null, maxIO);
@ -76,7 +77,7 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
final CriticalAnalyzer analyzer) {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener, analyzer);
this.bufferPooling = true;
this.bytesPool = new ThreadLocal<>();
this.bytesPool = ByteBufferPool.threadLocal(true);
}
public static ByteBuffer allocateDirectByteBuffer(final int size) {
@ -123,13 +124,9 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return timedBuffer != null;
}
private static int align(final int value, final int pow2alignment) {
return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
}
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
final int requiredCapacity = PowerOf2Util.align(size, DEFAULT_CAPACITY_ALIGNMENT);
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
byteBuffer.limit(size);
return byteBuffer;
@ -141,43 +138,18 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
}
@Override
public ByteBuffer newBuffer(final int size) {
public ByteBuffer newBuffer(int size) {
if (!this.bufferPooling) {
return allocateDirectBuffer(size);
} else {
final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
//do not free the old one (if any) until the new one will be released into the pool!
byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
} else {
bytesPool.set(null);
PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
byteBuffer.clear();
}
byteBuffer.limit(size);
return byteBuffer;
return bytesPool.borrow(size, true);
}
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
if (this.bufferPooling) {
if (buffer.isDirect()) {
final ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer != buffer) {
//replace with the current pooled only if greater or null
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
if (byteBuffer != null) {
//free the smaller one
PlatformDependent.freeDirectBuffer(byteBuffer);
}
bytesPool.set(buffer);
} else {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
bytesPool.release(buffer);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.util;
import java.nio.ByteBuffer;
/**
* Object Pool that allows to borrow and release {@link ByteBuffer}s according to a specific type (direct/heap).<br>
* The suggested usage pattern is:
* <pre>{@code
* ByteBuffer buffer = pool.borrow(size);
* //...using buffer...
* pool.release(buffer);
* }</pre>
*/
public interface ByteBufferPool {
/**
* It returns a {@link ByteBuffer} with {@link ByteBuffer#capacity()}>={@code size}.<br>
* The {@code buffer} is zeroed until {@code size} if {@code zeroed=true}, with {@link ByteBuffer#position()}=0 and {@link ByteBuffer#limit()}={@code size}.
*/
ByteBuffer borrow(int size, boolean zeroed);
/**
* It pools or free {@code buffer} that cannot be used anymore.<br>
* If {@code buffer} is of a type different from the one that the pool can borrow, it will ignore it.
*/
void release(ByteBuffer buffer);
/**
* Factory method that creates a thread-local pool of capacity 1 of {@link ByteBuffer}s of the specified type (direct/heap).
*/
static ByteBufferPool threadLocal(boolean direct) {
return new ThreadLocalByteBufferPool(direct);
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.util;
import java.nio.ByteBuffer;
import java.util.Objects;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.Env;
final class ThreadLocalByteBufferPool implements ByteBufferPool {
private final ThreadLocal<ByteBuffer> bytesPool;
private final boolean direct;
ThreadLocalByteBufferPool(boolean direct) {
this.bytesPool = new ThreadLocal<>();
this.direct = direct;
}
@Override
public ByteBuffer borrow(final int size, boolean zeroed) {
final int requiredCapacity = PowerOf2Util.align(size, Env.osPageSize());
ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
//do not free the old one (if any) until the new one will be released into the pool!
byteBuffer = direct ? ByteBuffer.allocateDirect(requiredCapacity) : ByteBuffer.allocate(requiredCapacity);
} else {
bytesPool.set(null);
if (zeroed) {
ByteUtil.zeros(byteBuffer, 0, size);
}
byteBuffer.clear();
}
byteBuffer.limit(size);
return byteBuffer;
}
@Override
public void release(ByteBuffer buffer) {
Objects.requireNonNull(buffer);
boolean directBuffer = buffer.isDirect();
if (directBuffer == direct && !buffer.isReadOnly()) {
final ByteBuffer byteBuffer = bytesPool.get();
if (byteBuffer != buffer) {
//replace with the current pooled only if greater or null
if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
if (byteBuffer != null) {
//free the smaller one
if (directBuffer) {
PlatformDependent.freeDirectBuffer(byteBuffer);
}
}
bytesPool.set(buffer);
} else {
if (directBuffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
}
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.util;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class ThreadLocalByteBufferPoolTest {
//testing using heap buffers to avoid killing the test suite
private static final boolean isDirect = false;
private final ByteBufferPool pool = ByteBufferPool.threadLocal(isDirect);
private final boolean zeroed;
public ThreadLocalByteBufferPoolTest(boolean zeroed) {
this.zeroed = zeroed;
}
@Parameterized.Parameters(name = "zeroed={0}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
}
private static void assertZeroed(ByteBuffer buffer) {
ByteBuffer bb = buffer.slice();
final byte[] content = new byte[bb.remaining()];
bb.get(content);
final byte[] zeroed = new byte[content.length];
Arrays.fill(zeroed, (byte) 0);
Assert.assertArrayEquals(zeroed, content);
}
@Test
public void shouldBorrowOnlyBuffersOfTheCorrectType() {
Assert.assertEquals(isDirect, pool.borrow(0, zeroed).isDirect());
}
@Test
public void shouldBorrowZeroedBuffer() {
final int size = 32;
final ByteBuffer buffer = pool.borrow(size, zeroed);
Assert.assertEquals(0, buffer.position());
Assert.assertEquals(size, buffer.limit());
if (zeroed) {
assertZeroed(buffer);
}
}
@Test
public void shouldBorrowTheSameBuffer() {
final int size = 32;
final ByteBuffer buffer = pool.borrow(size, zeroed);
buffer.put(0, (byte) 1);
buffer.position(1);
buffer.limit(2);
pool.release(buffer);
final int newSize = size - 1;
final ByteBuffer sameBuffer = pool.borrow(newSize, zeroed);
Assert.assertSame(buffer, sameBuffer);
Assert.assertEquals(0, sameBuffer.position());
Assert.assertEquals(newSize, sameBuffer.limit());
if (zeroed) {
assertZeroed(sameBuffer);
}
}
@Test
public void shouldBorrowNewBufferIfExceedPooledCapacity() {
final int size = 32;
final ByteBuffer buffer = pool.borrow(size, zeroed);
pool.release(buffer);
final int newSize = buffer.capacity() + 1;
final ByteBuffer differentBuffer = pool.borrow(newSize, zeroed);
Assert.assertNotSame(buffer, differentBuffer);
}
@Test
public void shouldPoolTheBiggestBuffer() {
final int size = 32;
final ByteBuffer small = pool.borrow(size, zeroed);
final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed);
pool.release(small);
big.limit(0);
pool.release(big);
Assert.assertSame(big, pool.borrow(big.capacity(), zeroed));
}
@Test
public void shouldNotPoolTheSmallestBuffer() {
final int size = 32;
final ByteBuffer small = pool.borrow(size, zeroed);
final ByteBuffer big = pool.borrow(small.capacity() + 1, zeroed);
big.limit(0);
pool.release(big);
pool.release(small);
Assert.assertSame(big, pool.borrow(big.capacity(), zeroed));
}
@Test
public void shouldNotPoolBufferOfDifferentType() {
final int size = 32;
final ByteBuffer buffer = isDirect ? ByteBuffer.allocate(size) : ByteBuffer.allocateDirect(size);
try {
pool.release(buffer);
Assert.assertNotSame(buffer, pool.borrow(size, zeroed));
} catch (Throwable t) {
if (PlatformDependent.hasUnsafe()) {
if (buffer.isDirect()) {
PlatformDependent.freeDirectBuffer(buffer);
}
}
}
}
@Test
public void shouldNotPoolReadOnlyBuffer() {
final int size = 32;
final ByteBuffer borrow = pool.borrow(size, zeroed);
final ByteBuffer readOnlyBuffer = borrow.asReadOnlyBuffer();
pool.release(readOnlyBuffer);
Assert.assertNotSame(readOnlyBuffer, pool.borrow(size, zeroed));
}
@Test(expected = NullPointerException.class)
public void shouldFailPoolingNullBuffer() {
pool.release(null);
}
@Test(expected = NullPointerException.class)
public void shouldFailPoolingNullBufferIfNotEmpty() {
final int size = 32;
pool.release(pool.borrow(size, zeroed));
pool.release(null);
}
@Test
public void shouldBorrowOnlyThreadLocalBuffers() throws ExecutionException, InterruptedException {
final int size = 32;
final ByteBuffer buffer = pool.borrow(size, zeroed);
pool.release(buffer);
final ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Assert.assertNotSame(buffer, executor.submit(() -> pool.borrow(size, zeroed)).get());
} finally {
executor.shutdown();
}
}
}