diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java index fdb5b52797..eed944f29b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java @@ -258,7 +258,7 @@ public final class JsonUtil { JsonObjectBuilder jsonObjectBuilder = JsonLoader.createObjectBuilder(); if (map != null) { for (Map.Entry entry : map.entrySet()) { - addToObject(entry.getKey(), entry.getValue(), jsonObjectBuilder); + addToObject(String.valueOf(entry.getKey()), entry.getValue(), jsonObjectBuilder); } } return jsonObjectBuilder.build(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java index 2aafcb144e..d213446c01 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java @@ -296,6 +296,15 @@ public interface QueueControl { @Operation(desc = "Returns the number of the messages in the queue", impact = MBeanOperationInfo.INFO) long countMessages() throws Exception; + /** + * Counts the number of messages in this queue matching the specified filter, grouped by the given property field. + * In case of null property will be grouped in "null" + *
+ * Using {@code null} or an empty filter will count all messages from this queue. + */ + @Operation(desc = "Returns the number of the messages in the queue matching the given filter, grouped by the given property field", impact = MBeanOperationInfo.INFO) + String countMessages(@Parameter(name = "filter", desc = "This filter separate account messages") String filter, @Parameter(name = "groupByProperty", desc = "This property to group by") String groupByProperty) throws Exception; + /** * Removes the message corresponding to the specified message ID. * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 16e948339e..7377846208 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -63,6 +63,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator; public class QueueControlImpl extends AbstractControl implements QueueControl { public static final int FLUSH_LIMIT = 500; + // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- @@ -682,30 +683,46 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { @Override public long countMessages(final String filterStr) throws Exception { - checkStarted(); + Long value = intenalCountMessages(filterStr, null).get(null); + return value == null ? 0 : value; + } + @Override + public String countMessages(final String filterStr, final String groupByProperty) throws Exception { + return JsonUtil.toJsonObject(intenalCountMessages(filterStr, groupByProperty)).toString(); + } + + private Map intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception { + checkStarted(); clearIO(); + Map result = new HashMap<>(); + try { Filter filter = FilterImpl.createFilter(filterStr); - if (filter == null) { - return getMessageCount(); + SimpleString groupByProperty = SimpleString.toSimpleString(groupByPropertyStr); + if (filter == null && groupByProperty == null) { + result.put(null, getMessageCount()); } else { try (LinkedListIterator iterator = queue.browserIterator()) { - int count = 0; - try { while (iterator.hasNext()) { - MessageReference ref = iterator.next(); - if (filter.match(ref.getMessage())) { - count++; + Message message = iterator.next().getMessage(); + if (filter == null || filter.match(message)) { + if (groupByProperty == null) { + result.compute(null, (k,v) -> v == null ? 1 : ++v); + } else { + Object value = message.getObjectProperty(groupByProperty); + String valueStr = value == null ? null : value.toString(); + result.compute(valueStr, (k,v) -> v == null ? 1 : ++v); + } } } } catch (NoSuchElementException ignored) { // this could happen through paging browsing } - return count; } } + return result; } finally { blockOnIO(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 9a68badad3..5ba0b39ba9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -1781,6 +1781,66 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testCountMessagesPropertyExist() throws Exception { + String key = new String("key_group"); + String valueGroup1 = "group_1"; + String valueGroup2 = "group_2"; + + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + ClientProducer producer = session.createProducer(address); + + for (int i = 0; i < 100; i++) { + ClientMessage msg = session.createMessage(false); + if (i % 3 == 0) { + msg.putStringProperty(key, valueGroup1); + } else { + msg.putStringProperty(key, valueGroup2); + } + producer.send(msg); + } + + for (int i = 0; i < 20; i++) { + ClientMessage msg = session.createMessage(false); + producer.send(msg); + } + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(120, getMessageCount(queueControl)); + String result = queueControl.countMessages(null, key); + JsonObject jsonObject = JsonUtil.readJsonObject(result); + Assert.assertEquals(34, jsonObject.getInt(valueGroup1)); + Assert.assertEquals(66, jsonObject.getInt(valueGroup2)); + Assert.assertEquals(20, jsonObject.getInt("null")); + session.deleteQueue(queue); + } + + @Test + public void testCountMessagesPropertyWithNullFilter() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + session.createQueue(address, queue, null, false); + ClientProducer producer = session.createProducer(address); + + for (int i = 0; i < 100; i++) { + ClientMessage msg = session.createMessage(false); + msg.putStringProperty(RandomUtil.randomString(), RandomUtil.randomString()); + producer.send(msg); + } + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(100, getMessageCount(queueControl)); + String result = queueControl.countMessages(null,null); + JsonObject jsonObject = JsonUtil.readJsonObject(result); + Assert.assertEquals(100, jsonObject.getInt("null")); + session.deleteQueue(queue); + } + + @Test public void testExpireMessagesWithFilter() throws Exception { SimpleString key = new SimpleString("key"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java index dcdb2f108f..0a62334faf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java @@ -101,6 +101,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest { return (Long) proxy.invokeOperation(Long.class, "countMessages"); } + @Override + public String countMessages(final String filter, final String groupByFilter) throws Exception { + return (String) proxy.invokeOperation(String.class, "countMessages", filter, groupByFilter); + } + @Override public boolean expireMessage(final long messageID) throws Exception { return (Boolean) proxy.invokeOperation("expireMessage", messageID);