ARTEMIS-2317 Avoid long TTSP caused by Page::read using mmap read

It implements Page::read using chunked reading of SequentialFile
instead of using memory mapped files causing long time to safepoint
issues.
This commit is contained in:
Francesco Nigro 2019-04-17 19:56:36 +02:00 committed by Clebert Suconic
parent b66adc2d8a
commit 881143252c
2 changed files with 115 additions and 97 deletions

View File

@ -16,27 +16,20 @@
*/
package org.apache.activemq.artemis.core.paging.impl;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
@ -45,6 +38,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.jboss.logging.Logger;
@ -87,8 +81,6 @@ public final class Page implements Comparable<Page> {
*/
private Set<PageSubscriptionCounter> pendingCounters;
private boolean canBeMapped;
public Page(final SimpleString storeName,
final StorageManager storageManager,
final SequentialFileFactory factory,
@ -99,8 +91,6 @@ public final class Page implements Comparable<Page> {
fileFactory = factory;
this.storageManager = storageManager;
this.storeName = storeName;
this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || fileFactory instanceof MappedSequentialFileFactory;
//pooled buffers to avoid allocations on hot paths
}
public int getPageId() {
@ -120,105 +110,133 @@ public final class Page implements Comparable<Page> {
throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
}
final List<PagedMessage> messages = new ArrayList<>();
size.lazySet((int) file.size());
if (this.canBeMapped) {
readFromMapped(storage, messages);
// if the file is open to be written
// it needs to updated the position
file.position(file.size());
} else {
readFromSequentialFile(storage, messages);
}
final List<PagedMessage> messages = readFromSequentialFile(storage);
numberOfMessages.lazySet(messages.size());
return messages;
}
private void readFromSequentialFile(StorageManager storage, List<PagedMessage> messages) throws Exception {
final int fileSize = (int) file.size();
//doesn't need to be a direct buffer: that case is covered using the MMAP read
final ByteBuffer buffer = this.fileFactory.newBuffer(fileSize);
try {
file.position(0);
file.read(buffer);
buffer.rewind();
assert (buffer.limit() == fileSize) : "buffer doesn't contains the whole file";
ChannelBufferWrapper activeMQBuffer = wrapBuffer(fileSize, buffer);
read(storage, activeMQBuffer, messages);
} finally {
this.fileFactory.releaseBuffer(buffer);
}
private static void decodeInto(ByteBuffer fileBuffer, int encodedSize, PagedMessageImpl msg) {
final ActiveMQBuffer wrappedBuffer = ActiveMQBuffers.wrappedBuffer(fileBuffer);
wrappedBuffer.writerIndex(encodedSize);
msg.decode(wrappedBuffer);
}
private ChannelBufferWrapper wrapBuffer(int fileSize, ByteBuffer buffer) {
ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
return activeMQBuffer;
private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
final ByteBuffer newFileBuffer = fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
newFileBuffer.put(fileBuffer);
fileFactory.releaseBuffer(fileBuffer);
fileBuffer = newFileBuffer;
//move the limit to allow reading as much as possible from the file
fileBuffer.limit(fileBuffer.capacity());
file.read(fileBuffer);
fileBuffer.position(0);
return fileBuffer;
}
private static MappedByteBuffer mapFileForRead(File file, int fileSize) {
try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private int readFromMapped(StorageManager storage, List<PagedMessage> messages) throws IOException {
file.position(0);
//use a readonly mapped view of the file
final int mappedSize = size.get();
final MappedByteBuffer mappedByteBuffer = mapFileForRead(this.file.getJavaFile(), mappedSize);
ChannelBufferWrapper activeMQBuffer = wrapBuffer(mappedSize, mappedByteBuffer);
try {
return read(storage, activeMQBuffer, messages);
} finally {
//unmap the file after read it to avoid GC to take care of it
PlatformDependent.freeDirectBuffer(mappedByteBuffer);
}
}
private int read(StorageManager storage, ActiveMQBuffer fileBuffer, List<PagedMessage> messages) {
int readMessages = 0;
while (fileBuffer.readable()) {
final int position = fileBuffer.readerIndex();
byte byteRead = fileBuffer.readByte();
if (byteRead == Page.START_BYTE) {
if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity()) {
int messageSize = fileBuffer.readInt();
int oldPos = fileBuffer.readerIndex();
if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == Page.END_BYTE) {
PagedMessage msg = new PagedMessageImpl(storageManager);
msg.decode(fileBuffer);
byte b = fileBuffer.readByte();
if (b != Page.END_BYTE) {
// Sanity Check: This would only happen if there is a bug on decode or any internal code, as
// this
// constraint was already checked
throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
}
msg.initMessage(storage);
if (logger.isTraceEnabled()) {
logger.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName);
}
readMessages++;
messages.add(msg);
} else {
markFileAsSuspect(file.getFileName(), position, messages.size());
break;
}
/**
* It returns a {@link ByteBuffer} that has {@link ByteBuffer#remaining()} bytes >= {@code requiredBytes}
* of valid data from {@link #file}.
*/
private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int requiredBytes) throws Exception {
final int remaining = fileBuffer.remaining();
//fileBuffer::remaining is the current size of valid data
final int bytesToBeRead = requiredBytes - remaining;
if (bytesToBeRead > 0) {
final int capacity = fileBuffer.capacity();
//fileBuffer has enough overall capacity to hold all the required bytes?
if (capacity >= requiredBytes) {
//we do not care to use the free space between
//fileBuffer::limit and fileBuffer::capacity
//to save compactions, because fileBuffer
//is very unlikely to not be completely full
//after each file::read
if (fileBuffer.limit() > 0) {
//the previous check avoid compact
//to attempt a copy of 0 bytes
fileBuffer.compact();
} else {
//compact already set the limit == capacity
fileBuffer.limit(capacity);
}
file.read(fileBuffer);
fileBuffer.position(0);
} else {
markFileAsSuspect(file.getFileName(), position, messages.size());
break;
fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, requiredBytes);
}
}
return fileBuffer;
}
//sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 2;
private static final int MINIMUM_MSG_PERSISTENT_SIZE = HEADER_AND_TRAILER_SIZE;
private static final int MIN_CHUNK_SIZE = Env.osPageSize();
private List<PagedMessage> readFromSequentialFile(StorageManager storage) throws Exception {
final List<PagedMessage> messages = new ArrayList<>();
final int fileSize = (int) file.size();
file.position(0);
int processedBytes = 0;
ByteBuffer fileBuffer = null;
try {
int remainingBytes = fileSize - processedBytes;
if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
fileBuffer = fileFactory.newBuffer(Math.min(remainingBytes, MIN_CHUNK_SIZE));
fileBuffer.limit(0);
do {
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, MINIMUM_MSG_PERSISTENT_SIZE);
final byte startByte = fileBuffer.get();
if (startByte == Page.START_BYTE) {
final int encodedSize = fileBuffer.getInt();
final int nextPosition = processedBytes + HEADER_AND_TRAILER_SIZE + encodedSize;
if (nextPosition <= fileSize) {
fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, encodedSize + 1);
final int endPosition = fileBuffer.position() + encodedSize;
//this check must be performed upfront decoding
if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) {
final PagedMessageImpl msg = new PagedMessageImpl(storageManager);
decodeInto(fileBuffer, encodedSize, msg);
fileBuffer.position(endPosition + 1);
assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
msg.initMessage(storage);
if (logger.isTraceEnabled()) {
logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
}
messages.add(msg);
processedBytes = nextPosition;
} else {
markFileAsSuspect(file.getFileName(), processedBytes, messages.size());
return messages;
}
} else {
markFileAsSuspect(file.getFileName(), processedBytes, messages.size());
return messages;
}
} else {
markFileAsSuspect(file.getFileName(), processedBytes, messages.size());
return messages;
}
remainingBytes = fileSize - processedBytes;
}
while (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE);
}
//ignore incomplete messages at the end of the file
if (logger.isTraceEnabled()) {
logger.tracef("%s has %d bytes of unknown data at position = %d", file.getFileName(), remainingBytes, processedBytes);
}
return messages;
} finally {
if (fileBuffer != null) {
fileFactory.releaseBuffer(fileBuffer);
}
if (file.position() != fileSize) {
file.position(fileSize);
}
}
return readMessages;
}
public synchronized void write(final PagedMessage message) throws Exception {
@ -228,7 +246,7 @@ public final class Page implements Comparable<Page> {
final int messageEncodedSize = message.getEncodeSize();
final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
ChannelBufferWrapper activeMQBuffer = wrapBuffer(bufferSize, buffer);
ChannelBufferWrapper activeMQBuffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
activeMQBuffer.clear();
activeMQBuffer.writeByte(Page.START_BYTE);
activeMQBuffer.writeInt(messageEncodedSize);

View File

@ -359,7 +359,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
throw new IllegalStateException("Is closed");
}
byte[] bytesRead = new byte[bytes.limit()];
byte[] bytesRead = new byte[Math.min(bytes.remaining(), data.remaining())];
data.get(bytesRead);