ARTEMIS-4941 Remove lazy update after application properties only when it's paging

for regular messages it's quite obvious when the message is leaving the queue but for paged messages it becomes a challenge. We should just ignore the update for paged messages.
This commit is contained in:
Clebert Suconic 2024-07-25 10:31:10 -04:00 committed by clebertsuconic
parent 1a5c2ec51c
commit db0ba73aa4
5 changed files with 84 additions and 6 deletions

View File

@ -548,9 +548,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
if (owner != null && memoryEstimate != -1) {
// the memory has already been tracked and needs to be updated to reflect the new decoding
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
((PagingStore)owner).addSize(addition, false);
final int updatedEstimate = memoryEstimate + addition;
memoryEstimate = updatedEstimate;
// it is difficult to track the updates for paged messages
// for that reason we won't do it if paged
if (!isPaged) {
((PagingStore) owner).addSize(addition, false);
final int updatedEstimate = memoryEstimate + addition;
memoryEstimate = updatedEstimate;
}
}
}

View File

@ -188,8 +188,16 @@ public class AMQPStandardMessage extends AMQPMessage {
@Override
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
originalEstimate = memoryEstimate;
if (isPaged) {
// When the message is paged, we don't take the unmarshalled application properties
// because it could be updated at different places.
// we just keep the estimate simple when paging
memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
originalEstimate = memoryEstimate;
} else {
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
originalEstimate = memoryEstimate;
}
}
return memoryEstimate;

View File

@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
@ -88,9 +89,13 @@ import org.mockito.Mockito;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQPMessageTest {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String TEST_TO_ADDRESS = "someAddress";
private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation";
@ -307,14 +312,32 @@ public class AMQPMessageTest {
@Test
public void testGetMemoryEstimateWithDecodedApplicationProperties() {
testGetMemoryEstimateWithDecodedApplicationProperties(true);
testGetMemoryEstimateWithDecodedApplicationProperties(false);
}
private void testGetMemoryEstimateWithDecodedApplicationProperties(boolean paged) {
AMQPStandardMessage decoded = new AMQPStandardMessage(0, encodedProtonMessage, new TypedProperties(), null);
if (paged) {
decoded.setPaged();
}
logger.info("Estimated size:: {}", decoded.getMemoryEstimate());
AMQPStandardMessage decodedWithApplicationPropertiesUnmarshalled =
new AMQPStandardMessage(0, encodeMessage(createProtonMessage()), new TypedProperties(), null);
if (paged) {
decodedWithApplicationPropertiesUnmarshalled.setPaged();
}
assertEquals(decodedWithApplicationPropertiesUnmarshalled.getStringProperty(TEST_APPLICATION_PROPERTY_KEY), TEST_APPLICATION_PROPERTY_VALUE);
assertNotEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
if (paged) {
assertEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
} else {
assertNotEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
}
}
//----- Test Connection ID access -----------------------------------------//

View File

@ -107,6 +107,7 @@ public class PagedMessageImpl implements PagedMessage {
this.storageManager = null;
this.queueIDs = queueIDs;
this.message = message;
this.message.setPaged();
this.storedSize = 0;
checkLargeMessage();
}
@ -171,6 +172,7 @@ public class PagedMessageImpl implements PagedMessage {
lgMessage.toMessage().usageUp();
lgMessage.setPaged();
this.message = lgMessage.toMessage();
this.message.setPaged();
largeMessageLazyData = null;
checkLargeMessage();
} else {
@ -214,6 +216,7 @@ public class PagedMessageImpl implements PagedMessage {
buffer.readBytes(largeMessageLazyData);
} else {
this.message = storageManager.createCoreLargeMessage().toMessage();
this.message.setPaged();
LargeMessagePersister.getInstance().decode(buffer, (LargeServerMessage) message, null);
((LargeServerMessage) message).setStorageManager(storageManager);
((LargeServerMessage) message).toMessage().usageUp();

View File

@ -35,9 +35,14 @@ import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@ -398,4 +403,38 @@ public class JMSMessageConsumerTest extends MultiprotocolJMSClientTestSupport {
consumerConnection.close();
}
}
@Test
public void testConvertedAndPaging() throws Exception {
final int MESSAGE_COUNT = 1;
server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST));
PagingStore store = server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
store.startPaging();
try (Connection senderConnection = createConnection(); Connection consumerConnection = createCoreConnection()) {
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
Session senderSession = senderConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName()));
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = senderSession.createMessage();
message.setIntProperty("count", i); // test will also pass if this is removed
producer.send(message);
}
senderSession.commit();
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message received = consumer.receive(1000);
assertNotNull(received);
}
consumerSession.commit();
consumer.close();
assertEquals(0, server.locateQueue(getQueueName()).getMessageCount());
Wait.assertEquals(0, store::getAddressSize, 5000);
assertEquals(0, ((AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + getQueueName())).getAddressSize());
}
}
}