ARTEMIS-4941 Remove lazy update after application properties as it's no longer needed

This commit is contained in:
Clebert Suconic 2024-07-22 12:25:03 -04:00 committed by clebertsuconic
parent 642a2c79fd
commit fb2a57f3ed
8 changed files with 46 additions and 122 deletions

View File

@ -809,12 +809,6 @@ public interface Message {
int getMemoryEstimate();
/** The first estimate that's been calculated without any updates. */
default int getOriginalEstimate() {
// For Core Protocol we always use the same estimate
return getMemoryEstimate();
}
/**
* This is the size of the message when persisted on disk which is used for metrics tracking
* Note that even if the message itself is not persisted on disk (ie non-durable) this value is

View File

@ -585,7 +585,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
originalEstimate = memoryEstimate;
}
return memoryEstimate;
}

View File

@ -43,7 +43,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -204,7 +203,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected long messageID;
protected SimpleString address;
protected volatile int memoryEstimate = -1;
protected volatile int originalEstimate = -1;
protected long expiration;
protected boolean expirationReload = false;
protected long scheduledTime = -1;
@ -545,13 +543,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
if (owner != null && memoryEstimate != -1) {
// the memory has already been tracked and needs to be updated to reflect the new decoding
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
((PagingStore)owner).addSize(addition, false);
final int updatedEstimate = memoryEstimate + addition;
memoryEstimate = updatedEstimate;
}
}
return applicationProperties;
@ -675,7 +666,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
}
encodedHeaderSize = 0;
memoryEstimate = -1;
originalEstimate = -1;
scheduledTime = -1;
encodedDeliveryAnnotationsSize = 0;
headerPosition = VALUE_NOT_PRESENT;
@ -871,16 +861,6 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override
public abstract int getMemoryEstimate();
@Override
public int getOriginalEstimate() {
if (originalEstimate < 0) {
// getMemoryEstimate should initialize originalEstimate
return getMemoryEstimate();
} else {
return originalEstimate;
}
}
@Override
public Map<String, Object> toPropertyMap(int valueSizeLimit) {
return toPropertyMap(false, valueSizeLimit);

View File

@ -189,7 +189,6 @@ public class AMQPStandardMessage extends AMQPMessage {
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
originalEstimate = memoryEstimate;
}
return memoryEstimate;

View File

@ -47,8 +47,6 @@ public class PagedReferenceImpl extends AbstractProtocolReference implements Pag
private int persistedCount;
private int messageEstimate = -1;
// this is a cached position returned on getPosition.
// just to avoid creating on object on each call
PagePosition cachedPositionObject;
@ -164,14 +162,12 @@ public class PagedReferenceImpl extends AbstractProtocolReference implements Pag
@Override
public int getMessageMemoryEstimate() {
if (messageEstimate <= 0) {
try {
messageEstimate = getMessage().getMemoryEstimate();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e);
}
try {
return getMessage().getMemoryEstimate();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e);
return 0;
}
return messageEstimate;
}
@Override

View File

@ -1052,7 +1052,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// If an AMQP message parses its properties, its size might be updated and the address will receive more bytes.
// However, in this case, we should always use the original estimate.
// Otherwise, we might get incorrect sizes after the update.
pagingStore.addSize(messageReference.getMessage().getOriginalEstimate(), false, false);
pagingStore.addSize(messageReference.getMessage().getMemoryEstimate(), false, false);
}
pagingStore.refUp(messageReference.getMessage(), count);
@ -1071,7 +1071,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// If an AMQP message parses its properties, its size might be updated and the address will receive more bytes.
// However, in this case, we should always use the original estimate.
// Otherwise, we might get incorrect sizes after the update.
pagingStore.addSize(-messageReference.getMessage().getOriginalEstimate(), false, false);
pagingStore.addSize(-messageReference.getMessage().getMemoryEstimate(), false, false);
}
pagingStore.refDown(messageReference.getMessage(), count);
}

View File

@ -121,87 +121,4 @@ public class AmqpPagingTest extends AmqpClientTestSupport {
connection.close();
}
@TestTemplate
@Timeout(60)
public void testSizeCalculationsForApplicationProperties() throws Exception {
final int MSG_SIZE = 1000;
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < MSG_SIZE; i++) {
builder.append('0');
}
final String data = builder.toString();
final int MSG_COUNT = 1;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName(), true);
// use selector expression that references a property to force decode of application properties
AmqpReceiver receiver = session.createReceiver(getQueueName(), "myData IS NOT NULL");
receiver.setPresettle(true);
receiver.flow(10);
assertNull(receiver.receiveNoWait(), "somehow the queue had messages from a previous test");
receiver.flow(0);
AmqpMessage message = new AmqpMessage();
message.setText(data);
// large message property also
message.setApplicationProperty("myData", data);
if (durable != null) {
message.setDurable(durable);
}
sender.send(message);
PagingStore pagingStore = server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
// verify page usage reflects data + 2*application properties (encoded and decoded)
assertTrue(Wait.waitFor(() -> {
return pagingStore.getAddressSize() > 3000;
}));
receiver.flow(MSG_COUNT);
AmqpMessage receive = receiver.receive(10, TimeUnit.MINUTES);
assertNotNull(receive, "Not received anything after receive");
receive.accept();
assertTrue(Wait.waitFor(() -> {
return pagingStore.getAddressSize() == 0;
}));
// send another with duplicate id property, to force early decode
message = new AmqpMessage();
message.setText(data);
// ensures application properties are referenced
message.setApplicationProperty("_AMQ_DUPL_ID", "1");
// large message property also
message.setApplicationProperty("myData", data);
if (durable != null) {
message.setDurable(durable);
}
sender.send(message);
sender.close();
// verify page usage reflects data + 2*application properties (encoded and decoded)
assertTrue(Wait.waitFor(() -> {
return pagingStore.getAddressSize() > 3000;
}));
receiver.flow(MSG_COUNT);
receive = receiver.receive(10, TimeUnit.MINUTES);
assertNotNull(receive, "Not received anything after receive");
receive.accept();
receiver.close();
connection.close();
}
}

View File

@ -35,9 +35,14 @@ import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.utils.DestinationUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@ -398,4 +403,38 @@ public class JMSMessageConsumerTest extends MultiprotocolJMSClientTestSupport {
consumerConnection.close();
}
}
@Test
public void testConvertedAndPaging() throws Exception {
final int MESSAGE_COUNT = 1;
server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST));
PagingStore store = server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
store.startPaging();
try (Connection senderConnection = createConnection(); Connection consumerConnection = createCoreConnection()) {
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
Session senderSession = senderConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName()));
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = senderSession.createMessage();
message.setIntProperty("count", i); // test will also pass if this is removed
producer.send(message);
}
senderSession.commit();
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message received = consumer.receive(1000);
assertNotNull(received);
}
consumerSession.commit();
consumer.close();
assertEquals(0, server.locateQueue(getQueueName()).getMessageCount());
Wait.assertEquals(0, store::getAddressSize, 5000);
assertEquals(0, ((AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + getQueueName())).getAddressSize());
}
}
}