This commit is contained in:
Clebert Suconic 2017-06-23 09:37:46 -04:00
commit b640a62417
1 changed files with 53 additions and 46 deletions

View File

@ -27,11 +27,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
@ -88,6 +89,9 @@ public final class Page implements Comparable<Page> {
private boolean canBeMapped; private boolean canBeMapped;
private final ActiveMQBuffer activeMQBuffer;
private final UnpooledUnsafeDirectByteBufWrapper unsafeByteBufWrapper;
public Page(final SimpleString storeName, public Page(final SimpleString storeName,
final StorageManager storageManager, final StorageManager storageManager,
final SequentialFileFactory factory, final SequentialFileFactory factory,
@ -99,6 +103,9 @@ public final class Page implements Comparable<Page> {
this.storageManager = storageManager; this.storageManager = storageManager;
this.storeName = storeName; this.storeName = storeName;
this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory; this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory;
//pooled buffers to avoid allocations on hot paths
this.unsafeByteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
this.activeMQBuffer = new ChannelBufferWrapper(this.unsafeByteBufWrapper);
} }
public int getPageId() { public int getPageId() {
@ -134,24 +141,24 @@ public final class Page implements Comparable<Page> {
} }
private void readFromSequentialFile(StorageManager storage, List<PagedMessage> messages) throws Exception { private void readFromSequentialFile(StorageManager storage, List<PagedMessage> messages) throws Exception {
// Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467 final int fileSize = (int) file.size();
ByteBuffer directBuffer = storage.allocateDirectBuffer((int) file.size()); //doesn't need to be a direct buffer: that case is covered using the MMAP read
ActiveMQBuffer fileBuffer = null; final ByteBuffer buffer = this.fileFactory.newBuffer(fileSize);
try { try {
file.position(0); file.position(0);
file.read(directBuffer); file.read(buffer);
buffer.rewind();
directBuffer.rewind(); assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file";
this.unsafeByteBufWrapper.wrap(buffer, 0, fileSize);
fileBuffer = ActiveMQBuffers.wrappedBuffer(directBuffer); try {
fileBuffer.writerIndex(fileBuffer.capacity()); this.activeMQBuffer.clear();
read(storage, fileBuffer, messages); this.activeMQBuffer.writerIndex(fileSize);
} finally { read(storage, this.activeMQBuffer, messages);
if (fileBuffer != null) { } finally {
fileBuffer.byteBuf().unwrap().release(); this.unsafeByteBufWrapper.reset();
} }
storage.freeDirectBuffer(directBuffer); } finally {
this.fileFactory.releaseBuffer(buffer);
} }
} }
@ -167,12 +174,15 @@ public final class Page implements Comparable<Page> {
private int readFromMapped(StorageManager storage, List<PagedMessage> messages) throws IOException { private int readFromMapped(StorageManager storage, List<PagedMessage> messages) throws IOException {
file.position(0); file.position(0);
//use a readonly mapped view of the file //use a readonly mapped view of the file
final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), size.get()); final int mappedSize = size.get();
final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize);
this.unsafeByteBufWrapper.wrap(mappedByteBuffer, 0, mappedSize);
try { try {
final ActiveMQBuffer fileBuffer = ActiveMQBuffers.wrappedBuffer(mappedByteBuffer); this.activeMQBuffer.clear();
fileBuffer.writerIndex(fileBuffer.capacity()); this.activeMQBuffer.writerIndex(mappedSize);
return read(storage, fileBuffer, messages); return read(storage, this.activeMQBuffer, messages);
} finally { } finally {
this.unsafeByteBufWrapper.reset();
//unmap the file after read it to avoid GC to take care of it //unmap the file after read it to avoid GC to take care of it
PlatformDependent.freeDirectBuffer(mappedByteBuffer); PlatformDependent.freeDirectBuffer(mappedByteBuffer);
} }
@ -220,35 +230,32 @@ public final class Page implements Comparable<Page> {
public synchronized void write(final PagedMessage message) throws Exception { public synchronized void write(final PagedMessage message) throws Exception {
if (!file.isOpen()) { if (!file.isOpen()) {
return; return;
} }
final int messageEncodedSize = message.getEncodeSize();
ByteBuffer buffer = fileFactory.newBuffer(message.getEncodeSize() + Page.SIZE_RECORD); final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
ActiveMQBuffer wrap = ActiveMQBuffers.wrappedBuffer(buffer); this.unsafeByteBufWrapper.wrap(buffer, 0, bufferSize);
wrap.clear(); try {
this.activeMQBuffer.clear();
wrap.writeByte(Page.START_BYTE); this.activeMQBuffer.writeByte(Page.START_BYTE);
wrap.writeInt(0); this.activeMQBuffer.writeInt(messageEncodedSize);
int startIndex = wrap.writerIndex(); message.encode(this.activeMQBuffer);
message.encode(wrap); this.activeMQBuffer.writeByte(Page.END_BYTE);
int endIndex = wrap.writerIndex(); assert (this.activeMQBuffer.readableBytes() == bufferSize) : "messageEncodedSize is different from expected";
wrap.setInt(1, endIndex - startIndex); // The encoded length //buffer limit and position are the same
wrap.writeByte(Page.END_BYTE); assert (buffer.remaining() == bufferSize) : "buffer position or limit are changed";
file.writeDirect(buffer, false);
buffer.rewind(); if (pageCache != null) {
pageCache.addLiveMessage(message);
file.writeDirect(buffer, false); }
//lighter than addAndGet when single writer
if (pageCache != null) { numberOfMessages.lazySet(numberOfMessages.get() + 1);
pageCache.addLiveMessage(message); size.lazySet(size.get() + bufferSize);
storageManager.pageWrite(message, pageId);
} finally {
this.unsafeByteBufWrapper.reset();
} }
numberOfMessages.incrementAndGet();
size.addAndGet(buffer.limit());
storageManager.pageWrite(message, pageId);
} }
public void sync() throws Exception { public void sync() throws Exception {