ARTEMIS-3067 - track application properties in memory estimate and check for modification after potential filter execution

This commit is contained in:
gtully 2021-01-14 16:34:13 +00:00 committed by clebertsuconic
parent 03b0fcd2c2
commit d186d20406
5 changed files with 133 additions and 1 deletions

View File

@ -490,6 +490,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) {
if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) {
applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class);
memoryEstimate = -1;
}
return applicationProperties;

View File

@ -124,12 +124,24 @@ public class AMQPStandardMessage extends AMQPMessage {
@Override
public int getMemoryEstimate() {
if (memoryEstimate == -1) {
memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData() : 0);
}
return memoryEstimate;
}
private int unmarshalledApplicationPropertiesMemoryEstimateFromData() {
if (applicationProperties != null) {
// they have been unmarshalled, estimate memory usage based on their encoded size
if (remainingBodyPosition != VALUE_NOT_PRESENT) {
return remainingBodyPosition - applicationPropertiesPosition;
} else {
return data.capacity() - applicationPropertiesPosition;
}
}
return 0;
}
@Override
public void persist(ActiveMQBuffer targetRecord) {
ensureDataIsValid();

View File

@ -269,6 +269,19 @@ public class AMQPMessageTest {
assertNotEquals(estimate, decoded.getMemoryEstimate());
}
@Test
public void testGetMemoryEstimateWithDecodedApplicationProperties() {
AMQPStandardMessage decoded = new AMQPStandardMessage(0, encodedProtonMessage, new TypedProperties(), null);
AMQPStandardMessage decodedWithApplicationPropertiesUnmarshalled =
new AMQPStandardMessage(0, encodeMessage(createProtonMessage()), new TypedProperties(), null);
assertEquals(decodedWithApplicationPropertiesUnmarshalled.getStringProperty(TEST_APPLICATION_PROPERTY_KEY), TEST_APPLICATION_PROPERTY_VALUE);
assertNotEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate());
}
//----- Test Connection ID access -----------------------------------------//

View File

@ -2935,6 +2935,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
MessageReference ref;
// filter evaluation or transformation may cause properties to be lazyDecoded, we need to reflect that
int existingMemoryEstimate = 0;
Consumer handledconsumer = null;
synchronized (this) {
@ -2970,6 +2973,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ref = holder.iter.next();
} else {
ref = null;
existingMemoryEstimate = 0;
}
if (ref == null) {
noDelivery++;
@ -2988,6 +2992,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
}
existingMemoryEstimate = ref.getMessageMemoryEstimate();
final SimpleString groupID = extractGroupID(ref);
groupConsumer = getGroupConsumer(groupID);
@ -3062,6 +3067,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (handledconsumer != null) {
proceedDeliver(handledconsumer, ref);
}
if (existingMemoryEstimate > 0 ) {
accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
}
}
return true;
@ -3685,8 +3694,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
consumer = groupConsumer;
}
// filter evaluation may cause properties to be lazyDecoded
final int existingMemoryEstimate = ref.getMessageMemoryEstimate();
HandleStatus status = handle(ref, consumer);
accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
if (redistributor == null) {
@ -3716,6 +3730,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private static void accountForChangeInMemoryEstimate(final MessageReference ref, final int existingMemoryEstimate) {
final int delta = ref.getMessageMemoryEstimate() - existingMemoryEstimate;
if (delta > 0) {
PagingStore pageStore = ref.getOwner();
if (pageStore != null) {
pageStore.addSize(delta);
}
}
}
private Consumer getGroupConsumer(SimpleString groupID) {
Consumer groupConsumer = null;
if (exclusive) {

View File

@ -118,4 +118,86 @@ public class AmqpPagingTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testSizeCalculationsForApplicationProperties() throws Exception {
final int MSG_SIZE = 1000;
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < MSG_SIZE; i++) {
builder.append('0');
}
final String data = builder.toString();
final int MSG_COUNT = 1;
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName(), true);
// use selector expression that references a property to force decode of application properties
AmqpReceiver receiver = session.createReceiver(getQueueName(), "myData IS NOT NULL");
receiver.setPresettle(true);
receiver.flow(10);
Assert.assertNull("somehow the queue had messages from a previous test", receiver.receiveNoWait());
receiver.flow(0);
AmqpMessage message = new AmqpMessage();
message.setText(data);
// large message property also
message.setApplicationProperty("myData", data);
if (durable != null) {
message.setDurable(durable);
}
sender.send(message);
PagingStore pagingStore = server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName()));
// verify page usage reflects data + 2*application properties (encoded and decoded)
assertTrue(Wait.waitFor(() -> {
return pagingStore.getAddressSize() > 3000;
}));
receiver.flow(MSG_COUNT);
AmqpMessage receive = receiver.receive(10, TimeUnit.MINUTES);
assertNotNull("Not received anything after receive", receive);
receive.accept();
assertTrue(Wait.waitFor(() -> {
return pagingStore.getAddressSize() == 0;
}));
// send another with duplicate id property, to force early decode
message = new AmqpMessage();
message.setText(data);
// ensures application properties are referenced
message.setApplicationProperty("_AMQ_DUPL_ID", "1");
// large message property also
message.setApplicationProperty("myData", data);
if (durable != null) {
message.setDurable(durable);
}
sender.send(message);
sender.close();
// verify page usage reflects data + 2*application properties (encoded and decoded)
assertTrue(Wait.waitFor(() -> {
return pagingStore.getAddressSize() > 3000;
}));
receiver.flow(MSG_COUNT);
receive = receiver.receive(10, TimeUnit.MINUTES);
assertNotNull("Not received anything after receive", receive);
receive.accept();
receiver.close();
connection.close();
}
}