From 46060b89ffc3d8899c46e8d80438c50c8b76c3e7 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Mon, 23 Mar 2020 13:01:38 +0100 Subject: [PATCH] ARTEMIS-2676 PageCursorProviderImpl::cleanup can save decoding pages without large messages --- .../cursor/impl/PageCursorProviderImpl.java | 5 +- .../artemis/core/paging/impl/Page.java | 79 +++++++++++++------ .../core/paging/impl/PagedMessageImpl.java | 63 ++++++++++++--- .../tests/unit/core/paging/impl/PageTest.java | 8 +- 4 files changed, 117 insertions(+), 38 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java index 1990633dac..0a8168d70e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java @@ -544,7 +544,7 @@ public class PageCursorProviderImpl implements PageCursorProvider { List pgdMessagesList = null; try { depagedPage.open(); - pgdMessagesList = depagedPage.read(storageManager); + pgdMessagesList = depagedPage.read(storageManager, true); } finally { try { depagedPage.close(false, false); @@ -553,7 +553,8 @@ public class PageCursorProviderImpl implements PageCursorProvider { storageManager.afterPageRead(); } - pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]); + pgdMessages = pgdMessagesList.isEmpty() ? null : + pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]); } else { pgdMessages = cache.getMessages(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java index 3aad18460b..badfabfac1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging.impl; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -210,6 +211,7 @@ public final class Page implements Comparable { readFileBuffer.position(endPosition + 1); assert readFileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte"; msg.initMessage(storageManager); + assert validateLargeMessageStorageManager(msg); if (logger.isTraceEnabled()) { logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName); } @@ -246,8 +248,13 @@ public final class Page implements Comparable { } public synchronized List read(StorageManager storage) throws Exception { + return read(storage, false); + } + + public synchronized List read(StorageManager storage, boolean onlyLargeMessages) throws Exception { if (logger.isDebugEnabled()) { - logger.debug("reading page " + this.pageId + " on address = " + storeName); + logger.debugf("reading page %d on address = %s onlyLargeMessages = %b", storeName, pageId, + storage, onlyLargeMessages); } if (!file.isOpen()) { @@ -256,9 +263,11 @@ public final class Page implements Comparable { size.lazySet((int) file.size()); - final List messages = readFromSequentialFile(storage); + final List messages = new ArrayList<>(); - numberOfMessages.lazySet(messages.size()); + final int totalMessageCount = readFromSequentialFile(storage, messages, onlyLargeMessages); + + numberOfMessages.lazySet(totalMessageCount); return messages; } @@ -316,6 +325,14 @@ public final class Page implements Comparable { return fileBuffer; } + private static boolean validateLargeMessageStorageManager(PagedMessage msg) { + if (!(msg.getMessage() instanceof LargeServerMessage)) { + return true; + } + LargeServerMessage largeServerMessage = ((LargeServerMessage) msg.getMessage()); + return largeServerMessage.getStorageManager() != null; + } + private static ChannelBufferWrapper wrapWhole(ByteBuffer fileBuffer) { final int position = fileBuffer.position(); final int limit = fileBuffer.limit(); @@ -340,13 +357,15 @@ public final class Page implements Comparable { private static final int HEADER_SIZE = HEADER_AND_TRAILER_SIZE - 1; private static final int MIN_CHUNK_SIZE = Env.osPageSize(); - private List readFromSequentialFile(StorageManager storage) throws Exception { - final List messages = new ArrayList<>(); + private int readFromSequentialFile(StorageManager storage, + List messages, + boolean onlyLargeMessages) throws Exception { final int fileSize = (int) file.size(); file.position(0); int processedBytes = 0; ByteBuffer fileBuffer = null; ChannelBufferWrapper fileBufferWrapper; + int totalMessageCount = 0; try { int remainingBytes = fileSize - processedBytes; if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) { @@ -376,29 +395,38 @@ public final class Page implements Comparable { final int endPosition = fileBuffer.position() + encodedSize; //this check must be performed upfront decoding if (fileBuffer.remaining() >= (encodedSize + 1) && fileBuffer.get(endPosition) == Page.END_BYTE) { - final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager); fileBufferWrapper.setIndex(fileBuffer.position(), endPosition); - msg.decode(fileBufferWrapper); - fileBuffer.position(endPosition + 1); - assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte"; - msg.initMessage(storage); - assert msg.getMessage() instanceof LargeServerMessage && ((LargeServerMessage)msg.getMessage()).getStorageManager() != null || !(msg.getMessage() instanceof LargeServerMessage); - if (logger.isTraceEnabled()) { - logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName); + final boolean skipMessage; + if (onlyLargeMessages) { + skipMessage = !PagedMessageImpl.isLargeMessage(fileBufferWrapper); + } else { + skipMessage = false; } - messages.add(msg); + if (!skipMessage) { + final PagedMessageImpl msg = new PagedMessageImpl(encodedSize, storageManager); + msg.decode(fileBufferWrapper); + assert fileBuffer.get(endPosition) == Page.END_BYTE : "decoding cannot change end byte"; + msg.initMessage(storage); + assert validateLargeMessageStorageManager(msg); + if (logger.isTraceEnabled()) { + logger.tracef("Reading message %s on pageId=%d for address=%s", msg, pageId, storeName); + } + messages.add(msg); + } + totalMessageCount++; + fileBuffer.position(endPosition + 1); processedBytes = nextPosition; } else { - markFileAsSuspect(file.getFileName(), processedBytes, messages.size()); - return messages; + markFileAsSuspect(file.getFileName(), processedBytes, totalMessageCount + 1); + return totalMessageCount; } } else { - markFileAsSuspect(file.getFileName(), processedBytes, messages.size()); - return messages; + markFileAsSuspect(file.getFileName(), processedBytes, totalMessageCount + 1); + return totalMessageCount; } } else { - markFileAsSuspect(file.getFileName(), processedBytes, messages.size()); - return messages; + markFileAsSuspect(file.getFileName(), processedBytes, totalMessageCount + 1); + return totalMessageCount; } remainingBytes = fileSize - processedBytes; } @@ -408,7 +436,7 @@ public final class Page implements Comparable { if (logger.isTraceEnabled()) { logger.tracef("%s has %d bytes of unknown data at position = %d", file.getFileName(), remainingBytes, processedBytes); } - return messages; + return totalMessageCount; } finally { if (fileBuffer != null) { fileFactory.releaseBuffer(fileBuffer); @@ -500,11 +528,12 @@ public final class Page implements Comparable { } if (logger.isDebugEnabled()) { - logger.debug("Deleting pageNr=" + pageId + " on store " + storeName); + logger.debugf("Deleting pageNr=%d on store %d", pageId, storeName); } - List largeMessageIds = new ArrayList<>(); - if (messages != null) { + final List largeMessageIds; + if (messages != null && messages.length > 0) { + largeMessageIds = new ArrayList<>(); for (PagedMessage msg : messages) { if ((msg.getMessage()).isLargeMessage()) { // this will trigger large message delete: no need to do it @@ -513,6 +542,8 @@ public final class Page implements Comparable { largeMessageIds.add(msg.getMessage().getMessageID()); } } + } else { + largeMessageIds = Collections.emptyList(); } try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java index b7b49dfcd9..a6438629ff 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagedMessageImpl.java @@ -35,6 +35,43 @@ import org.apache.activemq.artemis.utils.DataConstants; */ public class PagedMessageImpl implements PagedMessage { + // It encapsulates the logic to detect large message types + private static final class LargeMessageType { + + private static final byte NONE = 0; + private static final byte CORE = 1; + private static final byte NOT_CORE = 2; + + public static boolean isLargeMessage(byte encodedValue) { + switch (encodedValue) { + case LargeMessageType.NONE: + return false; + case LargeMessageType.CORE: + case LargeMessageType.NOT_CORE: + return true; + default: + throw new IllegalStateException("This largeMessageType isn't supported: " + encodedValue); + } + } + + public static boolean isCoreLargeMessage(Message message) { + return message.isLargeMessage() && message instanceof ICoreMessage; + } + + public static boolean isCoreLargeMessageType(byte encodedValue) { + return encodedValue == LargeMessageType.CORE; + } + + public static byte valueOf(Message message) { + if (!message.isLargeMessage()) { + return NONE; + } + if (message instanceof ICoreMessage) { + return CORE; + } + return NOT_CORE; + } + } /** * Large messages will need to be instantiated lazily during getMessage when the StorageManager * is available @@ -116,13 +153,21 @@ public class PagedMessageImpl implements PagedMessage { // EncodingSupport implementation -------------------------------- + /** + * This method won't move the {@link ActiveMQBuffer#readerIndex()} of {@code buffer}. + */ + public static boolean isLargeMessage(ActiveMQBuffer buffer) { + // skip transactionID + return LargeMessageType.isLargeMessage(buffer.getByte(buffer.readerIndex() + Long.BYTES)); + } + @Override public void decode(final ActiveMQBuffer buffer) { transactionID = buffer.readLong(); - boolean isLargeMessage = buffer.readBoolean(); + boolean isCoreLargeMessage = LargeMessageType.isCoreLargeMessageType(buffer.readByte()); - if (isLargeMessage) { + if (isCoreLargeMessage) { int largeMessageHeaderSize = buffer.readInt(); if (storageManager == null) { @@ -155,12 +200,12 @@ public class PagedMessageImpl implements PagedMessage { public void encode(final ActiveMQBuffer buffer) { buffer.writeLong(transactionID); - boolean isLargeMessage = isLargeMessage(); + byte largeMessageType = LargeMessageType.valueOf(message); - buffer.writeBoolean(isLargeMessage); + buffer.writeByte(largeMessageType); - if (isLargeMessage) { - buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message)); + if (LargeMessageType.isCoreLargeMessageType(largeMessageType)) { + buffer.writeInt(LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage) message)); LargeMessagePersister.getInstance().encode(buffer, (LargeServerMessage) message); } else { message.getPersister().encode(buffer, message); @@ -173,13 +218,9 @@ public class PagedMessageImpl implements PagedMessage { } } - public boolean isLargeMessage() { - return message instanceof ICoreMessage && ((ICoreMessage)message).isLargeMessage(); - } - @Override public int getEncodeSize() { - if (isLargeMessage()) { + if (LargeMessageType.isCoreLargeMessage(message)) { return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + LargeMessagePersister.getInstance().getEncodeSize((LargeServerMessage)message) + DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG; } else { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java index 1aa28c2792..894651e2c0 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTest.java @@ -150,7 +150,7 @@ public class PageTest extends ActiveMQTestBase { file.open(); page = new Page(new SimpleString("something"), storageManager, factory, file, 10); - List msgs = page.read(storageManager); + List msgs = page.read(storageManager, largeMessages); Assert.assertEquals(numberOfElements, msgs.size()); @@ -164,6 +164,12 @@ public class PageTest extends ActiveMQTestBase { Assert.assertEquals(largeMessages ? 1 : 0, pagedMessage.getMessage().getUsage()); } + if (!largeMessages) { + Page tmpPage = new Page(new SimpleString("something"), storageManager, factory, file, 10); + Assert.assertEquals(0, tmpPage.read(storageManager, true).size()); + Assert.assertEquals(numberOfElements, tmpPage.getNumberOfMessages()); + } + Assert.assertTrue(page.delete(msgs.toArray(new PagedMessage[msgs.size()]))); for (PagedMessage pagedMessage : msgs) {