From 1a5c2ec51c54465e0d845cc2bec380032cba6834 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 25 Jul 2024 08:54:08 -0400 Subject: [PATCH] Revert "ARTEMIS-4941 Remove lazy update after application properties as it's no longer needed" This reverts commit fb2a57f3ed25895681d3636e3f5fb9d9b9d0a053. --- .../activemq/artemis/api/core/Message.java | 6 ++ .../amqp/broker/AMQPLargeMessage.java | 1 + .../protocol/amqp/broker/AMQPMessage.java | 20 +++++ .../amqp/broker/AMQPStandardMessage.java | 1 + .../paging/cursor/PagedReferenceImpl.java | 14 ++-- .../artemis/core/server/impl/QueueImpl.java | 4 +- .../amqp/paging/AmqpPagingTest.java | 83 +++++++++++++++++++ .../multiprotocol/JMSMessageConsumerTest.java | 39 --------- 8 files changed, 122 insertions(+), 46 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 88abfa5531..2c6beaefda 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -809,6 +809,12 @@ public interface Message { int getMemoryEstimate(); + /** The first estimate that's been calculated without any updates. */ + default int getOriginalEstimate() { + // For Core Protocol we always use the same estimate + return getMemoryEstimate(); + } + /** * This is the size of the message when persisted on disk which is used for metrics tracking * Note that even if the message itself is not persisted on disk (ie non-durable) this value is diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 19621b1c59..777f3752ec 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -585,6 +585,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage public int getMemoryEstimate() { if (memoryEstimate == -1) { memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0); + originalEstimate = memoryEstimate; } return memoryEstimate; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 86e6794d00..4c43401fac 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants; import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.MessageReference; @@ -203,6 +204,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. protected long messageID; protected SimpleString address; protected volatile int memoryEstimate = -1; + protected volatile int originalEstimate = -1; protected long expiration; protected boolean expirationReload = false; protected long scheduledTime = -1; @@ -543,6 +545,13 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) { if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) { applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class); + if (owner != null && memoryEstimate != -1) { + // the memory has already been tracked and needs to be updated to reflect the new decoding + int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data); + ((PagingStore)owner).addSize(addition, false); + final int updatedEstimate = memoryEstimate + addition; + memoryEstimate = updatedEstimate; + } } return applicationProperties; @@ -666,6 +675,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } encodedHeaderSize = 0; memoryEstimate = -1; + originalEstimate = -1; scheduledTime = -1; encodedDeliveryAnnotationsSize = 0; headerPosition = VALUE_NOT_PRESENT; @@ -861,6 +871,16 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public abstract int getMemoryEstimate(); + @Override + public int getOriginalEstimate() { + if (originalEstimate < 0) { + // getMemoryEstimate should initialize originalEstimate + return getMemoryEstimate(); + } else { + return originalEstimate; + } + } + @Override public Map toPropertyMap(int valueSizeLimit) { return toPropertyMap(false, valueSizeLimit); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java index 178b96f884..245a838ad7 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java @@ -189,6 +189,7 @@ public class AMQPStandardMessage extends AMQPMessage { public int getMemoryEstimate() { if (memoryEstimate == -1) { memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0); + originalEstimate = memoryEstimate; } return memoryEstimate; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java index 74b5598e0a..873485b961 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java @@ -47,6 +47,8 @@ public class PagedReferenceImpl extends AbstractProtocolReference implements Pag private int persistedCount; + private int messageEstimate = -1; + // this is a cached position returned on getPosition. // just to avoid creating on object on each call PagePosition cachedPositionObject; @@ -162,12 +164,14 @@ public class PagedReferenceImpl extends AbstractProtocolReference implements Pag @Override public int getMessageMemoryEstimate() { - try { - return getMessage().getMemoryEstimate(); - } catch (Throwable e) { - ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e); - return 0; + if (messageEstimate <= 0) { + try { + messageEstimate = getMessage().getMemoryEstimate(); + } catch (Throwable e) { + ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e); + } } + return messageEstimate; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index f1751011ac..249c7e44a9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1052,7 +1052,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // If an AMQP message parses its properties, its size might be updated and the address will receive more bytes. // However, in this case, we should always use the original estimate. // Otherwise, we might get incorrect sizes after the update. - pagingStore.addSize(messageReference.getMessage().getMemoryEstimate(), false, false); + pagingStore.addSize(messageReference.getMessage().getOriginalEstimate(), false, false); } pagingStore.refUp(messageReference.getMessage(), count); @@ -1071,7 +1071,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // If an AMQP message parses its properties, its size might be updated and the address will receive more bytes. // However, in this case, we should always use the original estimate. // Otherwise, we might get incorrect sizes after the update. - pagingStore.addSize(-messageReference.getMessage().getMemoryEstimate(), false, false); + pagingStore.addSize(-messageReference.getMessage().getOriginalEstimate(), false, false); } pagingStore.refDown(messageReference.getMessage(), count); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java index d92ebf931a..9a41b04b62 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java @@ -121,4 +121,87 @@ public class AmqpPagingTest extends AmqpClientTestSupport { connection.close(); } + + @TestTemplate + @Timeout(60) + public void testSizeCalculationsForApplicationProperties() throws Exception { + final int MSG_SIZE = 1000; + final StringBuilder builder = new StringBuilder(); + for (int i = 0; i < MSG_SIZE; i++) { + builder.append('0'); + } + final String data = builder.toString(); + final int MSG_COUNT = 1; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName(), true); + + // use selector expression that references a property to force decode of application properties + AmqpReceiver receiver = session.createReceiver(getQueueName(), "myData IS NOT NULL"); + receiver.setPresettle(true); + receiver.flow(10); + assertNull(receiver.receiveNoWait(), "somehow the queue had messages from a previous test"); + receiver.flow(0); + + AmqpMessage message = new AmqpMessage(); + message.setText(data); + + // large message property also + message.setApplicationProperty("myData", data); + + if (durable != null) { + message.setDurable(durable); + } + sender.send(message); + + PagingStore pagingStore = server.getPagingManager().getPageStore(SimpleString.of(getQueueName())); + + // verify page usage reflects data + 2*application properties (encoded and decoded) + assertTrue(Wait.waitFor(() -> { + return pagingStore.getAddressSize() > 3000; + })); + + receiver.flow(MSG_COUNT); + AmqpMessage receive = receiver.receive(10, TimeUnit.MINUTES); + assertNotNull(receive, "Not received anything after receive"); + receive.accept(); + + assertTrue(Wait.waitFor(() -> { + return pagingStore.getAddressSize() == 0; + })); + + // send another with duplicate id property, to force early decode + message = new AmqpMessage(); + message.setText(data); + + // ensures application properties are referenced + message.setApplicationProperty("_AMQ_DUPL_ID", "1"); + + // large message property also + message.setApplicationProperty("myData", data); + + if (durable != null) { + message.setDurable(durable); + } + sender.send(message); + + sender.close(); + + // verify page usage reflects data + 2*application properties (encoded and decoded) + assertTrue(Wait.waitFor(() -> { + return pagingStore.getAddressSize() > 3000; + })); + + receiver.flow(MSG_COUNT); + receive = receiver.receive(10, TimeUnit.MINUTES); + assertNotNull(receive, "Not received anything after receive"); + receive.accept(); + + receiver.close(); + connection.close(); + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java index bde0955955..060f8862ee 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java @@ -35,14 +35,9 @@ import java.lang.invoke.MethodHandles; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.management.AddressControl; -import org.apache.activemq.artemis.api.core.management.ResourceNames; -import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.utils.DestinationUtil; import org.apache.activemq.artemis.utils.RandomUtil; -import org.apache.activemq.artemis.utils.Wait; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -403,38 +398,4 @@ public class JMSMessageConsumerTest extends MultiprotocolJMSClientTestSupport { consumerConnection.close(); } } - - - @Test - public void testConvertedAndPaging() throws Exception { - final int MESSAGE_COUNT = 1; - server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST)); - PagingStore store = server.getPagingManager().getPageStore(SimpleString.of(getQueueName())); - store.startPaging(); - try (Connection senderConnection = createConnection(); Connection consumerConnection = createCoreConnection()) { - Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName())); - - Session senderSession = senderConnection.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName())); - for (int i = 0; i < MESSAGE_COUNT; i++) { - Message message = senderSession.createMessage(); - message.setIntProperty("count", i); // test will also pass if this is removed - producer.send(message); - } - senderSession.commit(); - - for (int i = 0; i < MESSAGE_COUNT; i++) { - Message received = consumer.receive(1000); - assertNotNull(received); - } - consumerSession.commit(); - consumer.close(); - - assertEquals(0, server.locateQueue(getQueueName()).getMessageCount()); - Wait.assertEquals(0, store::getAddressSize, 5000); - assertEquals(0, ((AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + getQueueName())).getAddressSize()); - } - } - }