diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index bb127e8b99..f60efeb97d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -490,6 +490,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) { if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) { applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class); + memoryEstimate = -1; } return applicationProperties; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java index d409568290..cb2133bfc2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java @@ -124,12 +124,24 @@ public class AMQPStandardMessage extends AMQPMessage { @Override public int getMemoryEstimate() { if (memoryEstimate == -1) { - memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0); + memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData() : 0); } return memoryEstimate; } + private int unmarshalledApplicationPropertiesMemoryEstimateFromData() { + if (applicationProperties != null) { + // they have been unmarshalled, estimate memory usage based on their encoded size + if (remainingBodyPosition != VALUE_NOT_PRESENT) { + return remainingBodyPosition - applicationPropertiesPosition; + } else { + return data.capacity() - applicationPropertiesPosition; + } + } + return 0; + } + @Override public void persist(ActiveMQBuffer targetRecord) { ensureDataIsValid(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index fbef2fec20..487a19a08b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -269,6 +269,19 @@ public class AMQPMessageTest { assertNotEquals(estimate, decoded.getMemoryEstimate()); } + + @Test + public void testGetMemoryEstimateWithDecodedApplicationProperties() { + AMQPStandardMessage decoded = new AMQPStandardMessage(0, encodedProtonMessage, new TypedProperties(), null); + + AMQPStandardMessage decodedWithApplicationPropertiesUnmarshalled = + new AMQPStandardMessage(0, encodeMessage(createProtonMessage()), new TypedProperties(), null); + + assertEquals(decodedWithApplicationPropertiesUnmarshalled.getStringProperty(TEST_APPLICATION_PROPERTY_KEY), TEST_APPLICATION_PROPERTY_VALUE); + + assertNotEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate()); + } + //----- Test Connection ID access -----------------------------------------// diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 923eadf699..30f395fe5d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2935,6 +2935,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { MessageReference ref; + // filter evaluation or transformation may cause properties to be lazyDecoded, we need to reflect that + int existingMemoryEstimate = 0; + Consumer handledconsumer = null; synchronized (this) { @@ -2970,6 +2973,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { ref = holder.iter.next(); } else { ref = null; + existingMemoryEstimate = 0; } if (ref == null) { noDelivery++; @@ -2988,6 +2992,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { logger.trace("Queue " + this.getName() + " is delivering reference " + ref); } + existingMemoryEstimate = ref.getMessageMemoryEstimate(); final SimpleString groupID = extractGroupID(ref); groupConsumer = getGroupConsumer(groupID); @@ -3062,6 +3067,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (handledconsumer != null) { proceedDeliver(handledconsumer, ref); } + + if (existingMemoryEstimate > 0 ) { + accountForChangeInMemoryEstimate(ref, existingMemoryEstimate); + } } return true; @@ -3685,8 +3694,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { consumer = groupConsumer; } + // filter evaluation may cause properties to be lazyDecoded + final int existingMemoryEstimate = ref.getMessageMemoryEstimate(); + HandleStatus status = handle(ref, consumer); + accountForChangeInMemoryEstimate(ref, existingMemoryEstimate); + if (status == HandleStatus.HANDLED) { final MessageReference reference; if (redistributor == null) { @@ -3716,6 +3730,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + private static void accountForChangeInMemoryEstimate(final MessageReference ref, final int existingMemoryEstimate) { + final int delta = ref.getMessageMemoryEstimate() - existingMemoryEstimate; + if (delta > 0) { + PagingStore pageStore = ref.getOwner(); + if (pageStore != null) { + pageStore.addSize(delta); + } + } + } + private Consumer getGroupConsumer(SimpleString groupID) { Consumer groupConsumer = null; if (exclusive) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java index 5218a0d6b2..6360b6dce3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/paging/AmqpPagingTest.java @@ -118,4 +118,86 @@ public class AmqpPagingTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testSizeCalculationsForApplicationProperties() throws Exception { + final int MSG_SIZE = 1000; + final StringBuilder builder = new StringBuilder(); + for (int i = 0; i < MSG_SIZE; i++) { + builder.append('0'); + } + final String data = builder.toString(); + final int MSG_COUNT = 1; + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName(), true); + + // use selector expression that references a property to force decode of application properties + AmqpReceiver receiver = session.createReceiver(getQueueName(), "myData IS NOT NULL"); + receiver.setPresettle(true); + receiver.flow(10); + Assert.assertNull("somehow the queue had messages from a previous test", receiver.receiveNoWait()); + receiver.flow(0); + + AmqpMessage message = new AmqpMessage(); + message.setText(data); + + // large message property also + message.setApplicationProperty("myData", data); + + if (durable != null) { + message.setDurable(durable); + } + sender.send(message); + + PagingStore pagingStore = server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName())); + + // verify page usage reflects data + 2*application properties (encoded and decoded) + assertTrue(Wait.waitFor(() -> { + return pagingStore.getAddressSize() > 3000; + })); + + receiver.flow(MSG_COUNT); + AmqpMessage receive = receiver.receive(10, TimeUnit.MINUTES); + assertNotNull("Not received anything after receive", receive); + receive.accept(); + + assertTrue(Wait.waitFor(() -> { + return pagingStore.getAddressSize() == 0; + })); + + // send another with duplicate id property, to force early decode + message = new AmqpMessage(); + message.setText(data); + + // ensures application properties are referenced + message.setApplicationProperty("_AMQ_DUPL_ID", "1"); + + // large message property also + message.setApplicationProperty("myData", data); + + if (durable != null) { + message.setDurable(durable); + } + sender.send(message); + + sender.close(); + + // verify page usage reflects data + 2*application properties (encoded and decoded) + assertTrue(Wait.waitFor(() -> { + return pagingStore.getAddressSize() > 3000; + })); + + receiver.flow(MSG_COUNT); + receive = receiver.receive(10, TimeUnit.MINUTES); + assertNotNull("Not received anything after receive", receive); + receive.accept(); + + receiver.close(); + connection.close(); + } + }