From 284ce806186527a29185b2ccefd1c2187ee86ac8 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 25 Jul 2024 13:47:10 -0400 Subject: [PATCH] ARTEMIS-4952 Use getObjectPropertyForFilter when applying filters When counting messages with provided filters and grouping use the API getObjectPropertyForFilter which translates values from AMQP message annotations that are otherwise missed by the filters. --- .../management/impl/QueueControlImpl.java | 2 +- .../integration/amqp/JMXManagementTest.java | 61 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 2a010017c1..e3a054a436 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -1161,7 +1161,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { if (groupByProperty == null) { result.compute(null, (k, v) -> v == null ? 1 : ++v); } else { - Object value = message.getObjectProperty(groupByProperty); + Object value = message.getObjectPropertyForFilter(groupByProperty); String valueStr = value == null ? null : value.toString(); result.compute(valueStr, (k, v) -> v == null ? 1 : ++v); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java index 4e4606272f..7e1677baaa 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.QueueControl; @@ -242,6 +243,66 @@ public class JMXManagementTest extends JMSClientTestSupport { } } + @Test + public void testCountMessagesWithOriginalQueueFilter() throws Exception { + final String queueA = getQueueName(); + final String queueB = getQueueName() + "_B"; + final String queueC = getQueueName() + "_C"; + final QueueControl queueAControl = createManagementControl(queueA); + final QueueControl queueBControl = createManagementControl(queueB); + final QueueControl queueCControl = createManagementControl(queueC); + final int MESSAGE_COUNT = 20; + + server.createQueue(QueueConfiguration.of(queueB).setAddress(queueB).setRoutingType(RoutingType.ANYCAST)); + server.createQueue(QueueConfiguration.of(queueC).setAddress(queueC).setRoutingType(RoutingType.ANYCAST)); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + + try { + AmqpSession session = connection.createSession(); + AmqpSender senderA = session.createSender(queueA.toString()); + AmqpSender senderB = session.createSender(queueB.toString()); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + final AmqpMessage message = new AmqpMessage(); + message.setText("Message number: " + i); + + senderA.send(message); + senderB.send(message); + } + + assertEquals(MESSAGE_COUNT, queueAControl.countMessages()); + assertEquals(MESSAGE_COUNT, queueBControl.countMessages()); + assertEquals(0, queueCControl.countMessages()); + + queueAControl.moveMessages(null, queueC); + queueBControl.moveMessages(null, queueC); + + assertEquals(0, queueAControl.countMessages()); + assertEquals(0, queueBControl.countMessages()); + assertEquals(MESSAGE_COUNT * 2, queueCControl.countMessages()); + + final String originalGroup = queueCControl.countMessages(null, "_AMQ_ORIG_QUEUE"); + final String[] originalSplit = originalGroup.split(","); + + assertEquals(2, originalSplit.length); + assertTrue(originalSplit[0].contains("" + MESSAGE_COUNT)); + assertTrue(originalSplit[1].contains("" + MESSAGE_COUNT)); + + assertEquals("{\"" + queueA + "\":" + MESSAGE_COUNT + "}", + queueCControl.countMessages("_AMQ_ORIG_QUEUE = '" + queueA + "'", "_AMQ_ORIG_QUEUE")); + assertEquals("{\"" + queueB + "\":" + MESSAGE_COUNT + "}", + queueCControl.countMessages("_AMQ_ORIG_QUEUE = '" + queueB + "'", "_AMQ_ORIG_QUEUE")); + } finally { + connection.close(); + } + } + + protected QueueControl createManagementControl(final String queue) throws Exception { + return createManagementControl(SimpleString.of(queue), SimpleString.of(queue)); + } + protected QueueControl createManagementControl(final SimpleString address, final SimpleString queue) throws Exception { QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer);