ARTEMIS-2317 Reuse file buffer wrapper instances to reduce allocations

Page::read is allocating a new ChannelBufferWrapper on each
paged message read: to reduce the allocation rate, it could be
reused until a new wrapped ByteBuffer is created
This commit is contained in:
Francesco Nigro 2019-04-26 12:12:31 +02:00 committed by Clebert Suconic
parent d213aea96b
commit 79465f7f88
1 changed files with 37 additions and 12 deletions

View File

@ -22,9 +22,8 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@ -119,12 +118,6 @@ public final class Page implements Comparable<Page> {
return messages;
}
private static void decodeInto(ByteBuffer fileBuffer, int encodedSize, PagedMessageImpl msg) {
final ActiveMQBuffer wrappedBuffer = ActiveMQBuffers.wrappedBuffer(fileBuffer);
wrappedBuffer.writerIndex(encodedSize);
msg.decode(wrappedBuffer);
}
private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
newFileBuffer.put(fileBuffer);
@ -171,6 +164,24 @@ public final class Page implements Comparable<Page> {
return fileBuffer;
}
private static ChannelBufferWrapper wrapWhole(ByteBuffer fileBuffer) {
final int position = fileBuffer.position();
final int limit = fileBuffer.limit();
final int capacity = fileBuffer.capacity();
try {
fileBuffer.clear();
final ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(fileBuffer);
//this check is important to avoid next ByteBuf::setIndex
//to fail due to ByteBuf::capacity == ByteBuffer::remaining bytes
assert wrappedBuffer.readableBytes() == capacity;
final ChannelBufferWrapper fileBufferWrapper = new ChannelBufferWrapper(wrappedBuffer);
return fileBufferWrapper;
} finally {
fileBuffer.position(position);
fileBuffer.limit(limit);
}
}
//sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2;
private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE;
@ -182,24 +193,39 @@ public final class Page implements Comparable<Page> {
file.position(0);
int processedBytes = 0;
ByteBuffer fileBuffer = null;
ChannelBufferWrapper fileBufferWrapper;
try {
int remainingBytes = fileSize - processedBytes;
if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
fileBuffer = fileFactory.newBuffer(Math.min(remainingBytes, MIN_CHUNK_SIZE));
//the wrapper is reused to avoid unnecessary allocations
fileBufferWrapper = wrapWhole(fileBuffer);
//no content is being added yet
fileBuffer.limit(0);
do {
final ByteBuffer oldFileBuffer = fileBuffer;
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE);
//change wrapper if fileBuffer has changed
if (fileBuffer != oldFileBuffer) {
fileBufferWrapper = wrapWhole(fileBuffer);
}
final byte startByte = fileBuffer.get();
if (startByte == Page.START_BYTE) {
final int encodedSize = fileBuffer.getInt();
final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
if (nextPosition <= fileSize) {
final ByteBuffer currentFileBuffer = fileBuffer;
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1);
//change wrapper if fileBuffer has changed
if (fileBuffer != currentFileBuffer) {
fileBufferWrapper = wrapWhole(fileBuffer);
}
final int endPosition = fileBuffer.position() + encodedSize;
//this check must be performed upfront decoding
if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) {
final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
decodeInto(fileBuffer, encodedSize, msg);
fileBufferWrapper.setIndex(fileBuffer.position(), endPosition);
msg.decode(fileBufferWrapper);
fileBuffer.position(endPosition + 1);
assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
msg.initMessage(storage);
@ -415,8 +441,7 @@ public final class Page implements Comparable<Page> {
* @param pageSubscriptionCounter
*/
public void addPendingCounter(PageSubscriptionCounter pageSubscriptionCounter) {
Set<PageSubscriptionCounter> counter = getOrCreatePendingCounters();
pendingCounters.add(pageSubscriptionCounter);
getOrCreatePendingCounters().add(pageSubscriptionCounter);
}
private synchronized Set<PageSubscriptionCounter> getPendingCounters() {
@ -430,4 +455,4 @@ public final class Page implements Comparable<Page> {
return pendingCounters;
}
}
}