ARTEMIS-3135 - track possible change in memory estimate when messages are converted to maps for JMX or UI display, follows up from ARTEMIS-3067

This commit is contained in:
gtully 2021-02-22 22:08:23 +00:00 committed by Gary Tully
parent 4de4329cb1
commit a56ade38b4
5 changed files with 90 additions and 12 deletions

View File

@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -792,7 +793,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
int i = 0;
for (MessageReference ref : refs) {
Message message = ref.getMessage();
final int currentMemoryEstimate = message.getMemoryEstimate();
messages[i++] = message.toMap();
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
return messages;
}
@ -853,7 +856,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
Message message = ref.getMessage();
final int currentMemoryEstimate = message.getMemoryEstimate();
messages.add(message.toMap());
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
}
} catch (NoSuchElementException ignored) {
@ -898,7 +903,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
Message message = ref.getMessage();
final int currentMemoryEstimate = message.getMemoryEstimate();
messages.add(message.toMap());
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, currentMemoryEstimate);
}
return messages.toArray(new Map[1]);
}

View File

@ -179,6 +179,16 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
return MessageReferenceImpl.memoryOffset;
}
public static void accountForChangeInMemoryEstimate(final MessageReference ref, final int existingMemoryEstimate) {
final int delta = ref.getMessageMemoryEstimate() - existingMemoryEstimate;
if (delta > 0) {
PagingStore pageStore = ref.getOwner();
if (pageStore != null) {
pageStore.addSize(delta);
}
}
}
@Override
public int getDeliveryCount() {
return DELIVERY_COUNT_UPDATER.get(this);

View File

@ -3072,7 +3072,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (existingMemoryEstimate > 0 ) {
accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
}
}
@ -3702,7 +3702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
HandleStatus status = handle(ref, consumer);
accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
MessageReferenceImpl.accountForChangeInMemoryEstimate(ref, existingMemoryEstimate);
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
@ -3733,16 +3733,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private static void accountForChangeInMemoryEstimate(final MessageReference ref, final int existingMemoryEstimate) {
final int delta = ref.getMessageMemoryEstimate() - existingMemoryEstimate;
if (delta > 0) {
PagingStore pageStore = ref.getOwner();
if (pageStore != null) {
pageStore.addSize(delta);
}
}
}
private Consumer getGroupConsumer(SimpleString groupID) {
Consumer groupConsumer = null;
if (exclusive) {

View File

@ -18,7 +18,13 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -79,15 +85,26 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
final SimpleString queueNameSS = SimpleString.toSimpleString(getQueueName());
PagingStore targetPagingStore = server.getPagingManager().getPageStore(queueNameSS);
assertNotNull(targetPagingStore);
QueueControl queueControl = ManagementControlHelper.createQueueControl(queueNameSS, queueNameSS, RoutingType.ANYCAST, this.mBeanServer);
assertNotNull(queueControl);
AmqpMessage message = new AmqpMessage();
long deliveryTime = System.currentTimeMillis() + 6000;
message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
message.setText("Test-Message");
message.setApplicationProperty("OneOfThose", "Please");
sender.send(message);
sender.close();
assertEquals(1, queueView.getScheduledCount());
assertTrue(targetPagingStore.getAddressSize() > 0);
assertEquals(1, queueControl.listScheduledMessages().length);
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
@ -99,6 +116,8 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull(received);
received.accept();
Wait.assertEquals(0L, targetPagingStore::getAddressSize);
} finally {
connection.close();
}

View File

@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -34,6 +37,8 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.json.JsonArray;
import javax.json.JsonObject;
import java.util.Map;
public class JMXManagementTest extends JMSClientTestSupport {
@ -116,6 +121,53 @@ public class JMXManagementTest extends JMSClientTestSupport {
}
}
@Test
public void testAddressSizeOnDelete() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
session.begin();
AmqpMessage message = new AmqpMessage();
message.setApplicationProperty("TEST_STRING", "TEST");
message.setTimeToLive(100);
message.setText("TEST");
// send 2 so we can verify getFirstMessage and List
sender.send(message);
sender.send(message);
session.commit();
PagingStore targetPagingStore = server.getPagingManager().getPageStore(SimpleString.toSimpleString(getQueueName()));
assertNotNull(targetPagingStore);
assertTrue(targetPagingStore.getAddressSize() > 0);
SimpleString queue = new SimpleString(getQueueName());
QueueControl queueControl = createManagementControl(queue, queue);
Assert.assertEquals(2, queueControl.getMessageCount());
JsonArray array = JsonUtil.readJsonArray(queueControl.getFirstMessageAsJSON());
JsonObject object = (JsonObject) array.get(0);
queueControl.removeMessage(object.getJsonNumber("messageID").longValue());
Wait.assertEquals(1L, queueControl::getMessageCount);
Map<String, Object>[] messages = queueControl.listMessages("");
Assert.assertEquals(1, messages.length);
queueControl.removeMessage((Long) messages[0].get("messageID"));
Assert.assertEquals(0, queueControl.getMessageCount());
Wait.assertEquals(0L, targetPagingStore::getAddressSize);
} finally {
connection.close();
}
}
protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue) throws Exception {
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer);