This commit is contained in:
Clebert Suconic 2020-04-15 22:20:50 -04:00
commit ceceb6691e
4 changed files with 117 additions and 38 deletions

View File

@ -544,7 +544,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
List<PagedMessage> pgdMessagesList = null;
try {
depagedPage.open();
pgdMessagesList = depagedPage.read(storageManager);
pgdMessagesList = depagedPage.read(storageManager, true);
} finally {
try {
depagedPage.close(false, false);
@ -553,7 +553,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
storageManager.afterPageRead();
}
pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
pgdMessages = pgdMessagesList.isEmpty() ? null :
pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
} else {
pgdMessages = cache.getMessages();
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@ -210,6 +211,7 @@ public final class Page implements Comparable<Page> {
readFileBuffer.position(endPosition + 1);
assert readFileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
msg.initMessage(storageManager);
assert validateLargeMessageStorageManager(msg);
if (logger.isTraceEnabled()) {
logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
}
@ -246,8 +248,13 @@ public final class Page implements Comparable<Page> {
}
public synchronized List<PagedMessage> read(StorageManager storage) throws Exception {
return read(storage, false);
}
public synchronized List<PagedMessage> read(StorageManager storage, boolean onlyLargeMessages) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("reading page " + this.pageId + " on address = " + storeName);
logger.debugf("reading page %d on address = %s onlyLargeMessages = %b", storeName, pageId,
storage, onlyLargeMessages);
}
if (!file.isOpen()) {
@ -256,9 +263,11 @@ public final class Page implements Comparable<Page> {
size.lazySet((int) file.size());
final List<PagedMessage> messages = readFromSequentialFile(storage);
final List<PagedMessage> messages = new ArrayList<>();
numberOfMessages.lazySet(messages.size());
final int totalMessageCount = readFromSequentialFile(storage, messages, onlyLargeMessages);
numberOfMessages.lazySet(totalMessageCount);
return messages;
}
@ -316,6 +325,14 @@ public final class Page implements Comparable<Page> {
return fileBuffer;
}
private static boolean validateLargeMessageStorageManager(PagedMessage msg) {
if (!(msg.getMessage() instanceof LargeServerMessage)) {
return true;
}
LargeServerMessage largeServerMessage = ((LargeServerMessage) msg.getMessage());
return largeServerMessage.getStorageManager() != null;
}
private static ChannelBufferWrapper wrapWhole(ByteBuffer fileBuffer) {
final int position = fileBuffer.position();
final int limit = fileBuffer.limit();
@ -340,13 +357,15 @@ public final class Page implements Comparable<Page> {
private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1;
private static final int MIN_CHUNK_SIZE = Env.osPageSize();
private List<PagedMessage> readFromSequentialFile(StorageManager storage) throws Exception {
final List<PagedMessage> messages = new ArrayList<>();
private int readFromSequentialFile(StorageManager storage,
List<PagedMessage> messages,
boolean onlyLargeMessages) throws Exception {
final int fileSize = (int) file.size();
file.position(0);
int processedBytes = 0;
ByteBuffer fileBuffer = null;
ChannelBufferWrapper fileBufferWrapper;
int totalMessageCount = 0;
try {
int remainingBytes = fileSize - processedBytes;
if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
@ -376,29 +395,38 @@ public final class Page implements Comparable<Page> {
final int endPosition = fileBuffer.position() + encodedSize;
//this check must be performed upfront decoding
if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) {
final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager);
fileBufferWrapper.setIndex(fileBuffer.position(), endPosition);
msg.decode(fileBufferWrapper);
fileBuffer.position(endPosition + 1);
assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
msg.initMessage(storage);
assert msg.getMessage() instanceof LargeServerMessage && ((LargeServerMessage)msg.getMessage()).getStorageManager() != null || !(msg.getMessage() instanceof LargeServerMessage);
if (logger.isTraceEnabled()) {
logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
final boolean skipMessage;
if (onlyLargeMessages) {
skipMessage = !PagedMessageImpl.isLargeMessage(fileBufferWrapper);
} else {
skipMessage = false;
}
messages.add(msg);
if (!skipMessage) {
final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager);
msg.decode(fileBufferWrapper);
assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte";
msg.initMessage(storage);
assert validateLargeMessageStorageManager(msg);
if (logger.isTraceEnabled()) {
logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName);
}
messages.add(msg);
}
totalMessageCount++;
fileBuffer.position(endPosition + 1);
processedBytes = nextPosition;
} else {
markFileAsSuspect(file.getFileName(), processedBytes, messages.size());
return messages;
markFileAsSuspect(file.getFileName(), processedBytes, totalMessageCount + 1);
return totalMessageCount;
}
} else {
markFileAsSuspect(file.getFileName(), processedBytes, messages.size());
return messages;
markFileAsSuspect(file.getFileName(), processedBytes, totalMessageCount + 1);
return totalMessageCount;
}
} else {
markFileAsSuspect(file.getFileName(), processedBytes, messages.size());
return messages;
markFileAsSuspect(file.getFileName(), processedBytes, totalMessageCount + 1);
return totalMessageCount;
}
remainingBytes = fileSize - processedBytes;
}
@ -408,7 +436,7 @@ public final class Page implements Comparable<Page> {
if (logger.isTraceEnabled()) {
logger.tracef("%s has %d bytes of unknown data at position = %d", file.getFileName(), remainingBytes, processedBytes);
}
return messages;
return totalMessageCount;
} finally {
if (fileBuffer != null) {
fileFactory.releaseBuffer(fileBuffer);
@ -500,11 +528,12 @@ public final class Page implements Comparable<Page> {
}
if (logger.isDebugEnabled()) {
logger.debug("Deleting pageNr=" + pageId + " on store " + storeName);
logger.debugf("Deleting pageNr=%d on store %d", pageId, storeName);
}
List<Long> largeMessageIds = new ArrayList<>();
if (messages != null) {
final List<Long> largeMessageIds;
if (messages != null && messages.length > 0) {
largeMessageIds = new ArrayList<>();
for (PagedMessage msg : messages) {
if ((msg.getMessage()).isLargeMessage()) {
// this will trigger large message delete: no need to do it
@ -513,6 +542,8 @@ public final class Page implements Comparable<Page> {
largeMessageIds.add(msg.getMessage().getMessageID());
}
}
} else {
largeMessageIds = Collections.emptyList();
}
try {

View File

@ -35,6 +35,43 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public class PagedMessageImpl implements PagedMessage {
// It encapsulates the logic to detect large message types
private static final class LargeMessageType {
private static final byte NONE = 0;
private static final byte CORE = 1;
private static final byte NOT_CORE = 2;
public static boolean isLargeMessage(byte encodedValue) {
switch (encodedValue) {
case LargeMessageType.NONE:
return false;
case LargeMessageType.CORE:
case LargeMessageType.NOT_CORE:
return true;
default:
throw new IllegalStateException("This largeMessageType isn't supported: " + encodedValue);
}
}
public static boolean isCoreLargeMessage(Message message) {
return message.isLargeMessage() && message instanceof ICoreMessage;
}
public static boolean isCoreLargeMessageType(byte encodedValue) {
return encodedValue == LargeMessageType.CORE;
}
public static byte valueOf(Message message) {
if (!message.isLargeMessage()) {
return NONE;
}
if (message instanceof ICoreMessage) {
return CORE;
}
return NOT_CORE;
}
}
/**
* Large messages will need to be instantiated lazily during getMessage when the StorageManager
* is available
@ -116,13 +153,21 @@ public class PagedMessageImpl implements PagedMessage {
// EncodingSupport implementation --------------------------------
/**
* This method won't move the {@link ActiveMQBuffer#readerIndex()} of {@code buffer}.
*/
public static boolean isLargeMessage(ActiveMQBuffer buffer) {
// skip transactionID
return LargeMessageType.isLargeMessage(buffer.getByte(buffer.readerIndex() + Long.BYTES));
}
@Override
public void decode(final ActiveMQBuffer buffer) {
transactionID = buffer.readLong();
boolean isLargeMessage = buffer.readBoolean();
boolean isCoreLargeMessage = LargeMessageType.isCoreLargeMessageType(buffer.readByte());
if (isLargeMessage) {
if (isCoreLargeMessage) {
int largeMessageHeaderSize = buffer.readInt();
if (storageManager == null) {
@ -155,12 +200,12 @@ public class PagedMessageImpl implements PagedMessage {
public void encode(final ActiveMQBuffer buffer) {
buffer.writeLong(transactionID);
boolean isLargeMessage = isLargeMessage();
byte largeMessageType = LargeMessageType.valueOf(message);
buffer.writeBoolean(isLargeMessage);
buffer.writeByte(largeMessageType);
if (isLargeMessage) {
buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message));
if (LargeMessageType.isCoreLargeMessageType(largeMessageType)) {
buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage) message));
LargeMessagePersister.getInstance().encode(buffer, (LargeServerMessage) message);
} else {
message.getPersister().encode(buffer, message);
@ -173,13 +218,9 @@ public class PagedMessageImpl implements PagedMessage {
}
}
public boolean isLargeMessage() {
return message instanceof ICoreMessage && ((ICoreMessage)message).isLargeMessage();
}
@Override
public int getEncodeSize() {
if (isLargeMessage()) {
if (LargeMessageType.isCoreLargeMessage(message)) {
return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) +
DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
} else {

View File

@ -150,7 +150,7 @@ public class PageTest extends ActiveMQTestBase {
file.open();
page = new Page(new SimpleString("something"), storageManager, factory, file, 10);
List<PagedMessage> msgs = page.read(storageManager);
List<PagedMessage> msgs = page.read(storageManager, largeMessages);
Assert.assertEquals(numberOfElements, msgs.size());
@ -164,6 +164,12 @@ public class PageTest extends ActiveMQTestBase {
Assert.assertEquals(largeMessages ? 1 : 0, pagedMessage.getMessage().getUsage());
}
if (!largeMessages) {
Page tmpPage = new Page(new SimpleString("something"), storageManager, factory, file, 10);
Assert.assertEquals(0, tmpPage.read(storageManager, true).size());
Assert.assertEquals(numberOfElements, tmpPage.getNumberOfMessages());
}
Assert.assertTrue(page.delete(msgs.toArray(new PagedMessage[msgs.size()])));
for (PagedMessage pagedMessage : msgs) {