diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index c8fbb3d944..03d0e672b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -16,27 +16,20 @@ */ package org.apache.activemq.artemis.core.paging.impl; -import java.io.File; -import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.Unpooled; -import io.netty.util.internal.PlatformDependent; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFileFactory; -import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory; -import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.cursor.LivePageCache; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; @@ -45,6 +38,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.jboss.logging.Logger; @@ -87,8 +81,6 @@ public final class Page implements Comparable { */ private Set pendingCounters; - private boolean canBeMapped; - public Page(final SimpleString storeName, final StorageManager storageManager, final SequentialFileFactory factory, @@ -99,8 +91,6 @@ public final class Page implements Comparable { fileFactory = factory; this.storageManager = storageManager; this.storeName = storeName; - this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory; - //pooled buffers to avoid allocations on hot paths } public int getPageId() { @@ -120,105 +110,133 @@ public final class Page implements Comparable { throw ActiveMQMessageBundle.BUNDLE.invalidPageIO(); } - final List messages = new ArrayList<>(); - size.lazySet((int) file.size()); - if (this.canBeMapped) { - readFromMapped(storage, messages); - // if the file is open to be written - // it needs to updated the position - file.position(file.size()); - } else { - readFromSequentialFile(storage, messages); - } + final List messages = readFromSequentialFile(storage); numberOfMessages.lazySet(messages.size()); return messages; } - private void readFromSequentialFile(StorageManager storage, List messages) throws Exception { - final int fileSize = (int) file.size(); - //doesn't need to be a direct buffer: that case is covered using the MMAP read - final ByteBuffer buffer = this.fileFactory.newBuffer(fileSize); - try { - file.position(0); - file.read(buffer); - buffer.rewind(); - assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file"; - ChannelBufferWrapper activeMQBuffer = wrapBuffer(fileSize, buffer); - read(storage, activeMQBuffer, messages); - } finally { - this.fileFactory.releaseBuffer(buffer); - } + private static void decodeInto(ByteBuffer fileBuffer, int encodedSize, PagedMessageImpl msg) { + final ActiveMQBuffer wrappedBuffer = ActiveMQBuffers.wrappedBuffer(fileBuffer); + wrappedBuffer.writerIndex(encodedSize); + msg.decode(wrappedBuffer); } - private ChannelBufferWrapper wrapBuffer(int fileSize, ByteBuffer buffer) { - ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer)); - return activeMQBuffer; + private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes) throws Exception { + final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE)); + newFileBuffer.put(fileBuffer); + fileFactory.releaseBuffer(fileBuffer); + fileBuffer = newFileBuffer; + //move the limit to allow reading as much as possible from the file + fileBuffer.limit(fileBuffer.capacity()); + file.read(fileBuffer); + fileBuffer.position(0); + return fileBuffer; } - private static MappedByteBuffer mapFileForRead(File file, int fileSize) { - try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) { - return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - private int readFromMapped(StorageManager storage, List messages) throws IOException { - file.position(0); - //use a readonly mapped view of the file - final int mappedSize = size.get(); - final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize); - ChannelBufferWrapper activeMQBuffer = wrapBuffer(mappedSize, mappedByteBuffer); - try { - return read(storage, activeMQBuffer, messages); - } finally { - //unmap the file after read it to avoid GC to take care of it - PlatformDependent.freeDirectBuffer(mappedByteBuffer); - } - } - - private int read(StorageManager storage, ActiveMQBuffer fileBuffer, List messages) { - int readMessages = 0; - while (fileBuffer.readable()) { - final int position = fileBuffer.readerIndex(); - - byte byteRead = fileBuffer.readByte(); - - if (byteRead == Page.START_BYTE) { - if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity()) { - int messageSize = fileBuffer.readInt(); - int oldPos = fileBuffer.readerIndex(); - if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) { - PagedMessage msg = new PagedMessageImpl(storageManager); - msg.decode(fileBuffer); - byte b = fileBuffer.readByte(); - if (b != Page.END_BYTE) { - // Sanity Check: This would only happen if there is a bug on decode or any internal code, as - // this - // constraint was already checked - throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b); - } - msg.initMessage(storage); - if (logger.isTraceEnabled()) { - logger.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName); - } - readMessages++; - messages.add(msg); - } else { - markFileAsSuspect(file.getFileName(), position, messages.size()); - break; - } + /** + * It returns a {@link ByteBuffer} that has {@link ByteBuffer#remaining()} bytes >= {@code requiredBytes} + * of valid data from {@link #file}. + */ + private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int requiredBytes) throws Exception { + final int remaining = fileBuffer.remaining(); + //fileBuffer::remaining is the current size of valid data + final int bytesToBeRead = requiredBytes - remaining; + if (bytesToBeRead > 0) { + final int capacity = fileBuffer.capacity(); + //fileBuffer has enough overall capacity to hold all the required bytes? + if (capacity >= requiredBytes) { + //we do not care to use the free space between + //fileBuffer::limit and fileBuffer::capacity + //to save compactions, because fileBuffer + //is very unlikely to not be completely full + //after each file::read + if (fileBuffer.limit() > 0) { + //the previous check avoid compact + //to attempt a copy of 0 bytes + fileBuffer.compact(); + } else { + //compact already set the limit == capacity + fileBuffer.limit(capacity); } + file.read(fileBuffer); + fileBuffer.position(0); } else { - markFileAsSuspect(file.getFileName(), position, messages.size()); - break; + fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, requiredBytes); + } + } + return fileBuffer; + } + + //sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE) + private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2; + private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE; + private static final int MIN_CHUNK_SIZE = Env.osPageSize(); + + private List readFromSequentialFile(StorageManager storage) throws Exception { + final List messages = new ArrayList<>(); + final int fileSize = (int) file.size(); + file.position(0); + int processedBytes = 0; + ByteBuffer fileBuffer = null; + try { + int remainingBytes = fileSize - processedBytes; + if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) { + fileBuffer = fileFactory.newBuffer(Math.min(remainingBytes, MIN_CHUNK_SIZE)); + fileBuffer.limit(0); + do { + fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE); + final byte startByte = fileBuffer.get(); + if (startByte == Page.START_BYTE) { + final int encodedSize = fileBuffer.getInt(); + final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize; + if (nextPosition <= fileSize) { + fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1); + final int endPosition = fileBuffer.position() + encodedSize; + //this check must be performed upfront decoding + if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) { + final PagedMessageImpl msg = new PagedMessageImpl(storageManager); + decodeInto(fileBuffer, encodedSize, msg); + fileBuffer.position(endPosition + 1); + assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte"; + msg.initMessage(storage); + if (logger.isTraceEnabled()) { + logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName); + } + messages.add(msg); + processedBytes = nextPosition; + } else { + markFileAsSuspect(file.getFileName(), processedBytes, messages.size()); + return messages; + } + } else { + markFileAsSuspect(file.getFileName(), processedBytes, messages.size()); + return messages; + } + } else { + markFileAsSuspect(file.getFileName(), processedBytes, messages.size()); + return messages; + } + remainingBytes = fileSize - processedBytes; + } + while (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE); + } + //ignore incomplete messages at the end of the file + if (logger.isTraceEnabled()) { + logger.tracef("%s has %d bytes of unknown data at position = %d", file.getFileName(), remainingBytes, processedBytes); + } + return messages; + } finally { + if (fileBuffer != null) { + fileFactory.releaseBuffer(fileBuffer); + } + if (file.position() != fileSize) { + file.position(fileSize); } } - return readMessages; } public synchronized void write(final PagedMessage message) throws Exception { @@ -228,7 +246,7 @@ public final class Page implements Comparable { final int messageEncodedSize = message.getEncodeSize(); final int bufferSize = messageEncodedSize + Page.SIZE_RECORD; final ByteBuffer buffer = fileFactory.newBuffer(bufferSize); - ChannelBufferWrapper activeMQBuffer = wrapBuffer(bufferSize, buffer); + ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer)); activeMQBuffer.clear(); activeMQBuffer.writeByte(Page.START_BYTE); activeMQBuffer.writeInt(messageEncodedSize); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java index 155831fe9c..83681776e3 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java @@ -359,7 +359,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory { throw new IllegalStateException("Is closed"); } - byte[] bytesRead = new byte[bytes.limit()]; + byte[] bytesRead = new byte[Math.min(bytes.remaining(), data.remaining())]; data.get(bytesRead);