Revert "ARTEMIS-4941 Remove lazy update after application properties as it's no longer needed"
This reverts commit fb2a57f3ed
.
This commit is contained in:
parent
90797e1c42
commit
1a5c2ec51c
|
@ -809,6 +809,12 @@ public interface Message {
|
|||
|
||||
int getMemoryEstimate();
|
||||
|
||||
/** The first estimate that's been calculated without any updates. */
|
||||
default int getOriginalEstimate() {
|
||||
// For Core Protocol we always use the same estimate
|
||||
return getMemoryEstimate();
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the size of the message when persisted on disk which is used for metrics tracking
|
||||
* Note that even if the message itself is not persisted on disk (ie non-durable) this value is
|
||||
|
|
|
@ -585,6 +585,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
|
|||
public int getMemoryEstimate() {
|
||||
if (memoryEstimate == -1) {
|
||||
memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0);
|
||||
originalEstimate = memoryEstimate;
|
||||
}
|
||||
return memoryEstimate;
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
|
|||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
|
||||
import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
|
||||
import org.apache.activemq.artemis.core.persistence.Persister;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
|
@ -203,6 +204,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
protected long messageID;
|
||||
protected SimpleString address;
|
||||
protected volatile int memoryEstimate = -1;
|
||||
protected volatile int originalEstimate = -1;
|
||||
protected long expiration;
|
||||
protected boolean expirationReload = false;
|
||||
protected long scheduledTime = -1;
|
||||
|
@ -543,6 +545,13 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
|
||||
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
|
||||
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
|
||||
if (owner != null && memoryEstimate != -1) {
|
||||
// the memory has already been tracked and needs to be updated to reflect the new decoding
|
||||
int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data);
|
||||
((PagingStore)owner).addSize(addition, false);
|
||||
final int updatedEstimate = memoryEstimate + addition;
|
||||
memoryEstimate = updatedEstimate;
|
||||
}
|
||||
}
|
||||
|
||||
return applicationProperties;
|
||||
|
@ -666,6 +675,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
}
|
||||
encodedHeaderSize = 0;
|
||||
memoryEstimate = -1;
|
||||
originalEstimate = -1;
|
||||
scheduledTime = -1;
|
||||
encodedDeliveryAnnotationsSize = 0;
|
||||
headerPosition = VALUE_NOT_PRESENT;
|
||||
|
@ -861,6 +871,16 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
@Override
|
||||
public abstract int getMemoryEstimate();
|
||||
|
||||
@Override
|
||||
public int getOriginalEstimate() {
|
||||
if (originalEstimate < 0) {
|
||||
// getMemoryEstimate should initialize originalEstimate
|
||||
return getMemoryEstimate();
|
||||
} else {
|
||||
return originalEstimate;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> toPropertyMap(int valueSizeLimit) {
|
||||
return toPropertyMap(false, valueSizeLimit);
|
||||
|
|
|
@ -189,6 +189,7 @@ public class AMQPStandardMessage extends AMQPMessage {
|
|||
public int getMemoryEstimate() {
|
||||
if (memoryEstimate == -1) {
|
||||
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0);
|
||||
originalEstimate = memoryEstimate;
|
||||
}
|
||||
|
||||
return memoryEstimate;
|
||||
|
|
|
@ -47,6 +47,8 @@ public class PagedReferenceImpl extends AbstractProtocolReference implements Pag
|
|||
|
||||
private int persistedCount;
|
||||
|
||||
private int messageEstimate = -1;
|
||||
|
||||
// this is a cached position returned on getPosition.
|
||||
// just to avoid creating on object on each call
|
||||
PagePosition cachedPositionObject;
|
||||
|
@ -162,12 +164,14 @@ public class PagedReferenceImpl extends AbstractProtocolReference implements Pag
|
|||
|
||||
@Override
|
||||
public int getMessageMemoryEstimate() {
|
||||
try {
|
||||
return getMessage().getMemoryEstimate();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e);
|
||||
return 0;
|
||||
if (messageEstimate <= 0) {
|
||||
try {
|
||||
messageEstimate = getMessage().getMemoryEstimate();
|
||||
} catch (Throwable e) {
|
||||
ActiveMQServerLogger.LOGGER.errorCalculateMessageMemoryEstimate(e);
|
||||
}
|
||||
}
|
||||
return messageEstimate;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1052,7 +1052,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
// If an AMQP message parses its properties, its size might be updated and the address will receive more bytes.
|
||||
// However, in this case, we should always use the original estimate.
|
||||
// Otherwise, we might get incorrect sizes after the update.
|
||||
pagingStore.addSize(messageReference.getMessage().getMemoryEstimate(), false, false);
|
||||
pagingStore.addSize(messageReference.getMessage().getOriginalEstimate(), false, false);
|
||||
}
|
||||
|
||||
pagingStore.refUp(messageReference.getMessage(), count);
|
||||
|
@ -1071,7 +1071,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
// If an AMQP message parses its properties, its size might be updated and the address will receive more bytes.
|
||||
// However, in this case, we should always use the original estimate.
|
||||
// Otherwise, we might get incorrect sizes after the update.
|
||||
pagingStore.addSize(-messageReference.getMessage().getMemoryEstimate(), false, false);
|
||||
pagingStore.addSize(-messageReference.getMessage().getOriginalEstimate(), false, false);
|
||||
}
|
||||
pagingStore.refDown(messageReference.getMessage(), count);
|
||||
}
|
||||
|
|
|
@ -121,4 +121,87 @@ public class AmqpPagingTest extends AmqpClientTestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
|
||||
@TestTemplate
|
||||
@Timeout(60)
|
||||
public void testSizeCalculationsForApplicationProperties() throws Exception {
|
||||
final int MSG_SIZE = 1000;
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
for (int i = 0; i < MSG_SIZE; i++) {
|
||||
builder.append('0');
|
||||
}
|
||||
final String data = builder.toString();
|
||||
final int MSG_COUNT = 1;
|
||||
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getQueueName(), true);
|
||||
|
||||
// use selector expression that references a property to force decode of application properties
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName(), "myData IS NOT NULL");
|
||||
receiver.setPresettle(true);
|
||||
receiver.flow(10);
|
||||
assertNull(receiver.receiveNoWait(), "somehow the queue had messages from a previous test");
|
||||
receiver.flow(0);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setText(data);
|
||||
|
||||
// large message property also
|
||||
message.setApplicationProperty("myData", data);
|
||||
|
||||
if (durable != null) {
|
||||
message.setDurable(durable);
|
||||
}
|
||||
sender.send(message);
|
||||
|
||||
PagingStore pagingStore = server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
|
||||
|
||||
// verify page usage reflects data + 2*application properties (encoded and decoded)
|
||||
assertTrue(Wait.waitFor(() -> {
|
||||
return pagingStore.getAddressSize() > 3000;
|
||||
}));
|
||||
|
||||
receiver.flow(MSG_COUNT);
|
||||
AmqpMessage receive = receiver.receive(10, TimeUnit.MINUTES);
|
||||
assertNotNull(receive, "Not received anything after receive");
|
||||
receive.accept();
|
||||
|
||||
assertTrue(Wait.waitFor(() -> {
|
||||
return pagingStore.getAddressSize() == 0;
|
||||
}));
|
||||
|
||||
// send another with duplicate id property, to force early decode
|
||||
message = new AmqpMessage();
|
||||
message.setText(data);
|
||||
|
||||
// ensures application properties are referenced
|
||||
message.setApplicationProperty("_AMQ_DUPL_ID", "1");
|
||||
|
||||
// large message property also
|
||||
message.setApplicationProperty("myData", data);
|
||||
|
||||
if (durable != null) {
|
||||
message.setDurable(durable);
|
||||
}
|
||||
sender.send(message);
|
||||
|
||||
sender.close();
|
||||
|
||||
// verify page usage reflects data + 2*application properties (encoded and decoded)
|
||||
assertTrue(Wait.waitFor(() -> {
|
||||
return pagingStore.getAddressSize() > 3000;
|
||||
}));
|
||||
|
||||
receiver.flow(MSG_COUNT);
|
||||
receive = receiver.receive(10, TimeUnit.MINUTES);
|
||||
assertNotNull(receive, "Not received anything after receive");
|
||||
receive.accept();
|
||||
|
||||
receiver.close();
|
||||
connection.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -35,14 +35,9 @@ import java.lang.invoke.MethodHandles;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.QueueConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.management.AddressControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.utils.DestinationUtil;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.apache.activemq.artemis.utils.Wait;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -403,38 +398,4 @@ public class JMSMessageConsumerTest extends MultiprotocolJMSClientTestSupport {
|
|||
consumerConnection.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testConvertedAndPaging() throws Exception {
|
||||
final int MESSAGE_COUNT = 1;
|
||||
server.createQueue(QueueConfiguration.of(getQueueName()).setRoutingType(RoutingType.ANYCAST));
|
||||
PagingStore store = server.getPagingManager().getPageStore(SimpleString.of(getQueueName()));
|
||||
store.startPaging();
|
||||
try (Connection senderConnection = createConnection(); Connection consumerConnection = createCoreConnection()) {
|
||||
Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageConsumer consumer = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
|
||||
|
||||
Session senderSession = senderConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageProducer producer = senderSession.createProducer(senderSession.createQueue(getQueueName()));
|
||||
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||
Message message = senderSession.createMessage();
|
||||
message.setIntProperty("count", i); // test will also pass if this is removed
|
||||
producer.send(message);
|
||||
}
|
||||
senderSession.commit();
|
||||
|
||||
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||
Message received = consumer.receive(1000);
|
||||
assertNotNull(received);
|
||||
}
|
||||
consumerSession.commit();
|
||||
consumer.close();
|
||||
|
||||
assertEquals(0, server.locateQueue(getQueueName()).getMessageCount());
|
||||
Wait.assertEquals(0, store::getAddressSize, 5000);
|
||||
assertEquals(0, ((AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + getQueueName())).getAddressSize());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue