ARTEMIS-4952 Use getObjectPropertyForFilter when applying filters

When counting messages with provided filters and grouping use the API
getObjectPropertyForFilter which translates values from AMQP message
annotations that are otherwise missed by the filters.
This commit is contained in:
Timothy Bish 2024-07-25 13:47:10 -04:00 committed by clebertsuconic
parent fa917ae3fd
commit 284ce80618
2 changed files with 62 additions and 1 deletions

View File

@ -1161,7 +1161,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (groupByProperty == null) {
result.compute(null, (k, v) -> v == null ? 1 : ++v);
} else {
Object value = message.getObjectProperty(groupByProperty);
Object value = message.getObjectPropertyForFilter(groupByProperty);
String valueStr = value == null ? null : value.toString();
result.compute(valueStr, (k, v) -> v == null ? 1 : ++v);
}

View File

@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
@ -242,6 +243,66 @@ public class JMXManagementTest extends JMSClientTestSupport {
}
}
@Test
public void testCountMessagesWithOriginalQueueFilter() throws Exception {
final String queueA = getQueueName();
final String queueB = getQueueName() + "_B";
final String queueC = getQueueName() + "_C";
final QueueControl queueAControl = createManagementControl(queueA);
final QueueControl queueBControl = createManagementControl(queueB);
final QueueControl queueCControl = createManagementControl(queueC);
final int MESSAGE_COUNT = 20;
server.createQueue(QueueConfiguration.of(queueB).setAddress(queueB).setRoutingType(RoutingType.ANYCAST));
server.createQueue(QueueConfiguration.of(queueC).setAddress(queueC).setRoutingType(RoutingType.ANYCAST));
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
try {
AmqpSession session = connection.createSession();
AmqpSender senderA = session.createSender(queueA.toString());
AmqpSender senderB = session.createSender(queueB.toString());
for (int i = 0; i < MESSAGE_COUNT; i++) {
final AmqpMessage message = new AmqpMessage();
message.setText("Message number: " + i);
senderA.send(message);
senderB.send(message);
}
assertEquals(MESSAGE_COUNT, queueAControl.countMessages());
assertEquals(MESSAGE_COUNT, queueBControl.countMessages());
assertEquals(0, queueCControl.countMessages());
queueAControl.moveMessages(null, queueC);
queueBControl.moveMessages(null, queueC);
assertEquals(0, queueAControl.countMessages());
assertEquals(0, queueBControl.countMessages());
assertEquals(MESSAGE_COUNT * 2, queueCControl.countMessages());
final String originalGroup = queueCControl.countMessages(null, "_AMQ_ORIG_QUEUE");
final String[] originalSplit = originalGroup.split(",");
assertEquals(2, originalSplit.length);
assertTrue(originalSplit[0].contains("" + MESSAGE_COUNT));
assertTrue(originalSplit[1].contains("" + MESSAGE_COUNT));
assertEquals("{\"" + queueA + "\":" + MESSAGE_COUNT + "}",
queueCControl.countMessages("_AMQ_ORIG_QUEUE = '" + queueA + "'", "_AMQ_ORIG_QUEUE"));
assertEquals("{\"" + queueB + "\":" + MESSAGE_COUNT + "}",
queueCControl.countMessages("_AMQ_ORIG_QUEUE = '" + queueB + "'", "_AMQ_ORIG_QUEUE"));
} finally {
connection.close();
}
}
protected QueueControl createManagementControl(final String queue) throws Exception {
return createManagementControl(SimpleString.of(queue), SimpleString.of(queue));
}
protected QueueControl createManagementControl(final SimpleString address,
final SimpleString queue) throws Exception {
QueueControl queueControl = ManagementControlHelper.createQueueControl(address, queue, RoutingType.ANYCAST, this.mBeanServer);