diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 38732929de..077d852546 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -16,18 +16,26 @@ */ package org.apache.activemq.artemis.core.paging.impl; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; +import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; +import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; @@ -78,6 +86,8 @@ public final class Page implements Comparable { */ private Set pendingCounters; + private boolean canBeMapped; + public Page(final SimpleString storeName, final StorageManager storageManager, final SequentialFileFactory factory, @@ -88,6 +98,7 @@ public final class Page implements Comparable { fileFactory = factory; this.storageManager = storageManager; this.storeName = storeName; + this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory; } public int getPageId() { @@ -107,9 +118,22 @@ public final class Page implements Comparable { throw ActiveMQMessageBundle.BUNDLE.invalidPageIO(); } - ArrayList messages = new ArrayList<>(); + final List messages = new ArrayList<>(); - size.set((int) file.size()); + size.lazySet((int) file.size()); + + if (this.canBeMapped) { + readFromMapped(storage, messages); + } else { + readFromSequentialFile(storage, messages); + } + + numberOfMessages.lazySet(messages.size()); + + return messages; + } + + private void readFromSequentialFile(StorageManager storage, List messages) throws Exception { // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467 ByteBuffer directBuffer = storage.allocateDirectBuffer((int) file.size()); ActiveMQBuffer fileBuffer = null; @@ -122,51 +146,76 @@ public final class Page implements Comparable { fileBuffer = ActiveMQBuffers.wrappedBuffer(directBuffer); fileBuffer.writerIndex(fileBuffer.capacity()); - - while (fileBuffer.readable()) { - final int position = fileBuffer.readerIndex(); - - byte byteRead = fileBuffer.readByte(); - - if (byteRead == Page.START_BYTE) { - if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity()) { - int messageSize = fileBuffer.readInt(); - int oldPos = fileBuffer.readerIndex(); - if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) { - PagedMessage msg = new PagedMessageImpl(storageManager); - msg.decode(fileBuffer); - byte b = fileBuffer.readByte(); - if (b != Page.END_BYTE) { - // Sanity Check: This would only happen if there is a bug on decode or any internal code, as - // this - // constraint was already checked - throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b); - } - msg.initMessage(storage); - if (logger.isTraceEnabled()) { - logger.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName); - } - messages.add(msg); - } else { - markFileAsSuspect(file.getFileName(), position, messages.size()); - break; - } - } - } else { - markFileAsSuspect(file.getFileName(), position, messages.size()); - break; - } - } + read(storage, fileBuffer, messages); } finally { if (fileBuffer != null) { fileBuffer.byteBuf().unwrap().release(); } storage.freeDirectBuffer(directBuffer); } + } - numberOfMessages.set(messages.size()); + private static MappedByteBuffer mapFileForRead(File file, int fileSize) { + try (RandomAccessFile raf = new RandomAccessFile(file, "rw"); + FileChannel channel = raf.getChannel()) { + return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } - return messages; + private int readFromMapped(StorageManager storage, List messages) throws IOException { + file.position(0); + //use a readonly mapped view of the file + final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), size.get()); + try { + final ActiveMQBuffer fileBuffer = ActiveMQBuffers.wrappedBuffer(mappedByteBuffer); + fileBuffer.writerIndex(fileBuffer.capacity()); + return read(storage, fileBuffer, messages); + } finally { + //unmap the file after read it to avoid GC to take care of it + PlatformDependent.freeDirectBuffer(mappedByteBuffer); + } + } + + private int read(StorageManager storage, ActiveMQBuffer fileBuffer, List messages) { + int readMessages = 0; + while (fileBuffer.readable()) { + final int position = fileBuffer.readerIndex(); + + byte byteRead = fileBuffer.readByte(); + + if (byteRead == Page.START_BYTE) { + if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity()) { + int messageSize = fileBuffer.readInt(); + int oldPos = fileBuffer.readerIndex(); + if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) { + PagedMessage msg = new PagedMessageImpl(storageManager); + msg.decode(fileBuffer); + byte b = fileBuffer.readByte(); + if (b != Page.END_BYTE) { + // Sanity Check: This would only happen if there is a bug on decode or any internal code, as + // this + // constraint was already checked + throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b); + } + msg.initMessage(storage); + if (logger.isTraceEnabled()) { + logger.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName); + } + readMessages++; + messages.add(msg); + } else { + markFileAsSuspect(file.getFileName(), position, messages.size()); + break; + } + } + } else { + markFileAsSuspect(file.getFileName(), position, messages.size()); + break; + } + } + return readMessages; } public synchronized void write(final PagedMessage message) throws Exception {